36 $query = $this->buildReadyMessageQuery($queueId);
40 $item =
$query->fetchObject();
47 return $this->getEntityFromOrmItem($item);
56 $query = $this->buildReadyMessageQuery($queueId);
58 $query->setLimit($limit > 0 ? min($limit, 1000) : 50);
69 foreach (
$items as $ormItem)
71 if ($messageBox = $this->getEntityFromOrmItem($ormItem))
73 $collection->add($messageBox);
87 private function buildReadyMessageQuery(
string $queueId):
Query
90 $tableClass = $this->tableEntity->getDataClass();
92 $query = $tableClass::query();
96 ->where(
'QUEUE_ID',
'=', $queueId)
97 ->where(
'STATUS',
'=', MessageStatus::New->value)
98 ->where(
'AVAILABLE_AT',
'<=',
new DateTime())
99 ->setOrder([
'CREATED_AT' =>
'ASC'])
110 public function getStaleMessages(): MessageBoxCollection
112 $collection =
new MessageBoxCollection();
115 $tableClass = $this->tableEntity->getDataClass();
117 $thresholdDate = DateTime::createFromTimestamp(time() - 2 * 86400);
119 $query = $tableClass::query();
123 ->where(
'STATUS',
'=', MessageStatus::Processing->value)
124 ->where(
'UPDATED_AT',
'<', $thresholdDate)
125 ->setOrder([
'CREATED_AT' =>
'ASC'])
135 foreach (
$items as $ormItem)
137 if ($messageBox = $this->getEntityFromOrmItem($ormItem))
139 $collection->add($messageBox);
157 $result = $this->mapper->convertToOrm($messageBox)->save();
208 return $item->
getId();
216 $tableClass = $this->tableEntity->getDataClass();
218 $result = $tableClass::updateMulti($ids, [
'STATUS' =>
$status->value],
true);
222 throw new PersistenceException($e->getMessage(), $e->getCode(), $e);
227 throw new PersistenceException(
'Unable to update status: ' .
$result->getError()->getMessage());
231 private function getEntityFromOrmItem($ormItem): ?MessageBox
235 return $this->mapper->convertFromOrm($ormItem);
237 catch (ArgumentException)
__construct(private readonly Entity $tableEntity)
getReadyMessagesOfQueue(string $queueId, int $limit=50)