Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
queue.php
1<?php
3
15
16
17class Queue
18{
19 public const EVENT_SEND_RESULT = 'messageSendResult';
20
21 public static function hasMessages(): bool
22 {
23 $result = MessageTable::getList([
24 'select' => ['ID'],
25 'filter' => [
26 '=SUCCESS_EXEC' => 'N',
27 [
28 'LOGIC' => 'OR',
29 '<NEXT_EXEC' => new DateTime(),
30 '=NEXT_EXEC' => null,
31 ]
32 ],
33 'limit' => 1,
34 ]);
35
36 return !empty($result->fetch());
37 }
38
42 public static function run()
43 {
44 if (
45 defined('DisableMessageServiceCheck') && DisableMessageServiceCheck === true
46 || (
47 !defined('DisableMessageServiceCheck')
48 && defined("DisableEventsCheck")
49 && DisableEventsCheck === true
50 )
51 )
52 {
53 return null;
54 }
55
56 if (!static::hasMessages())
57 {
58 return "";
59 }
60
61 Application::getInstance()->addBackgroundJob([static::class, "sendMessages"]);
62
63 return "";
64 }
65
69 public static function sendMessages()
70 {
71 $lockTag = 'b_messageservice_message';
72 if (!Application::getConnection()->lock($lockTag))
73 {
74 return "";
75 }
76
77 $counts = Internal\Entity\MessageTable::getAllDailyCount();
78
79 $limit = abs((int)Config\Option::get("messageservice", "queue_limit", 5));
80 if (!$limit)
81 {
82 $limit = 5;
83 }
84
85 $query =
86 MessageTable::query()
87 ->addSelect('ID')
88 ->addSelect('TYPE')
89 ->addSelect('SENDER_ID')
90 ->addSelect('AUTHOR_ID')
91 ->addSelect('MESSAGE_FROM')
92 ->addSelect('MESSAGE_TO')
93 ->addSelect('MESSAGE_HEADERS')
94 ->addSelect('MESSAGE_BODY')
95 ->addSelect('EXTERNAL_ID')
96 ->where(Query::filter()
97 ->logic('or')
98 ->where(Query::filter()
99 ->logic('and')
100 ->where('SUCCESS_EXEC', 'N')
101 ->where(Query::filter()
102 ->logic('or')
103 ->where('NEXT_EXEC', '<', new DateTime())
104 ->whereNull('NEXT_EXEC')
105 )
106 )
107 ->where(Query::filter()
108 ->logic('and')
109 ->where('SUCCESS_EXEC', 'P')
110 ->where('NEXT_EXEC', '<', (new DateTime())->add('-2 MINUTE'))
111 )
112 )
113 ->addOrder('ID')
114 ->setLimit($limit)
115 ;
116
117 if (defined('BX_CLUSTER_GROUP'))
118 {
119 $query->where('CLUSTER_GROUP', \BX_CLUSTER_GROUP);
120 }
121 $messageFieldsList = $query->fetchAll();
122
123 if (!empty($messageFieldsList))
124 {
125 $idList = array_column($messageFieldsList, 'ID');
126 MessageTable::updateMulti(
127 $idList,
128 [
129 'SUCCESS_EXEC' => 'P',
130 'NEXT_EXEC' => (new DateTime())->add('+2 MINUTE'),
131 ],
132 true
133 );
134 }
135
136 $nextDay = static::getNextExecTime();
137 foreach ($messageFieldsList as $messageFields)
138 {
139 $serviceId = $messageFields['SENDER_ID'] . ':' . $messageFields['MESSAGE_FROM'];
140 $message = Message::createFromFields($messageFields);
141
142 if (!isset($counts[$serviceId]))
143 {
144 $counts[$serviceId] = 0;
145 }
146
147 $sender = $message->getSender();
148 if ($sender)
149 {
150 $limit = Sender\Limitation::getDailyLimit($sender->getId(), $messageFields['MESSAGE_FROM']);
151 $current = $counts[$serviceId];
152
153 if ($limit > 0 && $current >= $limit)
154 {
155 $message->update([
156 'STATUS_ID' => MessageStatus::DEFERRED,
157 'NEXT_EXEC' => $nextDay,
158 ]);
159 continue;
160 }
161 ++$counts[$serviceId];
162 }
163
164 try
165 {
166 $result = static::sendMessage($messageFields);
167 $message->updateWithSendResult($result, $nextDay);
168 }
169 catch (\Throwable $e)
170 {
171 Application::getInstance()->getExceptionHandler()->writeToLog($e);
172
173 $message->update([
174 'STATUS_ID' => MessageStatus::EXCEPTION,
175 'SUCCESS_EXEC' => 'E',
176 'DATE_EXEC' => new DateTime(),
177 'EXEC_ERROR' => $e->getMessage(),
178 ]);
179 break;
180 }
181 }
182
183 Application::getConnection()->unlock($lockTag);
184
185 return null;
186 }
187
192 private static function sendMessage(array $messageFields)
193 {
194 $type = $messageFields['TYPE'];
195 if ($type === MessageType::SMS)
196 {
197 $sender = SmsManager::getSenderById($messageFields['SENDER_ID']);
198 if (!$sender)
199 {
200 $sendResult = new SendMessage();
201 $sendResult->addError(new Error(Loc::getMessage("MESSAGESERVICE_QUEUE_SENDER_NOT_FOUND")));
202 }
203 else
204 {
205 $sender->setSocketTimeout(6);
206 $sender->setStreamTimeout(18);
207 $sendResult = $sender->sendMessage($messageFields);
208 }
209 }
210 else
211 {
212 $sendResult = new SendMessage();
213 $sendResult->addError(new Error(Loc::getMessage("MESSAGESERVICE_QUEUE_MESSAGE_TYPE_ERROR")));
214 }
215
216 EventManager::getInstance()->send(new Event("messageservice", static::EVENT_SEND_RESULT, [
217 'message' => $messageFields,
218 'sendResult' => $sendResult,
219 ]));
220
221 return $sendResult;
222 }
223
229 private static function getNextExecTime(): DateTime
230 {
231 $nextDay = DateTime::createFromTimestamp(time() + 86400);
232 $retryTime = Sender\Limitation::getRetryTime();
233 if (!$retryTime['auto'])
234 {
235 if ($nextDay->getTimeZone()->getName() !== $retryTime['tz'])
236 {
237 try //if TZ is incorrect
238 {
239 $nextDay->setTimeZone(new \DateTimeZone($retryTime['tz']));
240 }
241 catch (\Exception $e) {}
242 }
243 $nextDay->setTime($retryTime['h'], $retryTime['i'], 0);
244 }
245 return $nextDay;
246 }
247
251 public static function cleanUpAgent(): string
252 {
253 $period = abs(intval(Config\Option::get("messageservice", "clean_up_period", 14)));
254 $periodInSeconds = $period * 24 * 3600;
255
256 if ($periodInSeconds > 0)
257 {
258 $connection = \Bitrix\Main\Application::getConnection();
259 $datetime = $connection->getSqlHelper()->addSecondsToDateTime('-' . $periodInSeconds);
260 $connection->queryExecute("DELETE FROM b_messageservice_message WHERE DATE_EXEC <= {$datetime}");
261 }
262
263 return __METHOD__.'();';
264 }
265}
static getConnection($name="")
static getMessage($code, $replace=null, $language=null)
Definition loc.php:29
static createFromTimestamp($timestamp)
Definition datetime.php:246
static createFromFields(array $fields, Sender\Base $sender=null)
Definition message.php:89