Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
pushmanager.php
1<?php
2
4
9use Bitrix\Calendar\Core\Queue;
12use Bitrix\Calendar\Internals\EO_Push;
30use Exception;
31use Throwable;
32
34{
35 public const TYPE_CONNECTION = 'CONNECTION';
36 public const TYPE_SECTION_CONNECTION = 'SECTION_CONNECTION';
37 public const TYPE_SECTION = 'SECTION';
38
39 private const LOCK_CONNECTION_TIME = 20;
40
41 public const QUEUE_ROUTE_KEY_SECTION = 'calendar:SyncSectionPush';
42 public const QUEUE_ROUTE_KEY_CONNECTION = 'calendar:SyncConnectionPush';
43
55 public function getPush(string $entityType, int $entityId): ?Push
56 {
57 $data = PushTable::query()
58 ->setSelect(['*'])
59 ->addFilter('=ENTITY_TYPE', $entityType)
60 ->addFilter('ENTITY_ID', $entityId)
61 ->exec()->fetchObject();
62 if ($data)
63 {
64 return (new BuilderPushFromDM($data))->build();
65 }
66
67 return null;
68 }
69
80 public function addPush(string $entityType, int $entityId, array $data): Result
81 {
82 $result = new Result();
83 $data['ENTITY_TYPE'] = $entityType;
84 $data['ENTITY_ID'] = $entityId;
85
87 if ($addResult = PushTable::add($data)->getObject())
88 {
89 $result->setData([
90 'push' => (new BuilderPushFromDM($addResult))->build(),
91 ]);
92 }
93 else
94 {
95 $result->addError(new Error('Error of add push info into db.'));
96 }
97
98 return $result;
99 }
100
110 public function renewPush(Push $push, array $data): Result
111 {
112 $result = new Result();
113
114 // TODO: move this logic to push-mapper
115 $updateResult = PushTable::update([
116 'ENTITY_TYPE' => $push->getEntityType(),
117 'ENTITY_ID' => $push->getEntityId(),
118 ], $data);
119
120 if ($updateResult->isSuccess())
121 {
122 $push->setExpireDate(new Date($data['EXPIRES']));
123 $result->setData([
124 'push' => $push,
125 ]);
126 }
127 else
128 {
129 $result->addError(new Error('Error of update push in db.'));
130 }
131
132 return $result;
133 }
134
142 public function updatePush(Push $pushChannel): void
143 {
144 $data = [
145 'CHANNEL_ID' => $pushChannel->getChannelId(),
146 'RESOURCE_ID' => $pushChannel->getResourceId(),
147 'EXPIRES' => $pushChannel->getExpireDate()
148 ? $pushChannel->getExpireDate()->getDate()
149 : null
150 ,
151 'NOT_PROCESSED' => $pushChannel->getProcessStatus(),
152 'FIRST_PUSH_DATE' => $pushChannel->getFirstPushDate()
153 ? $pushChannel->getFirstPushDate()->getDate()
154 : null
155 ];
156 PushTable::update(
157 [
158 'ENTITY_TYPE' => $pushChannel->getEntityType(),
159 'ENTITY_ID' => $pushChannel->getEntityId(),
160 ],
161 $data
162 );
163 }
164
170 public function deletePush(Push $push): void
171 {
172 PushTable::delete([
173 'ENTITY_TYPE' => $push->getEntityType(),
174 'ENTITY_ID' => $push->getEntityId(),
175 ]);
176 }
177
193 public function handlePush(string $channel, string $resourceId, bool $forceUnprocessedPush = false): Result
194 {
195 $result = new Result();
196 $row = PushTable::query()
197 ->setSelect(['*'])
198 ->addFilter('=CHANNEL_ID', $channel)
199 ->addFilter('=RESOURCE_ID', $resourceId)
200 ->exec()->fetchObject()
201 ;
202 if ($row)
203 {
204 $push = (new BuilderPushFromDM($row))->build();
205
206 if ($push->isBlocked())
207 {
208 $this->setUnprocessedPush($push);
209
210 return new Result();
211 }
212
213 if (!$forceUnprocessedPush && $push->isUnprocessed())
214 {
215 return new Result();
216 }
217
218 try
219 {
220 $this->blockPush($push);
221 if ($push->getEntityType() === self::TYPE_SECTION_CONNECTION)
222 {
223 $this->syncSection($push);
224 }
225 elseif ($push->getEntityType() === self::TYPE_CONNECTION)
226 {
227 $this->syncConnection($push);
228 }
229
230 if ($this->getPushState($push->getEntityType(), $push->getEntityId())
231 === Dictionary::PUSH_STATUS_PROCESS['unprocessed'])
232 {
233 $this->handlePush($channel, $resourceId, true);
234 }
235 }
236 catch(Throwable $e)
237 {
238 }
239 finally
240 {
241 $this->setUnblockPush($push);
242 }
243
244
245 }
246
247 return $result;
248 }
249
259 private function getPushState(string $entityType, string $entityId)
260 {
261 $row = PushTable::query()
262 ->setSelect(['NOT_PROCESSED'])
263 ->addFilter('=ENTITY_TYPE', $entityType)
264 ->addFilter('=ENTITY_ID', $entityId)
265 ->exec()->fetch();
266
267 return $row['NOT_PROCESSED'] ?? null;
268 }
269
279 private function syncSection(Push $push): void
280 {
283 $sectionLink = (new SectionConnection())->getById($push->getEntityId());
284
285 if ($sectionLink)
286 {
287 try
288 {
289 if (!$this->lockConnection($sectionLink->getConnection(), self::LOCK_CONNECTION_TIME))
290 {
291 $this->pushSectionToQueue($sectionLink);
292 return;
293 }
294 $syncSectionMap = new SyncSectionMap();
295 $syncSection = (new SyncSection())
296 ->setSection($sectionLink->getSection())
297 ->setSectionConnection($sectionLink)
298 ->setVendorName($sectionLink->getConnection()->getVendor()->getCode());
299
300 $syncSectionMap->add(
301 $syncSection,
302 $syncSection->getSectionConnection()->getVendorSectionId()
303 );
304
305 $factory = FactoryBuilder::create(
306 $sectionLink->getConnection()->getVendor()->getCode(),
307 $sectionLink->getConnection(),
308 new Sync\Util\Context()
309 );
310
311 $manager = new VendorDataExchangeManager($factory, $syncSectionMap);
312
313 $manager
314 ->importEvents()
315 ->updateConnection($sectionLink->getConnection());
316
317 $this->markPushSuccess($push, true);
318 }
319 catch(BaseException $e)
320 {
321 $this->markPushSuccess($push, false);
322 }
323 finally
324 {
325 $this->unLockConnection($sectionLink->getConnection());
326 }
327 }
328 else
329 {
330 $this->deletePush($push);
331 }
332 }
333
339 private function syncConnection(Push $push): void
340 {
341 try
342 {
344 $connection = (new Connection())->getById($push->getEntityId());
345 if (!$connection || $connection->isDeleted())
346 {
347 return;
348 }
349 }
350 catch (ArgumentException $e)
351 {
352 return;
353 }
354
355 try
356 {
357
358 if (!$this->lockConnection($connection, self::LOCK_CONNECTION_TIME))
359 {
360 $this->pushConnectionToQueue($connection);
361 return;
362 }
363
364 $factory = FactoryBuilder::create(
365 $connection->getVendor()->getCode(),
366 $connection,
367 new Sync\Util\Context()
368 );
369 if ($factory)
370 {
371 $manager = new VendorDataExchangeManager(
372 $factory,
373 (new SyncSectionFactory())->getSyncSectionMapByFactory($factory)
374 );
375 $manager
376 ->importSections()
377 ->updateConnection($factory->getConnection())
378 ;
379 }
380 }
381 catch(\Exception $e)
382 {
383 }
384 finally
385 {
386 $this->unLockConnection($connection);
387 }
388
389 }
390
399 private function markPushSuccess(Push $push, bool $success): void
400 {
401 if (!$success)
402 {
403 $push->setProcessStatus(Dictionary::PUSH_STATUS_PROCESS['unblocked']);
404 $this->updatePush($push);
405 }
406 elseif(!$push->getFirstPushDate())
407 {
408 $push->setFirstPushDate(new Date());
409 $this->updatePush($push);
410 }
411 }
412
418 public function setBlockPush(?Push $push): bool
419 {
420 if (!$push || $push->isProcessed())
421 {
422 return false;
423 }
424
425 try
426 {
427 return $this->blockPush($push);
428 }
429 catch (Exception $e)
430 {
431 return false;
432 }
433 }
434
444 private function blockPush(Push $push): bool
445 {
446 return PushTable::update(
447 [
448 'ENTITY_TYPE' => $push->getEntityType(),
449 'ENTITY_ID' => $push->getEntityId(),
450 ],
451 [
452 'NOT_PROCESSED' => Dictionary::PUSH_STATUS_PROCESS['block']
453 ]
454 )->isSuccess();
455 }
456
470 public function setUnblockPush(?Push $push): void
471 {
472 if (!$push)
473 {
474 return;
475 }
476
477 PushTable::update(
478 [
479 'ENTITY_TYPE' => $push->getEntityType(),
480 'ENTITY_ID' => $push->getEntityId(),
481 ],
482 [
483 'NOT_PROCESSED' => Dictionary::PUSH_STATUS_PROCESS['unblocked']
484 ]
485 );
486
487 if ($push->isUnprocessed())
488 {
489 $this->handlePush($push->getChannelId(), $push->getResourceId());
490 }
491 }
492
497 public function setUnprocessedPush(?Push $push): void
498 {
499 if (!$push || $push->isUnprocessed())
500 {
501 return;
502 }
503
504 PushTable::update(
505 [
506 'ENTITY_TYPE' => $push->getEntityType(),
507 'ENTITY_ID' => $push->getEntityId(),
508 ],
509 [
510 'NOT_PROCESSED' => Dictionary::PUSH_STATUS_PROCESS['unprocessed']
511 ]
512 );
513 }
514
522 public function lockConnection(Sync\Connection\Connection $connection, int $time = 30): bool
523 {
524 return $this->getMutex($connection)->lock($time);
525 }
526
532 public function unLockConnection(Sync\Connection\Connection $connection): bool
533 {
534 return $this->getMutex($connection)->unlock();
535 }
536
542 private function getMutex(Sync\Connection\Connection $connection): Mutex
543 {
544 $key = 'lockPushForConnection_' . $connection->getId();
545 return new Mutex($key);
546 }
547
557 private function pushSectionToQueue(Sync\Connection\SectionConnection $sectionLink): void
558 {
559 $message = (new Queue\Message\Message())
560 ->setBody([
561 Sync\Push\Dictionary::PUSH_TYPE['sectionConnection'] => $sectionLink->getId(),
562 ])
563 ->setRoutingKey(self::QUEUE_ROUTE_KEY_SECTION);
564 Queue\Producer\Factory::getProduser()->send($message);
565 }
566
576 private function pushConnectionToQueue(Sync\Connection\Connection $connection): void
577 {
578 $message = (new Queue\Message\Message())
579 ->setBody([
580 Sync\Push\Dictionary::PUSH_TYPE['connection'] => $connection->getId(),
581 ])
582 ->setRoutingKey(self::QUEUE_ROUTE_KEY_CONNECTION);
583 Queue\Producer\Factory::getProduser()->send($message);
584 }
585}
lockConnection(Sync\Connection\Connection $connection, int $time=30)
handlePush(string $channel, string $resourceId, bool $forceUnprocessedPush=false)
getPush(string $entityType, int $entityId)
unLockConnection(Sync\Connection\Connection $connection)
setExpireDate(Date $expireDate)
Definition push.php:123