39 private const LOCK_CONNECTION_TIME = 20;
55 public function getPush(
string $entityType,
int $entityId): ?
Push
57 $data = PushTable::query()
59 ->addFilter(
'=ENTITY_TYPE', $entityType)
60 ->addFilter(
'ENTITY_ID', $entityId)
61 ->exec()->fetchObject();
80 public function addPush(
string $entityType,
int $entityId, array $data):
Result
83 $data[
'ENTITY_TYPE'] = $entityType;
84 $data[
'ENTITY_ID'] = $entityId;
87 if ($addResult = PushTable::add($data)->getObject())
95 $result->addError(
new Error(
'Error of add push info into db.'));
115 $updateResult = PushTable::update([
120 if ($updateResult->isSuccess())
129 $result->addError(
new Error(
'Error of update push in db.'));
193 public function handlePush(
string $channel,
string $resourceId,
bool $forceUnprocessedPush =
false):
Result
196 $row = PushTable::query()
198 ->addFilter(
'=CHANNEL_ID', $channel)
199 ->addFilter(
'=RESOURCE_ID', $resourceId)
200 ->exec()->fetchObject()
206 if ($push->isBlocked())
213 if (!$forceUnprocessedPush && $push->isUnprocessed())
220 $this->blockPush($push);
221 if ($push->getEntityType() === self::TYPE_SECTION_CONNECTION)
223 $this->syncSection($push);
225 elseif ($push->getEntityType() === self::TYPE_CONNECTION)
227 $this->syncConnection($push);
230 if ($this->getPushState($push->getEntityType(), $push->getEntityId())
233 $this->
handlePush($channel, $resourceId,
true);
259 private function getPushState(
string $entityType,
string $entityId)
261 $row = PushTable::query()
262 ->setSelect([
'NOT_PROCESSED'])
263 ->addFilter(
'=ENTITY_TYPE', $entityType)
264 ->addFilter(
'=ENTITY_ID', $entityId)
267 return $row[
'NOT_PROCESSED'] ??
null;
279 private function syncSection(Push $push): void
283 $sectionLink = (
new SectionConnection())->getById($push->getEntityId());
289 if (!$this->lockConnection($sectionLink->getConnection(), self::LOCK_CONNECTION_TIME))
291 $this->pushSectionToQueue($sectionLink);
294 $syncSectionMap =
new SyncSectionMap();
295 $syncSection = (
new SyncSection())
296 ->setSection($sectionLink->getSection())
297 ->setSectionConnection($sectionLink)
298 ->setVendorName($sectionLink->getConnection()->getVendor()->getCode());
300 $syncSectionMap->add(
302 $syncSection->getSectionConnection()->getVendorSectionId()
305 $factory = FactoryBuilder::create(
306 $sectionLink->getConnection()->getVendor()->getCode(),
307 $sectionLink->getConnection(),
311 $manager =
new VendorDataExchangeManager($factory, $syncSectionMap);
315 ->updateConnection($sectionLink->getConnection());
317 $this->markPushSuccess($push,
true);
319 catch(BaseException $e)
321 $this->markPushSuccess($push,
false);
325 $this->unLockConnection($sectionLink->getConnection());
339 private function syncConnection(Push $push): void
344 $connection = (
new Connection())->getById($push->getEntityId());
345 if (!$connection || $connection->isDeleted())
350 catch (ArgumentException $e)
358 if (!$this->
lockConnection($connection, self::LOCK_CONNECTION_TIME))
360 $this->pushConnectionToQueue($connection);
364 $factory = FactoryBuilder::create(
365 $connection->getVendor()->getCode(),
367 new Sync\Util\Context()
371 $manager =
new VendorDataExchangeManager(
373 (
new SyncSectionFactory())->getSyncSectionMapByFactory($factory)
377 ->updateConnection($factory->getConnection())
399 private function markPushSuccess(Push $push,
bool $success): void
406 elseif(!$push->getFirstPushDate())
408 $push->setFirstPushDate(
new Date());
427 return $this->blockPush($push);
444 private function blockPush(
Push $push): bool
446 return PushTable::update(
524 return $this->getMutex($connection)->lock($time);
534 return $this->getMutex($connection)->unlock();
544 $key =
'lockPushForConnection_' . $connection->getId();
545 return new Mutex($key);
557 private function pushSectionToQueue(Sync\
Connection\SectionConnection $sectionLink): void
559 $message = (
new Queue\Message\Message())
561 Sync\Push\Dictionary::PUSH_TYPE[
'sectionConnection'] => $sectionLink->getId(),
563 ->setRoutingKey(self::QUEUE_ROUTE_KEY_SECTION);
564 Queue\Producer\Factory::getProduser()->send($message);
578 $message = (
new Queue\Message\Message())
580 Sync\Push\Dictionary::PUSH_TYPE[
'connection'] => $connection->getId(),
582 ->setRoutingKey(self::QUEUE_ROUTE_KEY_CONNECTION);
583 Queue\Producer\Factory::getProduser()->send($message);