28 private array $brokers = [];
33 private array $configs = [];
46 private function loadConfig(): void
48 $this->loadGlobalConfig();
49 $this->loadModulesConfigs();
55 private function loadGlobalConfig(): void
57 $config = Configuration::getValue(
'messenger');
66 'table' => MessengerMessageTable::class,
73 if (empty(
$config[
'brokers'][
'default']))
75 throw new ConfigurationException(
'Default broker for messenger did not configured');
80 $this->loadBrokerConfig(
$name, $broker);
87 private function loadModulesConfigs(): void
99 $this->loadBrokerConfig(
$name, $broker);
111 private function loadBrokerConfig(
string $brokerName,
array $brokerConfig): void
113 if (empty($brokerConfig[
'type']) || empty($brokerConfig[
'params']))
115 throw new ConfigurationException(
'Wrong configuration of broker ' . $brokerName);
121 throw new ConfigurationException(
'Unknown broker type ' . $brokerConfig[
'type']);
124 $this->configs[$brokerName] = $brokerConfig;
132 public function getBroker(
string $queueId): BrokerInterface
135 $registry = ServiceLocator::getInstance()->get(QueueConfigRegistry::class);
137 $queueConfig = $registry->getQueueConfig($queueId);
141 throw new ConfigurationException(sprintf(
'Queue "%s" was not configured', $queueId));
144 $brokerCode = $queueConfig->brokerCode ??
'default';
146 return $this->brokers[$brokerCode] ?? $this->initBroker($brokerCode);
154 $tableClass = $brokerConfig[
'params'][
'table'] ??
null;
156 if (empty($tableClass))
161 if ($tableClass !== MessengerMessageTable::class && !is_subclass_of($tableClass, MessengerMessageTable::class))
164 sprintf(
'Table class "%s" should be in hierarchy of "%s"', $tableClass, MessengerMessageTable::class)
178 if (!isset($this->configs[$brokerCode]))
183 $brokerConfig = $this->configs[$brokerCode];
187 if (isset($brokerConfig[
'params'][
'module']))
197 $this->brokers[$brokerCode] =
new DbBroker($tableClass::getEntity());
199 return $this->brokers[$brokerCode];
201 catch (ArgumentException $e)
203 throw new ConfigurationException($e->getMessage(), $e);
207 throw new ConfigurationException(sprintf(
'Broker "%s" could not be initialized', $brokerCode));
static includeModule($moduleName)
getDbBrokerTableClass(array $brokerConfig, string $brokerName)
static getInstalledModules()