22 $protobufMessages = static::convertMessages(
$messages);
23 $requests = static::createRequests($protobufMessages);
24 $requestBatches = static::createRequestBatches($requests);
26 $queueServerUrl =
$options[
'serverUrl'] ?? Config::getPublishUrl();
27 $result->withRemoteAddress($queueServerUrl);
30 "binaryMode" =>
"true",
31 "hostname" => Config::getHostname(),
33 foreach ($requestBatches as $requestBatch)
35 $urlWithSignature = $queueServerUrl;
36 $httpClient =
new HttpClient([
"streamTimeout" => 1]);
37 $bodyStream = $requestBatch->toStream();
38 if(\CPullOptions::IsServerShared())
44 $httpClient->disableSslVerification();
45 $sendResult = $httpClient->query(HttpClient::HTTP_POST, $urlWithSignature, $bodyStream);
48 $errorCode = array_key_first($httpClient->getError());
49 $errorMsg = $httpClient->getError()[$errorCode];
53 if (Option::get(
'pull',
'pull_log_196916_enable',
'N') ===
'Y')
55 self::logMessageSending($httpClient, $requestBatch);
67 $logId =
'pull-196916';
70 $requestList = $batch->getRequestsList() ?? [];
72 foreach ($requestList[0]?->getIncomingMessages()?->getMessagesList() ?? [] as
$message)
74 $receiversList =
$message->getReceiversList() ?? [];
76 foreach ($receiversList as $receiver)
78 $channel = bin2hex($receiver->getId()?->getContents() ??
'');
79 $channels[$channel] = $channel;
82 $channelsString = implode(
',', array_values($channels));
86 .
"status: {$status}\n"
87 .
"result: {$result}\n"
89 .
"channels: {$channelsString}\n"
100 public static function getOnlineChannels(
array $channels)
103 $maxChannelsPerRequest = \CPullOptions::GetMaxChannelsPerRequest();
104 $channelBatches = [];
105 $currentChannelBatch = 0;
106 $requestsInChannelBatch = 0;
109 $channel =
new Protobuf\ChannelId();
111 $channel->setIsPrivate(
true);
113 $requestsInChannelBatch++;
115 if($requestsInChannelBatch >= $maxChannelsPerRequest)
117 $currentChannelBatch++;
118 $requestsInChannelBatch = 1;
120 $channelBatches[$currentChannelBatch][] = $channel;
124 foreach ($channelBatches as $channelBatchNumber => $channelBatch)
126 $channelsStatsRequest =
new Protobuf\ChannelStatsRequest();
127 $channelsStatsRequest->setChannelsList(
new MessageCollection($channelBatch));
130 $request->setChannelStats($channelsStatsRequest);
134 $queueServerUrl = \CHTTP::urlAddParams(Config::getPublishUrl(), [
135 "binaryMode" =>
"true",
136 "hostname" => Config::getHostname(),
139 $requestBatches = static::createRequestBatches($requests);
140 foreach ($requestBatches as $requestBatch)
142 $http =
new HttpClient();
143 $http->disableSslVerification();
145 $urlWithSignature = $queueServerUrl;
146 $bodyStream = $requestBatch->toStream();
147 if(\CPullOptions::IsServerShared())
149 $signature = \CPullChannel::GetSignature($bodyStream->getContents());
150 $urlWithSignature = \CHTTP::urlAddParams($urlWithSignature, [
"signature" => $signature]);
153 $binaryResponse = $http->post($urlWithSignature, $bodyStream);
155 if($http->getStatus() != 200)
159 if(strlen($binaryResponse) == 0)
166 $responseBatch = Protobuf\ResponseBatch::fromStream($binaryResponse);
168 catch (\Exception $e)
172 $responses = $responseBatch->getResponsesList();
175 if(!(
$response instanceof Protobuf\Response))
184 foreach ($stats->getChannelsList() as $channel)
186 if($channel->getIsOnline())
230 $extra = is_array(
$event[
'extra']) ?
$event[
'extra'] : [];
233 'module_id' =>
$event[
'module_id'],
234 'command' =>
$event[
'command'],
235 'params' =>
$event[
'params'] ?: [],
240 $messageType =
"{$event['module_id']}_{$event['command']}";
241 $messageType = preg_replace(
"/[^\w]/",
"", $messageType);
243 $maxChannelsPerRequest = \CPullOptions::GetMaxChannelsPerRequest();
245 foreach ($channels as $channel)
248 $receiver->setIsPrivate(
true);
249 $receiver->setId(hex2bin($channel));
250 $receivers[] = $receiver;
252 if(
count($receivers) === $maxChannelsPerRequest)
255 $message->setReceiversList(
new MessageCollection($receivers));
265 if(
count($receivers) > 0)
268 $message->setReceiversList(
new MessageCollection($receivers));
304 $maxPayload = \CPullOptions::GetMaxPayload() - 200;
305 $maxMessages = \CPullOptions::GetMaxMessagesPerRequest();
307 $currentMessageBatch = [];
308 $currentBatchSize = 0;
312 $messageSize = static::getMessageSize(
$message);
313 if($currentBatchSize + $messageSize >= $maxPayload ||
count($currentMessageBatch) >= $maxMessages)
317 $incomingMessagesRequest->setMessagesList(
new MessageCollection($currentMessageBatch));
319 $request->setIncomingMessages($incomingMessagesRequest);
322 $currentMessageBatch = [];
328 $currentBatchSize += $messageSize;
331 if(!empty($currentMessageBatch))
334 $incomingMessagesRequest->setMessagesList(
new MessageCollection($currentMessageBatch));
336 $request->setIncomingMessages($incomingMessagesRequest);
350 $receivers =
$message->getReceiversList();
351 if(
count($receivers) <= $maxReceivers)
357 $currentReceivers = [];
359 foreach ($receivers as $receiver)
361 if(
count($currentReceivers) == $maxReceivers)
364 $subMessage->setBody(
$message->getBody());
365 $subMessage->setExpiry(
$message->getExpiry());
366 $subMessage->setReceiversList(
new MessageCollection($currentReceivers));
368 $currentReceivers = [];
371 $currentReceivers[] = $receiver;
374 if(
count($currentReceivers) > 0)
377 $subMessage->setBody(
$message->getBody());
378 $subMessage->setExpiry(
$message->getExpiry());
379 $subMessage->setReceiversList(
new MessageCollection($currentReceivers));
388 $config = \Protobuf\Configuration::getInstance();