42 private ?
int $groupStateId;
46 private $dataFilter = [];
47 private const PER_PAGE = 100000;
48 private const MINIMAL_PER_PAGE = 500;
49 private const SEGMENT_TABLE =
'sender_segment_data';
50 private const CONNECTOR_ENTITY = [
51 'crm_client' =>
'CONTACT',
55 private const SEGMENT_LOCK_KEY =
'segment_lock_';
56 private const SEGMENT_DATA_LOCK_KEY =
'segment_data_lock_';
59 private static $isSent = [];
72 ?
int $groupStateId =
null
75 $this->groupId = $groupId;
76 $this->filterId = $filterId;
77 $this->endpoint = $endpoint;
78 $this->groupStateId = $groupStateId;
81 private static function checkBlockers()
84SELECT b.ID, b.GROUP_ID
85FROM b_sender_group_state b
87 SELECT GROUP_ID, FILTER_ID
88 FROM b_sender_group_state
89 GROUP BY GROUP_ID, FILTER_ID
91) d ON b.GROUP_ID = d.GROUP_ID AND b.FILTER_ID = d.FILTER_ID;
94 $dbResult = \Bitrix\Main\Application::getConnection()->query($query);
96 while ($row = $dbResult->fetch()) {
97 $groupId = $row[
'GROUP_ID'];
98 if (in_array($groupId, $groups))
103 GroupStateTable::delete($id);
104 $groups[] = $groupId;
105 Runtime\SegmentDataClearJob::addEventAgent($groupId);
117 $groupState = GroupStateTable::getList(
119 'filter' => $this->groupStateId
121 '=ID' => $this->groupStateId
124 '=FILTER_ID' => $this->filterId,
125 '=GROUP_ID' => $this->groupId,
141 return GroupStateTable::getList(
144 '=GROUP_ID' => $this->groupId,
168 if (!static::checkEndpoint($this->endpoint))
174 'FILTER_ID' => $this->filterId,
175 'GROUP_ID' => $this->groupId,
176 'ENDPOINT' => json_encode(Encoding::convertEncoding($this->endpoint, SITE_CHARSET,
'utf-8')),
179 'NEW_CREATED' =>
true,
182 $dataToSet[
'ID'] = GroupStateTable::add($dataToSet)->getId();
194 'FILTER_ID' => $this->filterId,
195 'GROUP_ID' => $this->groupId,
196 'ENDPOINT' => json_encode($this->endpoint),
201 $dataToSet[
'ID'] = GroupStateTable::update($id, $dataToSet)->getId();
218 GroupStateTable::update(
221 'FILTER_ID' => $this->filterId,
222 'GROUP_ID' => $this->groupId,
240 GroupStateTable::update(
247 if (self::checkIsSegmentPrepared($this->groupId))
269 $states = GroupStateTable::getList([
271 '=GROUP_ID' => $groupId
276 foreach ($states as $state)
290 GroupTable::update($groupId, [
291 'fields' => [
'STATUS' => $currentState]
295 if (CModule::IncludeModule(
'im') && $prepared)
297 $mailings = LetterSegmentTable::getList([
300 'USER_ID' =>
'LETTER.CREATED_BY',
303 '=SEGMENT_ID' => $groupId,
304 '!=LETTER.STATUS' => LetterTable::STATUS_END
307 $group = GroupTable::getById($groupId)->fetchRaw();
309 foreach ($mailings as $mailing)
316 if (static::$isSent[$groupId][$mailing[
'USER_ID']])
321 LetterTable::update($mailing[
'ID'], [
322 'WAITING_RECIPIENT' =>
'N'
326 "NOTIFY_TYPE" => IM_NOTIFY_SYSTEM,
327 "NOTIFY_MODULE" =>
"sender",
328 "NOTIFY_EVENT" =>
"group_prepared",
329 "TO_USER_ID" => $mailing[
'USER_ID'],
330 "NOTIFY_TAG" =>
"SENDER|GROUP_PREPARED|" . $groupId .
"|" . $mailing[
'USER_ID'],
332 "SENDER_SEGMENT_BUILDER_GROUP_PREPARED",
334 "#SEGMENT_ID#" => $groupId,
335 "#SEGMENT_NAME#" => htmlspecialcharsbx($group[
'NAME'])
340 \CIMNotify::Add($messageFields);
341 static::$isSent[$groupId][$mailing[
'USER_ID']] = $mailing[
'USER_ID'];
360 GroupStateTable::update(
378 GroupStateTable::delete($groupStateId);
382 '=GROUP_ID' => $this->groupId,
383 '=FILTER_ID' => $this->filterId,
397 '=GROUP_ID' => $groupId,
401 Runtime\SegmentDataClearJob::addEventAgent($groupId);
423 while ($row = $data->
fetch())
426 'GROUP_ID' => $this->groupId,
427 'FILTER_ID' => $this->filterId,
428 'CRM_ENTITY_ID' => $row[
'CRM_ENTITY_ID'],
429 'NAME' => $row[
'NAME'],
430 'CRM_ENTITY_TYPE_ID' => $row[
'CRM_ENTITY_TYPE_ID'],
431 'CRM_ENTITY_TYPE' => $row[
'CRM_ENTITY_TYPE'],
432 'CONTACT_ID' => $row[
'CRM_CONTACT_ID'],
433 'COMPANY_ID' => $row[
'CRM_COMPANY_ID'],
434 'EMAIL' => $row[
'EMAIL'] ??
null,
435 'IM' => $row[
'IM'] ??
null,
436 'PHONE' => $row[
'PHONE'] ??
null,
437 'HAS_EMAIL' => $row[
'EMAIL'] ?
'Y' :
'N',
438 'HAS_IMOL' => $row[
'IM'] ?
'Y' :
'N',
439 'HAS_PHONE' => $row[
'PHONE'] ?
'Y' :
'N',
440 'SENDER_TYPE_ID' => $this->detectSenderType($row),
442 $detectedTypes = $this->detectSenderTypes($row);
444 foreach ($detectedTypes as $type)
446 if (!isset($rowsDataCounter[$this->groupId][$type]))
448 $rowsDataCounter[$this->groupId][$type] = 0;
450 $rowsDataCounter[$this->groupId][$type]++;
454 if ($counter === self::MINIMAL_PER_PAGE)
456 SegmentDataTable::addMulti($rows,
true);
464 $this->updateCounters($rowsDataCounter);
465 SegmentDataTable::addMulti($rows,
true);
478 public function buildData($perPage =
null): bool
480 if (!$this->connectorIterable())
497 $connector = Connector\Manager::getConnector($this->endpoint);
500 $connector->setFieldValues($this->endpoint[
'FIELDS']);
502 $lastId =
$connector->getEntityLimitInfo()[
'lastId'];
505 $threadStrategy = Runtime\Env::getGroupThreadContext();
507 $threadStrategy->setGroupStateId($groupState[
'ID']);
509 $threadState = $threadStrategy->checkThreads();
517 $threadStrategy->fillThreads();
520 $threadStrategy->setPerPage(self::PER_PAGE);
524 || $threadStrategy->isProcessLimited()
530 $offset = $threadStrategy->getOffset();
531 if (!
Locker::lock(self::SEGMENT_DATA_LOCK_KEY, $this->groupId))
536 if ($offset < $lastId)
538 $limit = $offset + self::PER_PAGE;
545 $threadStrategy->updateStatus(GroupThreadTable::STATUS_NEW);
550 if ($threadStrategy->getThreadId() < $threadStrategy->lastThreadId())
552 $threadStrategy->updateStatus(GroupThreadTable::STATUS_DONE);
556 $threadStrategy->updateStatus(GroupThreadTable::STATUS_NEW);
558 if (!$threadStrategy->finalize())
574 if (!$this->connectorIterable())
582 if (isset($groupState[
'NEW_CREATED'])
584 && ($groupState[
'ENDPOINT'] !== json_encode($this->endpoint) || $rebuild))
588 GroupCounterTable::deleteByGroupId($this->groupId);
594 GroupTable::update($this->groupId, [
598 $result =
self::run($groupState[
'ID'], self::MINIMAL_PER_PAGE);
603 return $result !==
'';
613 $query = SegmentDataTable::query();
616 '=GROUP_ID' => $this->groupId,
617 '=FILTER_ID' => $this->filterId,
628 'CRM_ENTITY_TYPE_ID',
638 private function prepareEntityTypeFilter($type)
642 return [
'=HAS_EMAIL' =>
'Y'];
644 return [
'=HAS_IMOL' =>
'Y'];
646 return [
'=HAS_PHONE' =>
'Y'];
647 case Type::CRM_COMPANY_ID:
648 return [
'!=COMPANY_ID' =>
null];
649 case Type::CRM_CONTACT_ID:
650 return [
'!=CONTACT_ID' =>
null];
651 case Type::CRM_DEAL_PRODUCT_CONTACT_ID:
652 case Type::CRM_ORDER_PRODUCT_CONTACT_ID:
653 case Type::CRM_DEAL_PRODUCT_COMPANY_ID:
654 case Type::CRM_ORDER_PRODUCT_COMPANY_ID:
662 $this->dataFilter = [];
669 'SENDER_RECIPIENT_TYPE_ID' =>
"",
672 foreach ($filter as $key => $filterValue)
674 if (!isset($whiteList[$key]) || $filterValue ==
"undefined")
679 if ($key ===
'SENDER_RECIPIENT_TYPE_ID')
681 $type = $this->prepareEntityTypeFilter($filterValue);
684 $this->dataFilter[] = $this->prepareEntityTypeFilter($filterValue);
690 $this->dataFilter[$whiteList[$key]] = $filterValue;
704 'CRM_COMPANY_ID' =>
'COMPANY_ID',
705 'CRM_CONTACT_ID' =>
'CONTACT_ID',
707 'filter' => $this->prepareFilter($useFilterId),
712 $params[
'limit'] =
$nav->getLimit();
713 $params[
'offset'] =
$nav->getOffset();
716 return SegmentDataTable::getList($params);
719 private function prepareFilter(
bool $useFilterId =
true)
722 $filter[
'=GROUP_ID'] = $this->groupId;
726 $filter[
'=FILTER_ID'] = $this->filterId;
729 if ($this->dataFilter)
731 $filter = array_merge($this->dataFilter, $filter);
743 $connector = Connector\Manager::getConnector($this->endpoint);
745 $personalizeList = array();
746 $personalizeListTmp =
$connector->getPersonalizeList();
747 foreach($personalizeListTmp as $tag)
749 if(!empty($tag[
'ITEMS']))
751 foreach ($tag[
'ITEMS'] as $item)
753 $personalizeList[$item[
'CODE']] = $item[
'CODE'];
757 if(strlen($tag[
'CODE']) > 0)
759 $personalizeList[] = $tag[
'CODE'];
764 $result->setFilterFields($personalizeList);
765 $result->setDataTypeId(
$connector->getDataTypeId());
776 return SegmentDataTable::getCount($this->prepareFilter($useFilterId));
779 private function connectorIterable()
781 $connector = Connector\Manager::getConnector($this->endpoint);
795 public static function run($groupStateId, $perPage =
null)
797 $groupState = GroupStateTable::getById($groupStateId)->fetch();
799 if (!$groupState[
'FILTER_ID'])
801 GroupStateTable::update(
808 if ($groupState[
'GROUP_ID'])
816 (
int)$groupState[
'GROUP_ID'],
817 $groupState[
'FILTER_ID'],
818 json_decode($groupState[
'ENDPOINT'],
true),
822 if (!$segmentBuilder->buildData($perPage))
846 json_decode($groupState[
'ENDPOINT'],
true)
862 if (CModule::IncludeModule(
'pull'))
864 \CPullWatch::AddToStack(
865 self::FILTER_COUNTER_TAG,
867 'module_id' =>
'sender',
868 'command' =>
'updateFilterCounter',
870 'groupId' => $this->groupId,
871 'filterId' => $this->filterId,
872 'count' => $counter->getArray(),
873 'state' => $groupState[
'STATE'],
892 $connectors = GroupConnectorTable::getList(
894 'filter' => [
'=GROUP_ID' => $this->groupId]
898 foreach ($connectors as $dbConnector)
900 $endpoint = $dbConnector[
'ENDPOINT'];
905 $this->filterId = $endpoint[
'FILTER_ID'] ??
'sender_crm_client_--filter--crmclient--';
909 $connector->setFieldValues($endpoint[
'FIELDS']);
911 $counters[] = self::CONNECTOR_ENTITY[
$connector->getCode()] ?
933 $this->endpoint = $endpoint;
946 public static function actualize(
int $groupId,
bool $rebuild =
false)
948 $states = GroupStateTable::getList([
950 '=GROUP_ID' => $groupId
954 $connectors = GroupConnectorTable::getList([
960 '=GROUP_ID' => $groupId
968 if (!static::checkEndpoint(
$connector[
'ENDPOINT']))
973 $entityConnector = \Bitrix\Sender\Connector\Manager::getConnector(
$connector[
'ENDPOINT']);
977 $connector[
'FILTER_ID'] = $entityConnector->getUiFilterId();
993 foreach ($states as $state)
995 if ($state[
'FILTER_ID'] ===
$connector[
'FILTER_ID'])
1005 $dataBuilder->prepareForAgent(
true);
1006 $dataBuilder =
null;
1010 foreach ($states as $state)
1012 $endpoint = json_decode($state[
'ENDPOINT'],
true);
1015 if (!static::checkEndpoint($endpoint))
1017 $dataBuilder->clearBuilding($state[
'ID']);
1021 if (!in_array($state[
'FILTER_ID'], $usedFilters))
1023 $dataBuilder->clearBuilding($state[
'ID']);
1026 if ($endpoints[$state[
'FILTER_ID']] && $endpoints[$state[
'FILTER_ID']] !== $endpoint)
1028 $dataBuilder->setEndpoint($endpoints[$state[
'FILTER_ID']]);
1029 $dataBuilder->prepareForAgent();
1034 $dataBuilder->prepareForAgent(
true);
1037 $dataBuilder =
null;
1043 private static function checkEndpoint(?array $endpoint): bool
1045 return $endpoint && isset($endpoint[
'FIELDS']) && !empty($endpoint[
'FIELDS']);
1050 $groupStateList = GroupStateTable::getList([
1064 while ($groupState = $groupStateList->fetch())
1067 (
int)$groupState[
'GROUP_ID'],
1068 $groupState[
'FILTER_ID'],
1069 json_decode($groupState[
'ENDPOINT'],
true)
1072 $segmentBuilder->buildData();
1079 self::checkBlockers();
1080 $groupStateList = GroupStateTable::getList([
1094 while ($groupState = $groupStateList->fetch())
1099 $groupList = GroupTable::getList([
1108 while ($group = $groupList->fetch())
1113 return '\\Bitrix\Sender\\Posting\\SegmentDataBuilder::checkNotCompleted();';
1116 private function detectSenderType(array $row)
1118 if (isset($row[
'PROD_CRM_ORDER_ID']) && $row[
'PROD_CRM_ORDER_ID']
1119 && isset($row[
'CRM_ENTITY_TYPE_ID']) && $row[
'CRM_ENTITY_TYPE_ID'] == 5)
1120 return Type::CRM_ORDER_PRODUCT_CONTACT_ID;
1121 if (isset($row[
'CRM_ENTITY_TYPE_ID']) && $row[
'CRM_ENTITY_TYPE_ID'] == 5)
1122 return Type::CRM_CONTACT_ID;
1123 if (isset($row[
'SGT_DEAL_ID']) && isset($row[
'CRM_ENTITY_TYPE_ID']) && $row[
'CRM_ENTITY_TYPE_ID'] == 5)
1124 return Type::CRM_DEAL_PRODUCT_CONTACT_ID;
1126 if (isset($row[
'PROD_CRM_ORDER_ID']) && isset($row[
'CRM_ENTITY_TYPE_ID']) && $row[
'CRM_ENTITY_TYPE_ID'] == 4)
1127 return Type::CRM_ORDER_PRODUCT_COMPANY_ID;
1128 if (isset($row[
'CRM_ENTITY_TYPE_ID']) && $row[
'CRM_ENTITY_TYPE_ID'] == 4)
1129 return Type::CRM_COMPANY_ID;
1130 if (isset($row[
'SGT_DEAL_ID']) && isset($row[
'CRM_ENTITY_TYPE_ID']) && $row[
'CRM_ENTITY_TYPE_ID'] == 4)
1131 return Type::CRM_DEAL_PRODUCT_COMPANY_ID;
1133 if (isset($row[
'IM']))
1135 if (isset($row[
'EMAIL']))
1137 if (isset($row[
'PHONE']))
1143 private function detectSenderTypes(array $row)
1146 if (isset($row[
'PROD_CRM_ORDER_ID']) && isset($row[
'CRM_ENTITY_TYPE_ID']) && $row[
'CRM_ENTITY_TYPE_ID'] == 5)
1148 $types[] = Type::CRM_ORDER_PRODUCT_CONTACT_ID;
1150 if (isset($row[
'CRM_ENTITY_TYPE_ID']) && $row[
'CRM_ENTITY_TYPE_ID'] == 5)
1152 $types[] = Type::CRM_CONTACT_ID;
1154 if (isset($row[
'SGT_DEAL_ID']) && isset($row[
'CRM_ENTITY_TYPE_ID']) && $row[
'CRM_ENTITY_TYPE_ID'] == 5)
1156 $types[] = Type::CRM_DEAL_PRODUCT_CONTACT_ID;
1159 if (isset($row[
'PROD_CRM_ORDER_ID']) && isset($row[
'CRM_ENTITY_TYPE_ID']) && $row[
'CRM_ENTITY_TYPE_ID'] == 4)
1161 $types[] = Type::CRM_ORDER_PRODUCT_COMPANY_ID;
1163 if (isset($row[
'CRM_ENTITY_TYPE_ID']) &&$row[
'CRM_ENTITY_TYPE_ID'] == 4)
1165 $types[] = Type::CRM_COMPANY_ID;
1167 if (isset($row[
'SGT_DEAL_ID']) && isset($row[
'CRM_ENTITY_TYPE_ID']) &&$row[
'CRM_ENTITY_TYPE_ID'] == 4)
1169 $types[] = Type::CRM_DEAL_PRODUCT_COMPANY_ID;
1172 if ($row[
'CRM_ENTITY_TYPE_ID'] === Type::CRM_LEAD_ID)
1174 $types[] = Type::CRM_LEAD_ID;
1177 if (isset($row[
'IM']))
1179 $types[] = Type::IM;
1181 if (isset($row[
'EMAIL']))
1183 $types[] = Type::EMAIL;
1185 if (isset($row[
'PHONE']))
1187 $types[] = Type::PHONE;
1193 private function updateCounters(array $rowsDataCounter)
1195 if (!
Locker::lock(self::SEGMENT_LOCK_KEY, $this->groupId))
1200 $counter = GroupCounterTable::getList([
1202 'GROUP_ID',
'TYPE_ID',
'CNT'
1205 '=GROUP_ID' => $this->groupId
1209 while ($item = $counter->fetch())
1211 if (!isset($rowsDataCounter[$item[
'GROUP_ID']][$item[
'TYPE_ID']]))
1213 $rowsDataCounter[$item[
'GROUP_ID']][$item[
'TYPE_ID']] = $item[
'CNT'];
1216 $rowsDataCounter[$item[
'GROUP_ID']][$item[
'TYPE_ID']] += $item[
'CNT'];
1219 GroupCounterTable::deleteByGroupId($this->groupId);
1220 foreach ($rowsDataCounter as $groupId => $dataCounter)
1222 foreach ($dataCounter as $typeId => $count)
1224 GroupCounterTable::add(array(
1225 'GROUP_ID' => $groupId,
1226 'TYPE_ID' => $typeId,
1239 $counter =
new Connector\DataCounter($rowsDataCounter);
1241 if (CModule::IncludeModule(
'pull'))
1243 \CPullWatch::AddToStack(
1244 self::FILTER_COUNTER_TAG,
1246 'module_id' =>
'sender',
1247 'command' =>
'updateFilterCounter',
1249 'groupId' => $this->groupId,
1250 'filterId' => $this->filterId,
1251 'count' => $counter->getArray(),
1252 'state' => $groupState[
'STATE'],
fetch(\Bitrix\Main\Text\Converter $converter=null)
static getMessage($code, $replace=null, $language=null)
static lock(string $key, int $id)
static unlock(string $key, int $id)
__construct(int $groupId, string $filterId, array $endpoint=[], ?int $groupStateId=null)
static actualize(int $groupId, bool $rebuild=false)
getData(PageNavigation $nav=null, bool $useFilterId=true)