cron job chnages: remove duplicate jobs, check if ruuning

This commit is contained in:
Taras Machyshyn
2014-09-05 15:27:17 +03:00
parent 78140f7d5e
commit ee03c686ca
3 changed files with 110 additions and 44 deletions

View File

@@ -18,7 +18,7 @@
*
* You should have received a copy of the GNU General Public License
* along with EspoCRM. If not, see http://www.gnu.org/licenses/.
************************************************************************/
************************************************************************/
namespace Espo\Core\Cron;
@@ -46,22 +46,22 @@ class Service
$serviceName = $job['service_name'];
if (!$this->getServiceFactory()->checkExists($serviceName)) {
throw new NotFound();
throw new NotFound();
}
$service = $this->getServiceFactory()->create($serviceName);
$serviceMethod = $job['method'];
$service = $this->getServiceFactory()->create($serviceName);
$serviceMethod = $job['method'];
if (!method_exists($service, $serviceMethod)) {
throw new NotFound();
}
throw new NotFound();
}
$data = $job['data'];
if (Json::isJSON($data)) {
$data = Json::decode($data, true);
}
$service->$serviceMethod($data);
$service->$serviceMethod($data);
}
}

View File

@@ -34,6 +34,11 @@ class CronManager
private $jobService;
private $scheduledJobService;
const PENDING = 'Pending';
const RUNNING = 'Running';
const SUCCESS = 'Success';
const FAILED = 'Failed';
protected $lastRunTime = 'data/cache/application/cronLastRunTime.php';
@@ -128,13 +133,12 @@ class CronManager
//Check scheduled jobs and create related jobs
$this->createJobsFromScheduledJobs();
$pendingJobs = $this->getJobService()->getPendingJobs();
foreach ($pendingJobs as $job) {
$this->getJobService()->updateEntity($job['id'], array(
'status' => 'Running',
'status' => self::RUNNING,
));
$isSuccess = true;
@@ -150,7 +154,7 @@ class CronManager
$GLOBALS['log']->error('Failed job running, job ['.$job['id'].']. Error Details: '.$e->getMessage());
}
$status = $isSuccess ? 'Success' : 'Failed';
$status = $isSuccess ? self::SUCCESS : self::FAILED;
$this->getJobService()->updateEntity($job['id'], array(
'status' => $status,
@@ -180,7 +184,6 @@ class CronManager
$cronExpression = \Cron\CronExpression::factory($scheduling);
try {
//$nextDate = $cronExpression->getNextRunDate()->format('Y-m-d H:i:s');
$prevDate = $cronExpression->getPreviousRunDate()->format('Y-m-d H:i:s');
} catch (\Exception $e) {
$GLOBALS['log']->error('ScheduledJob ['.$scheduledJob['id'].']: CronExpression - Impossible CRON expression ['.$scheduling.']');
@@ -197,7 +200,7 @@ class CronManager
//create a job
$data = array(
'name' => $scheduledJob['name'],
'status' => 'Pending',
'status' => self::PENDING,
'scheduledJobId' => $scheduledJob['id'],
'executeTime' => $prevDate,
'method' => $scheduledJob['job'],

View File

@@ -18,51 +18,75 @@
*
* You should have received a copy of the GNU General Public License
* along with EspoCRM. If not, see http://www.gnu.org/licenses/.
************************************************************************/
************************************************************************/
namespace Espo\Services;
use \PDO;
use \Espo\Core\CronManager;
class Job extends Record
{
{
public function getPendingJobs()
{
$jobConfigs = $this->getConfig()->get('cron');
/** Mark Failed old jobs and remove pending duplicates */
$this->markFailedJobs();
$this->removePendingJobDuplicates();
$jobList = $this->getActiveJobs();
$runningScheduledJobs = $this->getActiveJobs('scheduled_job_id', CronManager::RUNNING, PDO::FETCH_COLUMN);
$list = array();
foreach ($jobList as $row) {
if (!in_array($row['scheduled_job_id'], $runningScheduledJobs)) {
$list[] = $row;
}
}
return $list;
}
/**
* Get active Jobs, which execution date in jobPeriod time
*
* @param string $displayColumns
* @param string $status
* @return array
*/
protected function getActiveJobs($displayColumns = '*', $status = CronManager::PENDING, $fetchMode = PDO::FETCH_ASSOC)
{
$jobConfigs = $this->getConfig()->get('cron');
$currentTime = time();
$periodTime = $currentTime - intval($jobConfigs['jobPeriod']);
$limit = empty($jobConfigs['maxJobNumber']) ? '' : 'LIMIT '.$jobConfigs['maxJobNumber'];
$query = "SELECT * FROM job WHERE
`status` = 'Pending'
AND execute_time BETWEEN '".date('Y-m-d H:i:s', $periodTime)."' AND '".date('Y-m-d H:i:s', $currentTime)."'
ORDER BY execute_time DESC ".$limit;
$query = "SELECT " . $displayColumns . " FROM job WHERE
`status` = '" . $status . "'
AND execute_time BETWEEN '".date('Y-m-d H:i:s', $periodTime)."' AND '".date('Y-m-d H:i:s', $currentTime)."'
AND deleted = 0
ORDER BY execute_time DESC ".$limit;
$pdo = $this->getEntityManager()->getPDO();
$sth = $pdo->prepare($query);
$sth = $pdo->prepare($query);
$sth->execute();
$rows = $sth->fetchAll(PDO::FETCH_ASSOC);
$list = array();
foreach ($rows as $row) {
$list[] = $row;
}
$rows = $sth->fetchAll($fetchMode);
return $list;
}
return $rows;
}
public function getJobByScheduledJob($scheduledJobId, $date)
public function getJobByScheduledJob($scheduledJobId, $date)
{
$query = "SELECT * FROM job WHERE
scheduled_job_id = '".$scheduledJobId."'
AND execute_time = '".$date."'
LIMIT 1";
AND deleted = 0
LIMIT 1";
$pdo = $this->getEntityManager()->getPDO();
$sth = $pdo->prepare($query);
$sth = $pdo->prepare($query);
$sth->execute();
$scheduledJob = $sth->fetchAll(PDO::FETCH_ASSOC);
@@ -70,20 +94,59 @@ class Job extends Record
return $scheduledJob;
}
/**
* Mark pending jobs (all jobs that exceeded jobPeriod)
*
* @return void
*/
protected function markFailedJobs()
{
$jobConfigs = $this->getConfig()->get('cron');
$currentTime = time();
$periodTime = $currentTime - intval($jobConfigs['jobPeriod']);
//todo remove, used for tests
public function testMethod($data)
{
}
$update = "UPDATE job SET `status` = '" . CronManager::FAILED ."' WHERE
(`status` = '" . CronManager::PENDING ."' OR `status` = '" . CronManager::RUNNING ."')
AND execute_time < '".date('Y-m-d H:i:s', $periodTime)."' ";
//todo remove, used for tests
public function testFailed($data)
{
throw new \Espo\Core\Exceptions\Error();
$pdo = $this->getEntityManager()->getPDO();
$sth = $pdo->prepare($update);
$sth->execute();
}
/**
* Remove pending duplicate jobs, no need to run twice the same job
*
* @return void
*/
protected function removePendingJobDuplicates()
{
$duplicateJobs = $this->getActiveJobs('DISTINCT scheduled_job_id');
$pdo = $this->getEntityManager()->getPDO();
foreach ($duplicateJobs as $row) {
if (!empty($row['scheduled_job_id'])) {
/* no possibility to use limit in update or subqueries */
$query = "SELECT id FROM `job` WHERE scheduled_job_id = '" . $row['scheduled_job_id'] . "'
AND `status` = '" . CronManager::PENDING ."'
ORDER BY execute_time
DESC LIMIT 1, 100000 ";
$sth = $pdo->prepare($query);
$sth->execute();
$jobIds = $sth->fetchAll(PDO::FETCH_COLUMN);
$update = "UPDATE job SET deleted = 1 WHERE
id IN ('". implode("', '", $jobIds)."') ";
$sth = $pdo->prepare($update);
$sth->execute();
}
}
}
}