diff --git a/application/Espo/Core/Webhook/Manager.php b/application/Espo/Core/Webhook/Manager.php index a9041d6643..fe7a2da858 100644 --- a/application/Espo/Core/Webhook/Manager.php +++ b/application/Espo/Core/Webhook/Manager.php @@ -38,6 +38,11 @@ use Espo\Core\{ Utils\Log, }; +use Espo\Entities\{ + WebhookEventQueueItem, + Webhook, +}; + /** * Processes events. Holds an information about existing events. */ @@ -102,7 +107,7 @@ class Manager $data = []; $list = $this->entityManager - ->getRDBRepository('Webhook') + ->getRDBRepository(Webhook::ENTITY_TYPE) ->select(['event']) ->groupBy(['event']) ->where([ @@ -136,7 +141,7 @@ class Manager public function removeEvent(string $event): void { $notExists = !$this->entityManager - ->getRDBRepository('Webhook') + ->getRDBRepository(Webhook::ENTITY_TYPE) ->select(['id']) ->where([ 'event' => $event, @@ -174,7 +179,7 @@ class Manager return; } - $this->entityManager->createEntity('WebhookEventQueueItem', [ + $this->entityManager->createEntity(WebhookEventQueueItem::ENTITY_TYPE, [ 'event' => $event, 'targetType' => $entity->getEntityType(), 'targetId' => $entity->getId(), @@ -195,7 +200,7 @@ class Manager return; } - $this->entityManager->createEntity('WebhookEventQueueItem', [ + $this->entityManager->createEntity(WebhookEventQueueItem::ENTITY_TYPE, [ 'event' => $event, 'targetType' => $entity->getEntityType(), 'targetId' => $entity->getId(), @@ -233,7 +238,7 @@ class Manager $data->id = $entity->getId(); if ($this->eventExists($event)) { - $this->entityManager->createEntity('WebhookEventQueueItem', [ + $this->entityManager->createEntity(WebhookEventQueueItem::ENTITY_TYPE, [ 'event' => $event, 'targetType' => $entity->getEntityType(), 'targetId' => $entity->getId(), @@ -281,7 +286,7 @@ class Manager $itemData->$attribute = $entity->get($attribute); } - $this->entityManager->createEntity('WebhookEventQueueItem', [ + $this->entityManager->createEntity(WebhookEventQueueItem::ENTITY_TYPE, [ 'event' => $itemEvent, 'targetType' => $entity->getEntityType(), 'targetId' => $entity->getId(), diff --git a/application/Espo/Core/Webhook/Queue.php b/application/Espo/Core/Webhook/Queue.php index f1d18648c6..22bab6ad29 100644 --- a/application/Espo/Core/Webhook/Queue.php +++ b/application/Espo/Core/Webhook/Queue.php @@ -33,18 +33,24 @@ use Espo\Entities\{ Webhook, WebhookQueueItem, WebhookEventQueueItem, + User, }; use Espo\Core\{ AclManager, Utils\Config, Utils\DateTime as DateTimeUtil, - ORM\EntityManager, Utils\Log, }; +use Espo\ORM\{ + EntityManager, + QueryParams\Parts\Condition as Cond, +}; + use Exception; use DateTime; +use stdClass; /** * Groups occurred events into portions and sends them. A portion contains @@ -97,7 +103,7 @@ class Queue $portionSize = $this->config->get('webhookQueueEventPortionSize', self::EVENT_PORTION_SIZE); $itemList = $this->entityManager - ->getRDBRepository('WebhookEventQueueItem') + ->getRDBRepository(WebhookEventQueueItem::ENTITY_TYPE) ->where([ 'isProcessed' => false, ]) @@ -119,7 +125,7 @@ class Queue protected function createQueueFromEvent(WebhookEventQueueItem $item): void { $webhookList = $this->entityManager - ->getRDBRepository('Webhook') + ->getRDBRepository(Webhook::ENTITY_TYPE) ->where([ 'event' => $item->get('event'), 'isActive' => true, @@ -128,7 +134,7 @@ class Queue ->find(); foreach ($webhookList as $webhook) { - $this->entityManager->createEntity('WebhookQueueItem', [ + $this->entityManager->createEntity(WebhookQueueItem::ENTITY_TYPE, [ 'webhookId' => $webhook->getId(), 'event' => $item->get('event'), 'targetId' => $item->get('targetId'), @@ -143,128 +149,143 @@ class Queue protected function processSending(): void { $portionSize = $this->config->get('webhookQueuePortionSize', self::PORTION_SIZE); - $batchSize = $this->config->get('webhookBatchSize', self::BATCH_SIZE); $groupedItemList = $this->entityManager - ->getRDBRepository('WebhookQueueItem') + ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->select(['webhookId', 'number']) - ->where([ - 'number=s' => [ - 'entityType' => 'WebhookQueueItem', - 'selectParams' => [ - 'select' => ['MIN:number'], - 'whereClause' => [ + ->where( + Cond::in( + Cond::column('number'), + $this->entityManager + ->getQueryBuilder() + ->select('MIN:(number)') + ->from(WebhookQueueItem::ENTITY_TYPE) + ->where([ 'status' => 'Pending', 'OR' => [ ['processAt' => null], ['processAt<=' => DateTimeUtil::getSystemNowString()], ], - ], - 'groupBy' => ['webhookId'], - ] - ], - ]) + ]) + ->groupBy('webhookId') + ->build() + ) + ) ->limit(0, $portionSize) ->order('number') ->find(); - foreach ($groupedItemList as $group) { - $webhookId = $group->get('webhookId'); - - $itemList = $this->entityManager - ->getRDBRepository('WebhookQueueItem') - ->where([ - 'webhookId' => $webhookId, - 'status' => 'Pending', - 'OR' => [ - ['processAt' => null], - ['processAt<=' => DateTimeUtil::getSystemNowString()], - ], - ]) - ->order('number') - ->limit(0, $batchSize) - ->find(); - - $webhook = $this->entityManager->getEntity('Webhook', $webhookId); - - if (!$webhook || !$webhook->get('isActive')) { - foreach ($itemList as $item) { - $this->deleteQueueItem($item); - } - } - - $forbiddenAttributeList = []; - - $user = null; - - if ($webhook->get('userId')) { - $user = $this->entityManager->getEntity('User', $webhook->get('userId')); - - if (!$user) { - foreach ($itemList as $item) { - $this->deleteQueueItem($item); - } - - continue; - } - - $forbiddenAttributeList = $this->aclManager - ->getScopeForbiddenAttributeList($user, $webhook->get('entityType')); - } - - $actualItemList = []; - - $dataList = []; - - foreach ($itemList as $item) { - $targetType = $item->get('targetType'); - $target = null; - - if ($this->entityManager->hasRepository($targetType)) { - $target = $this->entityManager - ->getRDBRepository($targetType) - ->where([ - 'id' => $item->get('targetId') - ]) - ->findOne(['withDeleted' => true]); - } - - if (!$target) { - $this->deleteQueueItem($item); - - continue; - } - - if ($user) { - if (!$this->aclManager->check($user, $target)) { - $this->deleteQueueItem($item); - - continue; - } - } - - $data = $item->get('data') ?? (object) []; - - $data = clone $data; - - foreach ($forbiddenAttributeList as $attribute) { - unset($data->$attribute); - } - - $actualItemList[] = $item; - - $dataList[] = $data; - } - - if (empty($dataList)) { - continue; - } - - $this->send($webhook, $dataList, $actualItemList); + foreach ($groupedItemList as $groupItem) { + $this->processSendingGroup($groupItem->get('webhookId')); } } - protected function send(Webhook $webhook, array $dataList, array $itemList): void + private function processSendingGroup(string $webhookId): void + { + $batchSize = $this->config->get('webhookBatchSize', self::BATCH_SIZE); + + $itemList = $this->entityManager + ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) + ->where([ + 'webhookId' => $webhookId, + 'status' => 'Pending', + 'OR' => [ + ['processAt' => null], + ['processAt<=' => DateTimeUtil::getSystemNowString()], + ], + ]) + ->order('number') + ->limit(0, $batchSize) + ->find(); + + $webhook = $this->entityManager->getEntity(Webhook::ENTITY_TYPE, $webhookId); + + if (!$webhook || !$webhook->get('isActive')) { + foreach ($itemList as $item) { + $this->deleteQueueItem($item); + } + } + + $forbiddenAttributeList = []; + + $user = null; + + if ($webhook->get('userId')) { + $user = $this->entityManager->getEntity(User::ENTITY_TYPE, $webhook->get('userId')); + + if (!$user) { + foreach ($itemList as $item) { + $this->deleteQueueItem($item); + } + + return; + } + + $forbiddenAttributeList = $this->aclManager + ->getScopeForbiddenAttributeList($user, $webhook->get('entityType')); + } + + $actualItemList = []; + + $dataList = []; + + foreach ($itemList as $item) { + $data = $this->prepareItemData($item, $user, $forbiddenAttributeList); + + if ($data === null) { + continue; + } + + $actualItemList[] = $item; + + $dataList[] = $data; + } + + if (empty($dataList)) { + return; + } + + $this->send($webhook, $dataList, $actualItemList); + } + + private function prepareItemData(WebhookQueueItem $item, User $user, array $forbiddenAttributeList): ?stdClass + { + $targetType = $item->get('targetType'); + $target = null; + + if ($this->entityManager->hasRepository($targetType)) { + $target = $this->entityManager + ->getRDBRepository($targetType) + ->where([ + 'id' => $item->get('targetId') + ]) + ->findOne(['withDeleted' => true]); + } + + if (!$target) { + $this->deleteQueueItem($item); + + return null; + } + + if ($user) { + if (!$this->aclManager->check($user, $target)) { + $this->deleteQueueItem($item); + + return null; + } + } + + $data = $item->get('data') ?? (object) []; + + foreach ($forbiddenAttributeList as $attribute) { + unset($data->$attribute); + } + + return $data; + } + + private function send(Webhook $webhook, array $dataList, array $itemList): void { try { $code = $this->sender->send($webhook, $dataList); @@ -319,13 +340,15 @@ class Queue protected function deleteQueueItem(WebhookQueueItem $item): void { - $this->entityManager->getRepository('WebhookQueueItem')->deleteFromDb($item->getId()); + $this->entityManager + ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) + ->deleteFromDb($item->getId()); } protected function dropWebhook(Webhook $webhook): void { $itemList = $this->entityManager - ->getRDBRepository('WebhookQueueItem') + ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->where([ 'status' => 'Pending', 'webhookId' => $webhook->getId(), diff --git a/application/Espo/Hooks/Common/Webhook.php b/application/Espo/Hooks/Common/Webhook.php index e5ce506b54..b4d941c23f 100644 --- a/application/Espo/Hooks/Common/Webhook.php +++ b/application/Espo/Hooks/Common/Webhook.php @@ -40,9 +40,9 @@ class Webhook { public static $order = 101; - protected $metadata; + private $metadata; - protected $webhookManager; + private $webhookManager; public function __construct(Metadata $metadata, WebhookManager $webhookManager) { diff --git a/tests/integration/Espo/Webhook/ProcessingTest.php b/tests/integration/Espo/Webhook/ProcessingTest.php new file mode 100644 index 0000000000..3023ce02dc --- /dev/null +++ b/tests/integration/Espo/Webhook/ProcessingTest.php @@ -0,0 +1,143 @@ +createUser( + [ + 'userName' => 'test', + 'password' => '1', + ], + [ + 'data' => [ + 'Webhook' => true, + 'Account' => [ + 'create' => 'yes', + 'read' => 'own', + ], + ], + ] + ); + + /* @var $em EntityManager */ + $em = $this->getContainer()->get('entityManager'); + + $em->createEntity(Webhook::ENTITY_TYPE, [ + 'event' => 'Account.create', + 'userId' => $user->getId(), + 'url' => 'https://test', + ]); + + $em->createEntity(Webhook::ENTITY_TYPE, [ + 'event' => 'Account.update', + 'userId' => $user->getId(), + 'url' => 'https://test', + ]); + + $this->getContainer()->get('dataManager')->clearCache(); + + $app = $this->createApplication(); + + $em = $app->getContainer()->get('entityManager'); + + $account1 = $em->createEntity(Account::ENTITY_TYPE, [ + 'name' => 'test1', + 'assignedUserId' => $user->getId(), + ]); + + $account2 = $em->createEntity(Account::ENTITY_TYPE, [ + 'name' => 'test2', + 'assignedUserId' => $user->getId(), + ]); + + $em->createEntity(Account::ENTITY_TYPE, [ + 'name' => 'test3', + ]); + + $dataList1 = [ + $account1->getValueMap(), + $account2->getValueMap(), + ]; + + $account1->set('name', 'test-1-changed'); + + $em->saveEntity($account1); + + $dataList2 = [ + (object) [ + 'name' => $account1->get('name'), + 'modifiedById' => 'system', + 'modifiedByName' => 'System', + 'id' => $account1->getId(), + ], + ]; + + $sender = $this->createMock(Sender::class); + + /* @var $queue Queue */ + $queue = $app->getContainer() + ->get('injectableFactory') + ->createWith(Queue::class, [ + 'sender' => $sender, + ]); + + $sender + ->expects($this->exactly(2)) + ->method('send') + ->withConsecutive( + [ + $this->isInstanceOf(Webhook::class), + $dataList1, + ], + [ + $this->isInstanceOf(Webhook::class), + $dataList2, + ] + ) + ->willReturn( + 200, + 200 + ); + + $queue->process(); + } +}