13 private static bool $isSendingScheduled =
false;
15 private static bool $backgroundContext =
false;
17 private static array $messages = [];
18 private static array $deferredMessages = [];
19 private static array $push = [];
20 private static $error =
false;
26 self::$error =
new Error(__METHOD__,
'EVENT_PARAMETERS_FORMAT', Loc::getMessage(
'PULL_EVENT_PARAMETERS_FORMAT_ERROR'),
$parameters);
31 if ($badUnicodeSymbolsPath)
33 $warning =
'Parameters array contains invalid UTF-8 characters by the path ' . $badUnicodeSymbolsPath;
34 self::$error =
new Error(__METHOD__,
'EVENT_BAD_ENCODING', $warning,
$parameters);
42 self::generateEventsForUsers($recipient,
$parameters, $channelType);
55 self::$error =
new Error(__METHOD__,
'EVENT_PARAMETERS_FORMAT', Loc::getMessage(
'PULL_EVENT_PARAMETERS_FORMAT_ERROR'),
$parameters);
64 if (!is_array($recipient))
66 $recipient = [$recipient];
69 $entities = self::getEntitiesByType($recipient);
70 if ($entities ===
null)
72 self::$error =
new Error(__METHOD__,
'RECIPIENT_FORMAT', Loc::getMessage(
'PULL_EVENT_RECIPIENT_FORMAT_ERROR'), [
73 'recipient' => $recipient,
87 if (empty($entities[
'users']) && empty($entities[
'channels']))
99 $pushParameters =
null;
104 $pushParametersCallback =
$parameters[
'pushParamsCallback'];
109 $pushParametersCallback =
null;
114 self::addMessage(self::$deferredMessages, $entities[
'channels'], $entities[
'users'],
$parameters);
118 self::addMessage(self::$messages, $entities[
'channels'], $entities[
'users'],
$parameters);
121 if (defined(
'BX_CHECK_AGENT_START') && !defined(
'BX_WITH_ON_AFTER_EPILOG'))
127 self::scheduleSending();
130 if ($pushParameters || $pushParametersCallback)
136 if ($pushParametersCallback)
138 $parameters[
'pushParamsCallback'] = $pushParametersCallback;
152 if (isset($destination[$eventCode]))
154 $waitingToReceiveUserList = $destination[$eventCode][
'users'] ?? [];
155 $newUserList = $users ?? [];
156 $destination[$eventCode][
'users'] = array_unique(array_merge($waitingToReceiveUserList, $newUserList));
158 $waitingToReceiveChannelList = $destination[$eventCode][
'channels'] ?? [];
159 $newChannelList = $channels ?? [];
160 $destination[$eventCode][
'channels'] = array_unique(array_merge($waitingToReceiveChannelList, $newChannelList));
164 $destination[$eventCode] = [
166 'users' => array_unique($users),
167 'channels' => array_unique($channels),
174 if (!is_array($recipients))
176 $recipients = [$recipients];
192 foreach ($recipients as $recipient)
194 if (isset($paramsByUser[$recipient]) && is_array($paramsByUser[$recipient]))
197 $userParams[
'params'] = array_merge(
$params, $paramsByUser[$recipient]);
198 self::addEvent($recipient, $userParams, $channelType);
200 $processed[] = $recipient;
204 $left = array_diff($recipients, $processed);
211 private static function addPush($users,
$parameters)
213 if (!\CPullOptions::GetPushStatus())
215 self::$error =
new Error(__METHOD__,
'PUSH_DISABLED', Loc::getMessage(
'PULL_EVENT_PUSH_DISABLED_ERROR'), [
216 'recipient' => $users,
222 if (!is_array($users))
227 foreach ($users as $id =>
$entity)
238 self::$error =
new Error(__METHOD__,
'RECIPIENT_FORMAT', Loc::getMessage(
'PULL_EVENT_RECIPIENT_FORMAT_ERROR'), [
239 'recipient' => $users,
272 $pushCode = self::getParamsCode(
$parameters[
'push']);
273 if (isset(self::$push[$pushCode]))
275 self::$push[$pushCode][
'users'] = array_unique(array_merge(self::$push[$pushCode][
'users'], array_values($users)));
282 self::$push[$pushCode][
'push'] =
$parameters[
'push'];
283 self::$push[$pushCode][
'extra'] =
$parameters[
'extra'];
284 self::$push[$pushCode][
'hasPushCallback'] = $hasPushCallback;
285 self::$push[$pushCode][
'users'] = array_unique(array_values($users));
288 if (defined(
'BX_CHECK_AGENT_START') && !defined(
'BX_WITH_ON_AFTER_EPILOG'))
294 self::scheduleSending();
300 private static function processDeferredMessages()
302 foreach (self::$deferredMessages as $eventCode =>
$message)
304 $callback =
$message[
'event'][
'paramsCallback'];
305 if (Main\
Loader::includeModule($callback[
'module_id']) && method_exists($callback[
'class'], $callback[
'method']))
307 $messageParameters = call_user_func_array([$callback[
'class'], $callback[
'method']], [$callback[
'params']]);
308 self::addMessage(self::$messages,
$message[
'users'],
$message[
'channels'], $messageParameters);
311 self::$deferredMessages = [];
314 private static function executePushEvent(
$parameters)
316 if (!self::$backgroundContext &&
$parameters[
'hasPushCallback'])
324 $callback =
$parameters[
'push'][
'pushParamsCallback'];
325 Main\Loader::includeModule($callback[
'module_id']);
326 if (method_exists($callback[
'class'], $callback[
'method']))
328 $data = call_user_func_array(
344 $data[
'message'] = str_replace(
"\n",
" ", trim(
$data[
'message'] ??
''));
346 $data[
'advanced_params'] =
$data[
'advanced_params'] ?? [];
353 $data[
'send_immediately'] = isset(
$data[
'send_immediately']) &&
$data[
'send_immediately'] ==
'Y' ?
'Y' :
'N';
354 $data[
'important'] = isset(
$data[
'important']) &&
$data[
'important'] ==
'Y' ?
'Y' :
'N';
370 'SKIP_USERS' => isset(
$data[
'skip_users']) && is_array(
$data[
'skip_users']) ?
$data[
'skip_users'] : [],
371 'MESSAGE' =>
$data[
'message'],
372 'EXPIRY' =>
$data[
'expiry'],
373 'PARAMS' =>
$data[
'params'],
374 'ADVANCED_PARAMS' =>
$data[
'advanced_params'],
375 'BADGE' =>
$data[
'badge'],
376 'SOUND' =>
$data[
'sound'],
377 'TAG' =>
$data[
'tag'],
378 'SUB_TAG' =>
$data[
'sub_tag'],
379 'APP_ID' =>
$data[
'app_id'],
380 'SEND_IMMEDIATELY' =>
$data[
'send_immediately'],
381 'IMPORTANT' =>
$data[
'important'],
389 if (self::$backgroundContext)
391 self::processDeferredMessages();
394 $executeResult = static::executeEvents();
395 if (!$executeResult->isSuccess())
397 foreach ($executeResult->getErrors() as $error)
399 $message = $error->getCode() ? $error->getCode() .
": " . $error->getMessage() : $error->getMessage();
400 trigger_error(
"Pull send error; {$message}; remote endpoint: {$executeResult->getRemoteAddress()}", E_USER_WARNING);
404 static::executePushEvents();
412 if (empty(self::$messages))
417 if (!\CPullOptions::GetQueueServerStatus())
419 self::$messages = [];
424 self::fillChannels(self::$messages);
426 if (Config::isJsonRpcUsed())
428 $messageList = self::convertEventsToMessages(self::$messages);
430 if ($sendResult->isSuccess())
432 self::$messages = [];
436 $result->withRemoteAddress($sendResult->getRemoteAddress());
437 $result->addErrors($sendResult->getErrors());
442 if (Config::isProtobufUsed())
445 if (!$sendResult->isSuccess())
447 $result->withRemoteAddress($sendResult->getRemoteAddress());
448 $result->addErrors($sendResult->getErrors());
453 self::sendEventsLegacy();
456 self::$messages = [];
465 foreach (self::$push as $pushCode =>
$event)
470 unset(self::$push[$pushCode]);
475 private static function sendEventsLegacy()
477 foreach (self::$messages as $eventCode =>
$event)
482 $currentHits = ceil(
count(
$event[
'channels']) / \CPullOptions::GetCommandPerHit());
483 $hitCount += $currentHits;
486 $channelCount += $currentChannelCount;
488 $currentMessagesBytes = self::getBytes(
$event[
'event']) + self::getBytes(
$event[
'channels']);
489 $messagesBytes += $currentMessagesBytes;
490 $logs[] =
'Command: ' .
$event[
'event'][
'module_id'] .
'/' .
$event[
'event'][
'command'] .
'; Hits: ' . $currentHits .
'; Channel: ' . $currentChannelCount .
'; Bytes: ' . $currentMessagesBytes .
'';
493 if (empty(
$event[
'channels']))
499 'module_id' =>
$event[
'event'][
'module_id'],
500 'command' =>
$event[
'event'][
'command'],
501 'params' => is_array(
$event[
'event'][
'params']) ?
$event[
'event'][
'params'] : [],
502 'extra' =>
$event[
'event'][
'extra'],
508 unset(self::$messages[$eventCode]);
512 if ($logs && \Bitrix\Pull\Log::isEnabled())
514 if (
count($logs) > 1)
516 $logs[] =
'Total - Hits: ' . $hitCount .
'; Channel: ' . $channelCount .
'; Messages: ' . $messagesCount .
'; Bytes: ' . $messagesBytes .
'';
519 if (
count($logs) > 1 || $hitCount > 1 || $channelCount > 1 || $messagesBytes > 1000)
521 $logTitle =
'!! Pull messages stats - important !!';
525 $logTitle =
'-- Pull messages stats --';
528 \Bitrix\Pull\Log::write(implode(
"\n", $logs), $logTitle);
534 self::scheduleSending();
540 if (self::$isSendingScheduled)
545 self::$isSendingScheduled =
true;
551 self::$backgroundContext =
true;
553 self::$isSendingScheduled =
false;
561 if (!empty($messages[
$key][
'channels']) && is_array($messages[
$key][
'channels']))
563 $messages[
$key][
'channels'] = array_merge($messages[
$key][
'channels'], self::getChannelIds($users,
$message[
'event'][
'channel_type']));
567 $messages[
$key][
'channels'] = self::getChannelIds($users,
$message[
'event'][
'channel_type']);
569 unset(
$message[
'event'][
'channel_type']);
590 $result = array_fill_keys($channels,
null);
591 $orm = \Bitrix\Pull\Model\ChannelTable::getList([
592 'select' => [
'USER_ID',
'CHANNEL_ID',
'USER_ACTIVE' =>
'USER.ACTIVE'],
594 '=CHANNEL_ID' => $channels,
597 while ($row = $orm->fetch())
599 if ($row[
'USER_ID'] > 0 && $row[
'USER_ACTIVE'] !==
'N')
601 $result[$row[
'CHANNEL_ID']] = $row[
'USER_ID'];
605 unset(
$result[$row[
'CHANNEL_ID']]);
616 self::$error =
new Error(__METHOD__,
'EVENT_PARAMETERS_FORMAT', Loc::getMessage(
'PULL_EVENT_PARAMETERS_FORMAT_ERROR'),
$parameters);
627 self::$error =
new Error(__METHOD__,
'EVENT_CALLBACK_FORMAT', Loc::getMessage(
'PULL_EVENT_CALLBACK_FORMAT_ERROR'),
$parameters);
631 if (empty(
$parameters[
'paramsCallback'][
'module_id']))
636 Main\Loader::includeModule(
$parameters[
'paramsCallback'][
'module_id']);
640 self::$error =
new Error(__METHOD__,
'EVENT_CALLBACK_NOT_FOUND', Loc::getMessage(
'PULL_EVENT_CALLBACK_FORMAT_ERROR'),
$parameters);
643 if (!isset(
$parameters[
'paramsCallback'][
'params']))
660 $parameters[
'extra'][
'server_time_unix'] ??= microtime(
true);
662 $parameters[
'extra'][
'server_name'] = Option::get(
'main',
'server_name',
$_SERVER[
'SERVER_NAME']);
677 || empty(
$parameters[
'pushParamsCallback'][
'method'])
680 self::$error =
new Error(__METHOD__,
'EVENT_PUSH_CALLBACK_FORMAT', Loc::getMessage(
'PULL_EVENT_PUSH_CALLBACK_FORMAT_ERROR'),
$parameters);
684 if (empty(
$parameters[
'pushParamsCallback'][
'module_id']))
689 Main\Loader::includeModule(
$parameters[
'pushParamsCallback'][
'module_id']);
691 if (!method_exists(
$parameters[
'pushParamsCallback'][
'class'],
$parameters[
'pushParamsCallback'][
'method']))
693 self::$error =
new Error(__METHOD__,
'EVENT_PUSH_CALLBACK_NOT_FOUND', Loc::getMessage(
'PULL_EVENT_PUSH_CALLBACK_FORMAT_ERROR'),
$parameters);
696 if (!isset(
$parameters[
'pushParamsCallback'][
'params']))
716 self::$error =
new Error(__METHOD__,
'EVENT_PUSH_PARAMETERS_FORMAT', Loc::getMessage(
'PULL_EVENT_PUSH_PARAMETERS_FORMAT_ERROR'),
$parameters);
727 $parameters[
'extra'][
'server_time_unix'] = microtime(
true);
737 return md5(
$params[
'groupId']);
743 unset($paramsWithoutTime[
'extra'][
'server_time']);
744 unset($paramsWithoutTime[
'extra'][
'server_time_unix']);
745 unset($paramsWithoutTime[
'advanced_params'][
'filterCallback']);
747 return serialize($paramsWithoutTime);
751 private static function getEntitiesByType(
array $recipientList): ?
array
758 foreach ($recipientList as
$entity)
765 else if (self::isChannelEntity(
$entity))
780 private static function getBytes($variable)
784 if (is_string($variable))
786 $bytes += mb_strlen($variable);
788 else if (is_array($variable))
790 foreach ($variable as $value)
792 $bytes += self::getBytes($value);
797 $bytes += mb_strlen((
string)$variable);
803 private static function isChannelEntity(
$entity)
812 private static function convertEventsToMessages(
array $events):
array
816 return Message::fromEvent(
$event);
static includeModule($moduleName)