45 public function wait(?Http\
Promise $targetPromise =
null): array
48 $processedPromises = [];
50 while (!empty($this->promises))
52 $currentPromise = current($this->promises);
54 if ($currentPromise ===
false)
56 $currentPromise = reset($this->promises);
60 $removedPromises = [];
62 $currentPromiseId = $currentPromise->getId();
63 $currentHandler = $currentPromise->getHandler();
65 if ($currentHandler->getState() == Handler::PENDING)
68 $currentHandler->process($currentPromise);
70 if ($currentPromise->getState() !== PromiseInterface::PENDING)
73 $removedPromises[] = $currentPromise;
78 $this->writeSockets[$currentPromiseId] = $currentHandler->getSocket()->getResource();
82 $read = $this->readSockets;
83 $write = $this->writeSockets;
86 if (!empty($read) || !empty($write))
88 if (stream_select($read, $write, $except, 0, $this->selectTimeout) > 0)
90 foreach (array_merge($write, $read) as $promiseId => $dummy)
92 $promise = $this->promises[$promiseId];
93 $handler = $promise->getHandler();
96 $handler->process($promise);
101 if ($promise->getState() !== PromiseInterface::PENDING)
104 $removedPromises[] = $promise;
111 foreach (array_merge($this->writeSockets, $this->readSockets) as $promiseId => $dummy)
113 $promise = $this->promises[$promiseId];
115 if ($promise->getState() === PromiseInterface::PENDING)
117 $handler = $promise->getHandler();
118 if ($handler->getSocket()->timedOut())
121 $promise->reject($exception);
123 if ($logger = $handler->getLogger())
125 $logger->error($exception->getMessage());
128 $removedPromises[] = $promise;
133 foreach ($removedPromises as $promise)
136 $processedPromises[] = $promise;
138 $promiseId = $promise->getId();
140 $this->
delete($promiseId);
143 if ($targetPromise && $promiseId === $targetPromise->getId())
146 return $processedPromises;
152 if ($jobCounter >= $this->activeQueries)
155 reset($this->promises);
157 elseif (isset($this->promises[$currentPromiseId]))
160 next($this->promises);
164 return $processedPromises;
169 $promiseId = $promise->
getId();
171 $state = $handler->getState();
173 if ($state === Handler::BODY_SENT || $state === Handler::CONNECT_SENT)
176 if (isset($this->writeSockets[$promiseId]))
178 unset($this->writeSockets[$promiseId]);
180 if (!isset($this->readSockets[$promiseId]))
182 $this->readSockets[$promiseId] = $handler->getSocket()->getResource();
185 elseif ($state === Handler::CONNECT_RECEIVED)
188 if (isset($this->readSockets[$promiseId]))
190 unset($this->readSockets[$promiseId]);
192 if (!isset($this->writeSockets[$promiseId]))
194 $this->writeSockets[$promiseId] = $handler->getSocket()->getResource();