noTableLocking = $configDataProvider->noTableLocking(); } public function process(Params $params): void { $pool = $params->useProcessPool() ? $this->asyncPoolFactory->create() : null; $pendingJobList = $this->queueUtil->getPendingJobList( $params->getQueue(), $params->getGroup(), $params->getLimit() ); foreach ($pendingJobList as $job) { $this->processJob($params, $job, $pool); } $pool?->wait(); } private function processJob(Params $params, JobEntity $job, ?AsyncPool $pool = null): void { $noLock = $params->noLock(); $lockTable = $job->getScheduledJobId() && !$noLock && !$this->noTableLocking; if ($lockTable) { // MySQL doesn't allow to lock non-existent rows. We resort to locking an entire table. $this->entityManager->getLocker()->lockExclusive(JobEntity::ENTITY_TYPE); } $skip = !$noLock && !$this->queueUtil->isJobPending($job->getId()); if ( !$skip && $job->getScheduledJobId() && $this->queueUtil->isScheduledJobRunning( $job->getScheduledJobId(), $job->getTargetId(), $job->getTargetType(), $job->getTargetGroup() ) ) { $skip = true; } if ($skip && $lockTable) { $this->entityManager->getLocker()->rollback(); } if ($skip) { return; } $job->setStartedAtNow(); if ($pool) { $job->setStatus(Status::READY); } else { $job->setStatus(Status::RUNNING); $job->setPid(System::getPid()); } $this->entityManager->saveEntity($job); if ($lockTable) { $this->entityManager->getLocker()->commit(); } if (!$pool) { $this->jobRunner->run($job); return; } $task = new JobTask($job->getId()); $pool->add($task); } }