Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
simple.php
1<?php
2
4
8use Bitrix\Calendar\Core;
11
13{
14 private const HANDLED_MESSAGE_HEADER_NAME = '~handledMessageId';
15
16 private Interfaces\Queue $queue;
17
18 private ?HandledMessageMapper $handledMessageMapper = null;
20 private int $packSize = 100;
21
22 public function __construct(Interfaces\Queue $queue)
23 {
24 $this->queue = $queue;
25 }
26
27 public function getQueue(): Interfaces\Queue
28 {
29 return $this->queue;
30 }
31
39 public function receive(): ?Interfaces\Message
40 {
41 $handledMessageMap = $this->getHandledMessageMap();
42
44 if ($row = $handledMessageMap->fetch())
45 {
46 $message = $row->getMessage();
47 $message->setHeader(self::HANDLED_MESSAGE_HEADER_NAME, $row->getId());
48 return $message;
49 }
50
51 return null;
52 }
53
54 public function acknowledge(Interfaces\Message $message): void
55 {
56 if ($id = $message->getHeader(self::HANDLED_MESSAGE_HEADER_NAME))
57 {
58 $message->setHeader(self::HANDLED_MESSAGE_HEADER_NAME, null);
59 $this->deleteHandledMessageByMessageId($id);
60 }
61 }
62
69 public function reject(Interfaces\Message $message, bool $requeue = false): void
70 {
71 if ($id = $message->getHeader(self::HANDLED_MESSAGE_HEADER_NAME))
72 {
73 $message->setHeader(self::HANDLED_MESSAGE_HEADER_NAME, null);
74 $this->deleteHandledMessageByMessageId($id);
75
76 $this->onAfterReject($message);
77 }
78 }
79
86 private function getHandledMessageMap(): Core\Base\Map
87 {
88 if (empty($this->handledMessageMap))
89 {
90 $this->handledMessageMap = $this->getHandledMessageMapper()->getMap(
91 [
92 'QUEUE_ID' => $this->getQueue()->getQueueId(),
93 ],
94 $this->getPackSize(),
95 [
96 'DATE_CREATE' => 'ASC'
97 ],
98 );
99 }
100
102 }
103
107 private function getHandledMessageMapper(): HandledMessageMapper
108 {
109 if ($this->handledMessageMapper === null)
110 {
111 $this->handledMessageMapper = new HandledMessageMapper();
112 }
113
114 return $this->handledMessageMapper;
115 }
116
122 private function onAfterReject(Interfaces\Message $message)
123 {
124 // TODO: implement it
125 }
126
127 private function deleteHandledMessageByMessageId($id)
128 {
129 $handledMessage = (new HandledMessage())->setId($id);
130 $this->getHandledMessageMapper()->delete($handledMessage);
131 }
132
136 public function getPackSize(): int
137 {
138 return $this->packSize;
139 }
140
144 public function setPackSize(int $packSize): void
145 {
146 $this->packSize = $packSize;
147 }
148}
reject(Interfaces\Message $message, bool $requeue=false)
Definition simple.php:69
__construct(Interfaces\Queue $queue)
Definition simple.php:22
acknowledge(Interfaces\Message $message)
Definition simple.php:54