24 private const PACK_SIZE = 100;
26 private const LAST_PROCESSED_OPTION_NAME =
'queue_last_processed_id';
32 private Map $routedQueues;
41 if ($this->getMutex()->lock())
45 $this->handleMessages();
55 $this->getMutex()->unlock();
66 private function getMessages(): Generator
70 $messages = $this->getMessageMapper()->getMap(
72 '>ID' => $this->getLastProcessedId(),
80 foreach ($messages as $message)
83 $this->setLastProcessedId($message->getId());
87 while($messages->count());
95 private function routeMessage(Message $message): bool
97 $rules = Registry::getInstance()->getRules();
99 foreach ($rules as $rule)
103 if ($handledMessage = $rule->route($message))
106 $handledMessage = $this->getHandledMessageMapper()->create($handledMessage);
123 private function getLastProcessedId(): int
125 return \COption::GetOptionInt(
"calendar", self::LAST_PROCESSED_OPTION_NAME, 0);
133 private function setLastProcessedId(
int $id = 0)
135 \COption::SetOptionInt(
"calendar", self::LAST_PROCESSED_OPTION_NAME, $id);
144 QueueListener\Dispatcher::register();
150 self::ON_QUEUE_PUSHED_EVENT_NAME,
164 if (empty($this->messageMapper))
169 return $this->messageMapper;
175 private function getHandledMessageMapper(): HandledMessageMapper
177 if (empty($this->handledMessageMapper))
179 $this->handledMessageMapper =
new HandledMessageMapper();
182 return $this->handledMessageMapper;
190 if (empty($this->routedQueues))
195 return $this->routedQueues;
201 private function getMutex():
Mutex
203 if (empty($this->mutex))
205 $this->mutex =
new Mutex(self::class);
217 public function handleMessages(): void
220 foreach ($this->getMessages() as $message)
222 $isRouted = $this->routeMessage($message);
225 $this->getMessageMapper()->delete($message);