mirror of
https://github.com/espocrm/espocrm.git
synced 2026-06-28 06:56:05 +00:00
webhook refactoring and test
This commit is contained in:
@@ -38,6 +38,11 @@ use Espo\Core\{
|
||||
Utils\Log,
|
||||
};
|
||||
|
||||
use Espo\Entities\{
|
||||
WebhookEventQueueItem,
|
||||
Webhook,
|
||||
};
|
||||
|
||||
/**
|
||||
* Processes events. Holds an information about existing events.
|
||||
*/
|
||||
@@ -102,7 +107,7 @@ class Manager
|
||||
$data = [];
|
||||
|
||||
$list = $this->entityManager
|
||||
->getRDBRepository('Webhook')
|
||||
->getRDBRepository(Webhook::ENTITY_TYPE)
|
||||
->select(['event'])
|
||||
->groupBy(['event'])
|
||||
->where([
|
||||
@@ -136,7 +141,7 @@ class Manager
|
||||
public function removeEvent(string $event): void
|
||||
{
|
||||
$notExists = !$this->entityManager
|
||||
->getRDBRepository('Webhook')
|
||||
->getRDBRepository(Webhook::ENTITY_TYPE)
|
||||
->select(['id'])
|
||||
->where([
|
||||
'event' => $event,
|
||||
@@ -174,7 +179,7 @@ class Manager
|
||||
return;
|
||||
}
|
||||
|
||||
$this->entityManager->createEntity('WebhookEventQueueItem', [
|
||||
$this->entityManager->createEntity(WebhookEventQueueItem::ENTITY_TYPE, [
|
||||
'event' => $event,
|
||||
'targetType' => $entity->getEntityType(),
|
||||
'targetId' => $entity->getId(),
|
||||
@@ -195,7 +200,7 @@ class Manager
|
||||
return;
|
||||
}
|
||||
|
||||
$this->entityManager->createEntity('WebhookEventQueueItem', [
|
||||
$this->entityManager->createEntity(WebhookEventQueueItem::ENTITY_TYPE, [
|
||||
'event' => $event,
|
||||
'targetType' => $entity->getEntityType(),
|
||||
'targetId' => $entity->getId(),
|
||||
@@ -233,7 +238,7 @@ class Manager
|
||||
$data->id = $entity->getId();
|
||||
|
||||
if ($this->eventExists($event)) {
|
||||
$this->entityManager->createEntity('WebhookEventQueueItem', [
|
||||
$this->entityManager->createEntity(WebhookEventQueueItem::ENTITY_TYPE, [
|
||||
'event' => $event,
|
||||
'targetType' => $entity->getEntityType(),
|
||||
'targetId' => $entity->getId(),
|
||||
@@ -281,7 +286,7 @@ class Manager
|
||||
$itemData->$attribute = $entity->get($attribute);
|
||||
}
|
||||
|
||||
$this->entityManager->createEntity('WebhookEventQueueItem', [
|
||||
$this->entityManager->createEntity(WebhookEventQueueItem::ENTITY_TYPE, [
|
||||
'event' => $itemEvent,
|
||||
'targetType' => $entity->getEntityType(),
|
||||
'targetId' => $entity->getId(),
|
||||
|
||||
@@ -33,18 +33,24 @@ use Espo\Entities\{
|
||||
Webhook,
|
||||
WebhookQueueItem,
|
||||
WebhookEventQueueItem,
|
||||
User,
|
||||
};
|
||||
|
||||
use Espo\Core\{
|
||||
AclManager,
|
||||
Utils\Config,
|
||||
Utils\DateTime as DateTimeUtil,
|
||||
ORM\EntityManager,
|
||||
Utils\Log,
|
||||
};
|
||||
|
||||
use Espo\ORM\{
|
||||
EntityManager,
|
||||
QueryParams\Parts\Condition as Cond,
|
||||
};
|
||||
|
||||
use Exception;
|
||||
use DateTime;
|
||||
use stdClass;
|
||||
|
||||
/**
|
||||
* Groups occurred events into portions and sends them. A portion contains
|
||||
@@ -97,7 +103,7 @@ class Queue
|
||||
$portionSize = $this->config->get('webhookQueueEventPortionSize', self::EVENT_PORTION_SIZE);
|
||||
|
||||
$itemList = $this->entityManager
|
||||
->getRDBRepository('WebhookEventQueueItem')
|
||||
->getRDBRepository(WebhookEventQueueItem::ENTITY_TYPE)
|
||||
->where([
|
||||
'isProcessed' => false,
|
||||
])
|
||||
@@ -119,7 +125,7 @@ class Queue
|
||||
protected function createQueueFromEvent(WebhookEventQueueItem $item): void
|
||||
{
|
||||
$webhookList = $this->entityManager
|
||||
->getRDBRepository('Webhook')
|
||||
->getRDBRepository(Webhook::ENTITY_TYPE)
|
||||
->where([
|
||||
'event' => $item->get('event'),
|
||||
'isActive' => true,
|
||||
@@ -128,7 +134,7 @@ class Queue
|
||||
->find();
|
||||
|
||||
foreach ($webhookList as $webhook) {
|
||||
$this->entityManager->createEntity('WebhookQueueItem', [
|
||||
$this->entityManager->createEntity(WebhookQueueItem::ENTITY_TYPE, [
|
||||
'webhookId' => $webhook->getId(),
|
||||
'event' => $item->get('event'),
|
||||
'targetId' => $item->get('targetId'),
|
||||
@@ -143,128 +149,143 @@ class Queue
|
||||
protected function processSending(): void
|
||||
{
|
||||
$portionSize = $this->config->get('webhookQueuePortionSize', self::PORTION_SIZE);
|
||||
$batchSize = $this->config->get('webhookBatchSize', self::BATCH_SIZE);
|
||||
|
||||
$groupedItemList = $this->entityManager
|
||||
->getRDBRepository('WebhookQueueItem')
|
||||
->getRDBRepository(WebhookQueueItem::ENTITY_TYPE)
|
||||
->select(['webhookId', 'number'])
|
||||
->where([
|
||||
'number=s' => [
|
||||
'entityType' => 'WebhookQueueItem',
|
||||
'selectParams' => [
|
||||
'select' => ['MIN:number'],
|
||||
'whereClause' => [
|
||||
->where(
|
||||
Cond::in(
|
||||
Cond::column('number'),
|
||||
$this->entityManager
|
||||
->getQueryBuilder()
|
||||
->select('MIN:(number)')
|
||||
->from(WebhookQueueItem::ENTITY_TYPE)
|
||||
->where([
|
||||
'status' => 'Pending',
|
||||
'OR' => [
|
||||
['processAt' => null],
|
||||
['processAt<=' => DateTimeUtil::getSystemNowString()],
|
||||
],
|
||||
],
|
||||
'groupBy' => ['webhookId'],
|
||||
]
|
||||
],
|
||||
])
|
||||
])
|
||||
->groupBy('webhookId')
|
||||
->build()
|
||||
)
|
||||
)
|
||||
->limit(0, $portionSize)
|
||||
->order('number')
|
||||
->find();
|
||||
|
||||
foreach ($groupedItemList as $group) {
|
||||
$webhookId = $group->get('webhookId');
|
||||
|
||||
$itemList = $this->entityManager
|
||||
->getRDBRepository('WebhookQueueItem')
|
||||
->where([
|
||||
'webhookId' => $webhookId,
|
||||
'status' => 'Pending',
|
||||
'OR' => [
|
||||
['processAt' => null],
|
||||
['processAt<=' => DateTimeUtil::getSystemNowString()],
|
||||
],
|
||||
])
|
||||
->order('number')
|
||||
->limit(0, $batchSize)
|
||||
->find();
|
||||
|
||||
$webhook = $this->entityManager->getEntity('Webhook', $webhookId);
|
||||
|
||||
if (!$webhook || !$webhook->get('isActive')) {
|
||||
foreach ($itemList as $item) {
|
||||
$this->deleteQueueItem($item);
|
||||
}
|
||||
}
|
||||
|
||||
$forbiddenAttributeList = [];
|
||||
|
||||
$user = null;
|
||||
|
||||
if ($webhook->get('userId')) {
|
||||
$user = $this->entityManager->getEntity('User', $webhook->get('userId'));
|
||||
|
||||
if (!$user) {
|
||||
foreach ($itemList as $item) {
|
||||
$this->deleteQueueItem($item);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
$forbiddenAttributeList = $this->aclManager
|
||||
->getScopeForbiddenAttributeList($user, $webhook->get('entityType'));
|
||||
}
|
||||
|
||||
$actualItemList = [];
|
||||
|
||||
$dataList = [];
|
||||
|
||||
foreach ($itemList as $item) {
|
||||
$targetType = $item->get('targetType');
|
||||
$target = null;
|
||||
|
||||
if ($this->entityManager->hasRepository($targetType)) {
|
||||
$target = $this->entityManager
|
||||
->getRDBRepository($targetType)
|
||||
->where([
|
||||
'id' => $item->get('targetId')
|
||||
])
|
||||
->findOne(['withDeleted' => true]);
|
||||
}
|
||||
|
||||
if (!$target) {
|
||||
$this->deleteQueueItem($item);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($user) {
|
||||
if (!$this->aclManager->check($user, $target)) {
|
||||
$this->deleteQueueItem($item);
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
$data = $item->get('data') ?? (object) [];
|
||||
|
||||
$data = clone $data;
|
||||
|
||||
foreach ($forbiddenAttributeList as $attribute) {
|
||||
unset($data->$attribute);
|
||||
}
|
||||
|
||||
$actualItemList[] = $item;
|
||||
|
||||
$dataList[] = $data;
|
||||
}
|
||||
|
||||
if (empty($dataList)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->send($webhook, $dataList, $actualItemList);
|
||||
foreach ($groupedItemList as $groupItem) {
|
||||
$this->processSendingGroup($groupItem->get('webhookId'));
|
||||
}
|
||||
}
|
||||
|
||||
protected function send(Webhook $webhook, array $dataList, array $itemList): void
|
||||
private function processSendingGroup(string $webhookId): void
|
||||
{
|
||||
$batchSize = $this->config->get('webhookBatchSize', self::BATCH_SIZE);
|
||||
|
||||
$itemList = $this->entityManager
|
||||
->getRDBRepository(WebhookQueueItem::ENTITY_TYPE)
|
||||
->where([
|
||||
'webhookId' => $webhookId,
|
||||
'status' => 'Pending',
|
||||
'OR' => [
|
||||
['processAt' => null],
|
||||
['processAt<=' => DateTimeUtil::getSystemNowString()],
|
||||
],
|
||||
])
|
||||
->order('number')
|
||||
->limit(0, $batchSize)
|
||||
->find();
|
||||
|
||||
$webhook = $this->entityManager->getEntity(Webhook::ENTITY_TYPE, $webhookId);
|
||||
|
||||
if (!$webhook || !$webhook->get('isActive')) {
|
||||
foreach ($itemList as $item) {
|
||||
$this->deleteQueueItem($item);
|
||||
}
|
||||
}
|
||||
|
||||
$forbiddenAttributeList = [];
|
||||
|
||||
$user = null;
|
||||
|
||||
if ($webhook->get('userId')) {
|
||||
$user = $this->entityManager->getEntity(User::ENTITY_TYPE, $webhook->get('userId'));
|
||||
|
||||
if (!$user) {
|
||||
foreach ($itemList as $item) {
|
||||
$this->deleteQueueItem($item);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$forbiddenAttributeList = $this->aclManager
|
||||
->getScopeForbiddenAttributeList($user, $webhook->get('entityType'));
|
||||
}
|
||||
|
||||
$actualItemList = [];
|
||||
|
||||
$dataList = [];
|
||||
|
||||
foreach ($itemList as $item) {
|
||||
$data = $this->prepareItemData($item, $user, $forbiddenAttributeList);
|
||||
|
||||
if ($data === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$actualItemList[] = $item;
|
||||
|
||||
$dataList[] = $data;
|
||||
}
|
||||
|
||||
if (empty($dataList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->send($webhook, $dataList, $actualItemList);
|
||||
}
|
||||
|
||||
private function prepareItemData(WebhookQueueItem $item, User $user, array $forbiddenAttributeList): ?stdClass
|
||||
{
|
||||
$targetType = $item->get('targetType');
|
||||
$target = null;
|
||||
|
||||
if ($this->entityManager->hasRepository($targetType)) {
|
||||
$target = $this->entityManager
|
||||
->getRDBRepository($targetType)
|
||||
->where([
|
||||
'id' => $item->get('targetId')
|
||||
])
|
||||
->findOne(['withDeleted' => true]);
|
||||
}
|
||||
|
||||
if (!$target) {
|
||||
$this->deleteQueueItem($item);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
if ($user) {
|
||||
if (!$this->aclManager->check($user, $target)) {
|
||||
$this->deleteQueueItem($item);
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
$data = $item->get('data') ?? (object) [];
|
||||
|
||||
foreach ($forbiddenAttributeList as $attribute) {
|
||||
unset($data->$attribute);
|
||||
}
|
||||
|
||||
return $data;
|
||||
}
|
||||
|
||||
private function send(Webhook $webhook, array $dataList, array $itemList): void
|
||||
{
|
||||
try {
|
||||
$code = $this->sender->send($webhook, $dataList);
|
||||
@@ -319,13 +340,15 @@ class Queue
|
||||
|
||||
protected function deleteQueueItem(WebhookQueueItem $item): void
|
||||
{
|
||||
$this->entityManager->getRepository('WebhookQueueItem')->deleteFromDb($item->getId());
|
||||
$this->entityManager
|
||||
->getRDBRepository(WebhookQueueItem::ENTITY_TYPE)
|
||||
->deleteFromDb($item->getId());
|
||||
}
|
||||
|
||||
protected function dropWebhook(Webhook $webhook): void
|
||||
{
|
||||
$itemList = $this->entityManager
|
||||
->getRDBRepository('WebhookQueueItem')
|
||||
->getRDBRepository(WebhookQueueItem::ENTITY_TYPE)
|
||||
->where([
|
||||
'status' => 'Pending',
|
||||
'webhookId' => $webhook->getId(),
|
||||
|
||||
@@ -40,9 +40,9 @@ class Webhook
|
||||
{
|
||||
public static $order = 101;
|
||||
|
||||
protected $metadata;
|
||||
private $metadata;
|
||||
|
||||
protected $webhookManager;
|
||||
private $webhookManager;
|
||||
|
||||
public function __construct(Metadata $metadata, WebhookManager $webhookManager)
|
||||
{
|
||||
|
||||
143
tests/integration/Espo/Webhook/ProcessingTest.php
Normal file
143
tests/integration/Espo/Webhook/ProcessingTest.php
Normal file
@@ -0,0 +1,143 @@
|
||||
<?php
|
||||
/************************************************************************
|
||||
* This file is part of EspoCRM.
|
||||
*
|
||||
* EspoCRM - Open Source CRM application.
|
||||
* Copyright (C) 2014-2021 Yurii Kuznietsov, Taras Machyshyn, Oleksii Avramenko
|
||||
* Website: https://www.espocrm.com
|
||||
*
|
||||
* EspoCRM is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* EspoCRM is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with EspoCRM. If not, see http://www.gnu.org/licenses/.
|
||||
*
|
||||
* The interactive user interfaces in modified source and object code versions
|
||||
* of this program must display Appropriate Legal Notices, as required under
|
||||
* Section 5 of the GNU General Public License version 3.
|
||||
*
|
||||
* In accordance with Section 7(b) of the GNU General Public License version 3,
|
||||
* these Appropriate Legal Notices must retain the display of the "EspoCRM" word.
|
||||
************************************************************************/
|
||||
|
||||
namespace tests\integration\Espo\Webhook;
|
||||
|
||||
use Espo\Core\Webhook\Queue;
|
||||
use Espo\Core\Webhook\Sender;
|
||||
|
||||
use Espo\ORM\EntityManager;
|
||||
|
||||
use Espo\Entities\Webhook;
|
||||
|
||||
use Espo\Modules\Crm\Entities\Account;
|
||||
|
||||
class ProcessingTest extends \tests\integration\Core\BaseTestCase
|
||||
{
|
||||
public function testProcessing1(): void
|
||||
{
|
||||
$user = $this->createUser(
|
||||
[
|
||||
'userName' => 'test',
|
||||
'password' => '1',
|
||||
],
|
||||
[
|
||||
'data' => [
|
||||
'Webhook' => true,
|
||||
'Account' => [
|
||||
'create' => 'yes',
|
||||
'read' => 'own',
|
||||
],
|
||||
],
|
||||
]
|
||||
);
|
||||
|
||||
/* @var $em EntityManager */
|
||||
$em = $this->getContainer()->get('entityManager');
|
||||
|
||||
$em->createEntity(Webhook::ENTITY_TYPE, [
|
||||
'event' => 'Account.create',
|
||||
'userId' => $user->getId(),
|
||||
'url' => 'https://test',
|
||||
]);
|
||||
|
||||
$em->createEntity(Webhook::ENTITY_TYPE, [
|
||||
'event' => 'Account.update',
|
||||
'userId' => $user->getId(),
|
||||
'url' => 'https://test',
|
||||
]);
|
||||
|
||||
$this->getContainer()->get('dataManager')->clearCache();
|
||||
|
||||
$app = $this->createApplication();
|
||||
|
||||
$em = $app->getContainer()->get('entityManager');
|
||||
|
||||
$account1 = $em->createEntity(Account::ENTITY_TYPE, [
|
||||
'name' => 'test1',
|
||||
'assignedUserId' => $user->getId(),
|
||||
]);
|
||||
|
||||
$account2 = $em->createEntity(Account::ENTITY_TYPE, [
|
||||
'name' => 'test2',
|
||||
'assignedUserId' => $user->getId(),
|
||||
]);
|
||||
|
||||
$em->createEntity(Account::ENTITY_TYPE, [
|
||||
'name' => 'test3',
|
||||
]);
|
||||
|
||||
$dataList1 = [
|
||||
$account1->getValueMap(),
|
||||
$account2->getValueMap(),
|
||||
];
|
||||
|
||||
$account1->set('name', 'test-1-changed');
|
||||
|
||||
$em->saveEntity($account1);
|
||||
|
||||
$dataList2 = [
|
||||
(object) [
|
||||
'name' => $account1->get('name'),
|
||||
'modifiedById' => 'system',
|
||||
'modifiedByName' => 'System',
|
||||
'id' => $account1->getId(),
|
||||
],
|
||||
];
|
||||
|
||||
$sender = $this->createMock(Sender::class);
|
||||
|
||||
/* @var $queue Queue */
|
||||
$queue = $app->getContainer()
|
||||
->get('injectableFactory')
|
||||
->createWith(Queue::class, [
|
||||
'sender' => $sender,
|
||||
]);
|
||||
|
||||
$sender
|
||||
->expects($this->exactly(2))
|
||||
->method('send')
|
||||
->withConsecutive(
|
||||
[
|
||||
$this->isInstanceOf(Webhook::class),
|
||||
$dataList1,
|
||||
],
|
||||
[
|
||||
$this->isInstanceOf(Webhook::class),
|
||||
$dataList2,
|
||||
]
|
||||
)
|
||||
->willReturn(
|
||||
200,
|
||||
200
|
||||
);
|
||||
|
||||
$queue->process();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user