42 private const OFFLINE_EVENT_DEFAULT_TIMEOUT = 1;
43 private const OFFLINE_EVENT_CACHE_PREFIX =
'OFFLINE_EVENT_TIMEOUT';
45 private static $isSendOfflineEvent =
false;
54 return 'b_rest_event_offline';
66 'data_type' =>
'integer',
68 'autocomplete' =>
true,
70 'TIMESTAMP_X' => array(
71 'data_type' =>
'datetime',
73 'MESSAGE_ID' => array(
74 'data_type' =>
'string',
78 'data_type' =>
'integer',
81 'EVENT_NAME' => array(
82 'data_type' =>
'string',
85 'EVENT_DATA' => array(
86 'data_type' =>
'text',
89 'EVENT_ADDITIONAL' => array(
90 'data_type' =>
'text',
93 'PROCESS_ID' => array(
94 'data_type' =>
'string',
95 'default_value' =>
'',
97 'CONNECTOR_ID' => array(
98 'data_type' =>
'string',
99 'default_value' =>
'',
102 'data_type' =>
'integer',
103 'default_value' => 0,
110 $connection = Main\Application::getConnection();
112 $tableName = static::getTableName();
113 $dateTime = $connection->getSqlHelper()->addSecondsToDateTime(
'-' . static::PROCESS_ID_LIFETIME);
115 $sql =
"DELETE FROM {$tableName} WHERE PROCESS_ID<>'' AND TIMESTAMP_X<{$dateTime}";
117 $connection->query($sql);
119 return "\\Bitrix\\Rest\\EventOfflineTable::cleanProcessAgent();";
124 if(!isset($fields[
'CONNECTOR_ID']))
126 $fields[
'CONNECTOR_ID'] =
'';
131 'MESSAGE_ID' => static::getMessageId($fields),
132 'APP_ID' => $fields[
'APP_ID'],
133 'EVENT_NAME' => $fields[
'EVENT_NAME'],
134 'EVENT_DATA' => serialize($fields[
'EVENT_DATA']),
135 'EVENT_ADDITIONAL' => serialize($fields[
'EVENT_ADDITIONAL']),
136 'CONNECTOR_ID' => $fields[
'CONNECTOR_ID'],
139 $updateFields = array(
141 'EVENT_DATA' => serialize($fields[
'EVENT_DATA']),
142 'EVENT_ADDITIONAL' => serialize($fields[
'EVENT_ADDITIONAL']),
145 if(array_key_exists(
'ERROR', $fields))
147 $addFields[
'ERROR'] = intval($fields[
'ERROR']) > 0 ? 1 : 0;
148 $updateFields[
'ERROR'] = intval($fields[
'ERROR']) > 0 ? 1 : 0;
151 $connection = Main\Application::getConnection();
152 $queries = $connection->getSqlHelper()->prepareMerge(
153 static::getTableName(),
154 array(
'MESSAGE_ID',
'APP_ID',
'CONNECTOR_ID',
'PROCESS_ID'),
159 foreach($queries as $query)
161 $connection->queryExecute($query);
167 $processId = static::getProcessId();
169 $limit = intval($limit);
171 $query->setOrder($order);
172 $query->setLimit($limit);
174 if (is_array($filter))
176 foreach ($filter as $key => $value)
179 if (preg_match(
'/^([\W]{1,2})(.+)/', $key, $matches) && $matches[0] === $key)
182 !is_string($matches[2])
183 || !is_string($matches[1])
188 if (is_array($value) || is_object($value))
200 if (!is_string($key))
204 if (is_array($value) || is_object($value))
216 $sql = $query->getMarkQuery($processId);
218 Main\Application::getConnection()->query($sql);
223 public static function clearEvents($processId, $appId, $connectorId, $listIds =
false)
225 $connection = Main\Application::getConnection();
227 $tableName = static::getTableName();
228 $processId = $connection->getSqlHelper()->forSql($processId);
229 $appId = intval($appId);
230 $connectorId = $connection->getSqlHelper()->forSql($connectorId);
232 $sql =
"DELETE FROM {$tableName} WHERE PROCESS_ID='{$processId}' AND APP_ID='{$appId}' AND CONNECTOR_ID='{$connectorId}'";
234 if($listIds !==
false)
236 array_map(
'intval', $listIds);
237 $sql .=
" AND ID IN ('".implode(
"', '", $listIds).
"')";
240 $connection->query($sql);
245 $connection = Main\Application::getConnection();
246 $helper = $connection->getSqlHelper();
248 $tableName = static::getTableName();
249 $processId = $connection->getSqlHelper()->forSql($processId);
250 $appId = intval($appId);
251 $connectorId = $connection->getSqlHelper()->forSql($connectorId);
253 $sql =
"DELETE FROM {$tableName} WHERE PROCESS_ID='{$processId}' AND APP_ID='{$appId}' AND CONNECTOR_ID='{$connectorId}'";
255 if($listIds !==
false)
257 foreach($listIds as $key => $id)
259 $listIds[$key] = $helper->forSql($id);
262 $sql .=
" AND MESSAGE_ID IN ('".implode(
"', '", $listIds).
"')";
265 $connection->query($sql);
268 public static function markError($processId, $appId, $connectorId, array $listIds)
270 if(count($listIds) > 0)
272 $connection = Main\Application::getConnection();
273 $helper = $connection->getSqlHelper();
275 foreach($listIds as $key => $id)
277 $listIds[$key] = $helper->forSql($id);
281 "APP_ID='".intval($appId).
"'",
282 "CONNECTOR_ID='".$helper->forSql($connectorId).
"'",
283 "MESSAGE_ID IN ('".implode(
"', '", $listIds).
"')",
286 $sqlTable = static::getTableName();
287 $sqlWhere = implode(
" AND ", $queryWhere);
288 $sqlProcessId = $helper->forSql($processId);
291 $sql[] =
"DELETE FROM {$sqlTable} WHERE {$sqlWhere} AND ERROR=0 AND PROCESS_ID <> '{$sqlProcessId}'";
292 $sql[] =
"UPDATE {$sqlTable} SET ERROR=1, PROCESS_ID=IF(PROCESS_ID='{$sqlProcessId}', '', 'fake_process_id') WHERE {$sqlWhere} AND ERROR=0 ORDER BY PROCESS_ID ASC";
293 $sql[] =
"DELETE FROM {$sqlTable} WHERE {$sqlWhere} AND PROCESS_ID='fake_process_id'";
295 foreach($sql as $query)
297 $connection->query($query);
304 return Main\Security\Random::getString(32);
309 return isset($fields[
'MESSAGE_ID']) ? $fields[
'MESSAGE_ID'] : md5($fields[
'EVENT_NAME'].
'|'.Main\Web\Json::encode($fields[
'EVENT_DATA']));
315 $timeout = !is_null($timeout) ? (int)$timeout : static::OFFLINE_EVENT_DEFAULT_TIMEOUT;
319 $key = static::OFFLINE_EVENT_CACHE_PREFIX.
'|' . $id .
'|'. $timeout;
320 $cache = Cache::createInstance();
321 if ($cache->initCache($timeout, $key))
325 elseif ($cache->startDataCache())
329 $cache->endDataCache($data);
332 elseif (static::$isSendOfflineEvent ===
false)
334 static::$isSendOfflineEvent =
true;
343 $data = reset($params);
344 if (!is_array($data[
'APP_LIST']) || !in_array((
int) $handler[
'APP_ID'], $data[
'APP_LIST'],
true))
349 $timeout = $handler[
'OPTIONS'][
'minTimeout'] ??
null;
350 if (!static::checkSendTime($handler[
'ID'], $timeout))
static clearEvents($processId, $appId, $connectorId, $listIds=false)
static checkSendTime($id, $timeout=null)
static cleanProcessAgent()
const PROCESS_ID_LIFETIME
static clearEventsByMessageId($processId, $appId, $connectorId, $listIds=false)
static prepareOfflineEvent($params, $handler)
static callEvent($fields)
static getMessageId($fields)
static markError($processId, $appId, $connectorId, array $listIds)
static markEvents($filter, $order, $limit)