1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
protobuftransport.php
См. документацию.
1<?php
2
3namespace Bitrix\Pull;
4
5use Bitrix\Main\Config\Option;
6use Bitrix\Main\Web\HttpClient;
7use Bitrix\Pull\Protobuf;
8use Protobuf\MessageCollection;
9
11{
12 protected $hits = 0;
13 protected $bytes = 0;
14
19 {
21
22 $protobufMessages = static::convertMessages($messages);
23 $requests = static::createRequests($protobufMessages);
24 $requestBatches = static::createRequestBatches($requests);
25
26 $queueServerUrl = $options['serverUrl'] ?? Config::getPublishUrl();
27 $result->withRemoteAddress($queueServerUrl);
28
29 $queueServerUrl = \CHTTP::urlAddParams($queueServerUrl, [
30 "binaryMode" => "true",
31 "hostname" => Config::getHostname(),
32 ]);
33 foreach ($requestBatches as $requestBatch)
34 {
35 $urlWithSignature = $queueServerUrl;
36 $httpClient = new HttpClient(["streamTimeout" => 1]);
37 $bodyStream = $requestBatch->toStream();
38 if(\CPullOptions::IsServerShared())
39 {
40 $signature = \CPullChannel::GetSignature($bodyStream->getContents());
41 $urlWithSignature = \CHTTP::urlAddParams($urlWithSignature, ["signature" => $signature]);
42 }
43
44 $httpClient->disableSslVerification();
45 $sendResult = $httpClient->query(HttpClient::HTTP_POST, $urlWithSignature, $bodyStream);
46 if (!$sendResult)
47 {
48 $errorCode = array_key_first($httpClient->getError());
49 $errorMsg = $httpClient->getError()[$errorCode];
50 $result->addError(new \Bitrix\Main\Error($errorMsg, $errorCode));
51 }
52
53 if (Option::get('pull', 'pull_log_196916_enable', 'N') === 'Y')
54 {
55 self::logMessageSending($httpClient, $requestBatch);
56 }
57 }
58
59 return $result;
60 }
61
62 protected static function logMessageSending(HttpClient $httpClient, Protobuf\RequestBatch $batch): void
63 {
64 $status = $httpClient->getStatus();
65 $result = $httpClient->getResult();
66 $url = $httpClient->getEffectiveUrl();
67 $logId = 'pull-196916';
68 $channels = [];
69
70 $requestList = $batch->getRequestsList() ?? [];
72 foreach ($requestList[0]?->getIncomingMessages()?->getMessagesList() ?? [] as $message)
73 {
74 $receiversList = $message->getReceiversList() ?? [];
76 foreach ($receiversList as $receiver)
77 {
78 $channel = bin2hex($receiver->getId()?->getContents() ?? '');
79 $channels[$channel] = $channel;
80 }
81 }
82 $channelsString = implode(',', array_values($channels));
83
84 $message =
85 "logId: {$logId}\n"
86 . "status: {$status}\n"
87 . "result: {$result}\n"
88 . "url: {$url}\n"
89 . "channels: {$channelsString}\n"
90 ;
91
92 AddMessage2Log($message, 'pull', 0);
93 }
94
100 public static function getOnlineChannels(array $channels)
101 {
102 $result = [];
103 $maxChannelsPerRequest = \CPullOptions::GetMaxChannelsPerRequest();
104 $channelBatches = [];
105 $currentChannelBatch = 0;
106 $requestsInChannelBatch = 0;
107 foreach ($channels as $channelId)
108 {
109 $channel = new Protobuf\ChannelId();
110 $channel->setId(hex2bin($channelId));
111 $channel->setIsPrivate(true);
112
113 $requestsInChannelBatch++;
114
115 if($requestsInChannelBatch >= $maxChannelsPerRequest)
116 {
117 $currentChannelBatch++;
118 $requestsInChannelBatch = 1;
119 }
120 $channelBatches[$currentChannelBatch][] = $channel;
121 }
122
123 $requests = [];
124 foreach ($channelBatches as $channelBatchNumber => $channelBatch)
125 {
126 $channelsStatsRequest = new Protobuf\ChannelStatsRequest();
127 $channelsStatsRequest->setChannelsList(new MessageCollection($channelBatch));
128
129 $request = new Protobuf\Request();
130 $request->setChannelStats($channelsStatsRequest);
131 $requests[] = $request;
132 }
133
134 $queueServerUrl = \CHTTP::urlAddParams(Config::getPublishUrl(), [
135 "binaryMode" => "true",
136 "hostname" => Config::getHostname(),
137 ]);
138
139 $requestBatches = static::createRequestBatches($requests);
140 foreach ($requestBatches as $requestBatch)
141 {
142 $http = new HttpClient();
143 $http->disableSslVerification();
144
145 $urlWithSignature = $queueServerUrl;
146 $bodyStream = $requestBatch->toStream();
147 if(\CPullOptions::IsServerShared())
148 {
149 $signature = \CPullChannel::GetSignature($bodyStream->getContents());
150 $urlWithSignature = \CHTTP::urlAddParams($urlWithSignature, ["signature" => $signature]);
151 }
152
153 $binaryResponse = $http->post($urlWithSignature, $bodyStream);
154
155 if($http->getStatus() != 200)
156 {
157 return [];
158 }
159 if(strlen($binaryResponse) == 0)
160 {
161 return [];
162 }
163
164 try
165 {
166 $responseBatch = Protobuf\ResponseBatch::fromStream($binaryResponse);
167 }
168 catch (\Exception $e)
169 {
170 return [];
171 }
172 $responses = $responseBatch->getResponsesList();
173
174 $response = $responses[0];
175 if(!($response instanceof Protobuf\Response))
176 {
177 return[];
178 }
179
180 if ($response->hasChannelStats())
181 {
182 $stats = $response->getChannelStats();
184 foreach ($stats->getChannelsList() as $channel)
185 {
186 if($channel->getIsOnline())
187 {
188 $channelId = bin2hex($channel->getId());
189 $result[$channelId] = true;
190 }
191 }
192 }
193 }
194
195 return $result;
196 }
197
202 protected static function convertMessages(array $messages)
203 {
204 $result = [];
205
206 foreach ($messages as $message)
207 {
208 $event = $message['event'] ?? null;
209 if(!is_array($message['channels']) || count($message['channels']) == 0 || !isset($event['module_id']) || !isset($event['command']))
210 {
211 continue;
212 }
213
214 $result = array_merge($result, static::convertMessage($message['channels'], $event));
215 }
216
217 return $result;
218 }
219
226 protected static function convertMessage(array $channels, array $event)
227 {
228 $result = [];
229
230 $extra = is_array($event['extra']) ? $event['extra'] : [];
231
233 'module_id' => $event['module_id'],
234 'command' => $event['command'],
235 'params' => $event['params'] ?: [],
236 'extra' => $extra
237 ));
238
239 // for statistics
240 $messageType = "{$event['module_id']}_{$event['command']}";
241 $messageType = preg_replace("/[^\w]/", "", $messageType);
242
243 $maxChannelsPerRequest = \CPullOptions::GetMaxChannelsPerRequest();
244 $receivers = [];
245 foreach ($channels as $channel)
246 {
247 $receiver = new Protobuf\Receiver();
248 $receiver->setIsPrivate(true);
249 $receiver->setId(hex2bin($channel));
250 $receivers[] = $receiver;
251
252 if(count($receivers) === $maxChannelsPerRequest)
253 {
255 $message->setReceiversList(new MessageCollection($receivers));
256 $message->setExpiry($event['expiry']);
257 $message->setBody($body);
258 $message->setType($messageType); // for statistics
259
260 $result[] = $message;
261 $receivers = [];
262 }
263 }
264
265 if(count($receivers) > 0)
266 {
268 $message->setReceiversList(new MessageCollection($receivers));
269 $message->setExpiry($event['expiry']);
270 $message->setBody($body);
271 $message->setType($messageType); // for statistics
272
273 $result[] = $message;
274 }
275
276 return $result;
277 }
278
283 protected static function createRequestBatches(array $requests)
284 {
285 $result = [];
286 foreach ($requests as $request)
287 {
288 $batch = new Protobuf\RequestBatch();
289 $batch->addRequests($request);
290 $result[] = $batch;
291 }
292
293 return $result;
294 }
295
300 protected static function createRequests(array $messages)
301 {
302 $result = [];
303
304 $maxPayload = \CPullOptions::GetMaxPayload() - 200;
305 $maxMessages = \CPullOptions::GetMaxMessagesPerRequest();
306
307 $currentMessageBatch = [];
308 $currentBatchSize = 0;
309
310 foreach ($messages as $message)
311 {
312 $messageSize = static::getMessageSize($message);
313 if($currentBatchSize + $messageSize >= $maxPayload || count($currentMessageBatch) >= $maxMessages)
314 {
315 // finalize current request and start a new one
316 $incomingMessagesRequest = new Protobuf\IncomingMessagesRequest();
317 $incomingMessagesRequest->setMessagesList(new MessageCollection($currentMessageBatch));
319 $request->setIncomingMessages($incomingMessagesRequest);
320 $result[] = $request;
321
322 $currentMessageBatch = [];
323 $messageSize = 0;
324 }
325
326 // add the request to the current batch
327 $currentMessageBatch[] = $message;
328 $currentBatchSize += $messageSize;
329 }
330
331 if(!empty($currentMessageBatch))
332 {
333 $incomingMessagesRequest = new Protobuf\IncomingMessagesRequest();
334 $incomingMessagesRequest->setMessagesList(new MessageCollection($currentMessageBatch));
336 $request->setIncomingMessages($incomingMessagesRequest);
337 $result[] = $request;
338 }
339
340 return $result;
341 }
342
348 protected static function splitReceivers(Protobuf\IncomingMessage $message, $maxReceivers)
349 {
350 $receivers = $message->getReceiversList();
351 if(count($receivers) <= $maxReceivers)
352 {
353 return [$message];
354 }
355
356 $result = [];
357 $currentReceivers = [];
358
359 foreach ($receivers as $receiver)
360 {
361 if(count($currentReceivers) == $maxReceivers)
362 {
363 $subMessage = new Protobuf\IncomingMessage();
364 $subMessage->setBody($message->getBody());
365 $subMessage->setExpiry($message->getExpiry());
366 $subMessage->setReceiversList(new MessageCollection($currentReceivers));
367 $result[] = $subMessage;
368 $currentReceivers = [];
369 }
370
371 $currentReceivers[] = $receiver;
372 }
373
374 if(count($currentReceivers) > 0)
375 {
376 $subMessage = new Protobuf\IncomingMessage();
377 $subMessage->setBody($message->getBody());
378 $subMessage->setExpiry($message->getExpiry());
379 $subMessage->setReceiversList(new MessageCollection($currentReceivers));
380 $result[] = $subMessage;
381 }
382
383 return $result;
384 }
385
387 {
388 $config = \Protobuf\Configuration::getInstance();
389 return $message->serializedSize($config->createComputeSizeContext());
390 }
391}
if(empty( $fields)) foreach($fields as $field) $channelId
Определения push.php:23
if(!Loader::includeModule('catalog')) if(!AccessController::getCurrent() ->check(ActionDictionary::ACTION_PRICE_EDIT)) if(!check_bitrix_sessid()) $request
Определения catalog_reindex.php:36
Определения error.php:15
getEffectiveUrl()
Определения httpclient.php:673
static jsonEncode($params)
Определения common.php:6
static getMessageSize(Protobuf\IncomingMessage $message)
Определения protobuftransport.php:386
static createRequests(array $messages)
Определения protobuftransport.php:300
static sendMessages(array $messages, array $options=[])
Определения protobuftransport.php:18
static convertMessage(array $channels, array $event)
Определения protobuftransport.php:226
static convertMessages(array $messages)
Определения protobuftransport.php:202
static splitReceivers(Protobuf\IncomingMessage $message, $maxReceivers)
Определения protobuftransport.php:348
static createRequestBatches(array $requests)
Определения protobuftransport.php:283
static urlAddParams($url, $add_params, $options=[])
Определения http.php:521
static GetSignature($value, $signatureKey=null)
Определения pull_channel.php:212
$options
Определения commerceml2.php:49
</td ></tr ></table ></td ></tr >< tr >< td class="bx-popup-label bx-width30"><?=GetMessage("PAGE_NEW_TAGS")?> array( $site)
Определения file_new.php:804
$result
Определения get_property_values.php:14
$status
Определения session.php:10
AddMessage2Log($text, $module='', $traceDepth=6, $showArgs=false)
Определения tools.php:3941
$message
Определения payment.php:8
$event
Определения prolog_after.php:141
$config
Определения quickway.php:69
$errorMsg
Определения refund.php:16
</p ></td >< td valign=top style='border-top:none;border-left:none;border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt;padding:0cm 2.0pt 0cm 2.0pt;height:9.0pt'>< p class=Normal align=center style='margin:0cm;margin-bottom:.0001pt;text-align:center;line-height:normal'>< a name=ТекстовоеПоле54 ></a ><?=($taxRate > count( $arTaxList) > 0) ? $taxRate."%"
Определения waybill.php:936
$messages
Определения template.php:8
$response
Определения result.php:21
$url
Определения iframe.php:7