Files
espocrm/application/Espo/Core/Job/QueueUtil.php
Yuri Kuznetsov efd64ecded cs fix
2021-04-21 14:13:08 +03:00

439 lines
12 KiB
PHP

<?php
/************************************************************************
* This file is part of EspoCRM.
*
* EspoCRM - Open Source CRM application.
* Copyright (C) 2014-2021 Yurii Kuznietsov, Taras Machyshyn, Oleksii Avramenko
* Website: https://www.espocrm.com
*
* EspoCRM is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* EspoCRM is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with EspoCRM. If not, see http://www.gnu.org/licenses/.
*
* The interactive user interfaces in modified source and object code versions
* of this program must display Appropriate Legal Notices, as required under
* Section 5 of the GNU General Public License version 3.
*
* In accordance with Section 7(b) of the GNU General Public License version 3,
* these Appropriate Legal Notices must retain the display of the "EspoCRM" word.
************************************************************************/
namespace Espo\Core\Job;
use Espo\Core\{
Job\JobManager,
Utils\Config,
ORM\EntityManager,
Utils\System,
};
use Espo\ORM\Collection;
use DateTime;
class QueueUtil
{
private $config;
private $entityManager;
private $scheduleUtil;
private const NOT_EXISTING_PROCESS_PERIOD = 300;
private const READY_NOT_STARTED_PERIOD = 60;
public function __construct(Config $config, EntityManager $entityManager, ScheduleUtil $scheduleUtil)
{
$this->config = $config;
$this->entityManager = $entityManager;
$this->scheduleUtil = $scheduleUtil;
}
public function isJobPending(string $id): bool
{
$job = $this->entityManager
->getRepository('Job')
->select(['id', 'status'])
->where([
'id' => $id,
])
->forUpdate()
->findOne();
if (!$job) {
return false;
}
return $job->get('status') === JobManager::PENDING;
}
public function getPendingJobList(?string $queue = null, int $limit = 0): Collection
{
$builder = $this->entityManager
->getRDBRepository('Job')
->select([
'id',
'scheduledJobId',
'scheduledJobJob',
'executeTime',
'targetId',
'targetType',
'methodName',
'serviceName',
'job',
'data',
])
->where([
'status' => JobManager::PENDING,
'executeTime<=' => date('Y-m-d H:i:s'),
'queue' => $queue,
])
->order('number');
if ($limit) {
$builder->limit(0, $limit);
}
return $builder->find();
}
public function isScheduledJobRunning(
string $scheduledJobId, ?string $targetId = null, ?string $targetType = null
): bool {
$where = [
'scheduledJobId' => $scheduledJobId,
'status' => [JobManager::RUNNING, JobManager::READY],
];
if ($targetId && $targetType) {
$where['targetId'] = $targetId;
$where['targetType'] = $targetType;
}
return (booL) $this->entityManager
->getRepository('Job')
->select(['id'])
->where($where)
->findOne();
}
public function getRunningScheduledJobIdList(): array
{
$list = [];
$jobList = $this->entityManager
->getRepository('Job')
->select(['scheduledJobId'])
->where([
'status' => ['Running', 'Ready'],
'scheduledJobId!=' => null,
'targetId=' => null,
])
->order('executeTime')
->find();
foreach ($jobList as $job) {
$list[] = $job->get('scheduledJobId');
}
return $list;
}
public function hasScheduledJobOnMinute(string $scheduledJobId, string $time): bool
{
$dateObj = new DateTime($time);
$timeWithoutSeconds = $dateObj->format('Y-m-d H:i:');
$job = $this->entityManager
->getRDBRepository('Job')
->select(['id'])
->where([
'scheduledJobId' => $scheduledJobId,
'executeTime*' => $timeWithoutSeconds . '%',
])
->findOne();
return (bool) $job;
}
public function getPendingCountByScheduledJobId(string $scheduledJobId): int
{
$countPending = $this->entityManager
->getRDBRepository('Job')
->where([
'scheduledJobId' => $scheduledJobId,
'status' => JobManager::PENDING,
])
->count();
return $countPending;
}
public function markJobsFailed(): void
{
$this->markJobsFailedByNotExistingProcesses();
$this->markJobsFailedReadyNotStarted();
$this->markJobsFailedByPeriod(true);
$this->markJobsFailedByPeriod();
}
protected function markJobsFailedByNotExistingProcesses(): void
{
$timeThreshold = time() - $this->config->get(
'jobPeriodForNotExistingProcess',
self::NOT_EXISTING_PROCESS_PERIOD
);
$dateTimeThreshold = date('Y-m-d H:i:s', $timeThreshold);
$runningJobList = $this->entityManager
->getRepository('Job')
->select([
'id',
'scheduledJobId',
'executeTime',
'targetId',
'targetType',
'pid',
'startedAt',
])
->where([
'status' => JobManager::RUNNING,
'startedAt<' => $dateTimeThreshold,
])
->find();
$failedJobList = [];
foreach ($runningJobList as $job) {
if ($job->get('pid') && !System::isProcessActive($job->get('pid'))) {
$failedJobList[] = $job;
}
}
$this->markJobListFailed($failedJobList);
}
protected function markJobsFailedReadyNotStarted(): void
{
$timeThreshold = time() -
$this->config->get('jobPeriodForReadyNotStarted', SELF::READY_NOT_STARTED_PERIOD);
$dateTimeThreshold = date('Y-m-d H:i:s', $timeThreshold);
$failedJobList = $this->entityManager
->getRepository('Job')
->select([
'id',
'scheduledJobId',
'executeTime',
'targetId',
'targetType',
'pid',
'startedAt',
])
->where([
'status' => JobManager::READY,
'startedAt<' => $dateTimeThreshold,
])
->find();
$this->markJobListFailed($failedJobList);
}
protected function markJobsFailedByPeriod(bool $isForActiveProcesses = false): void
{
$period = 'jobPeriod';
if ($isForActiveProcesses) {
$period = 'jobPeriodForActiveProcess';
}
$timeThreshold = time() - $this->config->get($period, 7800);
$dateTimeThreshold = date('Y-m-d H:i:s', $timeThreshold);
$runningJobList = $this->entityManager
->getRepository('Job')
->select([
'id',
'scheduledJobId',
'executeTime',
'targetId',
'targetType',
'pid',
'startedAt'
])
->where([
'status' => JobManager::RUNNING,
'executeTime<' => $dateTimeThreshold,
])
->find();
$failedJobList = [];
foreach ($runningJobList as $job) {
if (!$isForActiveProcesses) {
if (!$job->get('pid') || !System::isProcessActive($job->get('pid'))) {
$failedJobList[] = $job;
}
} else {
$failedJobList[] = $job;
}
}
$this->markJobListFailed($failedJobList);
}
protected function markJobListFailed(iterable $jobList): void
{
if (!count($jobList)) {
return;
}
$jobIdList = [];
foreach ($jobList as $job) {
$jobIdList[] = $job->getId();
}
$updateQuery = $this->entityManager->getQueryBuilder()
->update()
->in('Job')
->set([
'status' => JobManager::FAILED,
'attempts' => 0,
])
->where([
'id' => $jobIdList,
])
->build();
$this->entityManager->getQueryExecutor()->execute($updateQuery);
foreach ($jobList as $job) {
if (!$job->get('scheduledJobId')) {
continue;
}
$this->scheduleUtil->addLogRecord(
$job->get('scheduledJobId'),
JobManager::FAILED,
$job->get('startedAt'),
$job->get('targetId'),
$job->get('targetType')
);
}
}
/**
* Remove pending duplicate jobs, no need to run twice the same job.
*/
public function removePendingJobDuplicates(): void
{
$duplicateJobList = $this->entityManager
->getRepository('Job')
->select(['scheduledJobId'])
->where([
'scheduledJobId!=' => null,
'status' => JobManager::PENDING,
'executeTime<=' => date('Y-m-d H:i:s'),
'targetId' => null,
])
->groupBy(['scheduledJobId'])
->having([
'COUNT:id>' => 1,
])
->order('MAX:executeTime')
->find();
$scheduledJobIdList = [];
foreach ($duplicateJobList as $duplicateJob) {
if (!$duplicateJob->get('scheduledJobId')) {
continue;
}
$scheduledJobIdList[] = $duplicateJob->get('scheduledJobId');
}
foreach ($scheduledJobIdList as $scheduledJobId) {
$toRemoveJobList = $this->entityManager
->getRepository('Job')
->select(['id'])
->where([
'scheduledJobId' => $scheduledJobId,
'status' => JobManager::PENDING,
])
->order('executeTime')
->limit(0, 1000)
->find();
$jobIdList = [];
foreach ($toRemoveJobList as $job) {
$jobIdList[] = $job->id;
}
if (!count($jobIdList)) {
continue;
}
$delete = $this->entityManager->getQueryBuilder()
->delete()
->from('Job')
->where([
'id' => $jobIdList,
])
->build();
$this->entityManager->getQueryExecutor()->execute($delete);
}
}
/**
* Handle job attempts. Change failed to pending if attempts left.
*/
public function updateFailedJobAttempts(): void
{
$jobCollection = $this->entityManager
->getRDBRepository('Job')
->select([
'id',
'attempts',
'failedAttempts'
])
->where([
'status' => JobManager::FAILED,
'executeTime<=' => date('Y-m-d H:i:s'),
'attempts>' => 0,
])
->find();
foreach ($jobCollection as $job) {
$failedAttempts = $job->get('failedAttempts') ?? 0;
$attempts = $job->get('attempts');
$job->set([
'status' => JobManager::PENDING,
'attempts' => $attempts - 1,
'failedAttempts' => $failedAttempts + 1,
]);
$this->entityManager->saveEntity($job);
}
}
}