Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
jsonrpctransport.php
1<?php
2
3namespace Bitrix\Pull;
4
6
8{
9 protected const VERSION = '2.0';
10 protected const METHOD_PUBLISH = 'publish';
11 protected const METHOD_GET_LAST_SEEN = 'getUsersLastSeen';
12 protected const METHOD_UPDATE_LAST_SEEN = 'updateUsersLastSeen';
13
14 protected string $serverUrl = '';
15
16 function __construct(array $options = [])
17 {
18 $this->serverUrl = $options['serverUrl'] ?? Config::getJsonRpcUrl();
19 }
20
27 public function sendMessages(array $messages): TransportResult
28 {
29 $result = new TransportResult();
30 $result->withRemoteAddress($this->serverUrl);
31 try
32 {
33 $batchList = static::createRequestBatches($messages);
34 }
35 catch (\Throwable $e)
36 {
37 return $result->addError(new \Bitrix\Main\Error($e->getMessage(), $e->getCode()));
38 }
39
40 foreach ($batchList as $batch)
41 {
42 $executeResult = static::executeBatch($this->serverUrl, $batch);
43 if (!$executeResult->isSuccess())
44 {
45 return $result->addErrors($executeResult->getErrors());
46 }
47 }
48
49 return $result;
50 }
51
52 public function getUsersLastSeen(array $userList): Main\Result
53 {
54 $rpcResult = static::executeMethod(
55 $this->serverUrl,
56 static::METHOD_GET_LAST_SEEN,
57 [
58 'userList' => $userList
59 ]
60 );
61
62 if (!$rpcResult->isSuccess())
63 {
64 return $rpcResult;
65 }
66
67 $response = $rpcResult->getData();
68 $data = is_array($response['result']) ? $response['result'] : [];
69 $result = new Main\Result();
70
71 return $result->setData($data);
72 }
73
80 public function updateUsersLastSeen(array $userTimestamps): Main\Result
81 {
82 return static::executeMethod(
83 $this->serverUrl,
84 static::METHOD_UPDATE_LAST_SEEN,
85 $userTimestamps
86 );
87 }
88
93 protected static function createRequestBatches(array $messages): array
94 {
95 // creates just one batch right now
96 $maxPayload = \CPullOptions::GetMaxPayload() - 20;
97
98 $result = [];
99 $currentBatch = [];
100 $currentBatchSize = 2; // opening and closing bracket
101 foreach ($messages as $message)
102 {
103 $message->userList = array_values($message->userList);
104 $message->channelList = array_values($message->channelList);
105 $jsonRpcMessage = Main\Web\Json::encode(static::createJsonRpcRequest(static::METHOD_PUBLISH, $message));
106 if (mb_strlen($jsonRpcMessage) > $maxPayload - 20)
107 {
108 trigger_error("Pull message exceeds size limit, skipping", E_USER_WARNING);
109 }
110 if (($currentBatchSize + mb_strlen($jsonRpcMessage)) + 1> $maxPayload)
111 {
112 // start new batch
113 $result[] = "[" . implode(",", $currentBatch) . "]";
114 $currentBatch = [];
115 $currentBatchSize = 2;
116 }
117 $currentBatch[] = $jsonRpcMessage;
118 $currentBatchSize += (mb_strlen($jsonRpcMessage)) + 1; // + comma
119 }
120 if (count($currentBatch) > 0)
121 {
122 $result[] = "[" . implode(",", $currentBatch) . "]";
123 }
124 return $result;
125 }
126
132 protected static function createJsonRpcRequest(string $method, $params): array
133 {
134 return [
135 'jsonrpc' => static::VERSION,
136 'method' => $method,
137 'params' => $params
138 ];
139 }
140
141 protected static function executeMethod(string $queueServerUrl, string $method, array $params): Main\Result
142 {
143 $result = new Main\Result();
144 $rpcRequest = static::createJsonRpcRequest($method, $params);
145
146 try
147 {
148 $body = Main\Web\Json::encode($rpcRequest);
149 }
150 catch (\Throwable $e)
151 {
152 return $result->addError(new \Bitrix\Main\Error($e->getMessage(), $e->getCode()));
153 }
154 $httpResult = static::performHttpRequest($queueServerUrl, $body);
155 if (!$httpResult->isSuccess())
156 {
157 return $result->addErrors($httpResult->getErrors());
158 }
159 $response = $httpResult->getData();
160 if (!isset($response['jsonrpc']) || $response['jsonrpc'] != static::VERSION)
161 {
162 return $result->addError(new \Bitrix\Main\Error('Wrong response structure'));
163 }
164 if (is_array($response['error']))
165 {
166 return $result->addError(new \Bitrix\Main\Error($response['error']['message'], $response['error']['code']));
167 }
168
169 return $result->setData($response);
170 }
171
172 protected static function executeBatch(string $queueServerUrl, string $batchBody): Main\Result
173 {
174 $result = new Main\Result();
175 $httpResult = static::performHttpRequest($queueServerUrl, $batchBody);
176 if (!$httpResult->isSuccess())
177 {
178 return $result->addErrors($httpResult->getErrors());
179 }
180 $response = $result->getData();
181
182 return $result->setData($response);
183 }
184
185 protected static function performHttpRequest(string $queueServerUrl, string $body): Main\Result
186 {
187 $result = new Main\Result();
188 $httpClient = new Main\Web\HttpClient();
189
190 $signature = \CPullChannel::GetSignature($body);
191 $hostId = (string)Config::getHostId();
192 $urlWithSignature = \CHTTP::urlAddParams($queueServerUrl, ["hostId" => $hostId, "signature" => $signature]);
193
194 $sendResult = $httpClient->query(Main\Web\HttpClient::HTTP_POST, $urlWithSignature, $body);
195 if (!$sendResult)
196 {
197 $errorCode = array_key_first($httpClient->getError());
198 $errorMsg = $httpClient->getError()[$errorCode];
199 return $result->addError(new Main\Error($errorMsg, $errorCode));
200 }
201 $responseCode = (int)$httpClient->getStatus();
202 if ($responseCode !== 200)
203 {
204 return $result->addError(new Main\Error("Unexpected server response code {$responseCode}"));
205 }
206 $responseBody = $httpClient->getResult();
207 if ($responseBody == '')
208 {
209 return $result->addError(new Main\Error('Empty server response'));
210 }
211 try
212 {
213 $decodedBody = Main\Web\Json::decode($responseBody);
214 }
215 catch (\Throwable $e)
216 {
217 return $result->addError(new Main\Error('Could not decode server response. Raw response: ' . $responseBody));
218 }
219
220 return $result->setData($decodedBody);
221 }
222}
static executeBatch(string $queueServerUrl, string $batchBody)
static performHttpRequest(string $queueServerUrl, string $body)
static createJsonRpcRequest(string $method, $params)
static executeMethod(string $queueServerUrl, string $method, array $params)
updateUsersLastSeen(array $userTimestamps)
static createRequestBatches(array $messages)