39 $this->storage =
new DbStorage($tableEntity);
40 $this->retryStrategy = $retryStrategy;
42 $this->storage->unlockStaleMessages();
55 $retryStrategy = $this->getRetryStrategy($queueId);
60 ->setQueueId($queueId)
61 ->setTtl($retryStrategy->getMaxRetryCount())
64 $this->addProcessingParams($messageBox,
$params);
68 $this->retry([$this->storage,
'save'], $messageBox);
76 public function get(
string $queueId,
int $limit = 50): iterable
80 return $this->storage->getReadyMessagesOfQueue($queueId, $limit);
85 'Unable to lock messages: ' . $e->getMessage(),
93 'Unable to read messages: ' . $e->getMessage(),
104 return $this->storage->getOneByQueue($queueId);
109 'Unable to lock message: ' . $e->getMessage(),
117 'Unable to read message: ' . $e->getMessage(),
131 $this->retry([$this->storage,
'delete'],
$message);
150 $this->retry([$this->storage,
'delete'],
$message);
155 $this->retry([$this->storage,
'save'],
$message);
166 private function retry(callable $callable,
MessageBox $messageBox): void
174 call_user_func($callable, $messageBox);
178 catch (PersistenceException $e)
180 if ($retry > $this->retryStrategy->getMaxRetryCount() - 1)
185 usleep($this->retryStrategy->getWaitingTime($retry) * 1000);
198 private function addProcessingParams(MessageBox $messageBox,
array $params): void
202 if (!$param instanceof ProcessingParamInterface)
204 throw new ArgumentException(
205 sprintf(
'The type of message processing params should be "%s"', ProcessingParamInterface::class)
209 $messageBox = $param->applyTo($messageBox);
213 private function getRetryStrategy(
string $queueId): RetryStrategyInterface
215 return ServiceLocator::getInstance()
216 ->get(QueueConfigRegistry::class)
217 ->getQueueConfig($queueId)
send(MessageInterface $message, string $queueId, array $params=[])
reject(MessageBox $message)
__construct(Entity $tableEntity, RetryStrategyInterface $retryStrategy=new MultiplierRetryStrategy())