config = $config; $this->fileManager = $fileManager; $this->jobRunner = $jobRunner; $this->log = $log; $this->scheduleProcessor = $scheduleProcessor; $this->queueUtil = $queueUtil; $this->asyncPoolFactory = $asyncPoolFactory; $this->queueProcessorFactory = $queueProcessorFactory; if ($this->config->get('jobRunInParallel')) { if ($this->asyncPoolFactory->isSupported()) { $this->useProcessPool = true; } else { $this->log->warning("JobManager: useProcessPool requires pcntl and posix extensions."); } } } /** * Process jobs. Jobs will be created according scheduling. Then pending jobs will be processed. * This method supposed to be called on every Cron run or loop iteration of the Daemon. */ public function process(): void { if (!$this->checkLastRunTime()) { $this->log->info('JobManager: Skip job processing. Too frequent execution.'); return; } $this->updateLastRunTime(); $this->queueUtil->markJobsFailed(); $this->queueUtil->updateFailedJobAttempts(); $this->scheduleProcessor->process(); $this->queueUtil->removePendingJobDuplicates(); $this->processMainQueue(); } /** * Process pending jobs from a specific queue. Jobs within a queue are processed one by one. */ public function processQueue(string $queue, int $limit): void { $params = QueueProcessorParams ::create() ->withQueue($queue) ->withLimit($limit) ->withUseProcessPool(false) ->withNoLock(true); $processor = $this->queueProcessorFactory->create($params); $processor->process(); } /** * Process pending jobs from a specific group. Jobs within a group are processed one by one. */ public function processGroup(string $group, int $limit): void { $params = QueueProcessorParams ::create() ->withGroup($group) ->withLimit($limit) ->withUseProcessPool(false) ->withNoLock(true); $processor = $this->queueProcessorFactory->create($params); $processor->process(); } private function processMainQueue(): void { $limit = (int) $this->config->get('jobMaxPortion', 0); $params = QueueProcessorParams ::create() ->withUseProcessPool($this->useProcessPool) ->withLimit($limit); $processor = $this->queueProcessorFactory->create($params); $processor->process(); } /** * Run a specific job by ID. A job status should be set to 'Ready'. */ public function runJobById(string $id): void { $this->jobRunner->runById($id); } /** * Run a specific job. * * @throws Throwable */ public function runJob(JobEntity $job): void { $this->jobRunner->runThrowingException($job); } private function getLastRunTime(): int { $lastRunData = $this->fileManager->getPhpContents($this->lastRunTimeFile); if (is_array($lastRunData) && !empty($lastRunData['time'])) { $lastRunTime = $lastRunData['time']; } else { $lastRunTime = time() - intval($this->config->get('cronMinInterval', 0)) - 1; } return (int) $lastRunTime; } private function updateLastRunTime(): void { $data = [ 'time' => time(), ]; $this->fileManager->putPhpContents($this->lastRunTimeFile, $data, false, true); } private function checkLastRunTime(): bool { $currentTime = time(); $lastRunTime = $this->getLastRunTime(); $cronMinInterval = $this->config->get('cronMinInterval', 0); if ($currentTime > ($lastRunTime + $cronMinInterval)) { return true; } return false; } }