Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
abstractthreadstrategy.php
1<?php
2
4
14
15abstract class AbstractThreadStrategy implements IThreadStrategy
16{
20 protected $threadId;
21
22 protected $postingId;
23
24 protected $select;
25
26 protected $filter;
27 protected $runtime;
28
29 public const THREAD_UNAVAILABLE = -1;
30 public const THREAD_LOCKED = -2;
31 public const THREAD_NEEDED = 1;
32
38 public function fillThreads(): void
39 {
40 $insertData = [];
41 for ($thread = 0; $thread < static::THREADS_COUNT; $thread++)
42 {
43 $insertData[] = [
44 'THREAD_ID' => $thread,
45 'POSTING_ID' => $this->postingId,
46 'THREAD_TYPE' => static::THREADS_COUNT,
47 'EXPIRE_AT' => new DateTime(),
48 ];
49 }
50
51 SqlBatch::insert(PostingThreadTable::getTableName(), $insertData);
52 }
53
62 public function getRecipients(int $limit): Result
63 {
64 static::setRuntime();
65 static::setFilter();
66 static::setSelect();
67
68 // select all recipients of posting, only not processed
69 $recipients = PostingRecipientTable::getList(
70 [
71 'select' => $this->select,
72 'filter' => $this->filter,
73 'runtime' => $this->runtime,
74 'order' => ['STATUS' => 'DESC'],
75 'limit' => $limit
76 ]
77 );
78
79 $recipients->addFetchDataModifier(
80 function($row)
81 {
82 $row['FIELDS'] = is_array($row['FIELDS']) ? $row['FIELDS'] : [];
83
84 return $row;
85 }
86 );
87
88 return $recipients;
89 }
90
91 abstract protected function setRuntime(): void;
92
93 protected function setFilter() : void
94 {
95 $this->filter = ['=IS_UNSUB' => 'N'];
96 }
97 protected function setSelect(): void
98 {
99 $this->select = [
100 '*',
101 'NAME' => 'CONTACT.NAME',
102 'CONTACT_CODE' => 'CONTACT.CODE',
103 'CONTACT_TYPE_ID' => 'CONTACT.TYPE_ID',
104 'CONTACT_IS_SEND_SUCCESS' => 'CONTACT.IS_SEND_SUCCESS',
105 'CONTACT_BLACKLISTED' => 'CONTACT.BLACKLISTED',
106 'CONTACT_UNSUBSCRIBED' => 'CONTACT.IS_UNSUB',
107 'CONTACT_CONSENT_STATUS' => 'CONTACT.CONSENT_STATUS',
108 'CONTACT_MAILING_UNSUBSCRIBED' => 'MAILING_SUB.IS_UNSUB',
109 'CONTACT_CONSENT_REQUEST' => 'CONTACT.CONSENT_REQUEST',
110 'CAMPAIGN_ID' => 'POSTING.MAILING_ID',
111 ];
112 }
113
121 public function lockThread(): void
122 {
123 if(!static::checkLock())
124 {
125 return;
126 }
127 $thread = PostingThreadTable::getList(
128 [
129 "select" => ["THREAD_ID"],
130 "filter" => [
131 '=POSTING_ID' => $this->postingId,
132 [
133 'LOGIC' => 'OR',
134 [
136 ],
137 [
139 '<EXPIRE_AT' => new DateTime()
140 ]
141 ]
142 ],
143 "limit" => 1
144 ]
145 )->fetchAll();
146
147 if (!isset($thread[0]) && !isset($thread[0]["THREAD_ID"]))
148 {
149 return;
150 }
151 $this->threadId = $thread[0]["THREAD_ID"];
153 $this->unlock();
154 }
155
163 public function checkThreads(): ?int
164 {
165 $thread = PostingThreadTable::getList(
166 [
167 "select" => ["THREAD_ID"],
168 "filter" => [
169 '=POSTING_ID' => $this->postingId,
170 ],
171 "limit" => 1
172 ]
173 )->fetchAll();
174
175 if (isset($thread[0]) && isset($thread[0]["THREAD_ID"]))
176 {
178 }
179
180 return self::THREAD_NEEDED;
181 }
182
188 protected function lock()
189 {
190 $connection = Application::getInstance()->getConnection();
191
192 return $connection->lock($this->getLockName());
193 }
194
200 public function updateStatus(string $status): bool
201 {
202 if($status === PostingThreadTable::STATUS_DONE && !$this->checkToFinalizeStatus())
203 {
205 }
206
207 try
208 {
210 $expireAt = (new \DateTime())->modify("+10 minutes")->format('Y-m-d H:i:s');
211 $updateQuery = 'UPDATE '.$tableName.'
212 SET
213 STATUS = \''.$status.'\',
214 EXPIRE_AT = \''.$expireAt.'\'
215 WHERE
216 THREAD_ID = '.$this->threadId.'
217 AND POSTING_ID = '.$this->postingId;
218 Application::getConnection()->query($updateQuery);
219 }
220 catch (\Exception $e)
221 {
222 return false;
223 }
224
225 return true;
226 }
227
233 protected function unlock()
234 {
235 $connection = Application::getInstance()->getConnection();
236
237 return $connection->unlock($this->getLockName());
238 }
239
245 private function getLockName(): string
246 {
247 return "posting_thread_$this->postingId";
248 }
249
254 public function hasUnprocessedThreads(): bool
255 {
256 try
257 {
258 $threads = PostingThreadTable::getList(
259 [
260 "select" => ["THREAD_ID"],
261 "filter" => [
262 '@STATUS' => new SqlExpression(
263 "?, ?", PostingThreadTable::STATUS_NEW, PostingThreadTable::STATUS_IN_PROGRESS
264 ),
265 '=POSTING_ID' => $this->postingId,
266 '!=THREAD_ID' => $this->threadId
267 ]
268 ]
269 )->fetchAll();
270 }
271 catch (\Exception $e)
272 {
273 }
274
275 return !empty($threads);
276 }
277
282 public function getThreadId(): ?int
283 {
284 return $this->threadId;
285 }
286
291 public function lastThreadId(): int
292 {
293 return static::THREADS_COUNT - 1;
294 }
295
301 public function setPostingId(int $postingId): IThreadStrategy
302 {
303 $this->postingId = $postingId;
304
305 return $this;
306 }
307
312 protected function checkLock()
313 {
314 for($i = 0; $i <= static::lastThreadId(); $i++)
315 {
316 if ($this->lock())
317 {
318 return true;
319 }
320 sleep(rand(1,7));
321 }
322 return false;
323 }
324
328 public function finalize()
329 {
330 if(!$this->checkToFinalizeStatus())
331 {
332 return false;
333 }
334
335 $tableName = PostingThreadTable::getTableName();
336 $sqlHelper = Application::getConnection()->getSqlHelper();
337 $query = 'DELETE FROM ' . $sqlHelper->quote($tableName) . ' WHERE POSTING_ID=' . intval($this->postingId);
338 try
339 {
340 Application::getConnection()->query($query);
341 }
342 catch (SqlQueryException $e)
343 {
344 return false;
345 }
346
347 return true;
348 }
349
350 private function checkToFinalizeStatus()
351 {
352 if($this->threadId < static::lastThreadId())
353 {
354 return true;
355 }
356
357 return !static::hasUnprocessedThreads();
358 }
359
364 public function isProcessLimited(): bool
365 {
366 return false;
367 }
368
369}