14 private static bool $backgroundContext =
false;
16 private static array $messages = [];
17 private static array $deferredMessages = [];
18 private static array $push = [];
19 private static $error =
false;
21 public static function add($recipient, array
$parameters, $channelType = \CPullChannel::TYPE_PRIVATE)
29 $badUnicodeSymbolsPath = Common::findInvalidUnicodeSymbols(
$parameters);
30 if ($badUnicodeSymbolsPath)
32 $warning =
'Parameters array contains invalid UTF-8 characters by the path ' . $badUnicodeSymbolsPath;
33 self::$error =
new Error(__METHOD__,
'EVENT_BAD_ENCODING', $warning,
$parameters);
41 self::generateEventsForUsers($recipient,
$parameters, $channelType);
45 $result = self::addEvent($recipient,
$parameters, $channelType);
61 private static function addEvent($recipient,
$parameters, $channelType = \CPullChannel::TYPE_PRIVATE)
63 if (!is_array($recipient))
65 $recipient = [$recipient];
68 $entities = self::getEntitiesByType($recipient);
69 if ($entities ===
null)
71 self::$error =
new Error(__METHOD__,
'RECIPIENT_FORMAT',
Loc::getMessage(
'PULL_EVENT_RECIPIENT_FORMAT_ERROR'), [
72 'recipient' => $recipient,
86 if (empty($entities[
'users']) && empty($entities[
'channels']))
98 $pushParameters =
null;
103 $pushParametersCallback =
$parameters[
'pushParamsCallback'];
108 $pushParametersCallback =
null;
113 self::addMessage(self::$deferredMessages, $entities[
'channels'], $entities[
'users'],
$parameters);
117 self::addMessage(self::$messages, $entities[
'channels'], $entities[
'users'],
$parameters);
121 self::$backgroundContext
122 || defined(
'BX_CHECK_AGENT_START') && !defined(
'BX_WITH_ON_AFTER_EPILOG')
128 if ($pushParameters || $pushParametersCallback)
134 if ($pushParametersCallback)
136 $parameters[
'pushParamsCallback'] = $pushParametersCallback;
145 private static function addMessage(array &$destination, array $channels, array $users, array
$parameters)
150 if (isset($destination[$eventCode]))
152 $waitingToReceiveUserList = $destination[$eventCode][
'users'] ?? [];
153 $newUserList = $users ?? [];
154 $destination[$eventCode][
'users'] = array_unique(array_merge($waitingToReceiveUserList, $newUserList));
156 $waitingToReceiveChannelList = $destination[$eventCode][
'channels'] ?? [];
157 $newChannelList = $channels ?? [];
158 $destination[$eventCode][
'channels'] = array_unique(array_merge($waitingToReceiveChannelList, $newChannelList));
162 $destination[$eventCode] = [
164 'users' => array_unique($users),
165 'channels' => array_unique($channels),
170 private static function generateEventsForUsers($recipients,
$parameters, $channelType = \CPullChannel::TYPE_PRIVATE)
172 if (!is_array($recipients))
174 $recipients = [$recipients];
190 foreach ($recipients as $recipient)
192 if (isset($paramsByUser[$recipient]) && is_array($paramsByUser[$recipient]))
195 $userParams[
'params'] = array_merge($params, $paramsByUser[$recipient]);
196 self::addEvent($recipient, $userParams, $channelType);
198 $processed[] = $recipient;
202 $left = array_diff($recipients, $processed);
209 private static function addPush($users,
$parameters)
211 if (!\CPullOptions::GetPushStatus())
213 self::$error =
new Error(__METHOD__,
'PUSH_DISABLED',
Loc::getMessage(
'PULL_EVENT_PUSH_DISABLED_ERROR'), [
214 'recipient' => $users,
220 if (!is_array($users))
225 foreach ($users as $id => $entity)
227 $entity = intval($entity);
236 self::$error =
new Error(__METHOD__,
'RECIPIENT_FORMAT',
Loc::getMessage(
'PULL_EVENT_RECIPIENT_FORMAT_ERROR'), [
237 'recipient' => $users,
255 foreach ($users as $userId)
270 $pushCode = self::getParamsCode(
$parameters[
'push']);
271 if (self::$push[$pushCode])
273 self::$push[$pushCode][
'users'] = array_unique(array_merge(self::$push[$pushCode][
'users'], array_values($users)));
280 self::$push[$pushCode][
'push'] =
$parameters[
'push'];
281 self::$push[$pushCode][
'extra'] =
$parameters[
'extra'];
282 self::$push[$pushCode][
'hasPushCallback'] = $hasPushCallback;
283 self::$push[$pushCode][
'users'] = array_unique(array_values($users));
287 self::$backgroundContext
288 || defined(
'BX_CHECK_AGENT_START') && !defined(
'BX_WITH_ON_AFTER_EPILOG')
297 private static function processDeferredMessages()
299 foreach (self::$deferredMessages as $eventCode => $message)
301 $callback = $message[
'event'][
'paramsCallback'];
302 if (Main\
Loader::includeModule($callback[
'module_id']) && method_exists($callback[
'class'], $callback[
'method']))
304 $messageParameters = call_user_func_array([$callback[
'class'], $callback[
'method']], [$callback[
'params']]);
305 self::addMessage(self::$messages, $message[
'users'], $message[
'channels'], $messageParameters);
308 self::$deferredMessages = [];
311 private static function executePushEvent(
$parameters)
313 if (!self::$backgroundContext &&
$parameters[
'hasPushCallback'])
321 $callback =
$parameters[
'push'][
'pushParamsCallback'];
322 Main\Loader::includeModule($callback[
'module_id']);
323 if (method_exists($callback[
'class'], $callback[
'method']))
325 $data = call_user_func_array(
341 $data[
'message'] = str_replace(
"\n",
" ", trim($data[
'message']));
342 $data[
'params'] = $data[
'params'] ?? [];
343 $data[
'advanced_params'] = $data[
'advanced_params'] ?? [];
344 $data[
'advanced_params'][
'extra'] =
$parameters[
'extra'] ?? [];
345 $data[
'badge'] = isset($data[
'badge']) ? (int)$data[
'badge'] :
'';
346 $data[
'sound'] = $data[
'sound'] ??
'';
347 $data[
'tag'] = $data[
'tag'] ??
'';
348 $data[
'sub_tag'] = $data[
'sub_tag'] ??
'';
349 $data[
'app_id'] = $data[
'app_id'] ??
'';
350 $data[
'send_immediately'] = $data[
'send_immediately'] ==
'Y' ?
'Y' :
'N';
351 $data[
'important'] = $data[
'important'] ==
'Y' ?
'Y' :
'N';
364 $manager = new \CPushManager();
367 'SKIP_USERS' => is_array($data[
'skip_users']) ? $data[
'skip_users'] : [],
368 'MESSAGE' => $data[
'message'],
369 'EXPIRY' => $data[
'expiry'],
370 'PARAMS' => $data[
'params'],
371 'ADVANCED_PARAMS' => $data[
'advanced_params'],
372 'BADGE' => $data[
'badge'],
373 'SOUND' => $data[
'sound'],
374 'TAG' => $data[
'tag'],
375 'SUB_TAG' => $data[
'sub_tag'],
376 'APP_ID' => $data[
'app_id'],
377 'SEND_IMMEDIATELY' => $data[
'send_immediately'],
378 'IMPORTANT' => $data[
'important'],
386 if (self::$backgroundContext)
388 self::processDeferredMessages();
391 $executeResult = static::executeEvents();
392 if (!$executeResult->isSuccess())
394 foreach ($executeResult->getErrors() as $error)
396 $message = $error->getCode() ? $error->getCode() .
": " . $error->getMessage() : $error->getMessage();
397 trigger_error(
"Pull send error; {$message}; remote endpoint: {$executeResult->getRemoteAddress()}", E_USER_WARNING);
401 static::executePushEvents();
409 if (empty(self::$messages))
414 if (!\CPullOptions::GetQueueServerStatus())
416 self::$messages = [];
421 self::fillChannels(self::$messages);
423 if (Config::isJsonRpcUsed())
425 $messageList = self::convertEventsToMessages(self::$messages);
427 if ($sendResult->isSuccess())
429 self::$messages = [];
433 $result->withRemoteAddress($sendResult->getRemoteAddress());
434 $result->addErrors($sendResult->getErrors());
439 if (Config::isProtobufUsed())
441 $sendResult = ProtobufTransport::sendMessages(self::$messages);
442 if (!$sendResult->isSuccess())
444 $result->withRemoteAddress($sendResult->getRemoteAddress());
445 $result->addErrors($sendResult->getErrors());
450 self::sendEventsLegacy();
453 self::$messages = [];
462 foreach (self::$push as $pushCode => $event)
464 $result = self::executePushEvent($event);
465 if (!is_null($result))
467 unset(self::$push[$pushCode]);
472 private static function sendEventsLegacy()
474 foreach (self::$messages as $eventCode => $event)
476 if (\
Bitrix\Pull\Log::isEnabled())
479 $currentHits = ceil(count($event[
'channels']) / \CPullOptions::GetCommandPerHit());
480 $hitCount += $currentHits;
482 $currentChannelCount = count($event[
'channels']);
483 $channelCount += $currentChannelCount;
485 $currentMessagesBytes = self::getBytes($event[
'event']) + self::getBytes($event[
'channels']);
486 $messagesBytes += $currentMessagesBytes;
487 $logs[] =
'Command: ' . $event[
'event'][
'module_id'] .
'/' . $event[
'event'][
'command'] .
'; Hits: ' . $currentHits .
'; Channel: ' . $currentChannelCount .
'; Bytes: ' . $currentMessagesBytes .
'';
490 if (empty($event[
'channels']))
496 'module_id' => $event[
'event'][
'module_id'],
497 'command' => $event[
'event'][
'command'],
498 'params' => is_array($event[
'event'][
'params']) ? $event[
'event'][
'params'] : [],
499 'extra' => $event[
'event'][
'extra'],
501 $options = [
'expiry' => $event[
'event'][
'expiry']];
503 if (\CPullChannel::Send($event[
'channels'], \
Bitrix\Pull\Common::jsonEncode($data), $options))
505 unset(self::$messages[$eventCode]);
509 if ($logs && \
Bitrix\Pull\Log::isEnabled())
511 if (count($logs) > 1)
513 $logs[] =
'Total - Hits: ' . $hitCount .
'; Channel: ' . $channelCount .
'; Messages: ' . $messagesCount .
'; Bytes: ' . $messagesBytes .
'';
516 if (count($logs) > 1 || $hitCount > 1 || $channelCount > 1 || $messagesBytes > 1000)
518 $logTitle =
'!! Pull messages stats - important !!';
522 $logTitle =
'-- Pull messages stats --';
525 \Bitrix\Pull\Log::write(implode(
"\n", $logs), $logTitle);
531 Main\Application::getInstance()->addBackgroundJob([__CLASS__,
"sendInBackground"]);
537 self::$backgroundContext =
true;
543 foreach ($messages as $key => &$message)
545 $users = $message[
'users'] ?? [];
546 if (!empty($messages[$key][
'channels']) && is_array($messages[$key][
'channels']))
548 $messages[$key][
'channels'] = array_merge($messages[$key][
'channels'], self::getChannelIds($users, $message[
'event'][
'channel_type']));
552 $messages[$key][
'channels'] = self::getChannelIds($users, $message[
'event'][
'channel_type']);
554 unset($message[
'event'][
'channel_type']);
561 foreach ($users as $userId)
563 $data = \CPullChannel::Get($userId,
true,
false,
$type);
566 $result[] = $data[
'CHANNEL_ID'];
575 $result = array_fill_keys($channels,
null);
576 $orm = \Bitrix\Pull\Model\ChannelTable::getList([
577 'select' => [
'USER_ID',
'CHANNEL_ID',
'USER_ACTIVE' =>
'USER.ACTIVE'],
579 '=CHANNEL_ID' => $channels,
582 while ($row = $orm->fetch())
584 if ($row[
'USER_ID'] > 0 && $row[
'USER_ACTIVE'] !==
'N')
586 $result[$row[
'CHANNEL_ID']] = $row[
'USER_ID'];
590 unset($result[$row[
'CHANNEL_ID']]);
597 private static function prepareParameters(array
$parameters)
612 self::$error =
new Error(__METHOD__,
'EVENT_CALLBACK_FORMAT',
Loc::getMessage(
'PULL_EVENT_CALLBACK_FORMAT_ERROR'),
$parameters);
616 if (empty(
$parameters[
'paramsCallback'][
'module_id']))
621 Main\Loader::includeModule(
$parameters[
'paramsCallback'][
'module_id']);
628 if (!isset(
$parameters[
'paramsCallback'][
'params']))
645 $parameters[
'extra'][
'server_time_unix'] ??= microtime(
true);
647 $parameters[
'extra'][
'server_name'] = Option::get(
'main',
'server_name', $_SERVER[
'SERVER_NAME']);
648 $parameters[
'extra'][
'revision_web'] = PULL_REVISION_WEB;
649 $parameters[
'extra'][
'revision_mobile'] = PULL_REVISION_MOBILE;
654 private static function preparePushParameters(array
$parameters)
662 || empty(
$parameters[
'pushParamsCallback'][
'method'])
669 if (empty(
$parameters[
'pushParamsCallback'][
'module_id']))
674 Main\Loader::includeModule(
$parameters[
'pushParamsCallback'][
'module_id']);
676 if (!method_exists(
$parameters[
'pushParamsCallback'][
'class'],
$parameters[
'pushParamsCallback'][
'method']))
681 if (!isset(
$parameters[
'pushParamsCallback'][
'params']))
712 $parameters[
'extra'][
'server_time_unix'] = microtime(
true);
720 if (isset($params[
'groupId']) && !empty($params[
'groupId']))
722 return md5($params[
'groupId']);
726 $paramsWithoutTime = $params;
728 unset($paramsWithoutTime[
'extra'][
'server_time']);
729 unset($paramsWithoutTime[
'extra'][
'server_time_unix']);
730 unset($paramsWithoutTime[
'advanced_params'][
'filterCallback']);
732 return serialize($paramsWithoutTime);
736 private static function getEntitiesByType(array $recipientList): ?array
743 foreach ($recipientList as $entity)
747 $result[
'channels'][] = $entity->getPrivateId();
750 else if (self::isChannelEntity($entity))
752 $result[
'channels'][] = $entity;
757 $result[
'users'][] = intval($entity);
762 return $result[
'count'] > 0 ? $result :
null;
765 private static function getBytes($variable)
769 if (is_string($variable))
771 $bytes += mb_strlen($variable);
773 else if (is_array($variable))
775 foreach ($variable as $value)
777 $bytes += self::getBytes($value);
782 $bytes += mb_strlen((
string)$variable);
788 private static function isChannelEntity($entity)
790 return is_string($entity) && mb_strlen($entity) == 32;
797 private static function convertEventsToMessages(array $events): array
801 return Message::fromEvent($event);