1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
DbBroker.php
См. документацию.
1<?php
2
3declare(strict_types=1);
4
6
25
26final class DbBroker implements BrokerInterface
27{
28 public const TYPE_CODE = 'db';
29
30 private readonly StorageInterface $storage;
31
32 private MultiplierRetryStrategy $retryStrategy;
33
34 public function __construct(
35 Entity $tableEntity,
37 )
38 {
39 $this->storage = new DbStorage($tableEntity);
40 $this->retryStrategy = $retryStrategy;
41
42 $this->storage->unlockStaleMessages();
43 }
44
53 public function send(MessageInterface $message, string $queueId, array $params = []): void
54 {
55 $retryStrategy = $this->getRetryStrategy($queueId);
56
57 $messageBox = new MessageBox($message);
58
59 $messageBox
60 ->setQueueId($queueId)
61 ->setTtl($retryStrategy->getMaxRetryCount())
62 ;
63
64 $this->addProcessingParams($messageBox, $params);
65
66 try
67 {
68 $this->retry([$this->storage, 'save'], $messageBox);
69 }
70 catch (PersistenceException $e)
71 {
72 throw new SendFailedException($messageBox, $e);
73 }
74 }
75
76 public function get(string $queueId, int $limit = 50): iterable
77 {
78 try
79 {
80 return $this->storage->getReadyMessagesOfQueue($queueId, $limit);
81 }
82 catch (PersistenceException $e)
83 {
84 throw new BrokerReadException(
85 'Unable to lock messages: ' . $e->getMessage(),
86 $e->getCode(),
87 $e
88 );
89 }
90 catch (SystemException $e)
91 {
92 throw new BrokerReadException(
93 'Unable to read messages: ' . $e->getMessage(),
94 $e->getCode(),
95 $e
96 );
97 }
98 }
99
100 public function getOne(string $queueId): ?MessageBox
101 {
102 try
103 {
104 return $this->storage->getOneByQueue($queueId);
105 }
106 catch (PersistenceException $e)
107 {
108 throw new BrokerReadException(
109 'Unable to lock message: ' . $e->getMessage(),
110 $e->getCode(),
111 $e
112 );
113 }
114 catch (SystemException $e)
115 {
116 throw new BrokerReadException(
117 'Unable to read message: ' . $e->getMessage(),
118 $e->getCode(),
119 $e
120 );
121 }
122 }
123
127 public function ack(MessageBox $message): void
128 {
129 try
130 {
131 $this->retry([$this->storage, 'delete'], $message);
132 }
133 catch (PersistenceException $e)
134 {
135 throw new AckFailedException($message, $e);
136 }
137 }
138
142 public function reject(MessageBox $message): void
143 {
144 $message->reject();
145
146 try
147 {
148 if ($message->isDied())
149 {
150 $this->retry([$this->storage, 'delete'], $message);
151
152 return;
153 }
154
155 $this->retry([$this->storage, 'save'], $message);
156 }
157 catch (PersistenceException $e)
158 {
159 throw new RejectFailedException($message, $e);
160 }
161 }
162
166 private function retry(callable $callable, MessageBox $messageBox): void
167 {
168 $retry = 1;
169
170 while (true)
171 {
172 try
173 {
174 call_user_func($callable, $messageBox);
175
176 return;
177 }
178 catch (PersistenceException $e)
179 {
180 if ($retry > $this->retryStrategy->getMaxRetryCount() - 1)
181 {
182 throw $e;
183 }
184
185 usleep($this->retryStrategy->getWaitingTime($retry) * 1000);
186
187 $retry++;
188 }
189 }
190 }
191
198 private function addProcessingParams(MessageBox $messageBox, array $params): void
199 {
200 foreach ($params as $param)
201 {
202 if (!$param instanceof ProcessingParamInterface)
203 {
204 throw new ArgumentException(
205 sprintf('The type of message processing params should be "%s"', ProcessingParamInterface::class)
206 );
207 }
208
209 $messageBox = $param->applyTo($messageBox);
210 }
211 }
212
213 private function getRetryStrategy(string $queueId): RetryStrategyInterface
214 {
215 return ServiceLocator::getInstance()
216 ->get(QueueConfigRegistry::class)
217 ->getQueueConfig($queueId)
218 ->retryStrategy;
219 }
220}
send(MessageInterface $message, string $queueId, array $params=[])
Определения DbBroker.php:53
ack(MessageBox $message)
Определения DbBroker.php:127
reject(MessageBox $message)
Определения DbBroker.php:142
__construct(Entity $tableEntity, RetryStrategyInterface $retryStrategy=new MultiplierRetryStrategy())
Определения DbBroker.php:34
</td ></tr ></table ></td ></tr >< tr >< td class="bx-popup-label bx-width30"><?=GetMessage("PAGE_NEW_TAGS")?> array( $site)
Определения file_new.php:804
Определения ufield.php:9
$message
Определения payment.php:8
if($inWords) echo htmlspecialcharsbx(Number2Word_Rus(roundEx($totalVatSum $params['CURRENCY']
Определения template.php:799