Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
abstractthreadstrategy.php
1<?php
2
4
15
16abstract class AbstractThreadStrategy implements ThreadStrategy
17{
21 protected $threadId;
22
23 protected $groupStateId;
24 protected $offset;
25 protected $perPage = 100000;
26 protected const GROUP_THREAD_LOCK_KEY = 'group_thread_';
27 public const THREAD_UNAVAILABLE = -1;
28 public const THREAD_LOCKED = -2;
29 public const THREAD_NEEDED = 1;
30
31
37 public function fillThreads(): void
38 {
39 $insertData = [];
40 for ($thread = 0; $thread < static::THREADS_COUNT; $thread++)
41 {
42 $insertData[] = [
43 'THREAD_ID' => $thread,
44 'GROUP_STATE_ID' => $this->groupStateId,
45 'THREAD_TYPE' => static::THREADS_COUNT,
46 'EXPIRE_AT' => new DateTime(),
47 ];
48 }
49
50 SqlBatch::insert(GroupThreadTable::getTableName(), $insertData);
51 }
52
60 public function lockThread(): ?int
61 {
62 if (!static::checkLock())
63 {
65 }
66 $thread = GroupThreadTable::getList(
67 [
68 "select" => [
69 "THREAD_ID",
70 "STEP"
71 ],
72 "filter" => [
73 '=GROUP_STATE_ID' => $this->groupStateId,
74 [
75 'LOGIC' => 'OR',
76 [
77 '=STATUS' => GroupThreadTable::STATUS_NEW,
78 ],
79 [
80 '=STATUS' => GroupThreadTable::STATUS_IN_PROGRESS,
81 '<EXPIRE_AT' => new DateTime()
82 ]
83 ]
84 ],
85 "limit" => 1
86 ]
87 )->fetch();
88
89 if (!isset($thread["THREAD_ID"]))
90 {
92 }
93 $this->threadId = (int)$thread["THREAD_ID"];
94 $this->offset = $this->threadId === 0 && (int)$thread["STEP"] === 0
95 ? 0 : $this->threadId * $this->perPage + (static::lastThreadId() + 1) * $this->perPage * $thread["STEP"];
96
97 $this->updateStatus(GroupThreadTable::STATUS_IN_PROGRESS);
98 Locker::unlock(self::GROUP_THREAD_LOCK_KEY, $this->groupStateId);
99
100 return $this->threadId;
101 }
102
110 public function checkThreads(): ?int
111 {
112 if (!static::checkLock())
113 {
114 return self::THREAD_LOCKED;
115 }
116
117 $thread = GroupThreadTable::getList(
118 [
119 "select" => [
120 "THREAD_ID",
121 "STEP"
122 ],
123 "filter" => [
124 '=GROUP_STATE_ID' => $this->groupStateId
125 ],
126 "limit" => 1
127 ]
128 )->fetch();
129
130 if (isset($thread["THREAD_ID"]))
131 {
133 }
134 Locker::unlock(self::GROUP_THREAD_LOCK_KEY, $this->groupStateId);
135
136 return self::THREAD_NEEDED;
137 }
138
144 public function updateStatus(string $status): bool
145 {
146 if ($status === GroupThreadTable::STATUS_DONE && !$this->checkToFinalizeStatus())
147 {
148 $status = GroupThreadTable::STATUS_NEW;
149 }
150
151 try
152 {
153 $counter = (int)($status === GroupThreadTable::STATUS_IN_PROGRESS);
154 $tableName = GroupThreadTable::getTableName();
155 $expireAt = (new \DateTime())->modify("+10 minutes")->format('Y-m-d H:i:s');
156 $updateQuery = 'UPDATE ' . $tableName . '
157 SET
158 STATUS = \'' . $status . '\',
159 STEP = STEP + \'' . $counter . '\',
160 EXPIRE_AT = \'' . $expireAt . '\'
161 WHERE
162 THREAD_ID = ' . $this->threadId . '
163 AND GROUP_STATE_ID = ' . $this->groupStateId;
164 Application::getConnection()->query($updateQuery);
165 } catch (\Exception $e)
166 {
167 return false;
168 }
169
170 return true;
171 }
172
177 public function hasUnprocessedThreads(): bool
178 {
179 try
180 {
181 $threads = GroupThreadTable::getList(
182 [
183 "select" => ["THREAD_ID"],
184 "filter" => [
185 '@STATUS' => new SqlExpression(
186 "?, ?", GroupThreadTable::STATUS_NEW, GroupThreadTable::STATUS_IN_PROGRESS
187 ),
188 '=GROUP_STATE_ID' => $this->groupStateId,
189 '!=THREAD_ID' => $this->threadId
190 ]
191 ]
192 )->fetchAll();
193 } catch (\Exception $e)
194 {
195 }
196
197 return !empty($threads);
198 }
199
204 public function getThreadId(): ?int
205 {
206 return $this->threadId;
207 }
208
213 public function lastThreadId(): int
214 {
215 return static::THREADS_COUNT - 1;
216 }
217
223 public function setGroupStateId(int $groupStateId): ThreadStrategy
224 {
225 $this->groupStateId = $groupStateId;
226
227 return $this;
228 }
229
234 protected function checkLock()
235 {
236 for ($i = 0; $i <= static::lastThreadId(); $i++)
237 {
238 if (Locker::lock(self::GROUP_THREAD_LOCK_KEY, $this->groupStateId))
239 {
240 return true;
241 }
242 sleep(rand(1, 7));
243 }
244 return false;
245 }
246
250 public function finalize()
251 {
252 if (!$this->checkToFinalizeStatus())
253 {
254 return false;
255 }
256
257 $tableName = GroupThreadTable::getTableName();
258 $sqlHelper = Application::getConnection()->getSqlHelper();
259 $query = 'DELETE FROM ' . $sqlHelper->quote($tableName) . ' WHERE GROUP_STATE_ID=' . intval($this->groupStateId);
260 try
261 {
262 Application::getConnection()->query($query);
263 } catch (SqlQueryException $e)
264 {
265 return false;
266 }
267
268 return true;
269 }
270
271 private function checkToFinalizeStatus()
272 {
273 if ($this->threadId < static::lastThreadId())
274 {
275 return true;
276 }
277
278 return !static::hasUnprocessedThreads();
279 }
280
281
282 public function getOffset(): ?int
283 {
284 return intval($this->offset);
285 }
286
287 public function setPerPage(int $perPage)
288 {
289 $this->perPage = $perPage;
290 }
291
296 public function isProcessLimited(): bool
297 {
298 return false;
299 }
300
301}
static unlock(string $key, int $id)
Definition locker.php:34