1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
AbstractReceiver.php
См. документацию.
1<?php
2
3declare(strict_types=1);
4
6
20use Exception;
21
25abstract class AbstractReceiver implements ReceiverInterface
26{
27 protected int $limit = 50;
28
29 protected string $queueId;
30
32
33 public function setLimit(int $limit): self
34 {
35 $this->limit = $limit > 0 ? $limit : 50;
36
37 return $this;
38 }
39
40 public function setQueueId(string $queueId): self
41 {
42 $this->queueId = $queueId;
43
44 return $this;
45 }
46
47 public function setBroker(BrokerInterface $broker): self
48 {
49 $this->broker = $broker;
50
51 return $this;
52 }
53
57 protected function getMessage(): ?MessageBox
58 {
59 return $this->broker->getOne($this->queueId);
60 }
61
65 protected function getMessages(): iterable
66 {
67 return $this->broker->get($this->queueId, $this->limit);
68 }
69
73 protected function ack(MessageBox $messageBox): void
74 {
75 $this->broker->ack($messageBox);
76 }
77
81 protected function reject(MessageBox $messageBox): void
82 {
83 $this->broker->reject($messageBox);
84 }
85
92 abstract protected function process(MessageInterface $message): void;
93
98 public function run(): void
99 {
100 $logger = new EventLogger(
101 'main',
102 'MESSENGER_QUEUE',
103 static function (array $context, string $message)
104 {
105 $messageBox = $context['message'] ?? null;
106
107 if ($messageBox instanceof MessageBox)
108 {
109 return [
110 'ITEM_ID' => $messageBox->getItemId(),
111 'DESCRIPTION' => sprintf(
112 '%s. Message: "%s" (%s). Queue: "%s". ItemId: "%s"',
113 $message,
114 $messageBox->getClassName(),
115 $messageBox->getId(),
116 $messageBox->getQueueId(),
117 $messageBox->getItemId()
118 )
119 ];
120 }
121
122 return [
123 'ITEM_ID' => null,
124 ];
125 }
126 );
127
128 $messageBoxes = $this->getMessages();
129
131 foreach ($messageBoxes as $messageBox)
132 {
133 try
134 {
135 $this->process($messageBox->getMessage());
136
137 $this->ack($messageBox);
138 }
140 {
141 $this->reject($messageBox);
142
143 Application::getInstance()->getExceptionHandler()->writeToLog(
144 $e,
146 );
147 }
149 {
150 $messageBox->kill();
151
152 $this->reject($messageBox);
153
154 $logger->notice(
155 sprintf(
156 'Message has unrecoverable case: "%s"',
157 $e->getMessage(),
158 ),
159 [
160 'message' => $messageBox,
161 'exception' => $e
162 ]
163 );
164 }
166 {
167 $messageBox->requeue($e->getRetryDelay());
168
169 $this->reject($messageBox);
170
171 $logger->notice(
172 sprintf(
173 'Message has recoverable case: "%s"',
174 $e->getMessage(),
175 ),
176 [
177 'message' => $messageBox,
178 'exception' => $e
179 ]
180 );
181 }
182 catch (Exception $e)
183 {
184 $e = new ProcessingException(
185 $messageBox,
186 $e->getMessage() . ' Message: ' . $messageBox->getId(),
187 $e->getCode(),
188 $e
189 );
190
191 $this->reject($messageBox);
192
193 Application::getInstance()->getExceptionHandler()->writeToLog(
194 $e,
196 );
197 }
198 }
199 }
200}
static getInstance()
Определения application.php:98
ack(MessageBox $messageBox)
Определения AbstractReceiver.php:73
setBroker(BrokerInterface $broker)
Определения AbstractReceiver.php:47
reject(MessageBox $messageBox)
Определения AbstractReceiver.php:81
</td ></tr ></table ></td ></tr >< tr >< td class="bx-popup-label bx-width30"><?=GetMessage("PAGE_NEW_TAGS")?> array( $site)
Определения file_new.php:804
$context
Определения csv_new_setup.php:223
$message
Определения payment.php:8