diff --git a/application/Espo/Core/Job/AsyncPoolFactory.php b/application/Espo/Core/Job/AsyncPoolFactory.php new file mode 100644 index 0000000000..ca6e4b53fa --- /dev/null +++ b/application/Espo/Core/Job/AsyncPoolFactory.php @@ -0,0 +1,58 @@ +config = $config; + } + + public function isSupported() : bool + { + return Pool::isSupported(); + } + + public function create() : Pool + { + return Pool + ::create() + ->autoload(getcwd() . '/vendor/autoload.php') + ->concurrency($this->config->get('jobPoolConcurrencyNumber')) + ->timeout($this->config->get('jobPeriodForActiveProcess')); + } +} diff --git a/application/Espo/Core/Job/JobManager.php b/application/Espo/Core/Job/JobManager.php index 3856bdc2f6..37ed345311 100644 --- a/application/Espo/Core/Job/JobManager.php +++ b/application/Espo/Core/Job/JobManager.php @@ -30,48 +30,23 @@ namespace Espo\Core\Job; use Espo\Core\{ - Exceptions\Error, - ServiceFactory, Utils\Config, Utils\File\Manager as FileManager, - Utils\System, ORM\EntityManager, Utils\Log, - Job\JobTargeted, }; use Espo\{ Entities\Job as JobEntity, - Entities\ScheduledJob as ScheduledJobEntity, }; -use Spatie\Async\Pool as AsyncPool; - -use Cron\CronExpression; - -use Exception; use Throwable; /** - * Handles running jobs. + * Handles processing jobs. */ class JobManager { - private $queueUtil; - - private $scheduleUtil; - - private $useProcessPool = false; - - private $asSoonAsPossibleSchedulingList = [ - '*', - '* *', - '* * *', - '* * * *', - '* * * * *', - '* * * * * *', - ]; - const PENDING = 'Pending'; const READY = 'Ready'; @@ -82,6 +57,8 @@ class JobManager const FAILED = 'Failed'; + private $useProcessPool = false; + protected $lastRunTimeFile = 'data/cache/application/cronLastRunTime.php'; private $config; @@ -90,33 +67,41 @@ class JobManager private $entityManager; - private $serviceFactory; - - private $jobFactory; + private $jobRunner; private $log; + private $scheduleProcessor; + + private $queueUtil; + + private $asyncPoolFactory; + + private $queueProcessorFactory; + public function __construct( Config $config, FileManager $fileManager, EntityManager $entityManager, - ServiceFactory $serviceFactory, - JobFactory $jobFactory, + JobRunner $jobRunner, Log $log, + ScheduleProcessor $scheduleProcessor, QueueUtil $queueUtil, - ScheduleUtil $scheduleUtil + AsyncPoolFactory $asyncPoolFactory, + QueueProcessorFactory $queueProcessorFactory ) { $this->config = $config; $this->fileManager = $fileManager; $this->entityManager = $entityManager; - $this->serviceFactory = $serviceFactory; - $this->jobFactory = $jobFactory; + $this->jobRunner = $jobRunner; $this->log = $log; + $this->scheduleProcessor = $scheduleProcessor; $this->queueUtil = $queueUtil; - $this->scheduleUtil = $scheduleUtil; + $this->asyncPoolFactory = $asyncPoolFactory; + $this->queueProcessorFactory = $queueProcessorFactory; if ($this->config->get('jobRunInParallel')) { - if (AsyncPool::isSupported()) { + if ($this->asyncPoolFactory->isSupported()) { $this->useProcessPool = true; } else { @@ -162,16 +147,6 @@ class JobManager return false; } - protected function useProcessPool() : bool - { - return $this->useProcessPool; - } - - public function setUseProcessPool(bool $useProcessPool) : void - { - $this->useProcessPool = $useProcessPool; - } - /** * Process jobs. Jobs will be created according scheduling. Then pending jobs will be processed. * This method supposed to be called on every Cron run or loop iteration of the Daemon. @@ -189,11 +164,11 @@ class JobManager $this->queueUtil->markJobsFailed(); $this->queueUtil->updateFailedJobAttempts(); - $this->createJobsFromScheduledJobs(); + $this->scheduleProcessor->createJobsFromScheduledJobs(); $this->queueUtil->removePendingJobDuplicates(); - $this->processPendingJobs(); + $this->processMainQueue(); } /** @@ -201,123 +176,30 @@ class JobManager */ public function processQueue(string $queue, int $limit) : void { - $this->processPendingJobs($queue, $limit, true, true); + $params = QueueProcessorParams + ::fromNothing() + ->withQueue($queue) + ->withLimit($limit) + ->withUseProcessPool(false) + ->withNoLock(true); + + $processor = $this->queueProcessorFactory->create($params); + + $processor->process(); } - protected function processPendingJobs( - ?string $queue = null, - ?int $limit = null, - bool $poolDisabled = false, - bool $noLock = false - ) : void { - - if (is_null($limit)) { - $limit = intval($this->config->get('jobMaxPortion', 0)); - } - - $pendingJobList = $this->queueUtil->getPendingJobList($queue, $limit); - - $useProcessPool = $this->useProcessPool(); - - if ($poolDisabled) { - $useProcessPool = false; - } - - $pool = null; - - if ($useProcessPool) { - $pool = AsyncPool - ::create() - ->autoload(getcwd() . '/vendor/autoload.php') - ->concurrency($this->config->get('jobPoolConcurrencyNumber')) - ->timeout($this->config->get('jobPeriodForActiveProcess')); - } - - foreach ($pendingJobList as $job) { - $this->processPendingJob($job, $pool, $noLock); - } - - if ($useProcessPool) { - $pool->wait(); - } - } - - protected function processPendingJob(JobEntity $job, $pool = null, bool $noLock = false) : void + private function processMainQueue() : void { - $useProcessPool = (bool) $pool; + $limit = (int) $this->config->get('jobMaxPortion', 0); - $lockTable = (bool) $job->getScheduledJobId() && !$noLock; + $params = QueueProcessorParams + ::fromNothing() + ->withUseProcessPool($this->useProcessPool) + ->withLimit($limit); - $skip = false; + $processor = $this->queueProcessorFactory->create($params); - if (!$noLock) { - if ($lockTable) { - // MySQL doesn't allow to lock non-existent rows. We resort to locking an entire table. - $this->entityManager->getLocker()->lockExclusive('Job'); - } - else { - $this->entityManager->getTransactionManager()->start(); - } - } - - if ($noLock || $this->queueUtil->isJobPending($job->id)) { - if ( - $job->getScheduledJobId() && - $this->queueUtil->isScheduledJobRunning( - $job->getScheduledJobId(), - $job->getTargetId(), - $job->getTargetType() - ) - ) { - $skip = true; - } - } else { - $skip = true; - } - - if ($skip && !$noLock) { - if ($lockTable) { - $this->entityManager->getLocker()->rollback(); - } - else { - $this->entityManager->getTransactionManager()->rollback(); - } - } - - if ($skip) { - return; - } - - $job->set('startedAt', date('Y-m-d H:i:s')); - - if ($useProcessPool) { - $job->set('status', self::READY); - } - else { - $job->set('status', self::RUNNING); - $job->set('pid', System::getPid()); - } - - $this->entityManager->saveEntity($job); - - if (!$noLock) { - if ($lockTable) { - $this->entityManager->getLocker()->commit(); - } - else { - $this->entityManager->getTransactionManager()->commit(); - } - } - - if ($useProcessPool) { - $task = new JobTask($job->id); - - $pool->add($task); - - return; - } - - $this->runJobInternal($job); + $processor->process(); } /** @@ -325,30 +207,7 @@ class JobManager */ public function runJobById(string $id) : void { - if ($id === '') { - throw new Error(); - } - - $job = $this->entityManager->getEntity('Job', $id); - - if (!$job) { - throw new Error("Job {$id} not found."); - } - - if ($job->getStatus() !== self::READY) { - throw new Error("Can't run job {$id} with no status Ready."); - } - - if (!$job->getStartedAt()) { - $job->set('startedAt', date('Y-m-d H:i:s')); - } - - $job->set('status', self::RUNNING); - $job->set('pid', System::getPid()); - - $this->entityManager->saveEntity($job); - - $this->runJobInternal($job); + $this->jobRunner->runById($id); } /** @@ -358,243 +217,6 @@ class JobManager */ public function runJob(JobEntity $job) : void { - $this->runJobInternal($job, true); - } - - protected function runJobInternal(JobEntity $job, bool $throwException = false) : void - { - $isSuccess = true; - - $skipLog = false; - - $exception = null; - - try { - if ($job->getScheduledJobId()) { - $this->runScheduledJob($job); - } - else if ($job->getJob()) { - $this->runJobByName($job); - } - else if ($job->getServiceName()) { - $this->runService($job); - } - else { - $id = $job->getId(); - - throw new Error("Not runnable job '{$id}'."); - } - } - catch (Throwable $e) { - $isSuccess = false; - - if ($e->getCode() === -1) { - $job->set('attempts', 0); - - $skipLog = true; - } - else { - $this->log->error( - "JobManager: Failed job running, job '{$job->id}'. " . - $e->getMessage() . "; at " . $e->getFile() . ":" . $e->getLine() . "." - ); - } - - if ($throwException) { - $exception = $e; - } - } - - $status = $isSuccess ? self::SUCCESS : self::FAILED; - - $job->set('status', $status); - - if ($isSuccess) { - $job->set('executedAt', date('Y-m-d H:i:s')); - } - - $this->entityManager->saveEntity($job); - - if ($throwException && $exception) { - throw new $exception; - } - - if ($job->getScheduledJobId() && !$skipLog) { - $this->scheduleUtil->addLogRecord( - $job->getScheduledJobId(), - $status, - null, - $job->getTargetId(), - $job->getTargetType() - ); - } - } - - protected function runScheduledJob(JobEntity $job) : void - { - $jobName = $job->getScheduledJobJob(); - - if (!$jobName) { - throw new Error( - "Can't run job '" . $job->getId() . "'. Not a scheduled job." - ); - } - - $obj = $this->jobFactory->create($jobName); - - if ($obj instanceof JobTargeted) { - $obj->run($job->getTargetType(), $job->getTargetId(), $job->getData()); - - return; - } - - if (!method_exists($obj, 'run')) { - throw new Error("No 'run' method in job '{$jobName}'."); - } - - $obj->run(); - } - - protected function runService(JobEntity $job) : void - { - $serviceName = $job->getServiceName(); - - if (!$serviceName) { - throw new Error("Job with empty serviceName."); - } - - if (!$this->serviceFactory->checkExists($serviceName)) { - throw new Error(); - } - - $service = $this->serviceFactory->create($serviceName); - - $methodName = $job->getMethodName(); - - if (!$methodName) { - throw new Error('Job with empty methodName.'); - } - - if (!method_exists($service, $methodName)) { - throw new Error(); - } - - $service->$methodName($job->getData(), $job->getTargetId(), $job->getTargetType()); - } - - protected function runJobByName(JobEntity $job) : void - { - $jobName = $job->getJob(); - - $obj = $this->jobFactory->create($jobName); - - if ($obj instanceof JobTargeted) { - $obj->run($job->getTargetType(), $job->getTargetId(), $job->getData()); - - return; - } - - if (!method_exists($obj, 'run')) { - throw new Error("No 'run' method in job '{$jobName}'."); - } - - $obj->run(); - } - - protected function createJobsFromScheduledJobs() : void - { - $activeScheduledJobList = $this->scheduleUtil->getActiveScheduledJobList(); - - $runningScheduledJobIdList = $this->queueUtil->getRunningScheduledJobIdList(); - - foreach ($activeScheduledJobList as $scheduledJob) { - try { - $this->createJobsFromScheduledJob($scheduledJob, $runningScheduledJobIdList); - } - catch (Throwable $e) { - $id = $scheduledJob->getId(); - - $this->log->error("Scheduled Job '{$id}': " . $e->getMessage()); - } - } - } - - private function createJobsFromScheduledJob( - ScheduledJobEntity $scheduledJob, array $runningScheduledJobIdList - ) : void { - - $scheduling = $scheduledJob->getScheduling(); - - $id = $scheduledJob->getId(); - - $asSoonAsPossible = in_array($scheduling, $this->asSoonAsPossibleSchedulingList); - - if ($asSoonAsPossible) { - $executeTime = date('Y-m-d H:i:s'); - } - else { - try { - $cronExpression = CronExpression::factory($scheduling); - } - catch (Exception $e) { - $this->log->error( - "Scheduled Job '{$id}': Scheduling expression error: " . - $e->getMessage() . '.' - ); - - return; - } - - try { - $executeTime = $cronExpression->getNextRunDate()->format('Y-m-d H:i:s'); - } - catch (Exception $e) { - $this->log->error( - "Scheduled Job '{$id}': Unsupported scheduling expression '{$scheduling}'." - ); - - return; - } - - $jobAlreadyExists = $this->queueUtil->hasScheduledJobOnMinute($id, $executeTime); - - if ($jobAlreadyExists) { - return; - } - } - - $jobName = $scheduledJob->getJob(); - - if ($this->jobFactory->isPreparable($jobName)) { - $jobObj = $this->jobFactory->create($jobName); - - $jobObj->prepare($scheduledJob, $executeTime); - - return; - } - - if (in_array($id, $runningScheduledJobIdList)) { - return; - } - - $pendingCount = $this->queueUtil->getPendingCountByScheduledJobId($id); - - if ($asSoonAsPossible) { - if ($pendingCount > 0) { - return; - } - } - else { - if ($pendingCount > 1) { - return; - } - } - - $this->entityManager->createEntity('Job', [ - 'name' => $scheduledJob->getName(), - 'status' => self::PENDING, - 'scheduledJobId' => $id, - 'executeTime' => $executeTime, - ]); + $this->jobRunner->runThrowingException($job); } } diff --git a/application/Espo/Core/Job/JobRunner.php b/application/Espo/Core/Job/JobRunner.php new file mode 100644 index 0000000000..d42cfe062e --- /dev/null +++ b/application/Espo/Core/Job/JobRunner.php @@ -0,0 +1,252 @@ +jobFactory = $jobFactory; + $this->scheduleUtil = $scheduleUtil; + $this->entityManager = $entityManager; + $this->serviceFactory = $serviceFactory; + $this->log = $log; + } + + /** + * Run a job entity. Does not throw exceptions. + */ + public function run(JobEntity $job) : void + { + $this->runInternal($job, false); + } + + /** + * Run a job entity. Throws exceptions. + * + * @throws Throwable + */ + public function runThrowingException(JobEntity $job) : void + { + $this->runInternal($job, true); + } + + /** + * Run a job by ID. A job must have status 'Ready'. + * Used when running jobs in parallel processes. + */ + public function runById(string $id) : void + { + if ($id === '') { + throw new Error(); + } + + $job = $this->entityManager->getEntity('Job', $id); + + if (!$job) { + throw new Error("Job {$id} not found."); + } + + if ($job->getStatus() !== JobManager::READY) { + throw new Error("Can't run job {$id} with no status Ready."); + } + + if (!$job->getStartedAt()) { + $job->set('startedAt', date('Y-m-d H:i:s')); + } + + $job->set('status', JobManager::RUNNING); + $job->set('pid', System::getPid()); + + $this->entityManager->saveEntity($job); + + $this->run($job); + } + + private function runInternal(JobEntity $job, bool $throwException = false) : void + { + $isSuccess = true; + + $skipLog = false; + + $exception = null; + + try { + if ($job->getScheduledJobId()) { + $this->runScheduledJob($job); + } + else if ($job->getJob()) { + $this->runJobByName($job); + } + else if ($job->getServiceName()) { + $this->runService($job); + } + else { + $id = $job->getId(); + + throw new Error("Not runnable job '{$id}'."); + } + } + catch (Throwable $e) { + $isSuccess = false; + + $this->log->error( + "JobManager: Failed job running, job '{$job->id}'. " . + $e->getMessage() . "; at " . $e->getFile() . ":" . $e->getLine() . "." + ); + + if ($throwException) { + $exception = $e; + } + } + + $status = $isSuccess ? JobManager::SUCCESS : JobManager::FAILED; + + $job->set('status', $status); + + if ($isSuccess) { + $job->set('executedAt', date('Y-m-d H:i:s')); + } + + $this->entityManager->saveEntity($job); + + if ($throwException && $exception) { + throw new $exception; + } + + if ($job->getScheduledJobId() && !$skipLog) { + $this->scheduleUtil->addLogRecord( + $job->getScheduledJobId(), + $status, + null, + $job->getTargetId(), + $job->getTargetType() + ); + } + } + + protected function runJobByName(JobEntity $job) : void + { + $jobName = $job->getJob(); + + $obj = $this->jobFactory->create($jobName); + + if ($obj instanceof JobTargeted) { + $obj->run($job->getTargetType(), $job->getTargetId(), $job->getData()); + + return; + } + + if (!method_exists($obj, 'run')) { + throw new Error("No 'run' method in job '{$jobName}'."); + } + + $obj->run(); + } + + protected function runScheduledJob(JobEntity $job) : void + { + $jobName = $job->getScheduledJobJob(); + + if (!$jobName) { + throw new Error( + "Can't run job '" . $job->getId() . "'. Not a scheduled job." + ); + } + + $obj = $this->jobFactory->create($jobName); + + if ($obj instanceof JobTargeted) { + $obj->run($job->getTargetType(), $job->getTargetId(), $job->getData()); + + return; + } + + if (!method_exists($obj, 'run')) { + throw new Error("No 'run' method in job '{$jobName}'."); + } + + $obj->run(); + } + + protected function runService(JobEntity $job) : void + { + $serviceName = $job->getServiceName(); + + if (!$serviceName) { + throw new Error("Job with empty serviceName."); + } + + if (!$this->serviceFactory->checkExists($serviceName)) { + throw new Error(); + } + + $service = $this->serviceFactory->create($serviceName); + + $methodName = $job->getMethodName(); + + if (!$methodName) { + throw new Error('Job with empty methodName.'); + } + + if (!method_exists($service, $methodName)) { + throw new Error("No method '{$methodName}' in service '{$serviceName}'."); + } + + $service->$methodName($job->getData(), $job->getTargetId(), $job->getTargetType()); + } +} diff --git a/application/Espo/Core/Job/QueueProcessor.php b/application/Espo/Core/Job/QueueProcessor.php new file mode 100644 index 0000000000..81ef3d1423 --- /dev/null +++ b/application/Espo/Core/Job/QueueProcessor.php @@ -0,0 +1,170 @@ +params = $params; + $this->queueUtil = $queueUtil; + $this->jobRunner = $jobRunner; + $this->asyncPoolFactory = $asyncPoolFactory; + $this->entityManager = $entityManager; + } + + public function process() : void + { + $pool = null; + + if ($this->params->useProcessPool()) { + $pool = $this->asyncPoolFactory->create(); + } + + $pendingJobList = $this->queueUtil->getPendingJobList( + $this->params->getQueue(), + $this->params->getLimit() + ); + + foreach ($pendingJobList as $job) { + $this->processJob($job, $pool); + } + + if ($pool) { + $pool->wait(); + } + } + + protected function processJob(JobEntity $job, ?AsyncPool $pool = null) : void + { + $useProcessPool = $this->params->useProcessPool(); + + $noLock = $this->params->noLock(); + + $lockTable = (bool) $job->getScheduledJobId() && !$noLock; + + $skip = false; + + if (!$noLock) { + if ($lockTable) { + // MySQL doesn't allow to lock non-existent rows. We resort to locking an entire table. + $this->entityManager->getLocker()->lockExclusive('Job'); + } + else { + $this->entityManager->getTransactionManager()->start(); + } + } + + if ($noLock || $this->queueUtil->isJobPending($job->id)) { + if ( + $job->getScheduledJobId() && + $this->queueUtil->isScheduledJobRunning( + $job->getScheduledJobId(), + $job->getTargetId(), + $job->getTargetType() + ) + ) { + $skip = true; + } + } else { + $skip = true; + } + + if ($skip && !$noLock) { + if ($lockTable) { + $this->entityManager->getLocker()->rollback(); + } + else { + $this->entityManager->getTransactionManager()->rollback(); + } + } + + if ($skip) { + return; + } + + $job->set('startedAt', date('Y-m-d H:i:s')); + + if ($useProcessPool) { + $job->set('status', JobManager::READY); + } + else { + $job->set('status', JobManager::RUNNING); + $job->set('pid', System::getPid()); + } + + $this->entityManager->saveEntity($job); + + if (!$noLock) { + if ($lockTable) { + $this->entityManager->getLocker()->commit(); + } + else { + $this->entityManager->getTransactionManager()->commit(); + } + } + + if ($useProcessPool) { + $task = new JobTask($job->id); + + $pool->add($task); + + return; + } + + $this->jobRunner->run($job); + } +} \ No newline at end of file diff --git a/application/Espo/Core/Job/QueueProcessorFactory.php b/application/Espo/Core/Job/QueueProcessorFactory.php new file mode 100644 index 0000000000..3c5c9b5b0f --- /dev/null +++ b/application/Espo/Core/Job/QueueProcessorFactory.php @@ -0,0 +1,51 @@ +injectableFactory = $injectableFactory; + } + + public function create(QueueProcessorParams $params) : QueueProcessor + { + return $this->injectableFactory->createWith(QueueProcessor::class, [ + 'params' => $params, + ]); + } +} \ No newline at end of file diff --git a/application/Espo/Core/Job/QueueProcessorParams.php b/application/Espo/Core/Job/QueueProcessorParams.php new file mode 100644 index 0000000000..d505552ecf --- /dev/null +++ b/application/Espo/Core/Job/QueueProcessorParams.php @@ -0,0 +1,102 @@ +useProcessPool = $useProcessPool; + + return $obj; + } + + public function withNoLock(bool $noLock) : self + { + $obj = clone $this; + + $obj->noLock = $noLock; + + return $obj; + } + + public function withQueue(?string $queue) : self + { + $obj = clone $this; + + $obj->queue = $queue; + + return $obj; + } + + public function withLimit(int $limit) : self + { + $obj = clone $this; + + $obj->limit = $limit; + + return $obj; + } + + public function useProcessPool() : bool + { + return $this->useProcessPool; + } + + public function noLock() : bool + { + return $this->noLock; + } + + public function getQueue() : ?string + { + return $this->queue; + } + + public function getLimit() : int + { + return $this->limit; + } + + public static function fromNothing() : self + { + return new self(); + } +} diff --git a/application/Espo/Core/Job/ScheduleProcessor.php b/application/Espo/Core/Job/ScheduleProcessor.php new file mode 100644 index 0000000000..198c45f095 --- /dev/null +++ b/application/Espo/Core/Job/ScheduleProcessor.php @@ -0,0 +1,178 @@ +log = $log; + $this->entityManager = $entityManager; + $this->queueUtil = $queueUtil; + $this->scheduleUtil = $scheduleUtil; + $this->jobFactory = $jobFactory; + } + + public function createJobsFromScheduledJobs() : void + { + $activeScheduledJobList = $this->scheduleUtil->getActiveScheduledJobList(); + + $runningScheduledJobIdList = $this->queueUtil->getRunningScheduledJobIdList(); + + foreach ($activeScheduledJobList as $scheduledJob) { + try { + $this->createJobsFromScheduledJob($scheduledJob, $runningScheduledJobIdList); + } + catch (Throwable $e) { + $id = $scheduledJob->getId(); + + $this->log->error("Scheduled Job '{$id}': " . $e->getMessage()); + } + } + } + + private function createJobsFromScheduledJob( + ScheduledJobEntity $scheduledJob, + array $runningScheduledJobIdList + ) : void { + + $scheduling = $scheduledJob->getScheduling(); + + $id = $scheduledJob->getId(); + + $asSoonAsPossible = in_array($scheduling, $this->asSoonAsPossibleSchedulingList); + + if ($asSoonAsPossible) { + $executeTime = date('Y-m-d H:i:s'); + } + else { + try { + $cronExpression = CronExpression::factory($scheduling); + } + catch (Exception $e) { + $this->log->error( + "Scheduled Job '{$id}': Scheduling expression error: " . + $e->getMessage() . '.' + ); + + return; + } + + try { + $executeTime = $cronExpression->getNextRunDate()->format('Y-m-d H:i:s'); + } + catch (Exception $e) { + $this->log->error( + "Scheduled Job '{$id}': Unsupported scheduling expression '{$scheduling}'." + ); + + return; + } + + $jobAlreadyExists = $this->queueUtil->hasScheduledJobOnMinute($id, $executeTime); + + if ($jobAlreadyExists) { + return; + } + } + + $jobName = $scheduledJob->getJob(); + + if ($this->jobFactory->isPreparable($jobName)) { + $jobObj = $this->jobFactory->create($jobName); + + $jobObj->prepare($scheduledJob, $executeTime); + + return; + } + + if (in_array($id, $runningScheduledJobIdList)) { + return; + } + + $pendingCount = $this->queueUtil->getPendingCountByScheduledJobId($id); + + if ($asSoonAsPossible) { + if ($pendingCount > 0) { + return; + } + } + else { + if ($pendingCount > 1) { + return; + } + } + + $this->entityManager->createEntity('Job', [ + 'name' => $scheduledJob->getName(), + 'status' => JobManager::PENDING, + 'scheduledJobId' => $id, + 'executeTime' => $executeTime, + ]); + } +} diff --git a/tests/unit/Espo/Core/Job/JobManagerTest.php b/tests/unit/Espo/Core/Job/JobManagerTest.php deleted file mode 100644 index ef3948532b..0000000000 --- a/tests/unit/Espo/Core/Job/JobManagerTest.php +++ /dev/null @@ -1,130 +0,0 @@ -serviceFactory = $this->createMock(ServiceFactory::class); - $this->config = $this->createMock(Config::class); - $this->fileManager = $this->createMock(FileManager::class); - $this->jobFactory = $this->createMock(JobFactory::class); - $this->entityManager = $this->createMock(EntityManager::class); - $this->log = $this->createMock(Log::class); - $this->queueUtil = $this->createMock(QueueUtil::class); - $this->scheduleUtil = $this->createMock(ScheduleUtil::class); - - $this->manager = new JobManager( - $this->config, - $this->fileManager, - $this->entityManager, - $this->serviceFactory, - $this->jobFactory, - $this->log, - $this->queueUtil, - $this->scheduleUtil - ); - - $this->reflection = new ReflectionHelper($this->manager); - } - - protected function tearDown() : void - { - $this->manager = NULL; - } - - public function testCheckLastRunTimeFileDoesnotExist() - { - $this->fileManager - ->expects($this->once()) - ->method('getPhpContents') - ->will($this->returnValue(false)); - - $this->config - ->expects($this->any()) - ->method('get') - ->will($this->returnValue(50)); - - $this->assertTrue($this->reflection->invokeMethod('checkLastRunTime', [])); - } - - public function testCheckLastRunTime() - { - $this->fileManager - ->expects($this->once()) - ->method('getPhpContents') - ->will( - $this->returnValue([ - 'time' => time() - 60, - ]) - ); - - $this->config - ->expects($this->any()) - ->method('get') - ->will($this->returnValue(50)); - - $this->assertTrue( $this->reflection->invokeMethod('checkLastRunTime', [])); - } - - public function testCheckLastRunTimeTooFrequency() - { - $this->fileManager - ->expects($this->once()) - ->method('getPhpContents') - ->will( - $this->returnValue([ - 'time' => time() - 49, - ]) - ); - - $this->config - ->expects($this->exactly(1)) - ->method('get') - ->will($this->returnValue(50)); - - $this->assertFalse($this->reflection->invokeMethod('checkLastRunTime', [])); - } -} diff --git a/tests/unit/Espo/Core/Job/QueueProcessorParamsTest.php b/tests/unit/Espo/Core/Job/QueueProcessorParamsTest.php new file mode 100644 index 0000000000..17e040217e --- /dev/null +++ b/tests/unit/Espo/Core/Job/QueueProcessorParamsTest.php @@ -0,0 +1,70 @@ +withLimit(10); + + $this->assertFalse($params->useProcessPool()); + $this->assertFalse($params->noLock()); + + $this->assertEquals(10, $params->getLimit()); + + $this->assertNull($params->getQueue()); + } + + public function testParams2() + { + $params = QueueProcessorParams + ::fromNothing() + ->withLimit(10) + ->withUseProcessPool(true) + ->withNoLock(true) + ->withQueue('q0'); + + $this->assertTrue($params->useProcessPool()); + $this->assertTrue($params->noLock()); + + $this->assertEquals('q0', $params->getQueue()); + } +}