39 private const LOCK_CONNECTION_TIME = 20;
57 $data = PushTable::query()
59 ->addFilter(
'=ENTITY_TYPE', $entityType)
61 ->exec()->fetchObject();
83 $data[
'ENTITY_TYPE'] = $entityType;
86 if (empty(
$data[
'RESOURCE_ID']))
88 return $result->addError(
new Error(
'Resource ID is required.'));
92 if ($addResult = PushTable::add(
$data)->getObject())
95 'push' => (
new BuilderPushFromDM($addResult))->build(),
100 $result->addError(
new Error(
'Error of add push info into db.'));
120 $updateResult = PushTable::update([
125 if ($updateResult->isSuccess())
134 $result->addError(
new Error(
'Error of update push in db.'));
201 $row = PushTable::query()
203 ->addFilter(
'=CHANNEL_ID', $channel)
205 ->exec()->fetchObject()
211 if ($push->isBlocked())
218 if (!$forceUnprocessedPush && $push->isUnprocessed())
225 $this->blockPush($push);
226 if ($push->getEntityType() === self::TYPE_SECTION_CONNECTION)
228 $this->syncSection($push);
230 elseif ($push->getEntityType() === self::TYPE_CONNECTION)
232 $this->syncConnection($push);
235 if ($this->getPushState($push->getEntityType(), $push->getEntityId())
264 private function getPushState(
string $entityType,
string $entityId)
266 $row = PushTable::query()
267 ->setSelect([
'NOT_PROCESSED'])
268 ->addFilter(
'=ENTITY_TYPE', $entityType)
272 return $row[
'NOT_PROCESSED'] ??
null;
284 private function syncSection(
Push $push): void
288 $sectionLink = (
new SectionConnection())->getById($push->getEntityId());
294 if (!$this->lockConnection($sectionLink->getConnection(), self::LOCK_CONNECTION_TIME))
296 $this->pushSectionToQueue($sectionLink);
299 $syncSectionMap =
new SyncSectionMap();
300 $syncSection = (
new SyncSection())
301 ->setSection($sectionLink->getSection())
302 ->setSectionConnection($sectionLink)
303 ->setVendorName($sectionLink->getConnection()->getVendor()->getCode());
305 $syncSectionMap->add(
307 $syncSection->getSectionConnection()->getVendorSectionId()
310 $factory = FactoryBuilder::create(
311 $sectionLink->getConnection()->getVendor()->getCode(),
312 $sectionLink->getConnection(),
316 $manager =
new VendorDataExchangeManager($factory, $syncSectionMap);
320 ->updateConnection($sectionLink->getConnection());
322 $this->markPushSuccess($push,
true);
324 catch(BaseException $e)
326 $this->markPushSuccess($push,
false);
330 $this->unLockConnection($sectionLink->getConnection());
344 private function syncConnection(Push $push): void
349 $connection = (
new Connection())->getById($push->getEntityId());
355 catch (ArgumentException $e)
369 $factory = FactoryBuilder::create(
372 new Sync\Util\Context()
376 $manager =
new VendorDataExchangeManager(
378 (
new SyncSectionFactory())->getSyncSectionMapByFactory($factory)
382 ->updateConnection($factory->getConnection())
404 private function markPushSuccess(Push $push,
bool $success): void
411 elseif(!$push->getFirstPushDate())
413 $push->setFirstPushDate(
new Date());
432 return $this->blockPush($push);
449 private function blockPush(
Push $push): bool
451 return PushTable::update(
562 private function pushSectionToQueue(
Sync\
Connection\SectionConnection $sectionLink): void
564 $message = (
new Queue\Message\Message())
566 Sync\
Push\Dictionary::PUSH_TYPE[
'sectionConnection'] => $sectionLink->getId(),
568 ->setRoutingKey(self::QUEUE_ROUTE_KEY_SECTION);
569 Queue\Producer\Factory::getProduser()->send(
$message);
583 $message = (
new Queue\Message\Message())
587 ->setRoutingKey(self::QUEUE_ROUTE_KEY_CONNECTION);
588 Queue\Producer\Factory::getProduser()->send(
$message);
if( $daysToExpire >=0 &&$daysToExpire< 60 elseif)( $daysToExpire< 0)