. * * The interactive user interfaces in modified source and object code versions * of this program must display Appropriate Legal Notices, as required under * Section 5 of the GNU Affero General Public License version 3. * * In accordance with Section 7(b) of the GNU Affero General Public License version 3, * these Appropriate Legal Notices must retain the display of the "EspoCRM" word. ************************************************************************/ namespace Espo\Core\Webhook; use Espo\Entities\User; use Espo\Entities\Webhook; use Espo\Entities\WebhookEventQueueItem; use Espo\Entities\WebhookQueueItem; use Espo\Core\AclManager; use Espo\Core\Utils\Config; use Espo\Core\Utils\DateTime as DateTimeUtil; use Espo\Core\Utils\Log; use Espo\ORM\EntityManager; use Espo\ORM\Query\Part\Condition as Cond; use Exception; use DateTime; use stdClass; /** * Groups occurred events into portions and sends them. A portion contains * multiple events of the same webhook. */ class Queue { private const EVENT_PORTION_SIZE = 20; private const PORTION_SIZE = 20; private const BATCH_SIZE = 50; private const MAX_ATTEMPT_NUMBER = 4; private const FAIL_ATTEMPT_PERIOD = '10 minutes'; public function __construct( private Sender $sender, private Config $config, private EntityManager $entityManager, private AclManager $aclManager, private Log $log ) {} public function process(): void { $this->processEvents(); $this->processSending(); } protected function processEvents(): void { $portionSize = $this->config->get('webhookQueueEventPortionSize', self::EVENT_PORTION_SIZE); $itemList = $this->entityManager ->getRDBRepository(WebhookEventQueueItem::ENTITY_TYPE) ->where([ 'isProcessed' => false, ]) ->order('number') ->limit(0, $portionSize) ->find(); foreach ($itemList as $item) { $this->createQueueFromEvent($item); $item->set([ 'isProcessed' => true, ]); $this->entityManager->saveEntity($item); } } protected function createQueueFromEvent(WebhookEventQueueItem $item): void { $webhookList = $this->entityManager ->getRDBRepository(Webhook::ENTITY_TYPE) ->where([ 'event' => $item->get('event'), 'isActive' => true, ]) ->order('createdAt') ->find(); foreach ($webhookList as $webhook) { $this->entityManager->createEntity(WebhookQueueItem::ENTITY_TYPE, [ 'webhookId' => $webhook->getId(), 'event' => $item->get('event'), 'targetId' => $item->get('targetId'), 'targetType' => $item->get('targetType'), 'status' => 'Pending', 'data' => $item->get('data'), 'attempts' => 0, ]); } } protected function processSending(): void { $portionSize = $this->config->get('webhookQueuePortionSize', self::PORTION_SIZE); $groupedItemList = $this->entityManager ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->select(['webhookId', 'number']) ->where( Cond::in( Cond::column('number'), $this->entityManager ->getQueryBuilder() ->select('MIN:(number)') ->from(WebhookQueueItem::ENTITY_TYPE) ->where([ 'status' => WebhookQueueItem::STATUS_PENDING, 'OR' => [ ['processAt' => null], ['processAt<=' => DateTimeUtil::getSystemNowString()], ], ]) ->group('webhookId') ->build() ) ) ->limit(0, $portionSize) ->order('number') ->find(); foreach ($groupedItemList as $groupItem) { $this->processSendingGroup($groupItem->get('webhookId')); } } private function processSendingGroup(string $webhookId): void { $batchSize = $this->config->get('webhookBatchSize', self::BATCH_SIZE); $itemList = $this->entityManager ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->where([ 'webhookId' => $webhookId, 'status' => WebhookQueueItem::STATUS_PENDING, 'OR' => [ ['processAt' => null], ['processAt<=' => DateTimeUtil::getSystemNowString()], ], ]) ->order('number') ->limit(0, $batchSize) ->find(); /** @var ?Webhook $webhook */ $webhook = $this->entityManager->getEntityById(Webhook::ENTITY_TYPE, $webhookId); if (!$webhook || !$webhook->isActive()) { foreach ($itemList as $item) { $this->deleteQueueItem($item); } return; } $forbiddenAttributeList = []; $user = null; if ($webhook->getUserId()) { /** @var ?User $user */ $user = $this->entityManager->getEntityById(User::ENTITY_TYPE, $webhook->getUserId()); if (!$user) { foreach ($itemList as $item) { $this->deleteQueueItem($item); } return; } $forbiddenAttributeList = $this->aclManager ->getScopeForbiddenAttributeList($user, $webhook->getTargetEntityType()); } $actualItemList = []; $dataList = []; foreach ($itemList as $item) { $data = $this->prepareItemData($item, $user, $forbiddenAttributeList); if ($data === null) { continue; } $actualItemList[] = $item; $dataList[] = $data; } if (empty($dataList)) { return; } $this->send($webhook, $dataList, $actualItemList); } /** * @param string[] $forbiddenAttributeList */ private function prepareItemData(WebhookQueueItem $item, ?User $user, array $forbiddenAttributeList): ?stdClass { $targetType = $item->get('targetType'); $target = null; if ($this->entityManager->hasRepository($targetType)) { $target = $this->entityManager ->getRDBRepository($targetType) ->where([ 'id' => $item->get('targetId') ]) ->findOne(['withDeleted' => true]); } if (!$target) { $this->deleteQueueItem($item); return null; } if ($user) { if (!$this->aclManager->check($user, $target)) { $this->deleteQueueItem($item); return null; } } $data = $item->get('data') ?? (object) []; foreach ($forbiddenAttributeList as $attribute) { unset($data->$attribute); } return $data; } /** * @param stdClass[] $dataList * @param WebhookQueueItem[] $itemList */ private function send(Webhook $webhook, array $dataList, array $itemList): void { try { $code = $this->sender->send($webhook, $dataList); } catch (Exception $e) { $this->failQueueItemList($itemList, true); $this->log->error("Webhook Queue: Webhook '{$webhook->getId()}' sending failed. Error: {$e->getMessage()}"); return; } if ($code >= 200 && $code < 400) { $this->succeedQueueItemList($itemList); } else if ($code === 410) { $this->dropWebhook($webhook); } else if (in_array($code, [0, 401, 403, 404, 405, 408, 500, 503])) { $this->failQueueItemList($itemList); } else if ($code >= 400 && $code < 500) { $this->failQueueItemList($itemList, true); } else { $this->failQueueItemList($itemList, true); } $this->logSending($webhook, $code); } protected function logSending(Webhook $webhook, int $code): void { $this->log->debug("Webhook Queue: Webhook '{$webhook->getId()}' sent, response code: $code."); } /** * @param WebhookQueueItem[] $itemList */ protected function failQueueItemList(array $itemList, bool $force = false): void { foreach ($itemList as $item) { $this->failQueueItem($item, $force); } } /** * @param WebhookQueueItem[] $itemList */ protected function succeedQueueItemList(array $itemList): void { foreach ($itemList as $item) { $this->succeedQueueItem($item); } } protected function deleteQueueItem(WebhookQueueItem $item): void { $this->entityManager ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->deleteFromDb($item->getId()); } protected function dropWebhook(Webhook $webhook): void { $itemList = $this->entityManager ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->where([ 'status' => WebhookQueueItem::STATUS_PENDING, 'webhookId' => $webhook->getId(), ]) ->order('number') ->find(); foreach ($itemList as $item) { $this->deleteQueueItem($item); } $this->entityManager->removeEntity($webhook); } protected function succeedQueueItem(WebhookQueueItem $item): void { $item->set([ 'attempts' => $item->getAttempts() + 1, 'status' => WebhookQueueItem::STATUS_SUCCESS, 'processedAt' => DateTimeUtil::getSystemNowString(), ]); $this->entityManager->saveEntity($item); } protected function failQueueItem(WebhookQueueItem $item, bool $force = false): void { $attempts = $item->getAttempts() + 1; $maxAttemptsNumber = $this->config->get('webhookMaxAttemptNumber', self::MAX_ATTEMPT_NUMBER); $period = $this->config->get('webhookFailAttemptPeriod', self::FAIL_ATTEMPT_PERIOD); if ($force) { $maxAttemptsNumber = 0; } $dt = new DateTime(); $dt->modify($period); $item->set([ 'attempts' => $attempts, 'processAt' => $dt->format(DateTimeUtil::SYSTEM_DATE_TIME_FORMAT), ]); if ($attempts >= $maxAttemptsNumber) { $item->set('status', WebhookQueueItem::STATUS_FAILED); /** @noinspection PhpRedundantOptionalArgumentInspection */ $item->set('processAt', null); } $this->entityManager->saveEntity($item); } }