sender = $sender; $this->config = $config; $this->entityManager = $entityManager; $this->aclManager = $aclManager; $this->log = $log; } public function process(): void { $this->processEvents(); $this->processSending(); } protected function processEvents(): void { $portionSize = $this->config->get('webhookQueueEventPortionSize', self::EVENT_PORTION_SIZE); $itemList = $this->entityManager ->getRDBRepository(WebhookEventQueueItem::ENTITY_TYPE) ->where([ 'isProcessed' => false, ]) ->order('number') ->limit(0, $portionSize) ->find(); foreach ($itemList as $item) { $this->createQueueFromEvent($item); $item->set([ 'isProcessed' => true, ]); $this->entityManager->saveEntity($item); } } protected function createQueueFromEvent(WebhookEventQueueItem $item): void { $webhookList = $this->entityManager ->getRDBRepository(Webhook::ENTITY_TYPE) ->where([ 'event' => $item->get('event'), 'isActive' => true, ]) ->order('createdAt') ->find(); foreach ($webhookList as $webhook) { $this->entityManager->createEntity(WebhookQueueItem::ENTITY_TYPE, [ 'webhookId' => $webhook->getId(), 'event' => $item->get('event'), 'targetId' => $item->get('targetId'), 'targetType' => $item->get('targetType'), 'status' => 'Pending', 'data' => $item->get('data'), 'attempts' => 0, ]); } } protected function processSending(): void { $portionSize = $this->config->get('webhookQueuePortionSize', self::PORTION_SIZE); $groupedItemList = $this->entityManager ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->select(['webhookId', 'number']) ->where( Cond::in( Cond::column('number'), $this->entityManager ->getQueryBuilder() ->select('MIN:(number)') ->from(WebhookQueueItem::ENTITY_TYPE) ->where([ 'status' => 'Pending', 'OR' => [ ['processAt' => null], ['processAt<=' => DateTimeUtil::getSystemNowString()], ], ]) ->group('webhookId') ->build() ) ) ->limit(0, $portionSize) ->order('number') ->find(); foreach ($groupedItemList as $groupItem) { $this->processSendingGroup($groupItem->get('webhookId')); } } private function processSendingGroup(string $webhookId): void { $batchSize = $this->config->get('webhookBatchSize', self::BATCH_SIZE); $itemList = $this->entityManager ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->where([ 'webhookId' => $webhookId, 'status' => 'Pending', 'OR' => [ ['processAt' => null], ['processAt<=' => DateTimeUtil::getSystemNowString()], ], ]) ->order('number') ->limit(0, $batchSize) ->find(); $webhook = $this->entityManager->getEntity(Webhook::ENTITY_TYPE, $webhookId); if (!$webhook || !$webhook->get('isActive')) { foreach ($itemList as $item) { $this->deleteQueueItem($item); } } $forbiddenAttributeList = []; $user = null; if ($webhook->get('userId')) { $user = $this->entityManager->getEntity(User::ENTITY_TYPE, $webhook->get('userId')); if (!$user) { foreach ($itemList as $item) { $this->deleteQueueItem($item); } return; } $forbiddenAttributeList = $this->aclManager ->getScopeForbiddenAttributeList($user, $webhook->get('entityType')); } $actualItemList = []; $dataList = []; foreach ($itemList as $item) { $data = $this->prepareItemData($item, $user, $forbiddenAttributeList); if ($data === null) { continue; } $actualItemList[] = $item; $dataList[] = $data; } if (empty($dataList)) { return; } $this->send($webhook, $dataList, $actualItemList); } /** * @param string[] $forbiddenAttributeList */ private function prepareItemData(WebhookQueueItem $item, ?User $user, array $forbiddenAttributeList): ?stdClass { $targetType = $item->get('targetType'); $target = null; if ($this->entityManager->hasRepository($targetType)) { $target = $this->entityManager ->getRDBRepository($targetType) ->where([ 'id' => $item->get('targetId') ]) ->findOne(['withDeleted' => true]); } if (!$target) { $this->deleteQueueItem($item); return null; } if ($user) { if (!$this->aclManager->check($user, $target)) { $this->deleteQueueItem($item); return null; } } $data = $item->get('data') ?? (object) []; foreach ($forbiddenAttributeList as $attribute) { unset($data->$attribute); } return $data; } /** * @param mixed[] $dataList * @param WebhookQueueItem[] $itemList */ private function send(Webhook $webhook, array $dataList, array $itemList): void { try { $code = $this->sender->send($webhook, $dataList); } catch (Exception $e) { $this->failQueueItemList($itemList, true); $this->log->error( "Webhook Queue: Webhook '" . $webhook->getId() . "' sending failed. Error: " . $e->getMessage() ); return; } if ($code >= 200 && $code < 400) { $this->succeedQueueItemList($itemList); } else if ($code === 410) { $this->dropWebhook($webhook); } else if (in_array($code, [0, 401, 403, 404, 405, 408, 500, 503])) { $this->failQueueItemList($itemList); } else if ($code >= 400 && $code < 500) { $this->failQueueItemList($itemList, true); } else { $this->failQueueItemList($itemList, true); } $this->logSending($webhook, $code); } protected function logSending(Webhook $webhook, int $code): void { $this->log->debug("Webhook Queue: Webhook '" . $webhook->getId() . "' sent, response code: {$code}."); } /** * @param WebhookQueueItem[] $itemList */ protected function failQueueItemList(array $itemList, bool $force = false): void { foreach ($itemList as $item) { $this->failQueueItem($item, $force); } } /** * @param WebhookQueueItem[] $itemList */ protected function succeedQueueItemList(array $itemList): void { foreach ($itemList as $item) { $this->succeedQueueItem($item); } } protected function deleteQueueItem(WebhookQueueItem $item): void { $this->entityManager ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->deleteFromDb($item->getId()); } protected function dropWebhook(Webhook $webhook): void { $itemList = $this->entityManager ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->where([ 'status' => 'Pending', 'webhookId' => $webhook->getId(), ]) ->order('number') ->find(); foreach ($itemList as $item) { $this->deleteQueueItem($item); } $this->entityManager->removeEntity($webhook); } protected function succeedQueueItem(WebhookQueueItem $item): void { $item->set([ 'attempts' => $item->get('attempts') + 1, 'status' => 'Success', 'processedAt' => DateTimeUtil::getSystemNowString(), ]); $this->entityManager->saveEntity($item); } protected function failQueueItem(WebhookQueueItem $item, bool $force = false): void { $attempts = $item->get('attempts') + 1; $maxAttemptsNumber = $this->config->get('webhookMaxAttemptNumber', self::MAX_ATTEMPT_NUMBER); $period = $this->config->get('webhookFailAttemptPeriod', self::FAIL_ATTEMPT_PERIOD); if ($force) { $maxAttemptsNumber = 0; } $dt = new DateTime(); $dt->modify($period); $item->set([ 'attempts' => $attempts, 'processAt' => $dt->format(DateTimeUtil::SYSTEM_DATE_TIME_FORMAT), ]); if ($attempts >= $maxAttemptsNumber) { $item->set('status', 'Failed'); $item->set('processAt', null); } $this->entityManager->saveEntity($item); } }