118 $queueDb = static::getList(array(
119 'select' => array(
'ID'),
120 'filter' => array(
'=ACTION' => self::ACTION_AUTO_REMOVE),
123 if ($queueDb->fetch())
127 "SET ACTION='" . self::ACTION_REMOVE .
"' " .
128 "WHERE ACTION='" . self::ACTION_AUTO_REMOVE .
"' " .
129 "AND DATE_AUTO_REMOVE <= " . $connection->getSqlHelper()->getCurrentDateTimeFunction();
130 $connection->query($sql);
131 if ($connection->getAffectedRowsCount() > 0)
134 foreach ($types as $type)
136 static::addQueueAgent($type);
140 return static::processQueueAutoRemoveAgentName();
172 $queryData = array();
175 $maxQuantity = $audience->getMaxContactsPerPacket();
176 $maxQuantity = $maxQuantity > 1000 ? 1000 : $maxQuantity;
177 $queueDb = static::getList(array(
183 self::ACTION_IMPORT_AND_AUTO_REMOVE,
186 'limit' => $maxQuantity
188 while ($queueItem = $queueDb->fetch())
192 $isRemove = $queueItem[
'ACTION'] == self::ACTION_REMOVE ?
'Y' :
'N';
193 $queryId = $queueItem[
'TYPE'];
194 $queryId .=
'_' . $queueItem[
'PARENT_ID'];
195 $queryId .=
'_' . $queueItem[
'CLIENT_ID'];
196 $queryId .=
'_' . $queueItem[
'ACCOUNT_ID'];
197 $queryId .=
'_' . $queueItem[
'AUDIENCE_ID'];
198 $queryId .=
'_' . $isRemove;
200 if (!isset($queryData[$queryId]))
202 $queryData[$queryId] = array(
203 'CLIENT_ID' => $queueItem[
'CLIENT_ID'],
204 'ACCOUNT_ID' => $queueItem[
'ACCOUNT_ID'],
205 'AUDIENCE_ID' => $queueItem[
'AUDIENCE_ID'],
206 'PARENT_ID' => $queueItem[
'PARENT_ID'],
207 'IS_REMOVE' => $isRemove,
208 'CONTACTS' => array(),
209 'DELETE_ID_LIST' => array(),
210 'AUTO_REMOVE_ID_LIST' => array(),
213 $contactType = $queueItem[
'CONTACT_TYPE'];
214 if (!isset($queryData[$queryId][
'CONTACTS'][$contactType]))
216 $queryData[$queryId][
'CONTACTS'][$contactType] = array();
219 $queryData[$queryId][
'CONTACTS'][$contactType][] = $queueItem[
'VALUE'];
221 if ($queueItem[
'ACTION'] == self::ACTION_IMPORT_AND_AUTO_REMOVE)
223 $queryData[$queryId][
'AUTO_REMOVE_ID_LIST'][] = $queueItem[
'ID'];
227 $queryData[$queryId][
'DELETE_ID_LIST'][] = $queueItem[
'ID'];
231 $lastClientId =
null;
234 foreach ($queryData as $queryId => $query)
236 foreach ($query[
'CONTACTS'] as $contactType => $contacts)
238 $query[
'CONTACTS'][$contactType] = array_unique($contacts);
241 if ($lastClientId != $query[
'CLIENT_ID'] || !$service || !$authAdapter)
243 $lastClientId = $query[
'CLIENT_ID'];
246 $service->setClientId($lastClientId);
247 $authAdapter->setService($service);
250 $audience->setService($service);
251 $audience->getRequest()->setAuthAdapter($authAdapter);
253 $audience->disableQueueMode();
254 $audience->setAccountId($query[
'ACCOUNT_ID']);
256 $contactTypes = $audience->isSupportMultiTypeContacts() ? array(
'') : array_keys($query[
'CONTACTS']);
257 foreach ($contactTypes as $contactType)
259 if ($query[
'IS_REMOVE'] !=
'Y')
261 $audienceImportResult = $audience->addContacts(
262 $query[
'AUDIENCE_ID'],
265 'type' => $contactType
271 $audienceImportResult = $audience->deleteContacts(
272 $query[
'AUDIENCE_ID'],
275 'type' => $contactType
280 if ($audienceImportResult->isSuccess())
282 if (!empty($query[
'DELETE_ID_LIST']))
285 foreach ($portions as $portion)
288 "DELETE FROM " . self::getTableName() .
" WHERE ID IN (" . implode(
',', $portion) .
")"
293 if (!empty($query[
'AUTO_REMOVE_ID_LIST']))
296 foreach ($portions as $portion)
299 "UPDATE " . self::getTableName() .
" SET ACTION='" . self::ACTION_AUTO_REMOVE .
"'" .
300 "WHERE ID IN (" . implode(
',', $portion) .
")"
303 static::addQueueAutoRemoveAgent();
309 "DELETE FROM " . self::getTableName() .
311 " AND ACTION in ('" . implode(
"', '", [self::ACTION_IMPORT, self::ACTION_IMPORT_AND_AUTO_REMOVE, self::ACTION_REMOVE]) .
"')" .
312 " AND DATE_INSERT < '" . (
new DateTime())->add(
'-1 day')->format(
"Y-m-d H:i:s") .
"'"
325 $deleteCount = count($list);
326 $portionCount = ceil($deleteCount / self::PORTION_QUANTITY);
328 for ($portionNum = 0; $portionNum < $portionCount; $portionNum++)
330 $deleteList = array();
331 $deletePortionCount = ($portionNum + 1) * self::PORTION_QUANTITY;
332 for (; $deleteNum < $deletePortionCount; $deleteNum++)
334 if ($deleteNum >= $deleteCount)
338 $deleteList[] = (int) $list[$deleteNum];
341 $portions[] = $deleteList;
349 if (isset(static::$isAgentAdded[
'sys.auto_remove']))
354 $agentName = static::processQueueAutoRemoveAgentName();
355 $agent = new \CAgent();
356 $agentsDb = $agent->GetList(array(
"ID" =>
"DESC"), array(
357 "MODULE_ID" => self::MODULE_ID,
358 "NAME" => $agentName,
360 if (!$agentsDb->Fetch())
362 $agent->AddAgent($agentName, self::MODULE_ID,
"N", 86400,
null,
"Y",
"");
368 if (isset(static::$isAgentAdded[$type]))
373 $agent = new \CAgent();
376 $agentName = static::getProcessQueueAgentName($type);
377 $agentsDb = $agent->GetList(array(
"ID" =>
"DESC"), array(
378 "MODULE_ID" => self::MODULE_ID,
379 "NAME" => $agentName,
381 if (!$agentsDb->Fetch())
383 $interval = ($type ==
'yandex' ? 900 : 30);
384 $agent->AddAgent($agentName, self::MODULE_ID,
"N", $interval,
null,
"Y",
"");
388 static::$isAgentAdded[$type] =
true;