1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
MessageRepository.php
См. документацию.
1<?php
2
3declare(strict_types=1);
4
5namespace Bitrix\Main\Messenger\Internals\Storage\Db;
6
7use Bitrix\Main\ArgumentException;
8use Bitrix\Main\Messenger\Entity\MessageBox;
9use Bitrix\Main\Messenger\Entity\MessageBoxCollection;
10use Bitrix\Main\Messenger\Internals\Exception\Storage\MappingException;
11use Bitrix\Main\Messenger\Internals\Exception\Storage\PersistenceException;
12use Bitrix\Main\Messenger\Internals\Storage\Db\Model\MessengerMessageTable;
13use Bitrix\Main\ObjectPropertyException;
14use Bitrix\Main\ORM\Entity;
15use Bitrix\Main\ORM\Query\Query;
16use Bitrix\Main\Messenger\Internals\Storage\Db\Model\MessageStatus;
17use Bitrix\Main\SystemException;
18use Bitrix\Main\Type\DateTime;
19
21{
22 private MessageMapper $mapper;
23
24 public function __construct(private readonly Entity $tableEntity)
25 {
26 $this->mapper = new MessageMapper($tableEntity);
27 }
28
34 public function getOneByQueue(string $queueId): ?MessageBox
35 {
36 $query = $this->buildReadyMessageQuery($queueId);
37
38 $query->setLimit(1);
39
40 $item = $query->fetchObject();
41
42 if ($item === null)
43 {
44 return null;
45 }
46
47 return $this->getEntityFromOrmItem($item);
48 }
49
54 public function getReadyMessagesOfQueue(string $queueId, int $limit = 50): MessageBoxCollection
55 {
56 $query = $this->buildReadyMessageQuery($queueId);
57
58 $query->setLimit($limit > 0 ? min($limit, 1000) : 50);
59
60 $items = $query->fetchCollection();
61
62 $collection = new MessageBoxCollection();
63
64 if (!$items || $items->isEmpty())
65 {
66 return $collection;
67 }
68
69 foreach ($items as $ormItem)
70 {
71 if ($messageBox = $this->getEntityFromOrmItem($ormItem))
72 {
73 $collection->add($messageBox);
74 }
75 else
76 {
77 $ormItem->delete();
78 }
79 }
80
81 return $collection;
82 }
83
87 private function buildReadyMessageQuery(string $queueId): Query
88 {
90 $tableClass = $this->tableEntity->getDataClass();
91
92 $query = $tableClass::query();
93
94 $query
95 ->setSelect(['*'])
96 ->where('QUEUE_ID', '=', $queueId)
97 ->where('STATUS', '=', MessageStatus::New->value)
98 ->where('AVAILABLE_AT', '<=', new DateTime())
99 ->setOrder(['CREATED_AT' => 'ASC'])
100 ;
101
102 return $query;
103 }
104
110 public function getStaleMessages(): MessageBoxCollection
111 {
112 $collection = new MessageBoxCollection();
113
115 $tableClass = $this->tableEntity->getDataClass();
116
117 $thresholdDate = DateTime::createFromTimestamp(time() - 2 * 86400);
118
119 $query = $tableClass::query();
120
121 $query
122 ->setSelect(['*'])
123 ->where('STATUS', '=', MessageStatus::Processing->value)
124 ->where('UPDATED_AT', '<', $thresholdDate)
125 ->setOrder(['CREATED_AT' => 'ASC'])
126 ;
127
128 $items = $query->fetchCollection();
129
130 if (!$items || $items->isEmpty())
131 {
132 return $collection;
133 }
134
135 foreach ($items as $ormItem)
136 {
137 if ($messageBox = $this->getEntityFromOrmItem($ormItem))
138 {
139 $collection->add($messageBox);
140 }
141 else
142 {
143 $ormItem->delete();
144 }
145 }
146
147 return $collection;
148 }
149
153 public function save(MessageBox $messageBox): void
154 {
155 try
156 {
157 $result = $this->mapper->convertToOrm($messageBox)->save();
158 }
159 catch (\Exception $e)
160 {
161 throw new PersistenceException($e->getMessage(), $e->getCode(), $e);
162 }
163
164 if ($result->isSuccess() && !$messageBox->getId())
165 {
166 $messageBox->setId($result->getId());
167 }
168
169 if (!$result->isSuccess())
170 {
171 throw new PersistenceException('Unable to save message: ' . $result->getError()->getMessage());
172 }
173 }
174
178 public function delete(MessageBox $message): void
179 {
180 try
181 {
182 $result = $this->mapper->convertToOrm($message)->delete();
183 }
184 catch (\Exception $e)
185 {
186 throw new PersistenceException($e->getMessage(), $e->getCode(), $e);
187 }
188
189 if (!$result->isSuccess())
190 {
191 throw new PersistenceException('Unable to delete message: ' . $result->getError()->getMessage());
192 }
193 }
194
198 public function updateStatus(MessageBoxCollection $items, MessageStatus $status): void
199 {
200 if ($items->isEmpty())
201 {
202 return;
203 }
204
205 $ids = array_map(
206 function (MessageBox $item)
207 {
208 return $item->getId();
209 },
210 $items->toArray()
211 );
212
213 try
214 {
216 $tableClass = $this->tableEntity->getDataClass();
217
218 $result = $tableClass::updateMulti($ids, ['STATUS' => $status->value], true);
219 }
220 catch (SystemException $e)
221 {
222 throw new PersistenceException($e->getMessage(), $e->getCode(), $e);
223 }
224
225 if (!$result->isSuccess())
226 {
227 throw new PersistenceException('Unable to update status: ' . $result->getError()->getMessage());
228 }
229 }
230
231 private function getEntityFromOrmItem($ormItem): ?MessageBox
232 {
233 try
234 {
235 return $this->mapper->convertFromOrm($ormItem);
236 }
237 catch (ArgumentException)
238 {
239 return null;
240 }
241 }
242}
__construct(private readonly Entity $tableEntity)
Определения MessageRepository.php:24
getReadyMessagesOfQueue(string $queueId, int $limit=50)
Определения MessageRepository.php:54
$result
Определения get_property_values.php:14
$query
Определения get_search.php:11
$status
Определения session.php:10
Определения ufield.php:9
Определения chain.php:3
$message
Определения payment.php:8
$items
Определения template.php:224