30 $result->withRemoteAddress($this->serverUrl);
33 $batchList = static::createRequestBatches($messages);
37 return $result->addError(
new \
Bitrix\Main\
Error($e->getMessage(), $e->getCode()));
40 foreach ($batchList as $batch)
42 $executeResult = static::executeBatch($this->serverUrl, $batch);
43 if (!$executeResult->isSuccess())
45 return $result->addErrors($executeResult->getErrors());
54 $rpcResult = static::executeMethod(
56 static::METHOD_GET_LAST_SEEN,
58 'userList' => $userList
62 if (!$rpcResult->isSuccess())
67 $response = $rpcResult->getData();
68 $data = is_array($response[
'result']) ? $response[
'result'] : [];
71 return $result->setData($data);
96 $maxPayload = \CPullOptions::GetMaxPayload() - 20;
100 $currentBatchSize = 2;
101 foreach ($messages as $message)
103 $message->userList = array_values($message->userList);
104 $message->channelList = array_values($message->channelList);
105 $jsonRpcMessage = Main\Web\Json::encode(static::createJsonRpcRequest(static::METHOD_PUBLISH, $message));
106 if (mb_strlen($jsonRpcMessage) > $maxPayload - 20)
108 trigger_error(
"Pull message exceeds size limit, skipping", E_USER_WARNING);
110 if (($currentBatchSize + mb_strlen($jsonRpcMessage)) + 1> $maxPayload)
113 $result[] =
"[" . implode(
",", $currentBatch) .
"]";
115 $currentBatchSize = 2;
117 $currentBatch[] = $jsonRpcMessage;
118 $currentBatchSize += (mb_strlen($jsonRpcMessage)) + 1;
120 if (count($currentBatch) > 0)
122 $result[] =
"[" . implode(
",", $currentBatch) .
"]";
141 protected static function executeMethod(
string $queueServerUrl,
string $method, array $params): Main\
Result
144 $rpcRequest = static::createJsonRpcRequest($method, $params);
148 $body = Main\Web\Json::encode($rpcRequest);
150 catch (\Throwable $e)
152 return $result->addError(
new \
Bitrix\Main\
Error($e->getMessage(), $e->getCode()));
154 $httpResult = static::performHttpRequest($queueServerUrl, $body);
155 if (!$httpResult->isSuccess())
157 return $result->addErrors($httpResult->getErrors());
159 $response = $httpResult->getData();
160 if (!isset($response[
'jsonrpc']) || $response[
'jsonrpc'] != static::VERSION)
162 return $result->addError(
new \
Bitrix\Main\
Error(
'Wrong response structure'));
164 if (is_array($response[
'error']))
166 return $result->addError(
new \
Bitrix\Main\
Error($response[
'error'][
'message'], $response[
'error'][
'code']));
169 return $result->setData($response);
175 $httpResult = static::performHttpRequest($queueServerUrl, $batchBody);
176 if (!$httpResult->isSuccess())
178 return $result->addErrors($httpResult->getErrors());
180 $response = $result->getData();
182 return $result->setData($response);
190 $signature = \CPullChannel::GetSignature($body);
191 $hostId = (string)Config::getHostId();
192 $urlWithSignature = \CHTTP::urlAddParams($queueServerUrl, [
"hostId" => $hostId,
"signature" => $signature]);
194 $sendResult = $httpClient->query(Main\Web\HttpClient::HTTP_POST, $urlWithSignature, $body);
197 $errorCode = array_key_first($httpClient->getError());
198 $errorMsg = $httpClient->getError()[$errorCode];
199 return $result->addError(
new Main\
Error($errorMsg, $errorCode));
201 $responseCode = (int)$httpClient->getStatus();
202 if ($responseCode !== 200)
204 return $result->addError(
new Main\
Error(
"Unexpected server response code {$responseCode}"));
206 $responseBody = $httpClient->getResult();
207 if ($responseBody ==
'')
209 return $result->addError(
new Main\
Error(
'Empty server response'));
213 $decodedBody = Main\Web\Json::decode($responseBody);
215 catch (\Throwable $e)
217 return $result->addError(
new Main\
Error(
'Could not decode server response. Raw response: ' . $responseBody));
220 return $result->setData($decodedBody);