Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
baseagent.php
1<?php
2
4
7use CAgent;
8use Exception;
9
10abstract class BaseAgent
11{
12 private const TIME_LIMIT = 10;
13
14 protected bool $isEscalated = false;
18 public static function runAgent(): string
19 {
20 try
21 {
22 $runner = new static();
23 $runner->run();
24 self::modifyAgent($runner);
25
26 }
27 catch(Exception $e)
28 {
29 // TODO: log it
30 }
31
32 return static::getAgentName();
33 }
34
38 protected static function getAgentName(): string
39 {
40 return static::class . "::runAgent();";
41 }
42
46 protected static function getModule(): string
47 {
48 return 'calendar';
49 }
50
56 private static function modifyAgent(BaseAgent $runner)
57 {
58 $agent = CAgent::getList(
59 [],
60 [
61 'MODULE_ID' => self::getModule(),
62 '=NAME' =>self::getAgentName(),
63 ]
64 )->Fetch();
65 if ($agent)
66 {
67 $interval = $runner->isEscalated
68 ? $runner->getEscalatedInterval()
69 : $runner->getInterval()
70 ;
71 if ((int)$agent['AGENT_INTERVAL'] !== $interval)
72 {
73 CAgent::Update($agent['ID'],['AGENT_INTERVAL' => $interval]);
74 }
75 }
76 }
77
81 protected function run()
82 {
83 $consumer = $this->getConsumer();
84 $processor = $this->getProcessor();
85
86 $startTime = time();
87 $this->deescalateMe();
88 while ($message = $consumer->receive())
89 {
90 $result = $processor->process($message);
91
92 if ($result === Interfaces\Processor::ACK)
93 {
94 $consumer->acknowledge($message);
95 }
96 else if ($result === Interfaces\Processor::REJECT)
97 {
98 $consumer->reject($message);
99 }
100
101 $this->escalateMe();
102 if ((time() - $startTime) > $this->getTimeLimit())
103 {
104 break;
105 }
106 }
107 }
108
112 protected function escalateMe(): void
113 {
114 $this->isEscalated = true;
115 }
116
120 protected function deescalateMe(): void
121 {
122 $this->isEscalated = false;
123 }
124
128 protected function getInterval(): int
129 {
130 return 3600;
131 }
132
136 protected function getEscalatedInterval(): int
137 {
138 return 60;
139 }
140
144 protected function getTimeLimit(): int
145 {
146 return self::TIME_LIMIT;
147 }
148
152 abstract protected function getConsumer(): Interfaces\Consumer;
153
157 abstract protected function getProcessor(): Interfaces\Processor;
158}