2use Bitrix\Main\Page\Asset;
26 $bAdminSection =
false;
28 $bAdminSection =
true;
38 'ADMIN_SECTION' =>
false,
39 'PUBLIC_SECTION' =>
false
42 $arModule = self::GetDependentModule();
54 return $bGetSectionStatus?
$arResult[$bAdminSection?
'ADMIN_SECTION':
'PUBLIC_SECTION']:
$arResult;
66 foreach(
GetModuleEvents(
"pull",
"OnGetDependentModule",
true) as $arEvent)
69 if (isset(
$ar[
'MODULE_ID']))
71 $arModule[
$ar[
'MODULE_ID']] = Array(
72 'MODULE_ID' =>
$ar[
'MODULE_ID'],
73 'ADMIN_SECTION' => isset(
$ar[
'USE']) && in_array(
'ADMIN_SECTION',
$ar[
'USE'])?
true:
false,
74 'PUBLIC_SECTION' => isset(
$ar[
'USE']) && in_array(
'PUBLIC_SECTION',
$ar[
'USE'])?
true:
false,
84 $result = COption::GetOptionString(
"pull",
"exclude_sites",
"a:0:{}", self::GetDefaultOption(
"exclude_sites"));
85 return unserialize(
$result, [
"allowed_classes" =>
false]);
93 COption::SetOptionString(
"pull",
"exclude_sites", serialize(
$sites));
103 return self::GetQueueServerStatus();
107 if(static::IsServerShared())
109 return \Bitrix\Pull\SharedServer\Config::isRegistered();
113 return COption::GetOptionString(
"pull",
"nginx", self::GetDefaultOption(
"nginx")) ==
"Y";
118 $result = COption::GetOptionString(
"pull",
"nginx_headers", self::GetDefaultOption(
"nginx_headers"));
119 return $result ==
'Y' && self::GetQueueServerVersion() < 3?
true:
false;
127 return self::SetQueueServerStatus($flag);
131 $currentValue = COption::GetOptionString(
"pull",
"nginx");
132 if($currentValue === $flag)
137 COption::SetOptionString(
"pull",
"nginx", $flag==
'Y'?
'Y':
'N');
140 CAgent::AddAgent(
"CPullChannel::CheckOnlineChannel();",
"pull",
"N", 240,
"",
"Y", ConvertTimeStamp(time()+CTimeZone::GetOffset()+240,
"FULL"));
144 CAgent::RemoveAgent(
"CPullChannel::CheckOnlineChannel();",
"pull");
152 COption::SetOptionString(
"pull",
"nginx_headers", $flag==
'Y'?
'Y':
'N');
162 return Option::get(
"pull",
"server_mode");
170 Option::set(
"pull",
"server_mode", $mode);
175 return static::GetQueueServerMode() == static::SERVER_MODE_SHARED;
180 $result = COption::GetOptionString(
"pull",
"push", self::GetDefaultOption(
"push"));
181 return $result ==
'N'?
false:
true;
186 COption::SetOptionString(
"pull",
"push", $flag==
'Y'?
'Y':
'N');
188 CAgent::AddAgent(
"CPushManager::SendAgent();",
"pull",
"N", 30);
190 CAgent::RemoveAgent(
"CPushManager::SendAgent();",
"pull");
197 Option::set(
'pull',
'shared_worker_allowed',
$val ?
'Y' :
'N') ;
202 return Option::get(
'pull',
'shared_worker_allowed',
'Y') ===
'Y';
207 return intval(COption::GetOptionInt(
"pull",
"push_message_per_hit", self::GetDefaultOption(
"push_message_per_hit")));
212 COption::SetOptionInt(
"pull",
"push_message_per_hit", intval(
$count));
219 return COption::GetOptionString(
"pull",
"guest", self::GetDefaultOption(
"guest")) ==
'Y' &&
IsModuleInstalled(
'statistic');
224 COption::SetOptionString(
"pull",
"guest",
IsModuleInstalled(
'statistic') && $flag==
'Y'?
'Y':
'N');
231 $url = COption::GetOptionString(
"pull",
"path_to_publish", self::GetDefaultOption(
"path_to_publish"));
237 $url = COption::GetOptionString(
"pull",
"path_to_json_rpc", self::GetDefaultOption(
"path_to_json_rpc"));
243 COption::SetOptionString(
"pull",
"signature_key", $signature);
250 $url = COption::GetOptionString(
"pull",
"signature_key", self::GetDefaultOption(
"signature_key"));
256 $url = COption::GetOptionString(
"pull",
"signature_algo", self::GetDefaultOption(
"signature_algo"));
264 $path = self::GetDefaultOption(
'path_to_publish');
266 COption::SetOptionString(
"pull",
"path_to_publish",
$path);
274 $path = self::GetDefaultOption(
'path_to_json_rpc');
276 COption::SetOptionString(
"pull",
"path_to_json_rpc",
$path);
289 $url = str_replace(
'#PORT#', self::GetQueueServerVersion()>1?
'':
':8893',
$url);
298 $path = self::GetDefaultOption(
'path_to_modern_listener');
300 COption::SetOptionString(
"pull",
'path_to_modern_listener',
$path);
306 return \CPullOptions::GetQueueServerVersion() > 3;
326 $path = self::GetDefaultOption(
'path_to_publish_web');
328 COption::SetOptionString(
"pull",
'path_to_publish_web',
$path);
350 $path = self::GetDefaultOption(
'path_to_publish_web_secure');
352 COption::SetOptionString(
"pull",
'path_to_publish_web_secure',
$path);
366 $url = str_replace(
'#PORT#', self::GetQueueServerVersion()>1?
'':
':8894',
$url);
375 $path = self::GetDefaultOption(
'path_to_modern_listener_secure');
377 COption::SetOptionString(
"pull",
'path_to_modern_listener_secure',
$path);
395 COption::SetOptionInt(
"pull",
"nginx_version", intval($version));
402 return intval(COption::GetOptionInt(
"pull",
"nginx_command_per_hit", self::GetDefaultOption(
"nginx_command_per_hit")));
407 COption::SetOptionInt(
"pull",
"nginx_command_per_hit", intval(
$count));
414 return self::GetWebSocket() && self::GetQueueServerVersion()>1?
true:
false;
423 || COption::GetOptionString(
"pull",
"websocket", self::GetDefaultOption(
"websocket")) ==
'Y'
434 COption::SetOptionString(
"pull",
"websocket", $flag==
'Y'?
'Y':
'N');
445 $url = COption::GetOptionString(
"pull",
"path_to_websocket", self::GetDefaultOption(
"path_to_websocket")).(count(
$channelId)>0?
'?CHANNEL_ID='.implode(
'/',
$channelId):
'');
453 $path = self::GetDefaultOption(
'path_to_websocket');
456 COption::SetOptionString(
"pull",
"path_to_websocket",
$path);
467 $url = COption::GetOptionString(
"pull",
"path_to_websocket_secure", self::GetDefaultOption(
"path_to_websocket_secure")).(count(
$channelId)>0?
'?CHANNEL_ID='.implode(
'/',
$channelId):
'');
475 $path = self::GetDefaultOption(
'path_to_websocket_secure');
478 COption::SetOptionString(
"pull",
"path_to_websocket_secure",
$path);
488 COption::SetOptionInt(
"pull",
"config_timestamp", $timestamp);
493 return COption::GetOptionInt(
"pull",
"config_timestamp", self::GetDefaultOption(
"config_timestamp"));
504 return (
int)COption::GetOptionInt(
"pull",
"config_ttl", self::GetDefaultOption(
"config_ttl"));
509 $maxPayload = (int)Option::get(
'pull', static::MAX_PAYLOAD);
510 if(!$maxPayload === 0)
512 $maxPayload = static::GetDefaultOption(static::MAX_PAYLOAD);
519 $maxChannelsPerRequest = (int)Option::get(
'pull', static::MAX_CHANNELS_PER_REQUEST);
520 if(!$maxChannelsPerRequest === 0)
522 $maxChannelsPerRequest = static::GetDefaultOption(static::MAX_CHANNELS_PER_REQUEST);
524 return $maxChannelsPerRequest;
529 $maxMessagesPerRequest = (int)Option::get(
'pull', static::MAX_MESSAGES_PER_REQUEST);
530 if(!$maxMessagesPerRequest === 0)
532 $maxMessagesPerRequest = static::GetDefaultOption(static::MAX_MESSAGES_PER_REQUEST);
534 return $maxMessagesPerRequest;
540 return (PHP_INT_SIZE >= 8 || function_exists(
'bcadd'));
545 return (Option::get(
'pull', static::PROTOBUF_ENABLED) ===
'Y');
553 'module_id' =>
'pull',
554 'command' =>
'config_expire',
557 CPullStack::AddShared($arMessage);
570 if (is_null(self::$optionDefaultConfig))
573 self::$optionDefaultConfig = is_null(
$config) ? Array() :
$config;
576 if (is_null(self::$optionDefaultModule))
578 include(
$_SERVER[
'DOCUMENT_ROOT'].
BX_ROOT.
'/modules/pull/default_option.php');
582 if (array_key_exists(
$optionName, self::$optionDefaultConfig))
587 return array_key_exists(
$optionName, self::$optionDefaultModule)? self::$optionDefaultModule[
$optionName]:
null;
594 $CModule->IsInstalled();
596 CAgent::RemoveAgent(
"CPullOptions::ClearAgent();",
"pull");
597 CAgent::AddAgent(
"CPullOptions::ClearAgent();",
"pull",
"N", 30,
"",
"Y", ConvertTimeStamp(time()+CTimeZone::GetOffset()+30,
"FULL"));
605 if (self::ModuleEnable())
607 CAgent::AddAgent(
"CPullChannel::CheckOnlineChannel();",
"pull",
"N", 240,
"",
"Y", ConvertTimeStamp(time()+CTimeZone::GetOffset()+100,
"FULL"));
608 CAgent::AddAgent(
"CPullChannel::CheckExpireAgent();",
"pull",
"N", 43200,
"",
"Y", ConvertTimeStamp(time()+CTimeZone::GetOffset() + 43200,
"FULL"));
609 CAgent::AddAgent(
"CPullWatch::CheckExpireAgent();",
"pull",
"N", 600,
"",
"Y", ConvertTimeStamp(time()+CTimeZone::GetOffset() + 600,
"FULL"));
613 CAgent::RemoveAgent(
"CPullChannel::CheckOnlineChannel();",
"pull");
614 CAgent::RemoveAgent(
"CPullChannel::CheckExpireAgent();",
"pull");
615 CAgent::RemoveAgent(
"CPullWatch::CheckExpireAgent();",
"pull");
616 CAgent::RemoveAgent(
"CPushManager::SendAgent();",
"pull");
628 if (defined(
'PULL_USER_ID'))
632 else if (is_object(
$GLOBALS[
'USER']) && intval(
$GLOBALS[
'USER']->GetID()) > 0)
636 else if (
IsModuleInstalled(
'statistic') && intval($_SESSION[
"SESS_SEARCHER_ID"] ?? 0) <= 0 && intval($_SESSION[
"SESS_GUEST_ID"] ?? 0) > 0 && COption::GetOptionString(
"pull",
"guest", self::GetDefaultOption(
"guest")) ==
'Y')
638 $userId = intval($_SESSION[
"SESS_GUEST_ID"])*-1;
641 if (!defined(
'BX_PULL_SKIP_INIT') && !(isset(
$_REQUEST[
'AJAX_CALL']) &&
$_REQUEST[
'AJAX_CALL'] ==
'Y') &&
$userId != 0 && CModule::IncludeModule(
'pull'))
643 define(
"BX_PULL_SKIP_INIT",
true);
647 Asset::getInstance()->addString(
'<script>BX.bind(window, "load", function(){BX.PULL.start();});</script>');
if(empty( $fields)) foreach($fields as $field) $channelId
if(!is_object($USER)||! $USER->IsAuthorized()) $userId
static getServerVersion()
static GetWebSocketUrl($channelId="")
static SetSignatureKey($signature)
static SetPushStatus($flag="N")
static SetSharedWorkerAllowed(bool $val)
static GetCommandPerHit()
static SetQueueServerHeaders($flag="Y")
static SetExcludeSites($sites)
static SetCommandPerHit($count)
const MAX_CHANNELS_PER_REQUEST
static SetConfigTimestamp($timestamp=0)
static GetWebSocketSecureUrl($channelId="")
static SetPublishWebSecureUrl($path="")
static GetDependentModule()
static GetPublishUrl($channelId="")
static GetMaxMessagesPerRequest()
static SetQueueServerStatus($flag="N")
static GetPublishWebUrl($channelId="")
static IsProtobufSupported()
static SetGuestStatus($flag="N")
static IsSharedWorkerAllowed()
static GetQueueServerVersion()
static SetWebSocket($flag="N")
static GetQueueServerStatus()
static $optionDefaultConfig
static GetSignatureAlgorithm()
static GetQueueServerMode()
static $optionDefaultModule
static SetQueueServerMode($mode)
static SetListenSecureUrl($path="")
static SetPublishWebUrl($path="")
static IsProtobufEnabled()
static GetListenUrl($channelId="")
static GetPublishWebSecureUrl($channelId="")
static SetJsonRpcUrl($path="")
static GetPublishWebEnabled()
static SetWebSocketSecureUrl($path="")
static GetWebSocketStatus()
static GetDefaultOption($optionName)
static SetPublishUrl($path="")
static SetWebSocketUrl($path="")
static CheckNeedRun($bGetSectionStatus=true)
static GetPushMessagePerHit()
static GetMaxChannelsPerRequest()
static GetListenSecureUrl($channelId="")
const SERVER_MODE_PERSONAL
static SetPushMessagePerHit($count)
const MAX_MESSAGES_PER_REQUEST
static SetNginxStatus($flag="N")
static SetListenUrl($path="")
static SetQueueServerVersion($version)
static GetConfigTimestamp()
static GetQueueServerHeaders()
$_SERVER["DOCUMENT_ROOT"]
ExecuteModuleEventEx($arEvent, $arParams=[])
IsModuleInstalled($module_id)
GetModuleEvents($MODULE_ID, $MESSAGE_ID, $bReturnArray=false)
if(file_exists($_SERVER["DOCUMENT_ROOT"]."/bitrix/php_interface/pull.php")) $pull_default_option
while($site=$dbSites->Fetch()) $arExcludeSites
$GLOBALS['_____370096793']