Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
protobuftransport.php
1<?php
2
3namespace Bitrix\Pull;
4
9use Protobuf\MessageCollection;
10
12{
13 protected $hits = 0;
14 protected $bytes = 0;
15
19 public static function sendMessages(array $messages, array $options = []): TransportResult
20 {
21 $result = new TransportResult();
22
23 $protobufMessages = static::convertMessages($messages);
24 $requests = static::createRequests($protobufMessages);
25 $requestBatches = static::createRequestBatches($requests);
26
27 $queueServerUrl = $options['serverUrl'] ?? Config::getPublishUrl();
28 $result->withRemoteAddress($queueServerUrl);
29
30 $queueServerUrl = \CHTTP::urlAddParams($queueServerUrl, ["binaryMode" => "true"]);
31 foreach ($requestBatches as $requestBatch)
32 {
33 $urlWithSignature = $queueServerUrl;
34 $httpClient = new HttpClient(["streamTimeout" => 1]);
35 $bodyStream = $requestBatch->toStream();
36 if(\CPullOptions::IsServerShared())
37 {
38 $signature = \CPullChannel::GetSignature($bodyStream->getContents());
39 $urlWithSignature = \CHTTP::urlAddParams($urlWithSignature, ["signature" => $signature]);
40 }
41
42 $httpClient->disableSslVerification();
43 $sendResult = $httpClient->query(HttpClient::HTTP_POST, $urlWithSignature, $bodyStream);
44 if (!$sendResult)
45 {
46 $errorCode = array_key_first($httpClient->getError());
47 $errorMsg = $httpClient->getError()[$errorCode];
48 $result->addError(new \Bitrix\Main\Error($errorMsg, $errorCode));
49 }
50 }
51
52 return $result;
53 }
54
60 public static function getOnlineChannels(array $channels)
61 {
62 $result = [];
63 $maxChannelsPerRequest = \CPullOptions::GetMaxChannelsPerRequest();
64 $channelBatches = [];
65 $currentChannelBatch = 0;
66 $requestsInChannelBatch = 0;
67 foreach ($channels as $channelId)
68 {
69 $channel = new Protobuf\ChannelId();
70 $channel->setId(hex2bin($channelId));
71 $channel->setIsPrivate(true);
72
73 $requestsInChannelBatch++;
74
75 if($requestsInChannelBatch >= $maxChannelsPerRequest)
76 {
77 $currentChannelBatch++;
78 $requestsInChannelBatch = 1;
79 }
80 $channelBatches[$currentChannelBatch][] = $channel;
81 }
82
83 $requests = [];
84 foreach ($channelBatches as $channelBatchNumber => $channelBatch)
85 {
86 $channelsStatsRequest = new Protobuf\ChannelStatsRequest();
87 $channelsStatsRequest->setChannelsList(new MessageCollection($channelBatch));
88
89 $request = new Protobuf\Request();
90 $request->setChannelStats($channelsStatsRequest);
91 $requests[] = $request;
92 }
93
94 $queueServerUrl = \CHTTP::urlAddParams(Config::getPublishUrl(), ["binaryMode" => "true"]);
95
96 $requestBatches = static::createRequestBatches($requests);
97 foreach ($requestBatches as $requestBatch)
98 {
99 $http = new HttpClient();
100 $http->disableSslVerification();
101
102 $urlWithSignature = $queueServerUrl;
103 $bodyStream = $requestBatch->toStream();
104 if(\CPullOptions::IsServerShared())
105 {
106 $signature = \CPullChannel::GetSignature($bodyStream->getContents());
107 $urlWithSignature = \CHTTP::urlAddParams($urlWithSignature, ["signature" => $signature]);
108 }
109
110 $binaryResponse = $http->post($urlWithSignature, $bodyStream);
111
112 if($http->getStatus() != 200)
113 {
114 return [];
115 }
116 if(strlen($binaryResponse) == 0)
117 {
118 return [];
119 }
120
121 try
122 {
123 $responseBatch = Protobuf\ResponseBatch::fromStream($binaryResponse);
124 }
125 catch (\Exception $e)
126 {
127 return [];
128 }
129 $responses = $responseBatch->getResponsesList();
130
131 $response = $responses[0];
132 if(!($response instanceof Protobuf\Response))
133 {
134 return[];
135 }
136
137 if ($response->hasChannelStats())
138 {
139 $stats = $response->getChannelStats();
141 foreach ($stats->getChannelsList() as $channel)
142 {
143 if($channel->getIsOnline())
144 {
145 $channelId = bin2hex($channel->getId());
146 $result[$channelId] = true;
147 }
148 }
149 }
150 }
151
152 return $result;
153 }
154
159 protected static function convertMessages(array $messages)
160 {
161 $result = [];
162
163 foreach ($messages as $message)
164 {
165 $event = $message['event'] ?? null;
166 if(!is_array($message['channels']) || count($message['channels']) == 0 || !isset($event['module_id']) || !isset($event['command']))
167 {
168 continue;
169 }
170
171 $result = array_merge($result, static::convertMessage($message['channels'], $event));
172 }
173
174 return $result;
175 }
176
183 protected static function convertMessage(array $channels, array $event)
184 {
185 $result = [];
186
187 $extra = is_array($event['extra']) ? $event['extra'] : [];
188
189 $body = Common::jsonEncode(array(
190 'module_id' => $event['module_id'],
191 'command' => $event['command'],
192 'params' => $event['params'] ?: [],
193 'extra' => $extra
194 ));
195
196 // for statistics
197 $messageType = "{$event['module_id']}_{$event['command']}";
198 $messageType = preg_replace("/[^\w]/", "", $messageType);
199
200 $maxChannelsPerRequest = \CPullOptions::GetMaxChannelsPerRequest();
201 $receivers = [];
202 foreach ($channels as $channel)
203 {
204 $receiver = new Protobuf\Receiver();
205 $receiver->setIsPrivate(true);
206 $receiver->setId(hex2bin($channel));
207 $receivers[] = $receiver;
208
209 if(count($receivers) === $maxChannelsPerRequest)
210 {
211 $message = new Protobuf\IncomingMessage();
212 $message->setReceiversList(new MessageCollection($receivers));
213 $message->setExpiry($event['expiry']);
214 $message->setBody($body);
215 $message->setType($messageType); // for statistics
216
217 $result[] = $message;
218 $receivers = [];
219 }
220 }
221
222 if(count($receivers) > 0)
223 {
224 $message = new Protobuf\IncomingMessage();
225 $message->setReceiversList(new MessageCollection($receivers));
226 $message->setExpiry($event['expiry']);
227 $message->setBody($body);
228 $message->setType($messageType); // for statistics
229
230 $result[] = $message;
231 }
232
233 return $result;
234 }
235
240 protected static function createRequestBatches(array $requests)
241 {
242 $result = [];
243 foreach ($requests as $request)
244 {
245 $batch = new Protobuf\RequestBatch();
246 $batch->addRequests($request);
247 $result[] = $batch;
248 }
249
250 return $result;
251 }
252
257 protected static function createRequests(array $messages)
258 {
259 $result = [];
260
261 $maxPayload = \CPullOptions::GetMaxPayload() - 200;
262 $maxMessages = \CPullOptions::GetMaxMessagesPerRequest();
263
264 $currentMessageBatch = [];
265 $currentBatchSize = 0;
266
267 foreach ($messages as $message)
268 {
269 $messageSize = static::getMessageSize($message);
270 if($currentBatchSize + $messageSize >= $maxPayload || count($currentMessageBatch) >= $maxMessages)
271 {
272 // finalize current request and start a new one
273 $incomingMessagesRequest = new Protobuf\IncomingMessagesRequest();
274 $incomingMessagesRequest->setMessagesList(new MessageCollection($currentMessageBatch));
275 $request = new Protobuf\Request();
276 $request->setIncomingMessages($incomingMessagesRequest);
277 $result[] = $request;
278
279 $currentMessageBatch = [];
280 $messageSize = 0;
281 }
282
283 // add the request to the current batch
284 $currentMessageBatch[] = $message;
285 $currentBatchSize += $messageSize;
286 }
287
288 if(!empty($currentMessageBatch))
289 {
290 $incomingMessagesRequest = new Protobuf\IncomingMessagesRequest();
291 $incomingMessagesRequest->setMessagesList(new MessageCollection($currentMessageBatch));
292 $request = new Protobuf\Request();
293 $request->setIncomingMessages($incomingMessagesRequest);
294 $result[] = $request;
295 }
296
297 return $result;
298 }
299
305 protected static function splitReceivers(Protobuf\IncomingMessage $message, $maxReceivers)
306 {
307 $receivers = $message->getReceiversList();
308 if(count($receivers) <= $maxReceivers)
309 {
310 return [$message];
311 }
312
313 $result = [];
314 $currentReceivers = [];
315
316 foreach ($receivers as $receiver)
317 {
318 if(count($currentReceivers) == $maxReceivers)
319 {
320 $subMessage = new Protobuf\IncomingMessage();
321 $subMessage->setBody($message->getBody());
322 $subMessage->setExpiry($message->getExpiry());
323 $subMessage->setReceiversList(new MessageCollection($currentReceivers));
324 $result[] = $subMessage;
325 $currentReceivers = [];
326 }
327
328 $currentReceivers[] = $receiver;
329 }
330
331 if(count($currentReceivers) > 0)
332 {
333 $subMessage = new Protobuf\IncomingMessage();
334 $subMessage->setBody($message->getBody());
335 $subMessage->setExpiry($message->getExpiry());
336 $subMessage->setReceiversList(new MessageCollection($currentReceivers));
337 $result[] = $subMessage;
338 }
339
340 return $result;
341 }
342
343 protected static function getMessageSize(Protobuf\IncomingMessage $message)
344 {
345 $config = \Protobuf\Configuration::getInstance();
346 return $message->serializedSize($config->createComputeSizeContext());
347 }
348}
static jsonEncode($params)
Definition common.php:6
static getMessageSize(Protobuf\IncomingMessage $message)
static createRequests(array $messages)
static sendMessages(array $messages, array $options=[])
static convertMessage(array $channels, array $event)
static convertMessages(array $messages)
static splitReceivers(Protobuf\IncomingMessage $message, $maxReceivers)
static createRequestBatches(array $requests)