16 private const MESSAGE_LIMIT = 10;
17 private const HANDLED_MESSAGE_HEADER_ID =
'~handledMessageId';
18 private const HANDLED_MESSAGE_HEADER_HASH =
'~handledMessageHash';
26 $this->queue = $queue;
46 ->registerRuntimeField(
'MAX_ID', [
47 'data_type' =>
'string',
48 'expression' => [
'MAX(%s)',
'ID']
50 ->setSelect([
'MAX_ID',
'HASH'])
51 ->addFilter(
'QUEUE_ID', $this->
getQueue()->getQueueId())
56 $handledMessageId = (int)$row[
'MAX_ID'];
58 $handledMessage = $this->getHandledMessageMapper()->getById($handledMessageId);
60 $handledMessage->getMessage()
61 ->setHeader(self::HANDLED_MESSAGE_HEADER_ID, $handledMessageId)
62 ->setHeader(self::HANDLED_MESSAGE_HEADER_HASH, $row[
'HASH'])
65 return $handledMessage->getMessage();
75 $id = (int)$message->getHeader(self::HANDLED_MESSAGE_HEADER_ID);
76 $hash = $message->getHeader(self::HANDLED_MESSAGE_HEADER_HASH);
80 DELETE FROM b_calendar_queue_handled_message
81 WHERE ID <= " . $id .
"
82 AND HASH = '" . $hash .
"'
93 public function reject(Interfaces\
Message $message,
bool $requeue =
false): void
97 $id = (int)$message->getHeader(self::HANDLED_MESSAGE_HEADER_ID);
98 $hash = $message->getHeader(self::HANDLED_MESSAGE_HEADER_HASH);
102 DELETE FROM b_calendar_queue_handled_message
103 WHERE ID <= " . $id .
"
104 AND HASH = '" . $hash .
"'
107 $this->onAfterReject($message);
116 if ($this->handledMessageMapper ===
null)
121 return $this->handledMessageMapper;
129 private function onAfterReject(Interfaces\Message $message)