Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
rulemaster.php
1<?php
2
4
17use Generator;
18use Throwable;
19
21{
22 public const ON_QUEUE_PUSHED_EVENT_NAME = 'OnPushToTargetQueue';
23
24 private const PACK_SIZE = 100;
25
26 private const LAST_PROCESSED_OPTION_NAME = 'queue_last_processed_id';
27
28 private MessageMapper $messageMapper;
29
30 private HandledMessageMapper $handledMessageMapper;
31
32 private Map $routedQueues;
33
34 private Mutex $mutex;
35
39 public function run()
40 {
41 if ($this->getMutex()->lock())
42 {
43 try
44 {
45 $this->handleMessages();
46
47 $this->sendSystemEvents();
48 }
49 catch(Throwable $e)
50 {
51 // TODO: log it
52 }
53 finally
54 {
55 $this->getMutex()->unlock();
56 }
57 }
58 }
59
66 private function getMessages(): Generator
67 {
68 do
69 {
70 $messages = $this->getMessageMapper()->getMap(
71 [
72 '>ID' => $this->getLastProcessedId(),
73 ],
74 self::PACK_SIZE,
75 [
76 'ID' => 'ASC',
77 ],
78 );
80 foreach ($messages as $message)
81 {
82 yield $message;
83 $this->setLastProcessedId($message->getId());
84 }
85 }
86
87 while($messages->count());
88 }
89
95 private function routeMessage(Message $message): bool
96 {
97 $rules = Registry::getInstance()->getRules();
98 $isRouted = false;
99 foreach ($rules as $rule)
100 {
101 try
102 {
103 if ($handledMessage = $rule->route($message))
104 {
106 $handledMessage = $this->getHandledMessageMapper()->create($handledMessage);
107 $this->getRoutedQueues()->add($handledMessage->getQueue());
108 $isRouted = true;
109 }
110 }
111 catch(Throwable $e)
112 {
113 // TODO: log error
114 }
115 }
116
117 return $isRouted;
118 }
119
123 private function getLastProcessedId(): int
124 {
125 return \COption::GetOptionInt("calendar", self::LAST_PROCESSED_OPTION_NAME, 0);
126 }
127
133 private function setLastProcessedId(int $id = 0)
134 {
135 \COption::SetOptionInt("calendar", self::LAST_PROCESSED_OPTION_NAME, $id);
136 }
137
141 public function sendSystemEvents(): void
142 {
143 // TODO: move it to right place
144 QueueListener\Dispatcher::register();
145
146 foreach ($this->getRoutedQueues() as $queue)
147 {
148 $event = new Event(
149 "calendar",
150 self::ON_QUEUE_PUSHED_EVENT_NAME,
151 [
152 'queue' => $queue,
153 ],
154 );
155 EventManager::getInstance()->send($event);
156 }
157 }
158
162 private function getMessageMapper(): MessageMapper
163 {
164 if (empty($this->messageMapper))
165 {
166 $this->messageMapper = new MessageMapper();
167 }
168
169 return $this->messageMapper;
170 }
171
175 private function getHandledMessageMapper(): HandledMessageMapper
176 {
177 if (empty($this->handledMessageMapper))
178 {
179 $this->handledMessageMapper = new HandledMessageMapper();
180 }
181
182 return $this->handledMessageMapper;
183 }
184
188 public function getRoutedQueues(): Map
189 {
190 if (empty($this->routedQueues))
191 {
192 $this->routedQueues = new EntityMap();
193 }
194
195 return $this->routedQueues;
196 }
197
201 private function getMutex(): Mutex
202 {
203 if (empty($this->mutex))
204 {
205 $this->mutex = new Mutex(self::class);
206 }
207
208 return $this->mutex;
209 }
210
217 public function handleMessages(): void
218 {
220 foreach ($this->getMessages() as $message)
221 {
222 $isRouted = $this->routeMessage($message);
223 if (!$isRouted)
224 {
225 $this->getMessageMapper()->delete($message);
226 }
227 }
228 }
229}