1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
pushmanager.php
См. документацию.
1<?php
2
3namespace Bitrix\Calendar\Sync\Managers;
4
5use Bitrix\Calendar\Core\Base\BaseException;
6use Bitrix\Calendar\Core\Base\Date;
7use Bitrix\Calendar\Core\Mappers\Connection;
8use Bitrix\Calendar\Core\Mappers\SectionConnection;
9use Bitrix\Calendar\Core\Queue;
10use Bitrix\Calendar\Core\Queue\Exception\InvalidDestinationException;
11use Bitrix\Calendar\Core\Queue\Exception\InvalidMessageException;
12use Bitrix\Calendar\Internals\EO_Push;
13use Bitrix\Calendar\Core\Base\Mutex;
14use Bitrix\Calendar\Internals\PushTable;
15use Bitrix\Calendar\Sync\Builders\BuilderPushFromDM;
16use Bitrix\Calendar\Sync\Dictionary;
17use Bitrix\Calendar\Sync;
18use Bitrix\Calendar\Sync\Entities\SyncSection;
19use Bitrix\Calendar\Sync\Entities\SyncSectionMap;
20use Bitrix\Calendar\Sync\Factories\FactoryBuilder;
21use Bitrix\Calendar\Sync\Factories\SyncSectionFactory;
22use Bitrix\Calendar\Sync\Push\Push;
23use Bitrix\Calendar\Sync\Util\Result;
24use Bitrix\Main\ArgumentException;
25use Bitrix\Main\Error;
26use Bitrix\Main\ObjectException;
27use Bitrix\Main\ObjectNotFoundException;
28use Bitrix\Main\ObjectPropertyException;
29use Bitrix\Main\SystemException;
30use Exception;
31use Throwable;
32
34{
35 public const TYPE_CONNECTION = 'CONNECTION';
36 public const TYPE_SECTION_CONNECTION = 'SECTION_CONNECTION';
37 public const TYPE_SECTION = 'SECTION';
38
39 private const LOCK_CONNECTION_TIME = 20;
40
41 public const QUEUE_ROUTE_KEY_SECTION = 'calendar:SyncSectionPush';
42 public const QUEUE_ROUTE_KEY_CONNECTION = 'calendar:SyncConnectionPush';
43
55 public function getPush(string $entityType, int $entityId): ?Push
56 {
57 $data = PushTable::query()
58 ->setSelect(['*'])
59 ->addFilter('=ENTITY_TYPE', $entityType)
60 ->addFilter('ENTITY_ID', $entityId)
61 ->exec()->fetchObject();
62 if ($data)
63 {
64 return (new BuilderPushFromDM($data))->build();
65 }
66
67 return null;
68 }
69
80 public function addPush(string $entityType, int $entityId, array $data): Result
81 {
82 $result = new Result();
83 $data['ENTITY_TYPE'] = $entityType;
84 $data['ENTITY_ID'] = $entityId;
85
86 if (empty($data['RESOURCE_ID']))
87 {
88 return $result->addError(new Error('Resource ID is required.'));
89 }
90
92 if ($addResult = PushTable::add($data)->getObject())
93 {
94 $result->setData([
95 'push' => (new BuilderPushFromDM($addResult))->build(),
96 ]);
97 }
98 else
99 {
100 $result->addError(new Error('Error of add push info into db.'));
101 }
102
103 return $result;
104 }
105
115 public function renewPush(Push $push, array $data): Result
116 {
117 $result = new Result();
118
119 // TODO: move this logic to push-mapper
120 $updateResult = PushTable::update([
121 'ENTITY_TYPE' => $push->getEntityType(),
122 'ENTITY_ID' => $push->getEntityId(),
123 ], $data);
124
125 if ($updateResult->isSuccess())
126 {
127 $push->setExpireDate(new Date($data['EXPIRES']));
128 $result->setData([
129 'push' => $push,
130 ]);
131 }
132 else
133 {
134 $result->addError(new Error('Error of update push in db.'));
135 }
136
137 return $result;
138 }
139
147 public function updatePush(Push $pushChannel): void
148 {
149 $data = [
150 'CHANNEL_ID' => $pushChannel->getChannelId(),
151 'RESOURCE_ID' => $pushChannel->getResourceId(),
152 'EXPIRES' => $pushChannel->getExpireDate()
153 ? $pushChannel->getExpireDate()->getDate()
154 : null
155 ,
156 'NOT_PROCESSED' => $pushChannel->getProcessStatus(),
157 'FIRST_PUSH_DATE' => $pushChannel->getFirstPushDate()
158 ? $pushChannel->getFirstPushDate()->getDate()
159 : null
160 ];
161 PushTable::update(
162 [
163 'ENTITY_TYPE' => $pushChannel->getEntityType(),
164 'ENTITY_ID' => $pushChannel->getEntityId(),
165 ],
166 $data
167 );
168 }
169
175 public function deletePush(Push $push): void
176 {
177 PushTable::delete([
178 'ENTITY_TYPE' => $push->getEntityType(),
179 'ENTITY_ID' => $push->getEntityId(),
180 ]);
181 }
182
198 public function handlePush(string $channel, string $resourceId, bool $forceUnprocessedPush = false): Result
199 {
200 $result = new Result();
201 $row = PushTable::query()
202 ->setSelect(['*'])
203 ->addFilter('=CHANNEL_ID', $channel)
204 ->addFilter('=RESOURCE_ID', $resourceId)
205 ->exec()->fetchObject()
206 ;
207 if ($row)
208 {
209 $push = (new BuilderPushFromDM($row))->build();
210
211 if ($push->isBlocked())
212 {
213 $this->setUnprocessedPush($push);
214
215 return new Result();
216 }
217
218 if (!$forceUnprocessedPush && $push->isUnprocessed())
219 {
220 return new Result();
221 }
222
223 try
224 {
225 $this->blockPush($push);
226 if ($push->getEntityType() === self::TYPE_SECTION_CONNECTION)
227 {
228 $this->syncSection($push);
229 }
230 elseif ($push->getEntityType() === self::TYPE_CONNECTION)
231 {
232 $this->syncConnection($push);
233 }
234
235 if ($this->getPushState($push->getEntityType(), $push->getEntityId())
236 === Dictionary::PUSH_STATUS_PROCESS['unprocessed'])
237 {
238 $this->handlePush($channel, $resourceId, true);
239 }
240 }
241 catch(Throwable $e)
242 {
243 }
244 finally
245 {
246 $this->setUnblockPush($push);
247 }
248
249
250 }
251
252 return $result;
253 }
254
264 private function getPushState(string $entityType, string $entityId)
265 {
266 $row = PushTable::query()
267 ->setSelect(['NOT_PROCESSED'])
268 ->addFilter('=ENTITY_TYPE', $entityType)
269 ->addFilter('=ENTITY_ID', $entityId)
270 ->exec()->fetch();
271
272 return $row['NOT_PROCESSED'] ?? null;
273 }
274
284 private function syncSection(Push $push): void
285 {
288 $sectionLink = (new SectionConnection())->getById($push->getEntityId());
289
290 if ($sectionLink)
291 {
292 try
293 {
294 if (!$this->lockConnection($sectionLink->getConnection(), self::LOCK_CONNECTION_TIME))
295 {
296 $this->pushSectionToQueue($sectionLink);
297 return;
298 }
299 $syncSectionMap = new SyncSectionMap();
300 $syncSection = (new SyncSection())
301 ->setSection($sectionLink->getSection())
302 ->setSectionConnection($sectionLink)
303 ->setVendorName($sectionLink->getConnection()->getVendor()->getCode());
304
305 $syncSectionMap->add(
306 $syncSection,
307 $syncSection->getSectionConnection()->getVendorSectionId()
308 );
309
310 $factory = FactoryBuilder::create(
311 $sectionLink->getConnection()->getVendor()->getCode(),
312 $sectionLink->getConnection(),
313 new Sync\Util\Context()
314 );
315
316 $manager = new VendorDataExchangeManager($factory, $syncSectionMap);
317
319 ->importEvents()
320 ->updateConnection($sectionLink->getConnection());
321
322 $this->markPushSuccess($push, true);
323 }
324 catch(BaseException $e)
325 {
326 $this->markPushSuccess($push, false);
327 }
328 finally
329 {
330 $this->unLockConnection($sectionLink->getConnection());
331 }
332 }
333 else
334 {
335 $this->deletePush($push);
336 }
337 }
338
344 private function syncConnection(Push $push): void
345 {
346 try
347 {
349 $connection = (new Connection())->getById($push->getEntityId());
350 if (!$connection || $connection->isDeleted())
351 {
352 return;
353 }
354 }
355 catch (ArgumentException $e)
356 {
357 return;
358 }
359
360 try
361 {
362
363 if (!$this->lockConnection($connection, self::LOCK_CONNECTION_TIME))
364 {
365 $this->pushConnectionToQueue($connection);
366 return;
367 }
368
369 $factory = FactoryBuilder::create(
370 $connection->getVendor()->getCode(),
372 new Sync\Util\Context()
373 );
374 if ($factory)
375 {
376 $manager = new VendorDataExchangeManager(
377 $factory,
378 (new SyncSectionFactory())->getSyncSectionMapByFactory($factory)
379 );
381 ->importSections()
382 ->updateConnection($factory->getConnection())
383 ;
384 }
385 }
386 catch(\Exception $e)
387 {
388 }
389 finally
390 {
392 }
393
394 }
395
404 private function markPushSuccess(Push $push, bool $success): void
405 {
406 if (!$success)
407 {
408 $push->setProcessStatus(Dictionary::PUSH_STATUS_PROCESS['unblocked']);
409 $this->updatePush($push);
410 }
411 elseif(!$push->getFirstPushDate())
412 {
413 $push->setFirstPushDate(new Date());
414 $this->updatePush($push);
415 }
416 }
417
423 public function setBlockPush(?Push $push): bool
424 {
425 if (!$push || $push->isProcessed())
426 {
427 return false;
428 }
429
430 try
431 {
432 return $this->blockPush($push);
433 }
434 catch (Exception $e)
435 {
436 return false;
437 }
438 }
439
449 private function blockPush(Push $push): bool
450 {
451 return PushTable::update(
452 [
453 'ENTITY_TYPE' => $push->getEntityType(),
454 'ENTITY_ID' => $push->getEntityId(),
455 ],
456 [
457 'NOT_PROCESSED' => Dictionary::PUSH_STATUS_PROCESS['block']
458 ]
459 )->isSuccess();
460 }
461
475 public function setUnblockPush(?Push $push): void
476 {
477 if (!$push)
478 {
479 return;
480 }
481
482 PushTable::update(
483 [
484 'ENTITY_TYPE' => $push->getEntityType(),
485 'ENTITY_ID' => $push->getEntityId(),
486 ],
487 [
488 'NOT_PROCESSED' => Dictionary::PUSH_STATUS_PROCESS['unblocked']
489 ]
490 );
491
492 if ($push->isUnprocessed())
493 {
494 $this->handlePush($push->getChannelId(), $push->getResourceId());
495 }
496 }
497
502 public function setUnprocessedPush(?Push $push): void
503 {
504 if (!$push || $push->isUnprocessed())
505 {
506 return;
507 }
508
509 PushTable::update(
510 [
511 'ENTITY_TYPE' => $push->getEntityType(),
512 'ENTITY_ID' => $push->getEntityId(),
513 ],
514 [
515 'NOT_PROCESSED' => Dictionary::PUSH_STATUS_PROCESS['unprocessed']
516 ]
517 );
518 }
519
527 public function lockConnection(Sync\Connection\Connection $connection, int $time = 30): bool
528 {
529 return $this->getMutex($connection)->lock($time);
530 }
531
538 {
539 return $this->getMutex($connection)->unlock();
540 }
541
547 private function getMutex(Sync\Connection\Connection $connection): Mutex
548 {
549 $key = 'lockPushForConnection_' . $connection->getId();
550 return new Mutex($key);
551 }
552
562 private function pushSectionToQueue(Sync\Connection\SectionConnection $sectionLink): void
563 {
564 $message = (new Queue\Message\Message())
565 ->setBody([
566 Sync\Push\Dictionary::PUSH_TYPE['sectionConnection'] => $sectionLink->getId(),
567 ])
568 ->setRoutingKey(self::QUEUE_ROUTE_KEY_SECTION);
569 Queue\Producer\Factory::getProduser()->send($message);
570 }
571
581 private function pushConnectionToQueue(Sync\Connection\Connection $connection): void
582 {
583 $message = (new Queue\Message\Message())
584 ->setBody([
585 Sync\Push\Dictionary::PUSH_TYPE['connection'] => $connection->getId(),
586 ])
587 ->setRoutingKey(self::QUEUE_ROUTE_KEY_CONNECTION);
588 Queue\Producer\Factory::getProduser()->send($message);
589 }
590}
$connection
Определения actionsdefinitions.php:38
$resourceId
Определения push.php:24
const PUSH_STATUS_PROCESS
Определения dictionary.php:33
deletePush(Push $pushChannel)
Определения pushmanager.php:84
lockConnection(Sync\Connection\Connection $connection, int $time=30)
Определения pushmanager.php:527
handlePush(string $channel, string $resourceId, bool $forceUnprocessedPush=false)
Определения pushmanager.php:198
setUnprocessedPush(?Push $push)
Определения pushmanager.php:502
setUnblockPush(?Push $push)
Определения pushmanager.php:475
updatePush(Push $pushChannel)
Определения pushmanager.php:147
getPush(string $entityType, int $entityId)
Определения pushmanager.php:55
renewPush(Push $push, array $data)
Определения pushmanager.php:115
unLockConnection(Sync\Connection\Connection $connection)
Определения pushmanager.php:537
getEntityId()
Определения push.php:60
getResourceId()
Определения push.php:96
getFirstPushDate()
Определения push.php:201
getProcessStatus()
Определения push.php:193
getChannelId()
Определения push.php:78
setExpireDate(Date $expireDate)
Определения push.php:123
getEntityType()
Определения push.php:42
isUnprocessed()
Определения push.php:163
getExpireDate()
Определения push.php:114
isProcessed()
Определения push.php:140
Определения util.php:21
Определения error.php:15
Определения date.php:9
$data['IS_AVAILABLE']
Определения .description.php:13
</td ></tr ></table ></td ></tr >< tr >< td class="bx-popup-label bx-width30"><?=GetMessage("PAGE_NEW_TAGS")?> array( $site)
Определения file_new.php:804
$result
Определения get_property_values.php:14
$success
Определения mail_entry.php:69
Определения culture.php:9
$manager
Определения office365push.php:39
$entityId
Определения payment.php:4
$time
Определения payment.php:61
$message
Определения payment.php:8
if( $daysToExpire >=0 &&$daysToExpire< 60 elseif)( $daysToExpire< 0)
Определения prolog_main_admin.php:393
if(empty($signedUserToken)) $key
Определения quickway.php:257