Bitrix-D7
23.9
Загрузка...
Поиск...
Не найдено
abstractthreadstrategy.php
1
<?php
2
3
namespace
Bitrix\Sender\Posting\SegmentThreadStrategy
;
4
5
use
Bitrix\Main\Application
;
6
use
Bitrix\Main\DB
;
7
use
Bitrix\Main\DB\SqlExpression
;
8
use
Bitrix\Main\DB\SqlQueryException
;
9
use
Bitrix\Main\ORM\Query\Result
;
10
use
Bitrix\Main\Type\DateTime
;
11
use
Bitrix\Sender\Internals\Model\GroupThreadTable
;
12
use
Bitrix\Sender\Internals\SqlBatch
;
13
use
Bitrix\Sender\Posting\Locker
;
14
use
Bitrix\Sender\PostingRecipientTable
;
15
16
abstract
class
AbstractThreadStrategy
implements
ThreadStrategy
17
{
21
protected
$threadId
;
22
23
protected
$groupStateId
;
24
protected
$offset
;
25
protected
$perPage
= 100000;
26
protected
const
GROUP_THREAD_LOCK_KEY
=
'group_thread_'
;
27
public
const
THREAD_UNAVAILABLE
= -1;
28
public
const
THREAD_LOCKED
= -2;
29
public
const
THREAD_NEEDED
= 1;
30
31
37
public
function
fillThreads
(): void
38
{
39
$insertData = [];
40
for
($thread = 0; $thread < static::THREADS_COUNT; $thread++)
41
{
42
$insertData[] = [
43
'THREAD_ID'
=> $thread,
44
'GROUP_STATE_ID'
=>
$this->groupStateId
,
45
'THREAD_TYPE'
=> static::THREADS_COUNT,
46
'EXPIRE_AT'
=>
new
DateTime
(),
47
];
48
}
49
50
SqlBatch::insert(GroupThreadTable::getTableName(), $insertData);
51
}
52
60
public
function
lockThread
(): ?int
61
{
62
if
(!static::checkLock())
63
{
64
return
self::THREAD_UNAVAILABLE
;
65
}
66
$thread = GroupThreadTable::getList(
67
[
68
"select"
=> [
69
"THREAD_ID"
,
70
"STEP"
71
],
72
"filter"
=> [
73
'=GROUP_STATE_ID'
=> $this->groupStateId,
74
[
75
'LOGIC'
=>
'OR'
,
76
[
77
'=STATUS'
=> GroupThreadTable::STATUS_NEW,
78
],
79
[
80
'=STATUS'
=> GroupThreadTable::STATUS_IN_PROGRESS,
81
'<EXPIRE_AT'
=>
new
DateTime
()
82
]
83
]
84
],
85
"limit"
=> 1
86
]
87
)->fetch();
88
89
if
(!isset($thread[
"THREAD_ID"
]))
90
{
91
return
self::THREAD_UNAVAILABLE
;
92
}
93
$this->threadId = (int)$thread[
"THREAD_ID"
];
94
$this->offset = $this->threadId === 0 && (int)$thread[
"STEP"
] === 0
95
? 0 : $this->threadId * $this->perPage + (static::lastThreadId() + 1) * $this->perPage * $thread[
"STEP"
];
96
97
$this->
updateStatus
(GroupThreadTable::STATUS_IN_PROGRESS);
98
Locker::unlock
(self::GROUP_THREAD_LOCK_KEY, $this->groupStateId);
99
100
return
$this->threadId
;
101
}
102
110
public
function
checkThreads
(): ?int
111
{
112
if
(!static::checkLock())
113
{
114
return
self::THREAD_LOCKED
;
115
}
116
117
$thread = GroupThreadTable::getList(
118
[
119
"select"
=> [
120
"THREAD_ID"
,
121
"STEP"
122
],
123
"filter"
=> [
124
'=GROUP_STATE_ID'
=> $this->groupStateId
125
],
126
"limit"
=> 1
127
]
128
)->fetch();
129
130
if
(isset($thread[
"THREAD_ID"
]))
131
{
132
return
self::THREAD_UNAVAILABLE
;
133
}
134
Locker::unlock
(self::GROUP_THREAD_LOCK_KEY, $this->groupStateId);
135
136
return
self::THREAD_NEEDED
;
137
}
138
144
public
function
updateStatus
(
string
$status): bool
145
{
146
if
($status === GroupThreadTable::STATUS_DONE && !$this->checkToFinalizeStatus())
147
{
148
$status = GroupThreadTable::STATUS_NEW;
149
}
150
151
try
152
{
153
$counter = (int)($status === GroupThreadTable::STATUS_IN_PROGRESS);
154
$tableName = GroupThreadTable::getTableName();
155
$expireAt = (new \DateTime())->modify(
"+10 minutes"
)->format(
'Y-m-d H:i:s'
);
156
$updateQuery =
'UPDATE '
. $tableName .
'
157
SET
158
STATUS = \''
. $status .
'\'
,
159
STEP = STEP + \
''
. $counter .
'\'
,
160
EXPIRE_AT = \
''
. $expireAt .
'\'
161
WHERE
162
THREAD_ID =
' . $this->threadId . '
163
AND GROUP_STATE_ID =
' . $this->groupStateId;
164
Application::getConnection()->query($updateQuery);
165
} catch (\Exception $e)
166
{
167
return false;
168
}
169
170
return true;
171
}
172
177
public function hasUnprocessedThreads(): bool
178
{
179
try
180
{
181
$threads = GroupThreadTable::getList(
182
[
183
"select" => ["THREAD_ID"],
184
"filter" => [
185
'@STATUS
' => new SqlExpression(
186
"?, ?", GroupThreadTable::STATUS_NEW, GroupThreadTable::STATUS_IN_PROGRESS
187
),
188
'
=GROUP_STATE_ID
' => $this->groupStateId,
189
'
!=THREAD_ID
' => $this->threadId
190
]
191
]
192
)->fetchAll();
193
} catch (\Exception $e)
194
{
195
}
196
197
return !empty($threads);
198
}
199
204
public function getThreadId(): ?int
205
{
206
return $this->threadId;
207
}
208
213
public function lastThreadId(): int
214
{
215
return static::THREADS_COUNT - 1;
216
}
217
223
public function setGroupStateId(int $groupStateId): ThreadStrategy
224
{
225
$this->groupStateId = $groupStateId;
226
227
return $this;
228
}
229
234
protected function checkLock()
235
{
236
for ($i = 0; $i <= static::lastThreadId(); $i++)
237
{
238
if (Locker::lock(self::GROUP_THREAD_LOCK_KEY, $this->groupStateId))
239
{
240
return true;
241
}
242
sleep(rand(1, 7));
243
}
244
return false;
245
}
246
250
public function finalize()
251
{
252
if (!$this->checkToFinalizeStatus())
253
{
254
return false;
255
}
256
257
$tableName = GroupThreadTable::getTableName();
258
$sqlHelper = Application::getConnection()->getSqlHelper();
259
$query = 'DELETE FROM
' . $sqlHelper->quote($tableName) . '
WHERE GROUP_STATE_ID=
' . intval($this->groupStateId);
260
try
261
{
262
Application::getConnection()->query($query);
263
} catch (SqlQueryException $e)
264
{
265
return false;
266
}
267
268
return true;
269
}
270
271
private function checkToFinalizeStatus()
272
{
273
if ($this->threadId < static::lastThreadId())
274
{
275
return true;
276
}
277
278
return !static::hasUnprocessedThreads();
279
}
280
281
282
public function getOffset(): ?int
283
{
284
return intval($this->offset);
285
}
286
287
public function setPerPage(int $perPage)
288
{
289
$this->perPage = $perPage;
290
}
291
296
public function isProcessLimited(): bool
297
{
298
return false;
299
}
300
301
}
Bitrix\Main\Application
Definition
application.php:28
Bitrix\Main\DB\SqlExpression
Definition
sqlexpression.php:19
Bitrix\Main\DB\SqlQueryException
Definition
sqlexception.php:24
Bitrix\Main\ORM\Data\Result
Definition
result.php:16
Bitrix\Main\Type\DateTime
Definition
datetime.php:9
Bitrix\Sender\Internals\Model\GroupThreadTable
Definition
groupthread.php:33
Bitrix\Sender\Internals\SqlBatch
Definition
sqlbatch.php:22
Bitrix\Sender\Posting\Locker
Definition
locker.php:10
Bitrix\Sender\Posting\Locker\unlock
static unlock(string $key, int $id)
Definition
locker.php:34
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy
Definition
abstractthreadstrategy.php:17
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\fillThreads
fillThreads()
Definition
abstractthreadstrategy.php:37
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\$threadId
$threadId
Definition
abstractthreadstrategy.php:21
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\updateStatus
updateStatus(string $status)
Definition
abstractthreadstrategy.php:144
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\checkThreads
checkThreads()
Definition
abstractthreadstrategy.php:110
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\lockThread
lockThread()
Definition
abstractthreadstrategy.php:60
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\$groupStateId
$groupStateId
Definition
abstractthreadstrategy.php:23
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\THREAD_LOCKED
const THREAD_LOCKED
Definition
abstractthreadstrategy.php:28
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\THREAD_UNAVAILABLE
const THREAD_UNAVAILABLE
Definition
abstractthreadstrategy.php:27
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\THREAD_NEEDED
const THREAD_NEEDED
Definition
abstractthreadstrategy.php:29
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\GROUP_THREAD_LOCK_KEY
const GROUP_THREAD_LOCK_KEY
Definition
abstractthreadstrategy.php:26
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\$perPage
$perPage
Definition
abstractthreadstrategy.php:25
Bitrix\Sender\Posting\SegmentThreadStrategy\AbstractThreadStrategy\$offset
$offset
Definition
abstractthreadstrategy.php:24
Bitrix\Sender\PostingRecipientTable
Definition
posting.php:663
Bitrix\Sender\Posting\SegmentThreadStrategy\ThreadStrategy
Definition
threadstrategy.php:8
Bitrix\Main\DB
Definition
arrayresult.php:2
Bitrix\Sender\Posting\SegmentThreadStrategy
Definition
abstractthreadstrategy.php:3
modules
sender
lib
posting
segmentthreadstrategy
abstractthreadstrategy.php
Создано системой
1.10.0