Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
queue.php
1<?php
9
11use \Bitrix\Main\Entity;
12use \Bitrix\Main\Localization\Loc;
13use \Bitrix\Main\Type\DateTime;
15
16Loc::loadMessages(__FILE__);
17
34class QueueTable extends Entity\DataManager
35{
36 const MODULE_ID = 'seo';
37 const PORTION_QUANTITY = 50;
38 const ACTION_IMPORT = 'IMP';
40 const ACTION_REMOVE = 'REM';
41 const ACTION_AUTO_REMOVE = 'ARM';
42
43 protected static $isAgentAdded = array();
44
45 public static function getTableName()
46 {
47 return 'b_seo_service_rtg_queue';
48 }
49
50 public static function getMap()
51 {
52 $fieldsMap = array(
53 'ID' => array(
54 'data_type' => 'integer',
55 'primary' => true,
56 'autocomplete' => true,
57 ),
58 'DATE_INSERT' => array(
59 'data_type' => 'datetime',
60 'default_value' => new DateTime()
61 ),
62 'TYPE' => array(
63 'data_type' => 'string',
64 'required' => true,
65 ),
66 'CLIENT_ID' => array(
67 'data_type' => 'string',
68 ),
69 'ACCOUNT_ID' => array(
70 'data_type' => 'string',
71 ),
72 'AUDIENCE_ID' => array(
73 'data_type' => 'string',
74 'required' => true,
75 ),
76 'PARENT_ID' => array(
77 'data_type' => 'string',
78 ),
79 'CONTACT_TYPE' => array(
80 'data_type' => 'string',
81 'required' => true,
82 ),
83 'VALUE' => array(
84 'data_type' => 'string',
85 'required' => true,
86 ),
87 'ACTION' => array(
88 'data_type' => 'enum',
89 'values' => array(
90 self::ACTION_IMPORT,
91 self::ACTION_REMOVE,
92 self::ACTION_IMPORT_AND_AUTO_REMOVE,
93 self::ACTION_AUTO_REMOVE,
94 ),
95 'default_value' => self::ACTION_IMPORT_AND_AUTO_REMOVE,
96 'required' => true,
97 ),
98 'DATE_AUTO_REMOVE' => array(
99 'data_type' => 'datetime',
100 )
101 );
102
103 return $fieldsMap;
104 }
105
106 protected static function processQueueAutoRemoveAgentName()
107 {
108 return __CLASS__ . '::processQueueAutoRemoveAgent();';
109 }
110
111 protected static function getProcessQueueAgentName($type)
112 {
113 return __CLASS__ . '::processQueueAgent("' . $type . '");';
114 }
115
116 public static function processQueueAutoRemoveAgent()
117 {
118 $queueDb = static::getList(array(
119 'select' => array('ID'),
120 'filter' => array('=ACTION' => self::ACTION_AUTO_REMOVE),
121 'limit' => 1
122 ));
123 if ($queueDb->fetch())
124 {
125 $connection = Application::getConnection();
126 $sql = "UPDATE " . self::getTableName() . " " .
127 "SET ACTION='" . self::ACTION_REMOVE . "' " .
128 "WHERE ACTION='" . self::ACTION_AUTO_REMOVE . "' " .
129 "AND DATE_AUTO_REMOVE <= " . $connection->getSqlHelper()->getCurrentDateTimeFunction();
130 $connection->query($sql);
131 if ($connection->getAffectedRowsCount() > 0)
132 {
133 $types = Service::getTypes();
134 foreach ($types as $type)
135 {
136 static::addQueueAgent($type);
137 }
138 }
139
140 return static::processQueueAutoRemoveAgentName();
141 }
142 else
143 {
144 return '';
145 }
146 }
147
148 public static function processQueueAgent($type)
149 {
150 try
151 {
152 $hasQueue = static::processQueue($type);
153 }
154 catch(\Exception $e)
155 {
156 $hasQueue = false;
157 }
158
159 if (!$hasQueue)
160 {
161 return '';
162 }
163 else
164 {
165 return static::getProcessQueueAgentName($type);
166 }
167 }
168
169 protected static function processQueue($type)
170 {
171 $hasQueue = false;
172 $queryData = array();
173
174 $audience = Service::getAudience($type);
175 $maxQuantity = $audience->getMaxContactsPerPacket();
176 $maxQuantity = $maxQuantity > 1000 ? 1000 : $maxQuantity;
177 $queueDb = static::getList(array(
178 'filter' => array(
179 '=TYPE' => $type,
180 '=ACTION' => array(
181 self::ACTION_IMPORT,
182 self::ACTION_REMOVE,
183 self::ACTION_IMPORT_AND_AUTO_REMOVE,
184 )
185 ),
186 'limit' => $maxQuantity
187 ));
188 while ($queueItem = $queueDb->fetch())
189 {
190 $hasQueue = true;
191
192 $isRemove = $queueItem['ACTION'] == self::ACTION_REMOVE ? 'Y' : 'N';
193 $queryId = $queueItem['TYPE'];
194 $queryId .= '_' . $queueItem['PARENT_ID'];
195 $queryId .= '_' . $queueItem['CLIENT_ID'];
196 $queryId .= '_' . $queueItem['ACCOUNT_ID'];
197 $queryId .= '_' . $queueItem['AUDIENCE_ID'];
198 $queryId .= '_' . $isRemove;
199
200 if (!isset($queryData[$queryId]))
201 {
202 $queryData[$queryId] = array(
203 'CLIENT_ID' => $queueItem['CLIENT_ID'],
204 'ACCOUNT_ID' => $queueItem['ACCOUNT_ID'],
205 'AUDIENCE_ID' => $queueItem['AUDIENCE_ID'],
206 'PARENT_ID' => $queueItem['PARENT_ID'],
207 'IS_REMOVE' => $isRemove,
208 'CONTACTS' => array(),
209 'DELETE_ID_LIST' => array(),
210 'AUTO_REMOVE_ID_LIST' => array(),
211 );
212 }
213 $contactType = $queueItem['CONTACT_TYPE'];
214 if (!isset($queryData[$queryId]['CONTACTS'][$contactType]))
215 {
216 $queryData[$queryId]['CONTACTS'][$contactType] = array();
217 }
218
219 $queryData[$queryId]['CONTACTS'][$contactType][] = $queueItem['VALUE'];
220
221 if ($queueItem['ACTION'] == self::ACTION_IMPORT_AND_AUTO_REMOVE)
222 {
223 $queryData[$queryId]['AUTO_REMOVE_ID_LIST'][] = $queueItem['ID'];
224 }
225 else
226 {
227 $queryData[$queryId]['DELETE_ID_LIST'][] = $queueItem['ID'];
228 }
229 }
230
231 $lastClientId = null;
232 $service = null;
233 $authAdapter = Service::getAuthAdapter($type);
234 foreach ($queryData as $queryId => $query)
235 {
236 foreach ($query['CONTACTS'] as $contactType => $contacts)
237 {
238 $query['CONTACTS'][$contactType] = array_unique($contacts);
239 }
240
241 if ($lastClientId != $query['CLIENT_ID'] || !$service || !$authAdapter)
242 {
243 $lastClientId = $query['CLIENT_ID'];
244
245 $service = new Service();
246 $service->setClientId($lastClientId);
247 $authAdapter->setService($service);
248 }
249
250 $audience->setService($service);
251 $audience->getRequest()->setAuthAdapter($authAdapter);
252
253 $audience->disableQueueMode();
254 $audience->setAccountId($query['ACCOUNT_ID']);
255
256 $contactTypes = $audience->isSupportMultiTypeContacts() ? array('') : array_keys($query['CONTACTS']);
257 foreach ($contactTypes as $contactType)
258 {
259 if ($query['IS_REMOVE'] != 'Y')
260 {
261 $audienceImportResult = $audience->addContacts(
262 $query['AUDIENCE_ID'],
263 $query['CONTACTS'],
264 array(
265 'type' => $contactType
266 )
267 );
268 }
269 else
270 {
271 $audienceImportResult = $audience->deleteContacts(
272 $query['AUDIENCE_ID'],
273 $query['CONTACTS'],
274 array(
275 'type' => $contactType
276 )
277 );
278 }
279
280 if ($audienceImportResult->isSuccess())
281 {
282 if (!empty($query['DELETE_ID_LIST']))
283 {
284 $portions = self::divideListIntoPortions($query['DELETE_ID_LIST']);
285 foreach ($portions as $portion)
286 {
288 "DELETE FROM " . self::getTableName() . " WHERE ID IN (" . implode(',', $portion) . ")"
289 );
290 }
291 }
292
293 if (!empty($query['AUTO_REMOVE_ID_LIST']))
294 {
295 $portions = self::divideListIntoPortions($query['AUTO_REMOVE_ID_LIST']);
296 foreach ($portions as $portion)
297 {
299 "UPDATE " . self::getTableName() . " SET ACTION='" . self::ACTION_AUTO_REMOVE . "'" .
300 "WHERE ID IN (" . implode(',', $portion) . ")"
301 );
302 }
303 static::addQueueAutoRemoveAgent();
304 }
305 }
306 else
307 {
309 "DELETE FROM " . self::getTableName() .
310 " WHERE TYPE = '" . Application::getConnection()->getSqlHelper()->forSql($type) . "'" .
311 " AND ACTION in ('" . implode("', '", [self::ACTION_IMPORT, self::ACTION_IMPORT_AND_AUTO_REMOVE, self::ACTION_REMOVE]) . "')" .
312 " AND DATE_INSERT < '" . (new DateTime())->add('-1 day')->format("Y-m-d H:i:s") . "'"
313 );
314 }
315 }
316 }
317
318 return $hasQueue;
319 }
320
321 protected static function divideListIntoPortions($list)
322 {
323 $portions = array();
324
325 $deleteCount = count($list);
326 $portionCount = ceil($deleteCount / self::PORTION_QUANTITY);
327 $deleteNum = 0;
328 for ($portionNum = 0; $portionNum < $portionCount; $portionNum++)
329 {
330 $deleteList = array();
331 $deletePortionCount = ($portionNum + 1) * self::PORTION_QUANTITY;
332 for (; $deleteNum < $deletePortionCount; $deleteNum++)
333 {
334 if ($deleteNum >= $deleteCount)
335 {
336 break;
337 }
338 $deleteList[] = (int) $list[$deleteNum];
339 }
340
341 $portions[] = $deleteList;
342 }
343
344 return $portions;
345 }
346
347 protected static function addQueueAutoRemoveAgent()
348 {
349 if (isset(static::$isAgentAdded['sys.auto_remove']))
350 {
351 return;
352 }
353
354 $agentName = static::processQueueAutoRemoveAgentName();
355 $agent = new \CAgent();
356 $agentsDb = $agent->GetList(array("ID" => "DESC"), array(
357 "MODULE_ID" => self::MODULE_ID,
358 "NAME" => $agentName,
359 ));
360 if (!$agentsDb->Fetch())
361 {
362 $agent->AddAgent($agentName, self::MODULE_ID, "N", 86400, null, "Y", "");
363 }
364 }
365
366 protected static function addQueueAgent($type)
367 {
368 if (isset(static::$isAgentAdded[$type]))
369 {
370 return;
371 }
372
373 $agent = new \CAgent();
374 if ($type)
375 {
376 $agentName = static::getProcessQueueAgentName($type);
377 $agentsDb = $agent->GetList(array("ID" => "DESC"), array(
378 "MODULE_ID" => self::MODULE_ID,
379 "NAME" => $agentName,
380 ));
381 if (!$agentsDb->Fetch())
382 {
383 $interval = ($type == 'yandex' ? 900 : 30); // yandex queues must be processed rarely
384 $agent->AddAgent($agentName, self::MODULE_ID, "N", $interval, null, "Y", "");
385 }
386 }
387
388 static::$isAgentAdded[$type] = true;
389 }
390
391 public static function onAfterAdd(Entity\Event $event)
392 {
393 $fields = $event->getParameter('fields');
394 $type = $fields['TYPE'];
395 static::addQueueAgent($type);
396 }
397}
static getConnection($name="")
static loadMessages($file)
Definition loc.php:64
static onAfterAdd(Entity\Event $event)
Definition queue.php:391