71 $lockTag =
'b_messageservice_message';
77 $counts = Internal\Entity\MessageTable::getAllDailyCount();
79 $limit = abs((
int)
Config\Option::get(
"messageservice",
"queue_limit", 5));
89 ->addSelect(
'SENDER_ID')
90 ->addSelect(
'AUTHOR_ID')
91 ->addSelect(
'MESSAGE_FROM')
92 ->addSelect(
'MESSAGE_TO')
93 ->addSelect(
'MESSAGE_HEADERS')
94 ->addSelect(
'MESSAGE_BODY')
95 ->addSelect(
'EXTERNAL_ID')
96 ->where(Query::filter()
98 ->where(Query::filter()
100 ->where(
'SUCCESS_EXEC',
'N')
101 ->where(Query::filter()
103 ->where(
'NEXT_EXEC',
'<',
new DateTime())
104 ->whereNull(
'NEXT_EXEC')
107 ->where(Query::filter()
109 ->where(
'SUCCESS_EXEC',
'P')
110 ->where(
'NEXT_EXEC',
'<', (
new DateTime())->add(
'-2 MINUTE'))
117 if (defined(
'BX_CLUSTER_GROUP'))
119 $query->where(
'CLUSTER_GROUP', \BX_CLUSTER_GROUP);
121 $messageFieldsList = $query->fetchAll();
123 if (!empty($messageFieldsList))
125 $idList = array_column($messageFieldsList,
'ID');
126 MessageTable::updateMulti(
129 'SUCCESS_EXEC' =>
'P',
130 'NEXT_EXEC' => (
new DateTime())->add(
'+2 MINUTE'),
136 $nextDay = static::getNextExecTime();
137 foreach ($messageFieldsList as $messageFields)
139 $serviceId = $messageFields[
'SENDER_ID'] .
':' . $messageFields[
'MESSAGE_FROM'];
142 if (!isset($counts[$serviceId]))
144 $counts[$serviceId] = 0;
147 $sender = $message->getSender();
150 $limit = Sender\Limitation::getDailyLimit($sender->getId(), $messageFields[
'MESSAGE_FROM']);
151 $current = $counts[$serviceId];
153 if ($limit > 0 && $current >= $limit)
157 'NEXT_EXEC' => $nextDay,
161 ++$counts[$serviceId];
166 $result = static::sendMessage($messageFields);
167 $message->updateWithSendResult($result, $nextDay);
169 catch (\Throwable $e)
175 'SUCCESS_EXEC' =>
'E',
177 'EXEC_ERROR' => $e->getMessage(),
216 EventManager::getInstance()->send(
new Event(
"messageservice", static::EVENT_SEND_RESULT, [