Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
eventoffline.php
1<?php
2namespace Bitrix\Rest;
3
7
39class EventOfflineTable extends Main\Entity\DataManager
40{
41 const PROCESS_ID_LIFETIME = 2952000; // 30 days
42 private const OFFLINE_EVENT_DEFAULT_TIMEOUT = 1;
43 private const OFFLINE_EVENT_CACHE_PREFIX = 'OFFLINE_EVENT_TIMEOUT';
44
45 private static $isSendOfflineEvent = false;
46
52 public static function getTableName()
53 {
54 return 'b_rest_event_offline';
55 }
56
62 public static function getMap()
63 {
64 return array(
65 'ID' => array(
66 'data_type' => 'integer',
67 'primary' => true,
68 'autocomplete' => true,
69 ),
70 'TIMESTAMP_X' => array(
71 'data_type' => 'datetime',
72 ),
73 'MESSAGE_ID' => array(
74 'data_type' => 'string',
75 'required' => true,
76 ),
77 'APP_ID' => array(
78 'data_type' => 'integer',
79 'required' => true,
80 ),
81 'EVENT_NAME' => array(
82 'data_type' => 'string',
83 'required' => true,
84 ),
85 'EVENT_DATA' => array(
86 'data_type' => 'text',
87 'serialized' => true,
88 ),
89 'EVENT_ADDITIONAL' => array(
90 'data_type' => 'text',
91 'serialized' => true,
92 ),
93 'PROCESS_ID' => array(
94 'data_type' => 'string',
95 'default_value' => '',
96 ),
97 'CONNECTOR_ID' => array(
98 'data_type' => 'string',
99 'default_value' => '',
100 ),
101 'ERROR' => array(
102 'data_type' => 'integer',
103 'default_value' => 0,
104 ),
105 );
106 }
107
108 public static function cleanProcessAgent()
109 {
110 $connection = Main\Application::getConnection();
111
112 $tableName = static::getTableName();
113 $dateTime = $connection->getSqlHelper()->addSecondsToDateTime('-' . static::PROCESS_ID_LIFETIME);
114
115 $sql = "DELETE FROM {$tableName} WHERE PROCESS_ID<>'' AND TIMESTAMP_X<{$dateTime}";
116
117 $connection->query($sql);
118
119 return "\\Bitrix\\Rest\\EventOfflineTable::cleanProcessAgent();";
120 }
121
122 public static function callEvent($fields)
123 {
124 if(!isset($fields['CONNECTOR_ID']))
125 {
126 $fields['CONNECTOR_ID'] = '';
127 }
128
129 $addFields = array(
130 'TIMESTAMP_X' => new Main\Type\DateTime(),
131 'MESSAGE_ID' => static::getMessageId($fields),
132 'APP_ID' => $fields['APP_ID'],
133 'EVENT_NAME' => $fields['EVENT_NAME'],
134 'EVENT_DATA' => serialize($fields['EVENT_DATA']),
135 'EVENT_ADDITIONAL' => serialize($fields['EVENT_ADDITIONAL']),
136 'CONNECTOR_ID' => $fields['CONNECTOR_ID'],
137 );
138
139 $updateFields = array(
140 'TIMESTAMP_X' => new Main\Type\DateTime(),
141 'EVENT_DATA' => serialize($fields['EVENT_DATA']),
142 'EVENT_ADDITIONAL' => serialize($fields['EVENT_ADDITIONAL']),
143 );
144
145 if(array_key_exists('ERROR', $fields))
146 {
147 $addFields['ERROR'] = intval($fields['ERROR']) > 0 ? 1 : 0;
148 $updateFields['ERROR'] = intval($fields['ERROR']) > 0 ? 1 : 0;
149 }
150
151 $connection = Main\Application::getConnection();
152 $queries = $connection->getSqlHelper()->prepareMerge(
153 static::getTableName(),
154 array('MESSAGE_ID', 'APP_ID', 'CONNECTOR_ID', 'PROCESS_ID'),
155 $addFields,
156 $updateFields
157 );
158
159 foreach($queries as $query)
160 {
161 $connection->queryExecute($query);
162 }
163 }
164
165 public static function markEvents($filter, $order, $limit)
166 {
167 $processId = static::getProcessId();
168
169 $limit = intval($limit);
170 $query = new EventOfflineQuery(static::getEntity());
171 $query->setOrder($order);
172 $query->setLimit($limit);
173
174 if (is_array($filter))
175 {
176 foreach ($filter as $key => $value)
177 {
178 $matches = [];
179 if (preg_match('/^([\W]{1,2})(.+)/', $key, $matches) && $matches[0] === $key)
180 {
181 if (
182 !is_string($matches[2])
183 || !is_string($matches[1])
184 )
185 {
186 throw new ArgumentTypeException('FILTER_KEYS', 'string');
187 }
188 if (is_array($value) || is_object($value))
189 {
190 throw new ArgumentTypeException($key);
191 }
192 $query->where(
193 $matches[2],
194 $matches[1],
195 $value
196 );
197 }
198 else
199 {
200 if (!is_string($key))
201 {
202 throw new ArgumentTypeException('FILTER_KEYS', 'string');
203 }
204 if (is_array($value) || is_object($value))
205 {
206 throw new ArgumentTypeException($key);
207 }
208 $query->where(
209 $key,
210 $value
211 );
212 }
213 }
214 }
215
216 $sql = $query->getMarkQuery($processId);
217
218 Main\Application::getConnection()->query($sql);
219
220 return $processId;
221 }
222
223 public static function clearEvents($processId, $appId, $connectorId, $listIds = false)
224 {
225 $connection = Main\Application::getConnection();
226
227 $tableName = static::getTableName();
228 $processId = $connection->getSqlHelper()->forSql($processId);
229 $appId = intval($appId);
230 $connectorId = $connection->getSqlHelper()->forSql($connectorId);
231
232 $sql = "DELETE FROM {$tableName} WHERE PROCESS_ID='{$processId}' AND APP_ID='{$appId}' AND CONNECTOR_ID='{$connectorId}'";
233
234 if($listIds !== false)
235 {
236 array_map('intval', $listIds);
237 $sql .= " AND ID IN ('".implode("', '", $listIds)."')";
238 }
239
240 $connection->query($sql);
241 }
242
243 public static function clearEventsByMessageId($processId, $appId, $connectorId, $listIds = false)
244 {
245 $connection = Main\Application::getConnection();
246 $helper = $connection->getSqlHelper();
247
248 $tableName = static::getTableName();
249 $processId = $connection->getSqlHelper()->forSql($processId);
250 $appId = intval($appId);
251 $connectorId = $connection->getSqlHelper()->forSql($connectorId);
252
253 $sql = "DELETE FROM {$tableName} WHERE PROCESS_ID='{$processId}' AND APP_ID='{$appId}' AND CONNECTOR_ID='{$connectorId}'";
254
255 if($listIds !== false)
256 {
257 foreach($listIds as $key => $id)
258 {
259 $listIds[$key] = $helper->forSql($id);
260 }
261
262 $sql .= " AND MESSAGE_ID IN ('".implode("', '", $listIds)."')";
263 }
264
265 $connection->query($sql);
266 }
267
268 public static function markError($processId, $appId, $connectorId, array $listIds)
269 {
270 if(count($listIds) > 0)
271 {
272 $connection = Main\Application::getConnection();
273 $helper = $connection->getSqlHelper();
274
275 foreach($listIds as $key => $id)
276 {
277 $listIds[$key] = $helper->forSql($id);
278 }
279
280 $queryWhere = array(
281 "APP_ID='".intval($appId)."'",
282 "CONNECTOR_ID='".$helper->forSql($connectorId)."'",
283 "MESSAGE_ID IN ('".implode("', '", $listIds)."')",
284 );
285
286 $sqlTable = static::getTableName();
287 $sqlWhere = implode(" AND ", $queryWhere);
288 $sqlProcessId = $helper->forSql($processId);
289
290 $sql = array();
291 $sql[] = "DELETE FROM {$sqlTable} WHERE {$sqlWhere} AND ERROR=0 AND PROCESS_ID <> '{$sqlProcessId}'";
292 $sql[] = "UPDATE {$sqlTable} SET ERROR=1, PROCESS_ID=IF(PROCESS_ID='{$sqlProcessId}', '', 'fake_process_id') WHERE {$sqlWhere} AND ERROR=0 ORDER BY PROCESS_ID ASC";
293 $sql[] = "DELETE FROM {$sqlTable} WHERE {$sqlWhere} AND PROCESS_ID='fake_process_id'";
294
295 foreach($sql as $query)
296 {
297 $connection->query($query);
298 }
299 }
300 }
301
302 protected static function getProcessId()
303 {
304 return Main\Security\Random::getString(32);
305 }
306
307 protected static function getMessageId($fields)
308 {
309 return isset($fields['MESSAGE_ID']) ? $fields['MESSAGE_ID'] : md5($fields['EVENT_NAME'].'|'.Main\Web\Json::encode($fields['EVENT_DATA']));
310 }
311
312 public static function checkSendTime($id, $timeout = null) : bool
313 {
314 $result = false;
315 $timeout = !is_null($timeout) ? (int)$timeout : static::OFFLINE_EVENT_DEFAULT_TIMEOUT;
316
317 if ($timeout > 0)
318 {
319 $key = static::OFFLINE_EVENT_CACHE_PREFIX. '|' . $id . '|'. $timeout;
320 $cache = Cache::createInstance();
321 if ($cache->initCache($timeout, $key))
322 {
323 $result = false;
324 }
325 elseif ($cache->startDataCache())
326 {
327 $result = true;
328 $data = 1;
329 $cache->endDataCache($data);
330 }
331 }
332 elseif (static::$isSendOfflineEvent === false)
333 {
334 static::$isSendOfflineEvent = true;
335 $result = true;
336 }
337
338 return $result;
339 }
340
341 public static function prepareOfflineEvent($params, $handler)
342 {
343 $data = reset($params);
344 if (!is_array($data['APP_LIST']) || !in_array((int) $handler['APP_ID'], $data['APP_LIST'], true))
345 {
346 throw new RestException('Wrong application.');
347 }
348
349 $timeout = $handler['OPTIONS']['minTimeout'] ?? null;
350 if (!static::checkSendTime($handler['ID'], $timeout))
351 {
352 throw new RestException('Time is not up.');
353 }
354
355 return null;
356 }
357}
static clearEvents($processId, $appId, $connectorId, $listIds=false)
static checkSendTime($id, $timeout=null)
static clearEventsByMessageId($processId, $appId, $connectorId, $listIds=false)
static prepareOfflineEvent($params, $handler)
static markError($processId, $appId, $connectorId, array $listIds)
static markEvents($filter, $order, $limit)