1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
queue.php
См. документацию.
1<?php
2
9
10namespace Bitrix\Main\Web\Http\Socket;
11
12use Http\Promise\Promise as PromiseInterface;
13use Bitrix\Main\Web\Http;
14
15class Queue extends Http\Queue
16{
18 protected array $promises = [];
19 protected int $activeQueries = 20;
20 protected int $selectTimeout = 20000;
21 protected array $readSockets = [];
22 protected array $writeSockets = [];
23
30 public function add(Promise $promise): void
31 {
32 $this->promises[$promise->getId()] = $promise;
33 }
34
35 public function delete(string $promiseId): void
36 {
37 unset($this->promises[$promiseId]);
38 unset($this->readSockets[$promiseId]);
39 unset($this->writeSockets[$promiseId]);
40 }
41
45 public function wait(?Http\Promise $targetPromise = null): array
46 {
47 $jobCounter = 0;
48 $processedPromises = [];
49
50 while (!empty($this->promises))
51 {
52 $currentPromise = current($this->promises);
53
54 if ($currentPromise === false)
55 {
56 $currentPromise = reset($this->promises);
57 $jobCounter = 0;
58 }
59
60 $removedPromises = [];
61
62 $currentPromiseId = $currentPromise->getId();
63 $currentHandler = $currentPromise->getHandler();
64
65 if ($currentHandler->getState() == Handler::PENDING)
66 {
67 // yet not connected, "connect" inside
68 $currentHandler->process($currentPromise);
69
70 if ($currentPromise->getState() !== PromiseInterface::PENDING)
71 {
72 // the promise is rejected, go to the next promise
73 $removedPromises[] = $currentPromise;
74 }
75 else
76 {
77 // now connected, can "select" the socket for writing
78 $this->writeSockets[$currentPromiseId] = $currentHandler->getSocket()->getResource();
79 }
80 }
81
82 $read = $this->readSockets;
83 $write = $this->writeSockets;
84 $except = null;
85
86 if (!empty($read) || !empty($write))
87 {
88 if (stream_select($read, $write, $except, 0, $this->selectTimeout) > 0)
89 {
90 foreach (array_merge($write, $read) as $promiseId => $dummy)
91 {
92 $promise = $this->promises[$promiseId];
93 $handler = $promise->getHandler();
94
95 // do real work
96 $handler->process($promise);
97
98 // put the socket into the reading or writing list to minimize calls
99 $this->switchSocket($promise);
100
101 if ($promise->getState() !== PromiseInterface::PENDING)
102 {
103 // job done, the promise is fullfilled or rejected
104 $removedPromises[] = $promise;
105 }
106 }
107 }
108 }
109
110 // time out control
111 foreach (array_merge($this->writeSockets, $this->readSockets) as $promiseId => $dummy)
112 {
113 $promise = $this->promises[$promiseId];
114
115 if ($promise->getState() === PromiseInterface::PENDING)
116 {
117 $handler = $promise->getHandler();
118 if ($handler->getSocket()->timedOut())
119 {
120 $exception = new Http\NetworkException($promise->getRequest(), 'Stream timeout has been reached.');
121 $promise->reject($exception);
122
123 $handler->getLogger()?->error($exception->getMessage());
124
125 $removedPromises[] = $promise;
126 }
127 }
128 }
129
130 foreach ($removedPromises as $promise)
131 {
132 // job done, the promise is fullfilled or rejected
133 $processedPromises[] = $promise;
134
135 $promiseId = $promise->getId();
136
137 $this->delete($promiseId);
138 $jobCounter--;
139
140 if ($targetPromise && $promiseId === $targetPromise->getId())
141 {
142 // we were waiting for the specific promise
143 return $processedPromises;
144 }
145 }
146
147 // go to the next job in the queue
148 $jobCounter++;
149 if ($jobCounter >= $this->activeQueries)
150 {
151 $jobCounter = 0;
152 reset($this->promises);
153 }
154 elseif (isset($this->promises[$currentPromiseId]))
155 {
156 // unsetting an element the current pointer points to, moves the pointer forward
157 next($this->promises);
158 }
159 }
160
161 return $processedPromises;
162 }
163
164 protected function switchSocket(Promise $promise): void
165 {
166 $promiseId = $promise->getId();
167 $handler = $promise->getHandler();
168 $state = $handler->getState();
169
170 if ($state === Handler::BODY_SENT || $state === Handler::CONNECT_SENT)
171 {
172 // switch the socket to "reading"
173 if (isset($this->writeSockets[$promiseId]))
174 {
175 unset($this->writeSockets[$promiseId]);
176 }
177 if (!isset($this->readSockets[$promiseId]))
178 {
179 $this->readSockets[$promiseId] = $handler->getSocket()->getResource();
180 }
181 }
183 {
184 // switch the socket to "writing"
185 if (isset($this->readSockets[$promiseId]))
186 {
187 unset($this->readSockets[$promiseId]);
188 }
189 if (!isset($this->writeSockets[$promiseId]))
190 {
191 $this->writeSockets[$promiseId] = $handler->getSocket()->getResource();
192 }
193 }
194 }
195}
const CONNECT_RECEIVED
Определения handler.php:31
int $selectTimeout
Определения queue.php:20
add(Promise $promise)
Определения queue.php:30
int $activeQueries
Определения queue.php:19
array $writeSockets
Определения queue.php:22
wait(?Http\Promise $targetPromise=null)
Определения queue.php:45
switchSocket(Promise $promise)
Определения queue.php:164
array $promises
Определения queue.php:18
array $readSockets
Определения queue.php:21
</td ></tr ></table ></td ></tr >< tr >< td class="bx-popup-label bx-width30"><?=GetMessage("PAGE_NEW_TAGS")?> array( $site)
Определения file_new.php:804
if( $daysToExpire >=0 &&$daysToExpire< 60 elseif)( $daysToExpire< 0)
Определения prolog_main_admin.php:393