14 private const CACHE_TABLE =
"b_pull_channel";
17 private static array $staticCache = [];
31 public static function GetChannelShared($channelType = self::TYPE_SHARED, $cache =
true, $reOpen =
false)
36 public static function GetShared($cache =
true, $reOpen =
false, $channelType = self::TYPE_SHARED)
38 return self::Get(0, $cache, $reOpen, $channelType);
41 public static function GetChannel(
$userId, $channelType = self::TYPE_PRIVATE, $cache =
true, $reOpen =
false)
46 public static function Get(
int $userId, $cache =
true, $reOpen =
false, $channelType = self::TYPE_PRIVATE)
48 if (!CPullOptions::GetQueueServerStatus())
57 $channelType = (string)$channelType ?: self::TYPE_PRIVATE;
58 $lockId = self::getLockKey(
$userId, $channelType);
60 $cached = self::$staticCache[$lockId];
61 if ($cached && !self::isExpired($cached[
'CHANNEL_DT']))
71 'CHANNEL_PUBLIC_ID' =>
$arResult[
'CHANNEL_PUBLIC_ID'],
72 'CHANNEL_TYPE' =>
$arResult[
'CHANNEL_TYPE'],
76 self::$staticCache[$lockId] =
$result;
84 trigger_error(
"Could not get lock for creating a new channel", E_USER_WARNING);
96 'CHANNEL_PUBLIC_ID' =>
$arResult[
'CHANNEL_PUBLIC_ID'],
97 'CHANNEL_TYPE' =>
$arResult[
'CHANNEL_TYPE'],
101 self::$staticCache[$lockId] =
$result;
131 'CHANNEL_PUBLIC_ID' => $publicChannelId,
132 'CHANNEL_TYPE' => $channelType,
133 'CHANNEL_DT' => time(),
136 self::$staticCache[$lockId] =
$result;
141 private static function getInternal(
int $userId, $channelType = self::TYPE_PRIVATE)
149 CTimeZone::Disable();
151 SELECT C.CHANNEL_ID, C.CHANNEL_PUBLIC_ID, C.CHANNEL_TYPE, ".$DB->DatetimeToTimestampFunction(
'C.DATE_CREATE').
" AS DATE_CREATE, C.LAST_ID
152 FROM b_pull_channel C
153 WHERE C.USER_ID = ".
$userId.
" AND C.CHANNEL_TYPE = '".
$DB->ForSQL($channelType).
"'
163 private static function isUserActive(
int $userId): bool
165 $userData = UserTable::query()
166 ->setSelect([
'ACTIVE'])
171 return $userData && $userData[
'ACTIVE'] ===
'Y';
174 private static function isExpired(
int $timestamp): bool
176 return $timestamp + self::CHANNEL_TTL <= time();
184 trigger_error(
"Channel ID must be the string", E_USER_WARNING);
188 if ($signatureKey ===
"")
199 if ($signatureKey ===
"" || !is_string(
$channelId))
209 return static::GetSignature(
"public:".$value);
218 $signatureAlgo = \CPullOptions::GetSignatureAlgorithm();
220 $hmac->setHashAlgorithm($signatureAlgo);
222 $signer->setKey($signatureKey);
224 return $signer->getSignature($value);
230 $result = new \Bitrix\Main\Result();
235 'CHANNEL_PUBLIC_ID' => $publicChannelId,
236 'CHANNEL_TYPE' => $channelType,
238 'DATE_CREATE' => new \Bitrix\Main\Type\DateTime(),
241 $insertResult = \Bitrix\Pull\ChannelTable::add($channelFields);
242 if (!$insertResult->isSuccess())
244 $result->addErrors($insertResult->getErrors());
250 private static function Update(
int $userId,
string $prevChannelId,
string $channelId,
string $publicChannelId,
string $channelType = self::TYPE_PRIVATE) :\
Bitrix\
Main\Result
252 $result = new \Bitrix\Main\Result();
253 $updateResult = \Bitrix\Pull\ChannelTable::updateByFilter(
256 '=CHANNEL_ID' => $prevChannelId,
257 '=CHANNEL_TYPE' => $channelType,
261 'CHANNEL_PUBLIC_ID' => $publicChannelId,
266 if (!$updateResult->isSuccess())
268 $result->addErrors($updateResult->getErrors());
270 else if ($updateResult->getAffectedRowsCount() != 1)
272 $result->addError(
new \Bitrix\Main\
Error(
"Expected to update 1 row; updated {$updateResult->getAffectedRowsCount()} rows"));
284 $strSql =
"SELECT ID, USER_ID, CHANNEL_TYPE FROM b_pull_channel WHERE CHANNEL_ID = '".$DB->ForSQL(
$channelId).
"'";
288 $strSql =
"DELETE FROM b_pull_channel WHERE USER_ID = ".$arRes[
'USER_ID'].
" AND CHANNEL_TYPE = '".
$DB->ForSql(
$arRes[
'CHANNEL_TYPE']).
"'";
291 $channelType =
$arRes[
'CHANNEL_TYPE'];
294 'action' => $channelType != self::TYPE_PRIVATE?
'reconnect':
'get_config',
297 'type' => $channelType,
300 if ($channelType != self::TYPE_PRIVATE)
307 $params[
'new_channel'] = Array(
308 'id' => self::SignChannel(
$result[
'CHANNEL_ID']),
309 'start' =>
$result[
'CHANNEL_DT'],
310 'end' => date(
'c',
$result[
'CHANNEL_DT']+ self::CHANNEL_TTL),
311 'type' => $channelType,
315 'module_id' =>
'pull',
316 'command' =>
'channel_expire',
319 CPullStack::AddByChannel(
$channelId, $arMessage);
330 if (
$userId == 0 && $channelType == self::TYPE_PRIVATE)
332 $channelType = self::TYPE_SHARED;
337 $strSql =
"SELECT CHANNEL_ID, CHANNEL_TYPE FROM b_pull_channel WHERE USER_ID = ".$userId.
" AND CHANNEL_TYPE = '".
$DB->ForSQL($channelType).
"'";
342 $channelType =
$arRes[
'CHANNEL_TYPE'];
346 if ($channelType ==
'')
347 $channelTypeSql =
"(CHANNEL_TYPE = '' OR CHANNEL_TYPE IS NULL)";
349 $channelTypeSql =
"CHANNEL_TYPE = '".$DB->ForSQL($channelType).
"'";
351 $strSql =
"DELETE FROM b_pull_channel WHERE USER_ID = ".$userId.
" AND ".$channelTypeSql;
355 'action' => $channelType != self::TYPE_PRIVATE?
'reconnect':
'get_config',
358 'type' => $channelType,
361 if ($channelType != self::TYPE_PRIVATE)
368 $params[
'new_channel'] = Array(
369 'id' => self::SignChannel(
$result[
'CHANNEL_ID']),
370 'start' =>
$result[
'CHANNEL_DT'],
371 'end' => date(
'c',
$result[
'CHANNEL_DT']+ self::CHANNEL_TTL),
372 'type' => $channelType,
376 'module_id' =>
'pull',
377 'command' =>
'channel_expire',
381 CPullStack::AddByChannel(
$channelId, $arMessage);
388 $result_start =
'{"infos": ['; $result_end =
']}';
389 if (is_array(
$channelId) && CPullOptions::GetQueueServerVersion() == 1 && !CPullOptions::IsServerShared())
396 $result = json_decode($result_start.implode(
',', $results).$result_end);
400 $commandPerHit = CPullOptions::GetCommandPerHit();
407 if (!isset($arGroup[
$i]))
411 if (
count($arGroup[
$i]) == $commandPerHit)
416 $arGroup[
$i][] = $channel;
419 foreach($arGroup as $channels)
422 $subresult = json_decode(
$result);
423 if (is_array($subresult->infos))
425 $results = array_merge($results, $subresult->infos);
428 $result = json_decode(
'{"infos":'.json_encode($results).
'}');
443 $result = json_decode($result_start.$result.$result_end);
459 $defaultOptions =
array(
462 "dont_wait_answer" =>
true
467 if (!in_array(
$options[
"method"], Array(
'POST',
'GET')))
470 $nginx_error = COption::GetOptionString(
"pull",
"nginx_error",
"N");
471 if ($nginx_error !=
"N")
473 $nginx_error = unserialize($nginx_error, [
"allowed_classes" =>
false]);
474 if (intval($nginx_error[
'date'])+120 < time())
476 COption::SetOptionString(
"pull",
"nginx_error",
"N");
480 else if ($nginx_error[
'count'] >= 10)
483 "MESSAGE" => Loc::getMessage(
'PULL_ERROR_SEND'),
484 "TAG" =>
"PULL_ERROR_SEND",
485 "MODULE_ID" =>
"pull",
494 $httpClient = new \Bitrix\Main\Web\HttpClient([
495 "socketTimeout" => (
int)
$options[
"timeout"],
496 "streamTimeout" => (
int)
$options[
"timeout"],
497 "waitResponse" => !
$options[
"dont_wait_answer"]
501 $httpClient->setHeader(
"Message-Expiry", (
int)
$options[
"expiry"]);
504 if(CPullOptions::IsServerShared())
506 $signature = static::GetSignature($postdata);
507 $url = \CHTTP::urlAddParams(
$url, [
"signature" => $signature]);
510 $httpClient->disableSslVerification();
512 $sendResult = $httpClient->query(
$options[
"method"],
$url, $postdata);
516 $result =
$options[
"dont_wait_answer"] ?
'{}': $httpClient->getResult();
520 if ($nginx_error ==
"N")
522 $nginx_error = Array(
525 'date_increment' => time(),
528 else if (intval($nginx_error[
'date_increment'])+1 < time())
530 $nginx_error[
'count'] = intval($nginx_error[
'count'])+1;
531 $nginx_error[
'date_increment'] = time();
545 $CACHE_MANAGER->Read(self::CHANNEL_TTL, $cacheId, self::CACHE_TABLE);
553 $strSql =
"UPDATE b_pull_channel SET LAST_ID = ".intval(
$lastId).
" WHERE CHANNEL_ID = '".
$DB->ForSQL(
$channelId).
"'";
566 $sqlDateFunction = $sqlHelper->addSecondsToDateTime(-13 * 3600);
569 SELECT USER_ID, CHANNEL_ID, CHANNEL_TYPE
571 WHERE DATE_CREATE < {$sqlDateFunction}
576 $lockId = self::getLockKey((
int)
$arRes[
'USER_ID'],
$arRes[
'CHANNEL_TYPE']);
585 return __METHOD__.
'();';
590 if (!CPullOptions::GetQueueServerStatus())
592 return "CPullChannel::CheckOnlineChannel();";
597 $orm = \Bitrix\Pull\ChannelTable::getList([
603 '=CHANNEL_TYPE' =>
'private',
604 '=USER.IS_ONLINE' =>
'Y',
605 '=USER.IS_REAL_USER' =>
'Y',
609 while (
$res = $orm->fetch())
611 $channels[
$res[
'CHANNEL_ID']] =
$res[
'USER_ID'];
614 if (
count($channels) == 0)
616 return "CPullChannel::CheckOnlineChannel();";
619 $arOnline = static::getOnlineUsers($channels);
620 if (
count($arOnline) > 0)
623 CUser::SetLastActivityDateByArray($arOnline);
626 return "CPullChannel::CheckOnlineChannel();";
633 private static function getOnlineUsers(
array $channels):
array
641 $agentUserId =
$USER->GetId();
642 $arOnline[$agentUserId] = $agentUserId;
645 if (\Bitrix\Pull\Config::isJsonRpcUsed())
647 $userList = array_map(
"intval", array_values($channels));
648 $result = (new \Bitrix\Pull\JsonRpcTransport())->getUsersLastSeen(
$userList);
663 if (\Bitrix\Pull\Config::isProtobufUsed())
665 $channelsStatus = \Bitrix\Pull\ProtobufTransport::getOnlineChannels(array_keys($channels));
672 foreach ($channelsStatus as
$channelId => $onlineStatus)
699 $pullConfig = Array();
701 if (defined(
'BX_PULL_SKIP_LS'))
702 $pullConfig[
'LOCAL_STORAGE'] =
'N';
705 $pullConfig[
'BITRIX24'] =
'Y';
707 if (!CPullOptions::GetQueueServerHeaders())
708 $pullConfig[
'HEADERS'] =
'N';
711 if (!is_array($arChannel))
718 if (CPullOptions::GetQueueServerVersion() > 3)
720 if ($arChannel[
"CHANNEL_PUBLIC_ID"])
722 $arChannels[] =
self::SignChannel($arChannel[
"CHANNEL_ID"].
":".$arChannel[
"CHANNEL_PUBLIC_ID"]);
734 $nginxStatus = CPullOptions::GetQueueServerStatus();
735 $webSocketStatus =
false;
739 if (defined(
'BX_PULL_SKIP_WEBSOCKET'))
741 $pullConfig[
'WEBSOCKET'] =
'N';
745 $webSocketStatus = CPullOptions::GetWebSocketStatus();
751 if (is_array($arChannelShared))
754 $arChannel[
'CHANNEL_DT'] = $arChannel[
'CHANNEL_DT'].
'/'.$arChannelShared[
'CHANNEL_DT'];
759 $pullPath = ($nginxStatus? (CMain::IsHTTPS()? CPullOptions::GetListenSecureUrl($arChannels): CPullOptions::GetListenUrl($arChannels)):
'/bitrix/components/bitrix/pull.request/ajax.php?UPDATE_STATE');
760 $pullPathWs = ($nginxStatus && $webSocketStatus? (CMain::IsHTTPS()? CPullOptions::GetWebSocketSecureUrl($arChannels): CPullOptions::GetWebSocketUrl($arChannels)):
'');
761 $pullPathPublish = ($nginxStatus && \CPullOptions::GetPublishWebEnabled()? (CMain::IsHTTPS()? CPullOptions::GetPublishWebSecureUrl($arChannels): CPullOptions::GetPublishWebUrl($arChannels)):
'');
763 return $pullConfig+Array(
764 'CHANNEL_ID' => implode(
'/', $arChannels),
765 'CHANNEL_PUBLIC_ID' => CPullOptions::GetQueueServerVersion() > 3 && $arChannel[
"CHANNEL_PUBLIC_ID"]? self::SignPublicChannel($arChannel[
"CHANNEL_PUBLIC_ID"]):
'',
766 'CHANNEL_DT' => $arChannel[
'CHANNEL_DT'],
768 'LAST_ID' => $arChannel[
'LAST_ID'],
770 'PATH_PUB' => $pullPathPublish,
771 'PATH_WS' => $pullPathWs,
772 'PATH_COMMAND' => defined(
'BX_PULL_COMMAND_PATH')? BX_PULL_COMMAND_PATH:
'',
773 'METHOD' => ($nginxStatus?
'LONG':
'PULL'),
783 "dont_wait_answer" =>
false
786 $command = implode(
'/', array_unique($channels));
791 if (is_object($serverResult) && isset($serverResult->infos))
793 foreach ($serverResult->infos as
$info)
802 private static function getLockKey(
int $userId, $channelType): string
804 return "b_pchc_{$userId}_{$channelType}";
810 'action' => $channelType === self::TYPE_SHARED ?
'reconnect' :
'get_config',
813 'type' => $channelType,
820 'start' => date(
'c', time()),
821 'end' => date(
'c', time() + self::CHANNEL_TTL),
822 'type' => $channelType,
826 'module_id' =>
'pull',
827 'command' =>
'channel_expire',
831 CPullStack::AddByChannel($oldChannelId, $arMessage);
if(empty( $fields)) foreach($fields as $field) $channelId
if(!is_object($USER)||! $USER->IsAuthorized()) $userId
static getConnection($name="")
static DeleteByTag($tagId)
static SetOptionString($module_id, $name, $value="", $desc=false, $site="")
static PrepareData($arPostData, $prefix='')
static Get(int $userId, $cache=true, $reOpen=false, $channelType=self::TYPE_PRIVATE)
static GetNewChannelIdByTag(string $tag, string $suffix='')
static Delete($channelId)
static GetConfig($userId, $cache=true, $reopen=false, $mobile=false)
static UpdateLastId($channelId, $lastId)
static Send($channelId, $message, $options=array())
static GetChannel($userId, $channelType=self::TYPE_PRIVATE, $cache=true, $reOpen=false)
static CheckOnlineChannel()
static SignChannel($channelId)
static GetChannelShared($channelType=self::TYPE_SHARED, $cache=true, $reOpen=false)
static GetOnlineChannels(array $channels)
static GetNewChannelId($suffix='')
static GetSignature($value, $signatureKey=null)
static DeleteByUser($userId, $channelId=null, $channelType=self::TYPE_PRIVATE)
static sendChannelExpired(int $userId, string $channelType, string $oldChannelId, string $newChannelId)
static CheckExpireAgent()
static SignPublicChannel($channelId)
static SaveToCache($cacheId, $data)
static GetPublicSignature($value)
static Add(int $userId, string $channelId, string $publicChannelId, string $channelType=self::TYPE_PRIVATE)
static GetShared($cache=true, $reOpen=false, $channelType=self::TYPE_SHARED)
</td ></tr ></table ></td ></tr >< tr >< td class="bx-popup-label bx-width30"><?=GetMessage("PAGE_NEW_TAGS")?> array( $site)
$_SERVER["DOCUMENT_ROOT"]
if($NS['step']==6) if( $NS[ 'step']==7) if(COption::GetOptionInt('main', 'disk_space', 0) > 0) $info
IsModuleInstalled($module_id)
</p ></td >< td valign=top style='border-top:none;border-left:none;border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt;padding:0cm 2.0pt 0cm 2.0pt;height:9.0pt'>< p class=Normal align=center style='margin:0cm;margin-bottom:.0001pt;text-align:center;line-height:normal'>< a name=ТекстовоеПоле54 ></a ><?=($taxRate > count( $arTaxList) > 0) ? $taxRate."%"
if($inWords) echo htmlspecialcharsbx(Number2Word_Rus(roundEx($totalVatSum $params['CURRENCY']