59 return $this->broker->getOne($this->queueId);
67 return $this->broker->get($this->queueId, $this->limit);
75 $this->broker->ack($messageBox);
83 $this->broker->reject($messageBox);
98 public function run(): void
105 $messageBox =
$context[
'message'] ??
null;
110 'ITEM_ID' => $messageBox->getItemId(),
111 'DESCRIPTION' => sprintf(
112 '%s. Message: "%s" (%s). Queue: "%s". ItemId: "%s"',
114 $messageBox->getClassName(),
115 $messageBox->getId(),
116 $messageBox->getQueueId(),
117 $messageBox->getItemId()
131 foreach ($messageBoxes as $messageBox)
135 $this->
process($messageBox->getMessage());
137 $this->
ack($messageBox);
141 $this->
reject($messageBox);
152 $this->
reject($messageBox);
156 'Message has unrecoverable case: "%s"',
160 'message' => $messageBox,
167 $messageBox->requeue($e->getRetryDelay());
169 $this->
reject($messageBox);
173 'Message has recoverable case: "%s"',
177 'message' => $messageBox,
186 $e->getMessage() .
' Message: ' . $messageBox->getId(),
191 $this->
reject($messageBox);
ack(MessageBox $messageBox)
setQueueId(string $queueId)
setBroker(BrokerInterface $broker)
process(MessageInterface $message)
reject(MessageBox $messageBox)