Bitrix-D7 23.9
 
Загрузка...
Поиск...
Не найдено
queue.php
1<?php
2
11
12use Http\Promise\Promise as PromiseInterface;
14
15class Queue extends Http\Queue
16{
18 protected array $promises = [];
19 protected int $activeQueries = 20;
20 protected int $selectTimeout = 20000;
21 protected array $readSockets = [];
22 protected array $writeSockets = [];
23
30 public function add(Promise $promise): void
31 {
32 $this->promises[$promise->getId()] = $promise;
33 }
34
35 public function delete(string $promiseId): void
36 {
37 unset($this->promises[$promiseId]);
38 unset($this->readSockets[$promiseId]);
39 unset($this->writeSockets[$promiseId]);
40 }
41
45 public function wait(?Http\Promise $targetPromise = null): array
46 {
47 $jobCounter = 0;
48 $processedPromises = [];
49
50 while (!empty($this->promises))
51 {
52 $currentPromise = current($this->promises);
53
54 if ($currentPromise === false)
55 {
56 $currentPromise = reset($this->promises);
57 $jobCounter = 0;
58 }
59
60 $removedPromises = [];
61
62 $currentPromiseId = $currentPromise->getId();
63 $currentHandler = $currentPromise->getHandler();
64
65 if ($currentHandler->getState() == Handler::PENDING)
66 {
67 // yet not connected, "connect" inside
68 $currentHandler->process($currentPromise);
69
70 if ($currentPromise->getState() !== PromiseInterface::PENDING)
71 {
72 // the promise is rejected, go to the next promise
73 $removedPromises[] = $currentPromise;
74 }
75 else
76 {
77 // now connected, can "select" the socket for writing
78 $this->writeSockets[$currentPromiseId] = $currentHandler->getSocket()->getResource();
79 }
80 }
81
82 $read = $this->readSockets;
83 $write = $this->writeSockets;
84 $except = null;
85
86 if (!empty($read) || !empty($write))
87 {
88 if (stream_select($read, $write, $except, 0, $this->selectTimeout) > 0)
89 {
90 foreach (array_merge($write, $read) as $promiseId => $dummy)
91 {
92 $promise = $this->promises[$promiseId];
93 $handler = $promise->getHandler();
94
95 // do real work
96 $handler->process($promise);
97
98 // put the socket into the reading or writing list to minimize calls
99 $this->switchSocket($promise);
100
101 if ($promise->getState() !== PromiseInterface::PENDING)
102 {
103 // job done, the promise is fullfilled or rejected
104 $removedPromises[] = $promise;
105 }
106 }
107 }
108 }
109
110 // time out control
111 foreach (array_merge($this->writeSockets, $this->readSockets) as $promiseId => $dummy)
112 {
113 $promise = $this->promises[$promiseId];
114
115 if ($promise->getState() === PromiseInterface::PENDING)
116 {
117 $handler = $promise->getHandler();
118 if ($handler->getSocket()->timedOut())
119 {
120 $exception = new Http\NetworkException($promise->getRequest(), 'Stream timeout has been reached.');
121 $promise->reject($exception);
122
123 if ($logger = $handler->getLogger())
124 {
125 $logger->error($exception->getMessage());
126 }
127
128 $removedPromises[] = $promise;
129 }
130 }
131 }
132
133 foreach ($removedPromises as $promise)
134 {
135 // job done, the promise is fullfilled or rejected
136 $processedPromises[] = $promise;
137
138 $promiseId = $promise->getId();
139
140 $this->delete($promiseId);
141 $jobCounter--;
142
143 if ($targetPromise && $promiseId === $targetPromise->getId())
144 {
145 // we were waiting for the specific promise
146 return $processedPromises;
147 }
148 }
149
150 // go to the next job in the queue
151 $jobCounter++;
152 if ($jobCounter >= $this->activeQueries)
153 {
154 $jobCounter = 0;
155 reset($this->promises);
156 }
157 elseif (isset($this->promises[$currentPromiseId]))
158 {
159 // unsetting an element the current pointer points to, moves the pointer forward
160 next($this->promises);
161 }
162 }
163
164 return $processedPromises;
165 }
166
167 protected function switchSocket(Promise $promise): void
168 {
169 $promiseId = $promise->getId();
170 $handler = $promise->getHandler();
171 $state = $handler->getState();
172
173 if ($state === Handler::BODY_SENT || $state === Handler::CONNECT_SENT)
174 {
175 // switch the socket to "reading"
176 if (isset($this->writeSockets[$promiseId]))
177 {
178 unset($this->writeSockets[$promiseId]);
179 }
180 if (!isset($this->readSockets[$promiseId]))
181 {
182 $this->readSockets[$promiseId] = $handler->getSocket()->getResource();
183 }
184 }
185 elseif ($state === Handler::CONNECT_RECEIVED)
186 {
187 // switch the socket to "writing"
188 if (isset($this->readSockets[$promiseId]))
189 {
190 unset($this->readSockets[$promiseId]);
191 }
192 if (!isset($this->writeSockets[$promiseId]))
193 {
194 $this->writeSockets[$promiseId] = $handler->getSocket()->getResource();
195 }
196 }
197 }
198}
wait(?Http\Promise $targetPromise=null)
Definition queue.php:45
switchSocket(Promise $promise)
Definition queue.php:167