14 private const HANDLED_MESSAGE_HEADER_NAME =
'~handledMessageId';
20 private int $packSize = 100;
24 $this->queue = $queue;
46 $message = $row->getMessage();
47 $message->setHeader(self::HANDLED_MESSAGE_HEADER_NAME, $row->getId());
56 if ($id = $message->getHeader(self::HANDLED_MESSAGE_HEADER_NAME))
58 $message->setHeader(self::HANDLED_MESSAGE_HEADER_NAME,
null);
59 $this->deleteHandledMessageByMessageId($id);
69 public function reject(Interfaces\
Message $message,
bool $requeue =
false): void
71 if ($id = $message->getHeader(self::HANDLED_MESSAGE_HEADER_NAME))
73 $message->setHeader(self::HANDLED_MESSAGE_HEADER_NAME,
null);
74 $this->deleteHandledMessageByMessageId($id);
76 $this->onAfterReject($message);
86 private function getHandledMessageMap(): Core\Base\Map
88 if (empty($this->handledMessageMap))
90 $this->handledMessageMap = $this->getHandledMessageMapper()->getMap(
92 'QUEUE_ID' => $this->
getQueue()->getQueueId(),
96 'DATE_CREATE' =>
'ASC'
107 private function getHandledMessageMapper(): HandledMessageMapper
109 if ($this->handledMessageMapper ===
null)
111 $this->handledMessageMapper =
new HandledMessageMapper();
114 return $this->handledMessageMapper;
122 private function onAfterReject(Interfaces\Message $message)
127 private function deleteHandledMessageByMessageId($id)
129 $handledMessage = (
new HandledMessage())->setId($id);
130 $this->getHandledMessageMapper()->delete($handledMessage);
138 return $this->packSize;
146 $this->packSize = $packSize;