1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
pull_channel.php
См. документацию.
1<?php
2
6
8{
9 const TYPE_PRIVATE = 'private';
10 const TYPE_SHARED = 'shared';
11
12 const CHANNEL_TTL = 43205;
13
14 private const CACHE_TABLE = "b_pull_channel";
15
16 // cache key is calculated with the `getLockKey` method
17 private static array $staticCache = [];
18
19 public static function GetNewChannelId($suffix = '')
20 {
21 global $APPLICATION;
22 return md5(uniqid().$_SERVER["REMOTE_ADDR"].$_SERVER["SERVER_NAME"].(is_object($APPLICATION)? $APPLICATION->GetServerUniqID(): '').$suffix);
23 }
24
25 public static function GetNewChannelIdByTag(string $tag, string $suffix = '')
26 {
27 global $APPLICATION;
28 return md5($tag.(is_object($APPLICATION)? $APPLICATION->GetServerUniqID(): '').$suffix);
29 }
30
31 public static function GetChannelShared($channelType = self::TYPE_SHARED, $cache = true, $reOpen = false)
32 {
33 return self::GetShared($cache, $reOpen, $channelType);
34 }
35
36 public static function GetShared($cache = true, $reOpen = false, $channelType = self::TYPE_SHARED)
37 {
38 return self::Get(0, $cache, $reOpen, $channelType);
39 }
40
41 public static function GetChannel($userId, $channelType = self::TYPE_PRIVATE, $cache = true, $reOpen = false)
42 {
43 return self::Get($userId, $cache, $reOpen, $channelType);
44 }
45
46 public static function Get(int $userId, $cache = true, $reOpen = false, $channelType = self::TYPE_PRIVATE)
47 {
48 if (!CPullOptions::GetQueueServerStatus())
49 {
50 return false;
51 }
52 if ($userId && !self::isUserActive($userId))
53 {
54 return false;
55 }
56
57 $channelType = (string)$channelType ?: self::TYPE_PRIVATE;
58 $lockId = self::getLockKey($userId, $channelType);
59
60 $cached = self::$staticCache[$lockId];
61 if ($cached && !self::isExpired($cached['CHANNEL_DT']))
62 {
63 return $cached;
64 }
65
66 $arResult = static::getInternal($userId, $channelType);
67 if ($arResult && !self::isExpired($arResult['DATE_CREATE']))
68 {
69 $result = [
70 'CHANNEL_ID' => $arResult['CHANNEL_ID'],
71 'CHANNEL_PUBLIC_ID' => $arResult['CHANNEL_PUBLIC_ID'],
72 'CHANNEL_TYPE' => $arResult['CHANNEL_TYPE'],
73 'CHANNEL_DT' => $arResult['DATE_CREATE'],
74 'LAST_ID' => $arResult['LAST_ID'],
75 ];
76 self::$staticCache[$lockId] = $result;
77
78 return $result;
79 }
80
82 if (!$connection->lock($lockId, 2))
83 {
84 trigger_error("Could not get lock for creating a new channel", E_USER_WARNING);
85
86 return false;
87 }
88
89 // try reading once again, because DB state could be changed in a concurrent process
90 $arResult = static::getInternal($userId, $channelType);
91 if ($arResult && !self::isExpired($arResult['DATE_CREATE']))
92 {
93 $connection->unlock($lockId);
94 $result = [
95 'CHANNEL_ID' => $arResult['CHANNEL_ID'],
96 'CHANNEL_PUBLIC_ID' => $arResult['CHANNEL_PUBLIC_ID'],
97 'CHANNEL_TYPE' => $arResult['CHANNEL_TYPE'],
98 'CHANNEL_DT' => $arResult['DATE_CREATE'],
99 'LAST_ID' => $arResult['LAST_ID'],
100 ];
101 self::$staticCache[$lockId] = $result;
102
103 return $result;
104 }
105
107 $publicChannelId = $userId>0? self::GetNewChannelId('public'): '';
108
109 if ($arResult)
110 {
111 $result = self::Update($userId, $arResult['CHANNEL_ID'], $channelId, $publicChannelId, $channelType);
112 }
113 else
114 {
115 $result = self::Add($userId, $channelId, $publicChannelId, $channelType);
116 }
117
118 $connection->unlock($lockId);
119 if (!$result->isSuccess())
120 {
121 return false;
122 }
123
124 if (isset($arResult['CHANNEL_ID']))
125 {
126 self::sendChannelExpired($userId, $channelType, $arResult['CHANNEL_ID'], $channelId);
127 }
128
129 $result = [
130 'CHANNEL_ID' => $channelId,
131 'CHANNEL_PUBLIC_ID' => $publicChannelId,
132 'CHANNEL_TYPE' => $channelType,
133 'CHANNEL_DT' => time(),
134 'LAST_ID' => 0,
135 ];
136 self::$staticCache[$lockId] = $result;
137
138 return $result;
139 }
140
141 private static function getInternal(int $userId, $channelType = self::TYPE_PRIVATE)
142 {
143 global $DB;
144
145 $arResult = false;
146
147 if(!is_array($arResult) || !isset($arResult['CHANNEL_ID']) || ($userId > 0 && !isset($arResult['CHANNEL_PUBLIC_ID'])))
148 {
149 CTimeZone::Disable();
150 $strSql = "
151 SELECT C.CHANNEL_ID, C.CHANNEL_PUBLIC_ID, C.CHANNEL_TYPE, ".$DB->DatetimeToTimestampFunction('C.DATE_CREATE')." AS DATE_CREATE, C.LAST_ID
152 FROM b_pull_channel C
153 WHERE C.USER_ID = ".$userId." AND C.CHANNEL_TYPE = '".$DB->ForSQL($channelType)."'
154 ";
155 CTimeZone::Enable();
156 $res = $DB->Query($strSql);
157 $arResult = $res->Fetch();
158 }
159
160 return $arResult;
161 }
162
163 private static function isUserActive(int $userId): bool
164 {
165 $userData = UserTable::query()
166 ->setSelect(['ACTIVE'])
167 ->where('ID', $userId)
168 ->fetch()
169 ;
170
171 return $userData && $userData['ACTIVE'] === 'Y';
172 }
173
174 private static function isExpired(int $timestamp): bool
175 {
176 return $timestamp + self::CHANNEL_TTL <= time();
177 }
178
179 public static function SignChannel($channelId)
180 {
181 $signatureKey = \Bitrix\Pull\Config::getSignatureKey();
182 if (!is_string($channelId))
183 {
184 trigger_error("Channel ID must be the string", E_USER_WARNING);
185
186 return $channelId;
187 }
188 if ($signatureKey === "")
189 {
190 return $channelId;
191 }
192
193 return $channelId.".".static::GetSignature($channelId);
194 }
195
196 public static function SignPublicChannel($channelId)
197 {
198 $signatureKey = \Bitrix\Pull\Config::getSignatureKey();
199 if ($signatureKey === "" || !is_string($channelId))
200 {
201 return "";
202 }
203
204 return $channelId.".".static::GetPublicSignature($channelId);
205 }
206
207 public static function GetPublicSignature($value)
208 {
209 return static::GetSignature("public:".$value);
210 }
211
212 public static function GetSignature($value, $signatureKey = null)
213 {
214 if(!$signatureKey)
215 {
216 $signatureKey = \Bitrix\Pull\Config::getSignatureKey();
217 }
218 $signatureAlgo = \CPullOptions::GetSignatureAlgorithm();
219 $hmac = new Sign\HmacAlgorithm();
220 $hmac->setHashAlgorithm($signatureAlgo);
221 $signer = new Sign\Signer($hmac);
222 $signer->setKey($signatureKey);
223
224 return $signer->getSignature($value);
225 }
226
227 // create a channel for the user
228 public static function Add(int $userId, string $channelId, string $publicChannelId, string $channelType = self::TYPE_PRIVATE): \Bitrix\Main\Result
229 {
230 $result = new \Bitrix\Main\Result();
231
232 $channelFields = [
233 'USER_ID' => $userId,
234 'CHANNEL_ID' => $channelId,
235 'CHANNEL_PUBLIC_ID' => $publicChannelId,
236 'CHANNEL_TYPE' => $channelType,
237 'LAST_ID' => 0,
238 'DATE_CREATE' => new \Bitrix\Main\Type\DateTime(),
239 ];
240
241 $insertResult = \Bitrix\Pull\ChannelTable::add($channelFields);
242 if (!$insertResult->isSuccess())
243 {
244 $result->addErrors($insertResult->getErrors());
245 }
246
247 return $result;
248 }
249
250 private static function Update(int $userId, string $prevChannelId, string $channelId, string $publicChannelId, string $channelType = self::TYPE_PRIVATE) :\Bitrix\Main\Result
251 {
252 $result = new \Bitrix\Main\Result();
253 $updateResult = \Bitrix\Pull\ChannelTable::updateByFilter(
254 [
255 '=USER_ID' => $userId,
256 '=CHANNEL_ID' => $prevChannelId,
257 '=CHANNEL_TYPE' => $channelType,
258 ],
259 [
260 'CHANNEL_ID' => $channelId,
261 'CHANNEL_PUBLIC_ID' => $publicChannelId,
262 'DATE_CREATE' => new \Bitrix\Main\Type\DateTime(),
263 ]
264 );
265
266 if (!$updateResult->isSuccess())
267 {
268 $result->addErrors($updateResult->getErrors());
269 }
270 else if ($updateResult->getAffectedRowsCount() != 1)
271 {
272 $result->addError(new \Bitrix\Main\Error("Expected to update 1 row; updated {$updateResult->getAffectedRowsCount()} rows"));
273 }
274
275 return $result;
276 }
277
278 // remove channel by identifier
279 // before removing need to send a message to change channel
280 public static function Delete($channelId)
281 {
282 global $DB, $CACHE_MANAGER;
283
284 $strSql = "SELECT ID, USER_ID, CHANNEL_TYPE FROM b_pull_channel WHERE CHANNEL_ID = '".$DB->ForSQL($channelId)."'";
285 $res = $DB->Query($strSql);
286 if ($arRes = $res->Fetch())
287 {
288 $strSql = "DELETE FROM b_pull_channel WHERE USER_ID = ".$arRes['USER_ID']." AND CHANNEL_TYPE = '".$DB->ForSql($arRes['CHANNEL_TYPE'])."'";
289 $DB->Query($strSql);
290
291 $channelType = $arRes['CHANNEL_TYPE'];
292
293 $params = Array(
294 'action' => $channelType != self::TYPE_PRIVATE? 'reconnect': 'get_config',
295 'channel' => Array(
296 'id' => self::SignChannel($channelId),
297 'type' => $channelType,
298 ),
299 );
300 if ($channelType != self::TYPE_PRIVATE)
301 {
302 $result = self::GetShared(false);
303 if (!$result)
304 {
305 return true;
306 }
307 $params['new_channel'] = Array(
308 'id' => self::SignChannel($result['CHANNEL_ID']),
309 'start' => $result['CHANNEL_DT'],
310 'end' => date('c', $result['CHANNEL_DT']+ self::CHANNEL_TTL),
311 'type' => $channelType,
312 );
313 }
314 $arMessage = Array(
315 'module_id' => 'pull',
316 'command' => 'channel_expire',
317 'params' => $params
318 );
319 CPullStack::AddByChannel($channelId, $arMessage);
320 }
321
322 return true;
323 }
324
325 public static function DeleteByUser($userId, $channelId = null, $channelType = self::TYPE_PRIVATE)
326 {
327 global $DB, $CACHE_MANAGER;
328
329 $userId = intval($userId);
330 if ($userId == 0 && $channelType == self::TYPE_PRIVATE)
331 {
332 $channelType = self::TYPE_SHARED;
333 }
334
335 if (is_null($channelId))
336 {
337 $strSql = "SELECT CHANNEL_ID, CHANNEL_TYPE FROM b_pull_channel WHERE USER_ID = ".$userId." AND CHANNEL_TYPE = '".$DB->ForSQL($channelType)."'";
338 $res = $DB->Query($strSql);
339 if ($arRes = $res->Fetch())
340 {
341 $channelId = $arRes['CHANNEL_ID'];
342 $channelType = $arRes['CHANNEL_TYPE'];
343 }
344 }
345
346 if ($channelType == '')
347 $channelTypeSql = "(CHANNEL_TYPE = '' OR CHANNEL_TYPE IS NULL)";
348 else
349 $channelTypeSql = "CHANNEL_TYPE = '".$DB->ForSQL($channelType)."'";
350
351 $strSql = "DELETE FROM b_pull_channel WHERE USER_ID = ".$userId." AND ".$channelTypeSql;
352 $DB->Query($strSql);
353
354 $params = Array(
355 'action' => $channelType != self::TYPE_PRIVATE? 'reconnect': 'get_config',
356 'channel' => Array(
357 'id' => self::SignChannel($channelId),
358 'type' => $channelType,
359 ),
360 );
361 if ($channelType != self::TYPE_PRIVATE)
362 {
363 $result = self::GetShared(false);
364 if (!$result)
365 {
366 return true;
367 }
368 $params['new_channel'] = Array(
369 'id' => self::SignChannel($result['CHANNEL_ID']),
370 'start' => $result['CHANNEL_DT'],
371 'end' => date('c', $result['CHANNEL_DT']+ self::CHANNEL_TTL),
372 'type' => $channelType,
373 );
374 }
375 $arMessage = Array(
376 'module_id' => 'pull',
377 'command' => 'channel_expire',
378 'params' => $params
379 );
380
381 CPullStack::AddByChannel($channelId, $arMessage);
382
383 return true;
384 }
385
386 public static function Send($channelId, $message, $options = array())
387 {
388 $result_start = '{"infos": ['; $result_end = ']}';
389 if (is_array($channelId) && CPullOptions::GetQueueServerVersion() == 1 && !CPullOptions::IsServerShared())
390 {
391 $results = Array();
392 foreach ($channelId as $channel)
393 {
394 $results[] = self::SendCommand($channel, $message, $options);
395 }
396 $result = json_decode($result_start.implode(',', $results).$result_end);
397 }
398 else if (is_array($channelId))
399 {
400 $commandPerHit = CPullOptions::GetCommandPerHit();
401 if (count($channelId) > $commandPerHit)
402 {
403 $arGroup = Array();
404 $i = 0;
405 foreach($channelId as $channel)
406 {
407 if (!isset($arGroup[$i]))
408 {
409 $arGroup[$i] = [];
410 }
411 if (count($arGroup[$i]) == $commandPerHit)
412 {
413 $i++;
414 }
415
416 $arGroup[$i][] = $channel;
417 }
418 $results = Array();
419 foreach($arGroup as $channels)
420 {
421 $result = self::SendCommand($channels, $message, $options);
422 $subresult = json_decode($result);
423 if (is_array($subresult->infos))
424 {
425 $results = array_merge($results, $subresult->infos);
426 }
427 }
428 $result = json_decode('{"infos":'.json_encode($results).'}');
429 }
430 else
431 {
432 $result = self::SendCommand($channelId, $message, $options);
433 $result = json_decode($result);
434 }
435 }
436 else
437 {
438 $result = self::SendCommand($channelId, $message, $options);
439 if($result === false)
440 {
441 return $result;
442 }
443 $result = json_decode($result_start.$result.$result_end);
444 }
445
446 return $result;
447 }
448
449 private static function SendCommand($channelId, $message, $options = array())
450 {
451 if (!is_array($channelId))
452 $channelId = Array($channelId);
453
454 $channelId = implode('/', array_unique($channelId));
455
456 if ($channelId == '' || $message == '')
457 return false;
458
459 $defaultOptions = array(
460 "method" => "POST",
461 "timeout" => 5,
462 "dont_wait_answer" => true
463 );
464
465 $options = array_merge($defaultOptions, $options);
466
467 if (!in_array($options["method"], Array('POST', 'GET')))
468 return false;
469
470 $nginx_error = COption::GetOptionString("pull", "nginx_error", "N");
471 if ($nginx_error != "N")
472 {
473 $nginx_error = unserialize($nginx_error, ["allowed_classes" => false]);
474 if (intval($nginx_error['date'])+120 < time())
475 {
476 COption::SetOptionString("pull", "nginx_error", "N");
477 CAdminNotify::DeleteByTag("PULL_ERROR_SEND");
478 $nginx_error = "N";
479 }
480 else if ($nginx_error['count'] >= 10)
481 {
482 $ar = Array(
483 "MESSAGE" => Loc::getMessage('PULL_ERROR_SEND'),
484 "TAG" => "PULL_ERROR_SEND",
485 "MODULE_ID" => "pull",
486 );
488 return false;
489 }
490 }
491
492 $postdata = CHTTP::PrepareData($message);
493
494 $httpClient = new \Bitrix\Main\Web\HttpClient([
495 "socketTimeout" => (int)$options["timeout"],
496 "streamTimeout" => (int)$options["timeout"],
497 "waitResponse" => !$options["dont_wait_answer"]
498 ]);
499 if (isset($options["expiry"]) && (int)$options["expiry"])
500 {
501 $httpClient->setHeader("Message-Expiry", (int)$options["expiry"]);
502 }
503 $url = \Bitrix\Pull\Config::getPublishUrl($channelId);
504 if(CPullOptions::IsServerShared())
505 {
506 $signature = static::GetSignature($postdata);
507 $url = \CHTTP::urlAddParams($url, ["signature" => $signature]);
508 }
509
510 $httpClient->disableSslVerification();//todo: remove
511
512 $sendResult = $httpClient->query($options["method"], $url, $postdata);
513
514 if ($sendResult)
515 {
516 $result = $options["dont_wait_answer"] ? '{}': $httpClient->getResult();
517 }
518 else
519 {
520 if ($nginx_error == "N")
521 {
522 $nginx_error = Array(
523 'count' => 1,
524 'date' => time(),
525 'date_increment' => time(),
526 );
527 }
528 else if (intval($nginx_error['date_increment'])+1 < time())
529 {
530 $nginx_error['count'] = intval($nginx_error['count'])+1;
531 $nginx_error['date_increment'] = time();
532 }
533 COption::SetOptionString("pull", "nginx_error", serialize($nginx_error));
534 $result = false;
535 }
536
537 return $result;
538 }
539
540 public static function SaveToCache($cacheId, $data)
541 {
542 global $CACHE_MANAGER;
543
544 $CACHE_MANAGER->Clean($cacheId, self::CACHE_TABLE);
545 $CACHE_MANAGER->Read(self::CHANNEL_TTL, $cacheId, self::CACHE_TABLE);
546 $CACHE_MANAGER->SetImmediate($cacheId, $data);
547 }
548
549 public static function UpdateLastId($channelId, $lastId)
550 {
551 global $DB;
552
553 $strSql = "UPDATE b_pull_channel SET LAST_ID = ".intval($lastId)." WHERE CHANNEL_ID = '".$DB->ForSQL($channelId)."'";
554 $DB->Query($strSql);
555
556 return true;
557 }
558
559 // check channels that are older than 12 hours, remove them.
560 public static function CheckExpireAgent()
561 {
562 global $DB;
563
565 $sqlHelper = $connection->getSqlHelper();
566 $sqlDateFunction = $sqlHelper->addSecondsToDateTime(-13 * 3600);
567
568 $strSql = "
569 SELECT USER_ID, CHANNEL_ID, CHANNEL_TYPE
570 FROM b_pull_channel
571 WHERE DATE_CREATE < {$sqlDateFunction}
572 ";
573 $dbRes = $DB->Query($strSql);
574 while ($arRes = $dbRes->Fetch())
575 {
576 $lockId = self::getLockKey((int)$arRes['USER_ID'], $arRes['CHANNEL_TYPE']);
577
578 if ($connection->lock($lockId, 0))
579 {
580 self::DeleteByUser($arRes['USER_ID'], $arRes['CHANNEL_ID'], $arRes['CHANNEL_TYPE']);
581 $connection->unlock($lockId);
582 }
583 }
584
585 return __METHOD__. '();';
586 }
587
588 public static function CheckOnlineChannel()
589 {
590 if (!CPullOptions::GetQueueServerStatus())
591 {
592 return "CPullChannel::CheckOnlineChannel();";
593 }
594
595 $channels = Array();
596
597 $orm = \Bitrix\Pull\ChannelTable::getList([
598 'select' => [
599 'USER_ID',
600 'CHANNEL_ID'
601 ],
602 'filter' => [
603 '=CHANNEL_TYPE' => 'private',
604 '=USER.IS_ONLINE' => 'Y',
605 '=USER.IS_REAL_USER' => 'Y',
606 ]
607 ]);
608
609 while ($res = $orm->fetch())
610 {
611 $channels[$res['CHANNEL_ID']] = $res['USER_ID'];
612 }
613
614 if (count($channels) == 0)
615 {
616 return "CPullChannel::CheckOnlineChannel();";
617 }
618
619 $arOnline = static::getOnlineUsers($channels);
620 if (count($arOnline) > 0)
621 {
622 ksort($arOnline);
623 CUser::SetLastActivityDateByArray($arOnline);
624 }
625
626 return "CPullChannel::CheckOnlineChannel();";
627 }
628
633 private static function getOnlineUsers(array $channels): array
634 {
635 $arOnline = [];
636
637 global $USER;
638 $agentUserId = 0;
639 if (is_object($USER) && $USER->GetId() > 0)
640 {
641 $agentUserId = $USER->GetId();
642 $arOnline[$agentUserId] = $agentUserId;
643 }
644
645 if (\Bitrix\Pull\Config::isJsonRpcUsed())
646 {
647 $userList = array_map("intval", array_values($channels));
648 $result = (new \Bitrix\Pull\JsonRpcTransport())->getUsersLastSeen($userList);
649 if (!$result->isSuccess())
650 {
651 return [];
652 }
653 foreach ($result->getData() as $userId => $lastSeen)
654 {
655 if ($lastSeen == 0)
656 {
657 $arOnline[$userId] = $userId;
658 }
659 }
660 }
661 else
662 {
663 if (\Bitrix\Pull\Config::isProtobufUsed())
664 {
665 $channelsStatus = \Bitrix\Pull\ProtobufTransport::getOnlineChannels(array_keys($channels));
666 }
667 else
668 {
669 $channelsStatus = self::GetOnlineChannels(array_keys($channels));
670 }
671
672 foreach ($channelsStatus as $channelId => $onlineStatus)
673 {
674 $userId = $channels[$channelId];
675 if ($userId == 0 || $agentUserId == $userId)
676 {
677 continue;
678 }
679
680 if ($onlineStatus)
681 {
682 $arOnline[$userId] = $userId;
683 }
684 }
685 }
686
687
688 return $arOnline;
689 }
690
697 public static function GetConfig($userId, $cache = true, $reopen = false, $mobile = false)
698 {
699 $pullConfig = Array();
700
701 if (defined('BX_PULL_SKIP_LS'))
702 $pullConfig['LOCAL_STORAGE'] = 'N';
703
704 if (IsModuleInstalled('bitrix24'))
705 $pullConfig['BITRIX24'] = 'Y';
706
707 if (!CPullOptions::GetQueueServerHeaders())
708 $pullConfig['HEADERS'] = 'N';
709
710 $arChannel = CPullChannel::Get($userId, $cache, $reopen);
711 if (!is_array($arChannel))
712 {
713 return false;
714 }
715
716 $arChannels = [];
717
718 if (CPullOptions::GetQueueServerVersion() > 3)
719 {
720 if ($arChannel["CHANNEL_PUBLIC_ID"])
721 {
722 $arChannels[] = self::SignChannel($arChannel["CHANNEL_ID"].":".$arChannel["CHANNEL_PUBLIC_ID"]);
723 }
724 else
725 {
726 $arChannels[] = self::SignChannel($arChannel["CHANNEL_ID"]);
727 }
728 }
729 else
730 {
731 $arChannels[] = self::SignChannel($arChannel["CHANNEL_ID"]);
732 }
733
734 $nginxStatus = CPullOptions::GetQueueServerStatus();
735 $webSocketStatus = false;
736
737 if ($nginxStatus)
738 {
739 if (defined('BX_PULL_SKIP_WEBSOCKET'))
740 {
741 $pullConfig['WEBSOCKET'] = 'N';
742 }
743 else
744 {
745 $webSocketStatus = CPullOptions::GetWebSocketStatus();
746 }
747
748 if ($userId > 0)
749 {
750 $arChannelShared = CPullChannel::GetShared($cache, $reopen);
751 if (is_array($arChannelShared))
752 {
753 $arChannels[] = self::SignChannel($arChannelShared["CHANNEL_ID"]);
754 $arChannel['CHANNEL_DT'] = $arChannel['CHANNEL_DT'].'/'.$arChannelShared['CHANNEL_DT'];
755 }
756 }
757 }
758
759 $pullPath = ($nginxStatus? (CMain::IsHTTPS()? CPullOptions::GetListenSecureUrl($arChannels): CPullOptions::GetListenUrl($arChannels)): '/bitrix/components/bitrix/pull.request/ajax.php?UPDATE_STATE');
760 $pullPathWs = ($nginxStatus && $webSocketStatus? (CMain::IsHTTPS()? CPullOptions::GetWebSocketSecureUrl($arChannels): CPullOptions::GetWebSocketUrl($arChannels)): '');
761 $pullPathPublish = ($nginxStatus && \CPullOptions::GetPublishWebEnabled()? (CMain::IsHTTPS()? CPullOptions::GetPublishWebSecureUrl($arChannels): CPullOptions::GetPublishWebUrl($arChannels)): '');
762
763 return $pullConfig+Array(
764 'CHANNEL_ID' => implode('/', $arChannels),
765 'CHANNEL_PUBLIC_ID' => CPullOptions::GetQueueServerVersion() > 3 && $arChannel["CHANNEL_PUBLIC_ID"]? self::SignPublicChannel($arChannel["CHANNEL_PUBLIC_ID"]): '',
766 'CHANNEL_DT' => $arChannel['CHANNEL_DT'],
767 'USER_ID' => $userId,
768 'LAST_ID' => $arChannel['LAST_ID'],
769 'PATH' => $pullPath,
770 'PATH_PUB' => $pullPathPublish,
771 'PATH_WS' => $pullPathWs,
772 'PATH_COMMAND' => defined('BX_PULL_COMMAND_PATH')? BX_PULL_COMMAND_PATH: '',
773 'METHOD' => ($nginxStatus? 'LONG': 'PULL'),
774 'REVISION' => PULL_REVISION_WEB,
775 'ERROR' => '',
776 );
777 }
778
779 public static function GetOnlineChannels(array $channels)
780 {
781 $options = array(
782 "method" => "GET",
783 "dont_wait_answer" => false
784 );
785
786 $command = implode('/', array_unique($channels));
787 $serverResult = self::Send($channels, $command, $options);
788
789 $result = [];
790
791 if (is_object($serverResult) && isset($serverResult->infos))
792 {
793 foreach ($serverResult->infos as $info)
794 {
795 $result[$info->channel] = ($info->subscribers > 0);
796 }
797 }
798
799 return $result;
800 }
801
802 private static function getLockKey(int $userId, $channelType): string
803 {
804 return "b_pchc_{$userId}_{$channelType}";
805 }
806
807 public static function sendChannelExpired(int $userId, string $channelType, string $oldChannelId, string $newChannelId): void
808 {
809 $params = [
810 'action' => $channelType === self::TYPE_SHARED ? 'reconnect' : 'get_config',
811 'channel' => [
812 'id' => self::SignChannel($oldChannelId),
813 'type' => $channelType,
814 ],
815 ];
816 if ($userId == 0)
817 {
818 $params['new_channel'] = [
819 'id' => self::SignChannel($newChannelId),
820 'start' => date('c', time()),
821 'end' => date('c', time() + self::CHANNEL_TTL),
822 'type' => $channelType,
823 ];
824 }
825 $arMessage = [
826 'module_id' => 'pull',
827 'command' => 'channel_expire',
828 'params' => $params
829 ];
830
831 CPullStack::AddByChannel($oldChannelId, $arMessage);
832 }
833}
$connection
Определения actionsdefinitions.php:38
if(empty( $fields)) foreach($fields as $field) $channelId
Определения push.php:23
global $APPLICATION
Определения include.php:80
$arResult
Определения generate_coupon.php:16
if(!is_object($USER)||! $USER->IsAuthorized()) $userId
Определения check_mail.php:18
static getConnection($name="")
Определения application.php:638
Определения user.php:48
static getSignatureKey()
Определения config.php:265
static Add($arFields)
Определения admin_notify.php:22
static DeleteByTag($tagId)
Определения admin_notify.php:146
static SetOptionString($module_id, $name, $value="", $desc=false, $site="")
Определения option.php:29
static PrepareData($arPostData, $prefix='')
Определения http.php:118
Определения pull_channel.php:8
static Get(int $userId, $cache=true, $reOpen=false, $channelType=self::TYPE_PRIVATE)
Определения pull_channel.php:46
static GetNewChannelIdByTag(string $tag, string $suffix='')
Определения pull_channel.php:25
static Delete($channelId)
Определения pull_channel.php:280
static GetConfig($userId, $cache=true, $reopen=false, $mobile=false)
Определения pull_channel.php:697
static UpdateLastId($channelId, $lastId)
Определения pull_channel.php:549
static Send($channelId, $message, $options=array())
Определения pull_channel.php:386
static GetChannel($userId, $channelType=self::TYPE_PRIVATE, $cache=true, $reOpen=false)
Определения pull_channel.php:41
static CheckOnlineChannel()
Определения pull_channel.php:588
static SignChannel($channelId)
Определения pull_channel.php:179
const TYPE_PRIVATE
Определения pull_channel.php:9
const TYPE_SHARED
Определения pull_channel.php:10
static GetChannelShared($channelType=self::TYPE_SHARED, $cache=true, $reOpen=false)
Определения pull_channel.php:31
const CHANNEL_TTL
Определения pull_channel.php:12
static GetOnlineChannels(array $channels)
Определения pull_channel.php:779
static GetNewChannelId($suffix='')
Определения pull_channel.php:19
static GetSignature($value, $signatureKey=null)
Определения pull_channel.php:212
static DeleteByUser($userId, $channelId=null, $channelType=self::TYPE_PRIVATE)
Определения pull_channel.php:325
static sendChannelExpired(int $userId, string $channelType, string $oldChannelId, string $newChannelId)
Определения pull_channel.php:807
static CheckExpireAgent()
Определения pull_channel.php:560
static SignPublicChannel($channelId)
Определения pull_channel.php:196
static SaveToCache($cacheId, $data)
Определения pull_channel.php:540
static GetPublicSignature($value)
Определения pull_channel.php:207
static Add(int $userId, string $channelId, string $publicChannelId, string $channelType=self::TYPE_PRIVATE)
Определения pull_channel.php:228
static GetShared($cache=true, $reOpen=false, $channelType=self::TYPE_SHARED)
Определения pull_channel.php:36
global $CACHE_MANAGER
Определения clear_component_cache.php:7
if(!\Bitrix\Main\Loader::includeModule('clouds')) $lastId
Определения sync.php:68
$options
Определения commerceml2.php:49
$data['IS_AVAILABLE']
Определения .description.php:13
$userList
Определения discount_coupon_list.php:276
</td ></tr ></table ></td ></tr >< tr >< td class="bx-popup-label bx-width30"><?=GetMessage("PAGE_NEW_TAGS")?> array( $site)
Определения file_new.php:804
$res
Определения filter_act.php:7
$result
Определения get_property_values.php:14
Send()
Определения idea_email_notify.php:116
Get()
Определения idea_idea_comment.php:22
$_SERVER["DOCUMENT_ROOT"]
Определения cron_frame.php:9
global $DB
Определения cron_frame.php:29
global $USER
Определения csv_new_run.php:40
if($NS['step']==6) if( $NS[ 'step']==7) if(COption::GetOptionInt('main', 'disk_space', 0) > 0) $info
Определения backup.php:924
IsModuleInstalled($module_id)
Определения tools.php:5301
Определения collection.php:2
trait Error
Определения error.php:11
$message
Определения payment.php:8
const PULL_REVISION_WEB
Определения include.php:3
$ar
Определения options.php:199
$i
Определения factura.php:643
</p ></td >< td valign=top style='border-top:none;border-left:none;border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt;padding:0cm 2.0pt 0cm 2.0pt;height:9.0pt'>< p class=Normal align=center style='margin:0cm;margin-bottom:.0001pt;text-align:center;line-height:normal'>< a name=ТекстовоеПоле54 ></a ><?=($taxRate > count( $arTaxList) > 0) ? $taxRate."%"
Определения waybill.php:936
if($inWords) echo htmlspecialcharsbx(Number2Word_Rus(roundEx($totalVatSum $params['CURRENCY']
Определения template.php:799
$arRes
Определения options.php:104
$url
Определения iframe.php:7
$dbRes
Определения yandex_detail.php:168