23 $protobufMessages = static::convertMessages($messages);
24 $requests = static::createRequests($protobufMessages);
25 $requestBatches = static::createRequestBatches($requests);
27 $queueServerUrl = $options[
'serverUrl'] ?? Config::getPublishUrl();
28 $result->withRemoteAddress($queueServerUrl);
30 $queueServerUrl = \CHTTP::urlAddParams($queueServerUrl, [
"binaryMode" =>
"true"]);
31 foreach ($requestBatches as $requestBatch)
33 $urlWithSignature = $queueServerUrl;
34 $httpClient =
new HttpClient([
"streamTimeout" => 1]);
35 $bodyStream = $requestBatch->toStream();
36 if(\CPullOptions::IsServerShared())
38 $signature = \CPullChannel::GetSignature($bodyStream->getContents());
39 $urlWithSignature = \CHTTP::urlAddParams($urlWithSignature, [
"signature" => $signature]);
42 $httpClient->disableSslVerification();
43 $sendResult = $httpClient->query(HttpClient::HTTP_POST, $urlWithSignature, $bodyStream);
46 $errorCode = array_key_first($httpClient->getError());
47 $errorMsg = $httpClient->getError()[$errorCode];
48 $result->addError(
new \
Bitrix\Main\
Error($errorMsg, $errorCode));
60 public static function getOnlineChannels(array $channels)
63 $maxChannelsPerRequest = \CPullOptions::GetMaxChannelsPerRequest();
65 $currentChannelBatch = 0;
66 $requestsInChannelBatch = 0;
67 foreach ($channels as $channelId)
70 $channel->setId(hex2bin($channelId));
71 $channel->setIsPrivate(
true);
73 $requestsInChannelBatch++;
75 if($requestsInChannelBatch >= $maxChannelsPerRequest)
77 $currentChannelBatch++;
78 $requestsInChannelBatch = 1;
80 $channelBatches[$currentChannelBatch][] = $channel;
84 foreach ($channelBatches as $channelBatchNumber => $channelBatch)
86 $channelsStatsRequest =
new Protobuf\ChannelStatsRequest();
87 $channelsStatsRequest->setChannelsList(
new MessageCollection($channelBatch));
89 $request =
new Protobuf\Request();
90 $request->setChannelStats($channelsStatsRequest);
91 $requests[] = $request;
94 $queueServerUrl = \CHTTP::urlAddParams(Config::getPublishUrl(), [
"binaryMode" =>
"true"]);
96 $requestBatches = static::createRequestBatches($requests);
97 foreach ($requestBatches as $requestBatch)
99 $http =
new HttpClient();
100 $http->disableSslVerification();
102 $urlWithSignature = $queueServerUrl;
103 $bodyStream = $requestBatch->toStream();
104 if(\CPullOptions::IsServerShared())
106 $signature = \CPullChannel::GetSignature($bodyStream->getContents());
107 $urlWithSignature = \CHTTP::urlAddParams($urlWithSignature, [
"signature" => $signature]);
110 $binaryResponse = $http->post($urlWithSignature, $bodyStream);
112 if($http->getStatus() != 200)
116 if(strlen($binaryResponse) == 0)
123 $responseBatch = Protobuf\ResponseBatch::fromStream($binaryResponse);
125 catch (\Exception $e)
129 $responses = $responseBatch->getResponsesList();
131 $response = $responses[0];
132 if(!($response instanceof Protobuf\Response))
137 if ($response->hasChannelStats())
139 $stats = $response->getChannelStats();
141 foreach ($stats->getChannelsList() as $channel)
143 if($channel->getIsOnline())
145 $channelId = bin2hex($channel->getId());
146 $result[$channelId] =
true;
163 foreach ($messages as $message)
165 $event = $message[
'event'] ??
null;
166 if(!is_array($message[
'channels']) || count($message[
'channels']) == 0 || !isset($event[
'module_id']) || !isset($event[
'command']))
171 $result = array_merge($result, static::convertMessage($message[
'channels'], $event));
187 $extra = is_array($event[
'extra']) ? $event[
'extra'] : [];
190 'module_id' => $event[
'module_id'],
191 'command' => $event[
'command'],
192 'params' => $event[
'params'] ?: [],
197 $messageType =
"{$event['module_id']}_{$event['command']}";
198 $messageType = preg_replace(
"/[^\w]/",
"", $messageType);
200 $maxChannelsPerRequest = \CPullOptions::GetMaxChannelsPerRequest();
202 foreach ($channels as $channel)
205 $receiver->setIsPrivate(
true);
206 $receiver->setId(hex2bin($channel));
207 $receivers[] = $receiver;
209 if(count($receivers) === $maxChannelsPerRequest)
212 $message->setReceiversList(
new MessageCollection($receivers));
213 $message->setExpiry($event[
'expiry']);
214 $message->setBody($body);
215 $message->setType($messageType);
217 $result[] = $message;
222 if(count($receivers) > 0)
225 $message->setReceiversList(
new MessageCollection($receivers));
226 $message->setExpiry($event[
'expiry']);
227 $message->setBody($body);
228 $message->setType($messageType);
230 $result[] = $message;
243 foreach ($requests as $request)
246 $batch->addRequests($request);
261 $maxPayload = \CPullOptions::GetMaxPayload() - 200;
262 $maxMessages = \CPullOptions::GetMaxMessagesPerRequest();
264 $currentMessageBatch = [];
265 $currentBatchSize = 0;
267 foreach ($messages as $message)
269 $messageSize = static::getMessageSize($message);
270 if($currentBatchSize + $messageSize >= $maxPayload || count($currentMessageBatch) >= $maxMessages)
274 $incomingMessagesRequest->setMessagesList(
new MessageCollection($currentMessageBatch));
276 $request->setIncomingMessages($incomingMessagesRequest);
277 $result[] = $request;
279 $currentMessageBatch = [];
284 $currentMessageBatch[] = $message;
285 $currentBatchSize += $messageSize;
288 if(!empty($currentMessageBatch))
291 $incomingMessagesRequest->setMessagesList(
new MessageCollection($currentMessageBatch));
293 $request->setIncomingMessages($incomingMessagesRequest);
294 $result[] = $request;
307 $receivers = $message->getReceiversList();
308 if(count($receivers) <= $maxReceivers)
314 $currentReceivers = [];
316 foreach ($receivers as $receiver)
318 if(count($currentReceivers) == $maxReceivers)
321 $subMessage->setBody($message->getBody());
322 $subMessage->setExpiry($message->getExpiry());
323 $subMessage->setReceiversList(
new MessageCollection($currentReceivers));
324 $result[] = $subMessage;
325 $currentReceivers = [];
328 $currentReceivers[] = $receiver;
331 if(count($currentReceivers) > 0)
334 $subMessage->setBody($message->getBody());
335 $subMessage->setExpiry($message->getExpiry());
336 $subMessage->setReceiversList(
new MessageCollection($currentReceivers));
337 $result[] = $subMessage;
345 $config = \Protobuf\Configuration::getInstance();
346 return $message->serializedSize($config->createComputeSizeContext());