Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
grouphash.php
1<?php
2
4
8use Bitrix\Calendar\Core;
13
15{
16 private const MESSAGE_LIMIT = 10;
17 private const HANDLED_MESSAGE_HEADER_ID = '~handledMessageId';
18 private const HANDLED_MESSAGE_HEADER_HASH = '~handledMessageHash';
19
20 private Interfaces\Queue $queue;
21
22 private ?HandledMessageMapper $handledMessageMapper = null;
23
24 public function __construct(Interfaces\Queue $queue)
25 {
26 $this->queue = $queue;
27 }
28
29 public function getQueue(): Interfaces\Queue
30 {
31 return $this->queue;
32 }
33
41 public function receive(): ?Interfaces\Message
42 {
44 ->addGroup('HASH')
45 ->setLimit(1)
46 ->registerRuntimeField('MAX_ID', [
47 'data_type' => 'string',
48 'expression' => ['MAX(%s)', 'ID']
49 ])
50 ->setSelect(['MAX_ID', 'HASH'])
51 ->addFilter('QUEUE_ID', $this->getQueue()->getQueueId())
52 ->exec()->fetch()
53 ;
54 if ($row)
55 {
56 $handledMessageId = (int)$row['MAX_ID'];
58 $handledMessage = $this->getHandledMessageMapper()->getById($handledMessageId);
59
60 $handledMessage->getMessage()
61 ->setHeader(self::HANDLED_MESSAGE_HEADER_ID, $handledMessageId)
62 ->setHeader(self::HANDLED_MESSAGE_HEADER_HASH, $row['HASH'])
63 ;
64
65 return $handledMessage->getMessage();
66 }
67
68 return null;
69 }
70
71 public function acknowledge(Interfaces\Message $message): void
72 {
73 global $DB;
74
75 $id = (int)$message->getHeader(self::HANDLED_MESSAGE_HEADER_ID);
76 $hash = $message->getHeader(self::HANDLED_MESSAGE_HEADER_HASH);
77 if ($id && $hash)
78 {
79 $DB->Query("
80 DELETE FROM b_calendar_queue_handled_message
81 WHERE ID <= " . $id . "
82 AND HASH = '" . $hash . "'
83 ");
84 }
85 }
86
93 public function reject(Interfaces\Message $message, bool $requeue = false): void
94 {
95 global $DB;
96
97 $id = (int)$message->getHeader(self::HANDLED_MESSAGE_HEADER_ID);
98 $hash = $message->getHeader(self::HANDLED_MESSAGE_HEADER_HASH);
99 if ($id && $hash)
100 {
101 $DB->Query("
102 DELETE FROM b_calendar_queue_handled_message
103 WHERE ID <= " . $id . "
104 AND HASH = '" . $hash . "'
105 ");
106
107 $this->onAfterReject($message);
108 }
109 }
110
114 private function getHandledMessageMapper(): HandledMessageMapper
115 {
116 if ($this->handledMessageMapper === null)
117 {
118 $this->handledMessageMapper = new HandledMessageMapper();
119 }
120
121 return $this->handledMessageMapper;
122 }
123
129 private function onAfterReject(Interfaces\Message $message)
130 {
131 // TODO: implement it
132 }
133}
reject(Interfaces\Message $message, bool $requeue=false)
Definition grouphash.php:93
acknowledge(Interfaces\Message $message)
Definition grouphash.php:71