48 $processedPromises = [];
50 while (!empty($this->promises))
52 $currentPromise = current($this->promises);
54 if ($currentPromise ===
false)
56 $currentPromise = reset($this->promises);
60 $removedPromises = [];
62 $currentPromiseId = $currentPromise->getId();
63 $currentHandler = $currentPromise->getHandler();
68 $currentHandler->process($currentPromise);
70 if ($currentPromise->getState() !== PromiseInterface::PENDING)
73 $removedPromises[] = $currentPromise;
78 $this->writeSockets[$currentPromiseId] = $currentHandler->getSocket()->getResource();
82 $read = $this->readSockets;
83 $write = $this->writeSockets;
86 if (!empty($read) || !empty($write))
88 if (stream_select($read, $write, $except, 0, $this->selectTimeout) > 0)
90 foreach (array_merge($write, $read) as $promiseId => $dummy)
92 $promise = $this->promises[$promiseId];
93 $handler = $promise->getHandler();
96 $handler->process($promise);
101 if ($promise->getState() !== PromiseInterface::PENDING)
104 $removedPromises[] = $promise;
111 foreach (array_merge($this->writeSockets, $this->readSockets) as $promiseId => $dummy)
113 $promise = $this->promises[$promiseId];
115 if ($promise->getState() === PromiseInterface::PENDING)
117 $handler = $promise->getHandler();
118 if ($handler->getSocket()->timedOut())
121 $promise->reject($exception);
123 $handler->getLogger()?->error($exception->getMessage());
125 $removedPromises[] = $promise;
130 foreach ($removedPromises as $promise)
133 $processedPromises[] = $promise;
135 $promiseId = $promise->getId();
137 $this->
delete($promiseId);
140 if ($targetPromise && $promiseId === $targetPromise->getId())
143 return $processedPromises;
149 if ($jobCounter >= $this->activeQueries)
152 reset($this->promises);
154 elseif (isset($this->promises[$currentPromiseId]))
157 next($this->promises);
161 return $processedPromises;
166 $promiseId = $promise->
getId();
168 $state = $handler->getState();
173 if (isset($this->writeSockets[$promiseId]))
175 unset($this->writeSockets[$promiseId]);
177 if (!isset($this->readSockets[$promiseId]))
179 $this->readSockets[$promiseId] = $handler->getSocket()->getResource();
185 if (isset($this->readSockets[$promiseId]))
187 unset($this->readSockets[$promiseId]);
189 if (!isset($this->writeSockets[$promiseId]))
191 $this->writeSockets[$promiseId] = $handler->getSocket()->getResource();