Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
segmentdatabuilder.php
1<?php
2
4
25use CModule;
28Loc::loadMessages(__FILE__);
29
31{
35 private $groupId;
36
40 private $filterId;
41
42 private ?int $groupStateId;
43
44 private $endpoint;
45
46 private $dataFilter = [];
47 private const PER_PAGE = 100000;
48 private const MINIMAL_PER_PAGE = 500;
49 private const SEGMENT_TABLE = 'sender_segment_data';
50 private const CONNECTOR_ENTITY = [
51 'crm_client' => 'CONTACT',
52 'crm_lead' => 'LEAD'
53 ];
54
55 private const SEGMENT_LOCK_KEY = 'segment_lock_';
56 private const SEGMENT_DATA_LOCK_KEY = 'segment_data_lock_';
57
58 public const FILTER_COUNTER_TAG = 'senderGroupFilterCounter';
59 private static $isSent = [];
60
68 public function __construct(
69 int $groupId,
70 string $filterId,
71 array $endpoint = [],
72 ?int $groupStateId = null
73 )
74 {
75 $this->groupId = $groupId;
76 $this->filterId = $filterId;
77 $this->endpoint = $endpoint;
78 $this->groupStateId = $groupStateId;
79 }
80
81 private static function checkBlockers()
82 {
83 $query = "
84SELECT b.ID, b.GROUP_ID
85FROM b_sender_group_state b
86INNER JOIN (
87 SELECT GROUP_ID, FILTER_ID
88 FROM b_sender_group_state
89 GROUP BY GROUP_ID, FILTER_ID
90 HAVING COUNT(*) > 1
91) d ON b.GROUP_ID = d.GROUP_ID AND b.FILTER_ID = d.FILTER_ID;
92";
93
94 $dbResult = \Bitrix\Main\Application::getConnection()->query($query);
95 $groups = [];
96 while ($row = $dbResult->fetch()) {
97 $groupId = $row['GROUP_ID'];
98 if (in_array($groupId, $groups))
99 {
100 continue;
101 }
102 $id = $row['ID'];
103 GroupStateTable::delete($id);
104 $groups[] = $groupId;
105 Runtime\SegmentDataClearJob::addEventAgent($groupId);
106 }
107 }
108
115 public function getCurrentGroupState()
116 {
117 $groupState = GroupStateTable::getList(
118 [
119 'filter' => $this->groupStateId
120 ? [
121 '=ID' => $this->groupStateId
122 ]
123 : [
124 '=FILTER_ID' => $this->filterId,
125 '=GROUP_ID' => $this->groupId,
126 ]
127 ]
128 )->fetch();
129
130
131 return $groupState ?: $this->createGroupState();
132 }
139 public function getAllStates()
140 {
141 return GroupStateTable::getList(
142 [
143 'filter' => [
144 '=GROUP_ID' => $this->groupId,
145 ]
146 ]
147 )->fetchAll();
148 }
149
154 public function isBuildingCompleted(): bool
155 {
156 $groupState = $this->getCurrentGroupState();
157
158 return !$groupState || (int)$groupState['STATE'] === (int)GroupStateTable::STATES['COMPLETED']
159 ;
160 }
161
166 public function createGroupState(): ?array
167 {
168 if (!static::checkEndpoint($this->endpoint))
169 {
170 return null;
171 }
172
173 $dataToSet = [
174 'FILTER_ID' => $this->filterId,
175 'GROUP_ID' => $this->groupId,
176 'ENDPOINT' => json_encode(Encoding::convertEncoding($this->endpoint, SITE_CHARSET, 'utf-8')),
177 'OFFSET' => 0,
178 'STATE' => GroupStateTable::STATES['CREATED'],
179 'NEW_CREATED' => true,
180 ];
181
182 $dataToSet['ID'] = GroupStateTable::add($dataToSet)->getId();
183
184 return $dataToSet;
185 }
186
191 public function resetGroupState(int $id)
192 {
193 $dataToSet = [
194 'FILTER_ID' => $this->filterId,
195 'GROUP_ID' => $this->groupId,
196 'ENDPOINT' => json_encode($this->endpoint),
197 'OFFSET' => 0,
198 'STATE' => GroupStateTable::STATES['CREATED'],
199 ];
200
201 $dataToSet['ID'] = GroupStateTable::update($id, $dataToSet)->getId();
202
203 return $dataToSet;
204 }
205
213 public function updateGroupStateOffset(int $offset)
214 {
215 $groupState = $this->getCurrentGroupState();
216 if ($groupState)
217 {
218 GroupStateTable::update(
219 $groupState['ID'],
220 [
221 'FILTER_ID' => $this->filterId,
222 'GROUP_ID' => $this->groupId,
223 'OFFSET' => $offset,
224 'STATE' => GroupStateTable::STATES['IN_PROGRESS'],
225 ]
226 );
227 }
228 }
229
235 public function completeBuilding()
236 {
237 $groupState = $this->getCurrentGroupState();
238 if ($groupState)
239 {
240 GroupStateTable::update(
241 $groupState['ID'],
242 [
243 'STATE' => GroupStateTable::STATES['COMPLETED'],
244 ]
245 );
246
247 if (self::checkIsSegmentPrepared($this->groupId))
248 {
249 $this->calculateFilterCounts();
250 }
251 }
252 }
253
262 public static function checkIsSegmentPrepared(int $groupId)
263 {
264 if (!Locker::lock(self::SEGMENT_LOCK_KEY, $groupId))
265 {
266 return false;
267 }
268
269 $states = GroupStateTable::getList([
270 'filter' => [
271 '=GROUP_ID' => $groupId
272 ],
273 ])->fetchAll();
274
275 $currentState = GroupTable::STATUS_READY_TO_USE;
276 foreach ($states as $state)
277 {
278 if ((int)$state['STATE'] !== GroupStateTable::STATES['COMPLETED'])
279 {
280 if (!SegmentDataBuilderJob::existsInDB($state['ID']))
281 {
283 }
284
285 $currentState = GroupTable::STATUS_IN_PROGRESS;
286 break;
287 }
288 }
289
290 GroupTable::update($groupId, [
291 'fields' => ['STATUS' => $currentState]
292 ]);
293
294 $prepared = $currentState === GroupTable::STATUS_READY_TO_USE;
295 if (CModule::IncludeModule('im') && $prepared)
296 {
297 $mailings = LetterSegmentTable::getList([
298 'select' => [
299 'ID' => 'LETTER.ID',
300 'USER_ID' => 'LETTER.CREATED_BY',
301 ],
302 'filter' => [
303 '=SEGMENT_ID' => $groupId,
304 '!=LETTER.STATUS' => LetterTable::STATUS_END
305 ],
306 ]);
307 $group = GroupTable::getById($groupId)->fetchRaw();
308
309 foreach ($mailings as $mailing)
310 {
311 if (!$mailing['ID'])
312 {
313 continue;
314 }
315
316 if (static::$isSent[$groupId][$mailing['USER_ID']])
317 {
318 continue;
319 }
320
321 LetterTable::update($mailing['ID'], [
322 'WAITING_RECIPIENT' => 'N'
323 ]);
324
325 $messageFields = [
326 "NOTIFY_TYPE" => IM_NOTIFY_SYSTEM,
327 "NOTIFY_MODULE" => "sender",
328 "NOTIFY_EVENT" => "group_prepared",
329 "TO_USER_ID" => $mailing['USER_ID'],
330 "NOTIFY_TAG" => "SENDER|GROUP_PREPARED|" . $groupId . "|" . $mailing['USER_ID'],
331 "NOTIFY_MESSAGE" => Loc::getMessage(
332 "SENDER_SEGMENT_BUILDER_GROUP_PREPARED",
333 [
334 "#SEGMENT_ID#" => $groupId,
335 "#SEGMENT_NAME#" => htmlspecialcharsbx($group['NAME'])
336 ]
337 )
338 ];
339
340 \CIMNotify::Add($messageFields);
341 static::$isSent[$groupId][$mailing['USER_ID']] = $mailing['USER_ID'];
342 }
343 }
344
345 Locker::unlock(self::SEGMENT_LOCK_KEY, $groupId);
346
347 return $prepared;
348 }
349
355 public function haltBuilding()
356 {
357 $groupState = $this->getCurrentGroupState();
358 if ($groupState)
359 {
360 GroupStateTable::update(
361 $groupState['ID'],
362 [
363 'STATE' => GroupStateTable::STATES['HALTED'],
364 ]
365 );
366 }
367 }
368
374 public function clearBuilding(int $groupStateId)
375 {
376 if ($groupStateId)
377 {
378 GroupStateTable::delete($groupStateId);
379 }
380
382 '=GROUP_ID' => $this->groupId,
383 '=FILTER_ID' => $this->filterId,
384 ]);
385 }
386
392 public static function clearGroupBuilding(int $groupId)
393 {
394 if ($groupId)
395 {
396 $filter = [
397 '=GROUP_ID' => $groupId,
398 ];
399
401 Runtime\SegmentDataClearJob::addEventAgent($groupId);
402 }
403 }
404
412 public function addToDB(?Result $data)
413 {
414 if ($data)
415 {
416 $rows = [];
417 $rowsDataCounter = [
418 $this->groupId => [
419
420 ],
421 ];
422 $counter = 0;
423 while ($row = $data->fetch())
424 {
425 $rows[] = [
426 'GROUP_ID' => $this->groupId,
427 'FILTER_ID' => $this->filterId,
428 'CRM_ENTITY_ID' => $row['CRM_ENTITY_ID'],
429 'NAME' => $row['NAME'],
430 'CRM_ENTITY_TYPE_ID' => $row['CRM_ENTITY_TYPE_ID'],
431 'CRM_ENTITY_TYPE' => $row['CRM_ENTITY_TYPE'],
432 'CONTACT_ID' => $row['CRM_CONTACT_ID'],
433 'COMPANY_ID' => $row['CRM_COMPANY_ID'],
434 'EMAIL' => $row['EMAIL'] ?? null,
435 'IM' => $row['IM'] ?? null,
436 'PHONE' => $row['PHONE'] ?? null,
437 'HAS_EMAIL' => $row['EMAIL'] ? 'Y' : 'N',
438 'HAS_IMOL' => $row['IM'] ? 'Y' : 'N',
439 'HAS_PHONE' => $row['PHONE'] ? 'Y' : 'N',
440 'SENDER_TYPE_ID' => $this->detectSenderType($row),
441 ];
442 $detectedTypes = $this->detectSenderTypes($row);
443
444 foreach ($detectedTypes as $type)
445 {
446 if (!isset($rowsDataCounter[$this->groupId][$type]))
447 {
448 $rowsDataCounter[$this->groupId][$type] = 0;
449 }
450 $rowsDataCounter[$this->groupId][$type]++;
451 }
452 $counter++;
453
454 if ($counter === self::MINIMAL_PER_PAGE)
455 {
456 SegmentDataTable::addMulti($rows, true);
457 $rows = [];
458 $counter = 0;
459 }
460 }
461
462 if ($rows)
463 {
464 $this->updateCounters($rowsDataCounter);
465 SegmentDataTable::addMulti($rows, true);
466 }
467 }
468 }
469
478 public function buildData($perPage = null): bool
479 {
480 if (!$this->connectorIterable())
481 {
482 return true;
483 }
484
485 $groupState = $this->getCurrentGroupState();
486
487 if (!$groupState)
488 {
489 return true;
490 }
491
492 if ($this->isBuildingCompleted())
493 {
494 return true;
495 }
496
497 $connector = Connector\Manager::getConnector($this->endpoint);
498 $connector->setDataTypeId(null);
499 $connector->setCheckAccessRights(false);
500 $connector->setFieldValues($this->endpoint['FIELDS']);
501
502 $lastId = $connector->getEntityLimitInfo()['lastId'];
503
505 $threadStrategy = Runtime\Env::getGroupThreadContext();
506
507 $threadStrategy->setGroupStateId($groupState['ID']);
508
509 $threadState = $threadStrategy->checkThreads();
510 if ($threadState === AbstractThreadStrategy::THREAD_LOCKED)
511 {
512 return false;
513 }
514
515 if ($threadState === AbstractThreadStrategy::THREAD_NEEDED)
516 {
517 $threadStrategy->fillThreads();
518 }
519
520 $threadStrategy->setPerPage(self::PER_PAGE);
521
522 if (
523 $threadStrategy->lockThread() === AbstractThreadStrategy::THREAD_UNAVAILABLE
524 || $threadStrategy->isProcessLimited()
525 )
526 {
527 return false;
528 }
529
530 $offset = $threadStrategy->getOffset();
531 if (!Locker::lock(self::SEGMENT_DATA_LOCK_KEY, $this->groupId))
532 {
533 return false;
534 }
535
536 if ($offset < $lastId)
537 {
538 $limit = $offset + self::PER_PAGE;
539
540 $this->addToDB(
541 $connector->getLimitedData($offset, $limit)
542 );
543
544 Locker::unlock(self::SEGMENT_DATA_LOCK_KEY, $this->groupId);
545 $threadStrategy->updateStatus(GroupThreadTable::STATUS_NEW);
546 return false;
547 }
548 Locker::unlock(self::SEGMENT_DATA_LOCK_KEY, $this->groupId);
549
550 if ($threadStrategy->getThreadId() < $threadStrategy->lastThreadId())
551 {
552 $threadStrategy->updateStatus(GroupThreadTable::STATUS_DONE);
553 return false;
554 }
555
556 $threadStrategy->updateStatus(GroupThreadTable::STATUS_NEW);
557
558 if (!$threadStrategy->finalize())
559 {
560 return false;
561 }
562
563 $this->completeBuilding();
564
565 return true;
566 }
567
572 public function prepareForAgent($rebuild = false)
573 {
574 if (!$this->connectorIterable())
575 {
576 return false;
577 }
578
579 $groupState = $this->getCurrentGroupState();
580 $result = '';
581
582 if (isset($groupState['NEW_CREATED'])
583 || $groupState
584 && ($groupState['ENDPOINT'] !== json_encode($this->endpoint) || $rebuild))
585 {
587 $this->clearBuilding($groupState['ID']);
588 GroupCounterTable::deleteByGroupId($this->groupId);
589
590 $groupState = $this->getCurrentGroupState();
591
592 if ($groupState)
593 {
594 GroupTable::update($this->groupId, [
595 'fields' => ['STATUS' => GroupTable::STATUS_IN_PROGRESS]
596 ]);
597
598 $result = self::run($groupState['ID'], self::MINIMAL_PER_PAGE);
599 SegmentDataBuilderJob::addEventAgent($groupState['ID']);
600 }
601 }
602
603 return $result !== '';
604 }
605
610 public function getQuery(): \Bitrix\Main\ORM\Query\Query
611 {
612
613 $query = SegmentDataTable::query();
614 $query->setFilter(
615 [
616 '=GROUP_ID' => $this->groupId,
617 '=FILTER_ID' => $this->filterId,
618 ]
619 );
620
621 $query->registerRuntimeField(new Entity\ExpressionField('CRM_COMPANY_ID' , '%s', ['COMPANY_ID']));
622 $query->registerRuntimeField(new Entity\ExpressionField('CRM_CONTACT_ID' , '%s', ['CONTACT_ID']));
623
624 $query->setSelect(
625 [
626 'CRM_ENTITY_ID',
627 'NAME',
628 'CRM_ENTITY_TYPE_ID',
629 'CRM_ENTITY_TYPE',
630 'CRM_CONTACT_ID',
631 'CRM_COMPANY_ID',
632 ]
633 );
634
635 return $query;
636 }
637
638 private function prepareEntityTypeFilter($type)
639 {
640 switch ($type) {
641 case Type::EMAIL:
642 return ['=HAS_EMAIL' => 'Y'];
643 case Type::IM:
644 return ['=HAS_IMOL' => 'Y'];
645 case Type::PHONE:
646 return ['=HAS_PHONE' => 'Y'];
647 case Type::CRM_COMPANY_ID:
648 return ['!=COMPANY_ID' => null];
649 case Type::CRM_CONTACT_ID:
650 return ['!=CONTACT_ID' => null];
651 case Type::CRM_DEAL_PRODUCT_CONTACT_ID:
652 case Type::CRM_ORDER_PRODUCT_CONTACT_ID:
653 case Type::CRM_DEAL_PRODUCT_COMPANY_ID:
654 case Type::CRM_ORDER_PRODUCT_COMPANY_ID:
655 default:
656 return null;
657 }
658 }
659
660 public function setDataFilter(array $filter = []): SegmentDataBuilder
661 {
662 $this->dataFilter = [];
663
664 $whiteList = [
665 'EMAIL' => '=EMAIL',
666 'PHONE' => '=PHONE',
667 'IM' => '=IM',
668 'NAME' => 'NAME',
669 'SENDER_RECIPIENT_TYPE_ID' => "",
670 ];
671
672 foreach ($filter as $key => $filterValue)
673 {
674 if (!isset($whiteList[$key]) || $filterValue == "undefined")
675 {
676 continue;
677 }
678
679 if ($key === 'SENDER_RECIPIENT_TYPE_ID')
680 {
681 $type = $this->prepareEntityTypeFilter($filterValue);
682 if ($type)
683 {
684 $this->dataFilter[] = $this->prepareEntityTypeFilter($filterValue);
685 }
686
687 continue;
688 }
689
690 $this->dataFilter[$whiteList[$key]] = $filterValue;
691 }
692
693 return $this;
694 }
699 public function getData(PageNavigation $nav = null, bool $useFilterId = true): Result
700 {
701 $params = [
702 'select' => [
703 '*',
704 'CRM_COMPANY_ID' => 'COMPANY_ID',
705 'CRM_CONTACT_ID' => 'CONTACT_ID',
706 ],
707 'filter' => $this->prepareFilter($useFilterId),
708 ];
709
710 if ($nav)
711 {
712 $params['limit'] = $nav->getLimit();
713 $params['offset'] = $nav->getOffset();
714 }
715
716 return SegmentDataTable::getList($params);
717 }
718
719 private function prepareFilter(bool $useFilterId = true)
720 {
721 $filter = [];
722 $filter['=GROUP_ID'] = $this->groupId;
723
724 if ($useFilterId)
725 {
726 $filter['=FILTER_ID'] = $this->filterId;
727 }
728
729 if ($this->dataFilter)
730 {
731 $filter = array_merge($this->dataFilter, $filter);
732 }
733
734 return $filter;
735 }
736
741 public function getPreparedData(): Connector\Result
742 {
743 $connector = Connector\Manager::getConnector($this->endpoint);
744
745 $personalizeList = array();
746 $personalizeListTmp = $connector->getPersonalizeList();
747 foreach($personalizeListTmp as $tag)
748 {
749 if(!empty($tag['ITEMS']))
750 {
751 foreach ($tag['ITEMS'] as $item)
752 {
753 $personalizeList[$item['CODE']] = $item['CODE'];
754 }
755 continue;
756 }
757 if(strlen($tag['CODE']) > 0)
758 {
759 $personalizeList[] = $tag['CODE'];
760 }
761 }
762
763 $result = new Connector\Result($this->getData());
764 $result->setFilterFields($personalizeList);
765 $result->setDataTypeId($connector->getDataTypeId());
766
767 return $result;
768 }
769
774 public function getDataCount(bool $useFilterId = true): int
775 {
776 return SegmentDataTable::getCount($this->prepareFilter($useFilterId));
777 }
778
779 private function connectorIterable()
780 {
781 $connector = Connector\Manager::getConnector($this->endpoint);
782
784 }
785
795 public static function run($groupStateId, $perPage = null)
796 {
797 $groupState = GroupStateTable::getById($groupStateId)->fetch();
798
799 if (!$groupState['FILTER_ID'])
800 {
801 GroupStateTable::update(
802 $groupStateId,
803 [
804 'STATE' => GroupStateTable::STATES['COMPLETED'],
805 ]
806 );
807
808 if ($groupState['GROUP_ID'])
809 {
810 self::checkIsSegmentPrepared($groupState['GROUP_ID']);
811 }
812
813 return '';
814 }
815 $segmentBuilder = new SegmentDataBuilder(
816 (int)$groupState['GROUP_ID'],
817 $groupState['FILTER_ID'],
818 json_decode($groupState['ENDPOINT'], true),
819 $groupState['ID']
820 );
821
822 if (!$segmentBuilder->buildData($perPage))
823 {
824 return SegmentDataBuilderJob::getAgentName($groupStateId);
825 }
826
827 return '';
828 }
829
837 {
838 $groupState = $this->getCurrentGroupState();
839
840 if (!$groupState)
841 {
842 return new Connector\DataCounter([]);
843 }
844
845 $connector = Connector\Manager::getConnector(
846 json_decode($groupState['ENDPOINT'], true)
847 );
848
849 if (!$connector)
850 {
851 $this->clearBuilding($groupState['ID']);
852 return new Connector\DataCounter([]);
853 }
854
856 $this->getQuery(),
857 self::SEGMENT_TABLE,
858 self::CONNECTOR_ENTITY[$connector->getCode()]
859 ));
860
861 Segment::updateAddressCounters($this->groupId, [$counter]);
862 if (CModule::IncludeModule('pull'))
863 {
864 \CPullWatch::AddToStack(
865 self::FILTER_COUNTER_TAG,
866 [
867 'module_id' => 'sender',
868 'command' => 'updateFilterCounter',
869 'params' => [
870 'groupId' => $this->groupId,
871 'filterId' => $this->filterId,
872 'count' => $counter->getArray(),
873 'state' => $groupState['STATE'],
874 'completed' => (int)$groupState['STATE'] === GroupStateTable::STATES['COMPLETED']
875 ],
876 ]
877 );
878 }
879
880 return $counter;
881 }
882
890 public function calculateFilterCounts(): array
891 {
892 $connectors = GroupConnectorTable::getList(
893 [
894 'filter' => ['=GROUP_ID' => $this->groupId]
895 ])->fetchAll();
896
897 $counters = [];
898 foreach ($connectors as $dbConnector)
899 {
900 $endpoint = $dbConnector['ENDPOINT'];
901 $connector = Connector\Manager::getConnector(
902 $endpoint
903 );
904
905 $this->filterId = $endpoint['FILTER_ID'] ?? 'sender_crm_client_--filter--crmclient--';
906 if ($connector instanceof Contact)
907 {
908 $connector->setCheckAccessRights(false);
909 $connector->setFieldValues($endpoint['FIELDS']);
910 }
911 $counters[] = self::CONNECTOR_ENTITY[$connector->getCode()] ?
913 $this->getQuery(),
914 self::SEGMENT_TABLE,
915 self::CONNECTOR_ENTITY[$connector->getCode()]
916 )) : $connector->getDataCounter()
917 ;
918
919 }
920
921 Segment::updateAddressCounters($this->groupId, $counters);
922
923 return $counters;
924 }
925
931 public function setEndpoint(array $endpoint): SegmentDataBuilder
932 {
933 $this->endpoint = $endpoint;
934
935 return $this;
936 }
937
946 public static function actualize(int $groupId, bool $rebuild = false)
947 {
948 $states = GroupStateTable::getList([
949 'filter' => [
950 '=GROUP_ID' => $groupId
951 ],
952 ])->fetchAll();
953
954 $connectors = GroupConnectorTable::getList([
955 'select' => [
956 'FILTER_ID',
957 'ENDPOINT'
958 ],
959 'filter' => [
960 '=GROUP_ID' => $groupId
961 ],
962 ])->fetchAll();
963
964 $usedFilters = [];
965 $endpoints = [];
966 foreach ($connectors as $connector)
967 {
968 if (!static::checkEndpoint($connector['ENDPOINT']))
969 {
970 continue;
971 }
972
973 $entityConnector = \Bitrix\Sender\Connector\Manager::getConnector($connector['ENDPOINT']);
974
975 if (!$connector['FILTER_ID'] && $entityConnector instanceof Connector\BaseFilter)
976 {
977 $connector['FILTER_ID'] = $entityConnector->getUiFilterId();
978 }
979
980 if (
981 !$entityConnector instanceof IncrementallyConnector
982 || !isset($connector['FILTER_ID'])
983 )
984 {
985 continue;
986 }
987
988 $usedFilters[] = $connector['FILTER_ID'];
989 $endpoints[$connector['FILTER_ID']] = $connector['ENDPOINT'];
990
991 $isUsed = false;
992
993 foreach ($states as $state)
994 {
995 if ($state['FILTER_ID'] === $connector['FILTER_ID'])
996 {
997 $isUsed = true;
998 break;
999 }
1000 }
1001
1002 if (!$isUsed)
1003 {
1004 $dataBuilder = new SegmentDataBuilder($groupId, $connector['FILTER_ID'], $connector['ENDPOINT']);
1005 $dataBuilder->prepareForAgent(true);
1006 $dataBuilder = null;
1007 }
1008 }
1009
1010 foreach ($states as $state)
1011 {
1012 $endpoint = json_decode($state['ENDPOINT'], true);
1013 $dataBuilder = new SegmentDataBuilder($groupId, $state['FILTER_ID'], $endpoint);
1014
1015 if (!static::checkEndpoint($endpoint))
1016 {
1017 $dataBuilder->clearBuilding($state['ID']);
1018 continue;
1019 }
1020
1021 if (!in_array($state['FILTER_ID'], $usedFilters))
1022 {
1023 $dataBuilder->clearBuilding($state['ID']);
1024 }
1025
1026 if ($endpoints[$state['FILTER_ID']] && $endpoints[$state['FILTER_ID']] !== $endpoint)
1027 {
1028 $dataBuilder->setEndpoint($endpoints[$state['FILTER_ID']]);
1029 $dataBuilder->prepareForAgent();
1030 }
1031
1032 if ($rebuild)
1033 {
1034 $dataBuilder->prepareForAgent(true);
1035 }
1036
1037 $dataBuilder = null;
1038 }
1039
1041 }
1042
1043 private static function checkEndpoint(?array $endpoint): bool
1044 {
1045 return $endpoint && isset($endpoint['FIELDS']) && !empty($endpoint['FIELDS']);
1046 }
1047
1048 public static function checkBuild(): void
1049 {
1050 $groupStateList = GroupStateTable::getList([
1051 'select' => [
1052 'GROUP_ID',
1053 'FILTER_ID',
1054 'ENDPOINT',
1055 ],
1056 'filter' => [
1057 '!@STATE' => [
1058 GroupStateTable::STATES['COMPLETED'],
1059 GroupStateTable::STATES['HALTED'],
1060 ]
1061 ],
1062 ]);
1063
1064 while ($groupState = $groupStateList->fetch())
1065 {
1066 $segmentBuilder = new SegmentDataBuilder(
1067 (int)$groupState['GROUP_ID'],
1068 $groupState['FILTER_ID'],
1069 json_decode($groupState['ENDPOINT'], true)
1070 );
1071
1072 $segmentBuilder->buildData();
1073 self::checkIsSegmentPrepared((int)$groupState['GROUP_ID']);
1074 }
1075 }
1076
1077 public static function checkNotCompleted(): string
1078 {
1079 self::checkBlockers();
1080 $groupStateList = GroupStateTable::getList([
1081 'select' => [
1082 'GROUP_ID',
1083 'FILTER_ID',
1084 'ENDPOINT',
1085 ],
1086 'filter' => [
1087 '!@STATE' => [
1088 GroupStateTable::STATES['COMPLETED'],
1089 GroupStateTable::STATES['HALTED'],
1090 ]
1091 ],
1092 ]);
1093
1094 while ($groupState = $groupStateList->fetch())
1095 {
1096 self::checkIsSegmentPrepared((int)$groupState['GROUP_ID']);
1097 }
1098
1099 $groupList = GroupTable::getList([
1100 'select' => [
1101 'ID'
1102 ],
1103 'filter' => [
1105 ]
1106 ]);
1107
1108 while ($group = $groupList->fetch())
1109 {
1110 self::checkIsSegmentPrepared((int)$group['ID']);
1111 }
1112
1113 return '\\Bitrix\Sender\\Posting\\SegmentDataBuilder::checkNotCompleted();';
1114 }
1115
1116 private function detectSenderType(array $row)
1117 {
1118 if (isset($row['PROD_CRM_ORDER_ID']) && $row['PROD_CRM_ORDER_ID']
1119 && isset($row['CRM_ENTITY_TYPE_ID']) && $row['CRM_ENTITY_TYPE_ID'] == 5)
1120 return Type::CRM_ORDER_PRODUCT_CONTACT_ID;
1121 if (isset($row['CRM_ENTITY_TYPE_ID']) && $row['CRM_ENTITY_TYPE_ID'] == 5)
1122 return Type::CRM_CONTACT_ID;
1123 if (isset($row['SGT_DEAL_ID']) && isset($row['CRM_ENTITY_TYPE_ID']) && $row['CRM_ENTITY_TYPE_ID'] == 5)
1124 return Type::CRM_DEAL_PRODUCT_CONTACT_ID;
1125
1126 if (isset($row['PROD_CRM_ORDER_ID']) && isset($row['CRM_ENTITY_TYPE_ID']) && $row['CRM_ENTITY_TYPE_ID'] == 4)
1127 return Type::CRM_ORDER_PRODUCT_COMPANY_ID;
1128 if (isset($row['CRM_ENTITY_TYPE_ID']) && $row['CRM_ENTITY_TYPE_ID'] == 4)
1129 return Type::CRM_COMPANY_ID;
1130 if (isset($row['SGT_DEAL_ID']) && isset($row['CRM_ENTITY_TYPE_ID']) && $row['CRM_ENTITY_TYPE_ID'] == 4)
1131 return Type::CRM_DEAL_PRODUCT_COMPANY_ID;
1132
1133 if (isset($row['IM']))
1134 return Type::IM;
1135 if (isset($row['EMAIL']))
1136 return Type::EMAIL;
1137 if (isset($row['PHONE']))
1138 return Type::PHONE;
1139
1140 return Type::EMAIL;
1141 }
1142
1143 private function detectSenderTypes(array $row)
1144 {
1145 $types = [];
1146 if (isset($row['PROD_CRM_ORDER_ID']) && isset($row['CRM_ENTITY_TYPE_ID']) && $row['CRM_ENTITY_TYPE_ID'] == 5)
1147 {
1148 $types[] = Type::CRM_ORDER_PRODUCT_CONTACT_ID;
1149 }
1150 if (isset($row['CRM_ENTITY_TYPE_ID']) && $row['CRM_ENTITY_TYPE_ID'] == 5)
1151 {
1152 $types[] = Type::CRM_CONTACT_ID;
1153 }
1154 if (isset($row['SGT_DEAL_ID']) && isset($row['CRM_ENTITY_TYPE_ID']) && $row['CRM_ENTITY_TYPE_ID'] == 5)
1155 {
1156 $types[] = Type::CRM_DEAL_PRODUCT_CONTACT_ID;
1157 }
1158
1159 if (isset($row['PROD_CRM_ORDER_ID']) && isset($row['CRM_ENTITY_TYPE_ID']) && $row['CRM_ENTITY_TYPE_ID'] == 4)
1160 {
1161 $types[] = Type::CRM_ORDER_PRODUCT_COMPANY_ID;
1162 }
1163 if (isset($row['CRM_ENTITY_TYPE_ID']) &&$row['CRM_ENTITY_TYPE_ID'] == 4)
1164 {
1165 $types[] = Type::CRM_COMPANY_ID;
1166 }
1167 if (isset($row['SGT_DEAL_ID']) && isset($row['CRM_ENTITY_TYPE_ID']) &&$row['CRM_ENTITY_TYPE_ID'] == 4)
1168 {
1169 $types[] = Type::CRM_DEAL_PRODUCT_COMPANY_ID;
1170 }
1171
1172 if ($row['CRM_ENTITY_TYPE_ID'] === Type::CRM_LEAD_ID)
1173 {
1174 $types[] = Type::CRM_LEAD_ID;
1175 }
1176
1177 if (isset($row['IM']))
1178 {
1179 $types[] = Type::IM;
1180 }
1181 if (isset($row['EMAIL']))
1182 {
1183 $types[] = Type::EMAIL;
1184 }
1185 if (isset($row['PHONE']))
1186 {
1187 $types[] = Type::PHONE;
1188 }
1189
1190 return $types;
1191 }
1192
1193 private function updateCounters(array $rowsDataCounter)
1194 {
1195 if (!Locker::lock(self::SEGMENT_LOCK_KEY, $this->groupId))
1196 {
1197 return;
1198 }
1199
1200 $counter = GroupCounterTable::getList([
1201 'select' => [
1202 'GROUP_ID', 'TYPE_ID', 'CNT'
1203 ],
1204 'filter' => [
1205 '=GROUP_ID' => $this->groupId
1206 ],
1207 ]);
1208
1209 while ($item = $counter->fetch())
1210 {
1211 if (!isset($rowsDataCounter[$item['GROUP_ID']][$item['TYPE_ID']]))
1212 {
1213 $rowsDataCounter[$item['GROUP_ID']][$item['TYPE_ID']] = $item['CNT'];
1214 }
1215
1216 $rowsDataCounter[$item['GROUP_ID']][$item['TYPE_ID']] += $item['CNT'];
1217 }
1218
1219 GroupCounterTable::deleteByGroupId($this->groupId);
1220 foreach ($rowsDataCounter as $groupId => $dataCounter)
1221 {
1222 foreach ($dataCounter as $typeId => $count)
1223 {
1224 GroupCounterTable::add(array(
1225 'GROUP_ID' => $groupId,
1226 'TYPE_ID' => $typeId,
1227 'CNT' => $count,
1228 ));
1229 }
1230 }
1231 Locker::unlock(self::SEGMENT_LOCK_KEY, $this->groupId);
1232
1233 $groupState = $this->getCurrentGroupState();
1234 if (!$groupState)
1235 {
1236 return;
1237 }
1238
1239 $counter = new Connector\DataCounter($rowsDataCounter);
1240
1241 if (CModule::IncludeModule('pull'))
1242 {
1243 \CPullWatch::AddToStack(
1244 self::FILTER_COUNTER_TAG,
1245 [
1246 'module_id' => 'sender',
1247 'command' => 'updateFilterCounter',
1248 'params' => [
1249 'groupId' => $this->groupId,
1250 'filterId' => $this->filterId,
1251 'count' => $counter->getArray(),
1252 'state' => $groupState['STATE'],
1253 'completed' => (int)$groupState['STATE'] === GroupStateTable::STATES['COMPLETED']
1254 ],
1255 ]
1256 );
1257 }
1258 }
1259}
fetch(\Bitrix\Main\Text\Converter $converter=null)
Definition result.php:167
static loadMessages($file)
Definition loc.php:64
static getMessage($code, $replace=null, $language=null)
Definition loc.php:29
static updateAddressCounters($segmentId, array $counters)
Definition segment.php:378
static getPreparedCount(Entity\Query $query, string $entityDbName, string $entityName, $dataTypeId=null)
static lock(string $key, int $id)
Definition locker.php:19
static unlock(string $key, int $id)
Definition locker.php:34
__construct(int $groupId, string $filterId, array $endpoint=[], ?int $groupStateId=null)
static actualize(int $groupId, bool $rebuild=false)
static run($groupStateId, $perPage=null)
getData(PageNavigation $nav=null, bool $useFilterId=true)