1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
abstractthreadstrategy.php
См. документацию.
1<?php
2
4
15
16abstract class AbstractThreadStrategy implements ThreadStrategy
17{
21 protected $threadId;
22
23 protected $groupStateId;
24 protected $offset;
25 protected $perPage = 100000;
26 protected const GROUP_THREAD_LOCK_KEY = 'group_thread_';
27 public const THREAD_UNAVAILABLE = -1;
28 public const THREAD_LOCKED = -2;
29 public const THREAD_NEEDED = 1;
30
31
37 public function fillThreads(): void
38 {
39 $insertData = [];
40
41 \CTimeZone::Disable();
42 for ($thread = 0; $thread < static::THREADS_COUNT; $thread++)
43 {
44 $insertData[] = [
45 'THREAD_ID' => $thread,
46 'GROUP_STATE_ID' => $this->groupStateId,
47 'THREAD_TYPE' => static::THREADS_COUNT,
48 'EXPIRE_AT' => new DateTime(),
49 ];
50 }
51
52 SqlBatch::insert(GroupThreadTable::getTableName(), $insertData);
53 \CTimeZone::Enable();
54 }
55
63 public function lockThread(): ?int
64 {
65 if (!static::checkLock())
66 {
67 return self::THREAD_UNAVAILABLE;
68 }
69
70 \CTimeZone::Disable();
71 $thread = GroupThreadTable::getList(
72 [
73 "select" => [
74 "THREAD_ID",
75 "STEP"
76 ],
77 "filter" => [
78 '=GROUP_STATE_ID' => $this->groupStateId,
79 [
80 'LOGIC' => 'OR',
81 [
82 '=STATUS' => GroupThreadTable::STATUS_NEW,
83 ],
84 [
85 '=STATUS' => GroupThreadTable::STATUS_IN_PROGRESS,
86 '<EXPIRE_AT' => new DateTime()
87 ]
88 ]
89 ],
90 "limit" => 1
91 ]
92 )->fetch();
93 \CTimeZone::Enable();
94
95 if (!isset($thread["THREAD_ID"]))
96 {
97 return self::THREAD_UNAVAILABLE;
98 }
99 $this->threadId = (int)$thread["THREAD_ID"];
100 $this->offset = $this->threadId === 0 && (int)$thread["STEP"] === 0
101 ? 0 : $this->threadId * $this->perPage + (static::lastThreadId() + 1) * $this->perPage * $thread["STEP"];
102
103 $this->updateStatus(GroupThreadTable::STATUS_IN_PROGRESS);
104 Locker::unlock(self::GROUP_THREAD_LOCK_KEY, $this->groupStateId);
105
106 return $this->threadId;
107 }
108
116 public function checkThreads(): ?int
117 {
118 if (!static::checkLock())
119 {
120 return self::THREAD_LOCKED;
121 }
122
123 $thread = GroupThreadTable::getList(
124 [
125 "select" => [
126 "THREAD_ID",
127 "STEP"
128 ],
129 "filter" => [
130 '=GROUP_STATE_ID' => $this->groupStateId
131 ],
132 "limit" => 1
133 ]
134 )->fetch();
135
136 if (isset($thread["THREAD_ID"]))
137 {
138 return self::THREAD_UNAVAILABLE;
139 }
140 Locker::unlock(self::GROUP_THREAD_LOCK_KEY, $this->groupStateId);
141
142 return self::THREAD_NEEDED;
143 }
144
150 public function updateStatus(string $status): bool
151 {
152 if ($status === GroupThreadTable::STATUS_DONE && !$this->checkToFinalizeStatus())
153 {
154 $status = GroupThreadTable::STATUS_NEW;
155 }
156
157 try
158 {
159 \CTimeZone::Disable();
160
161 $counter = (int)($status === GroupThreadTable::STATUS_IN_PROGRESS);
162 $tableName = GroupThreadTable::getTableName();
163 $expireAt = (new \DateTime())->modify("+10 minutes")->format('Y-m-d H:i:s');
164 $updateQuery = 'UPDATE ' . $tableName . '
165 SET
166 STATUS = \'' . $status . '\',
167 STEP = STEP + \'' . $counter . '\',
168 EXPIRE_AT = \'' . $expireAt . '\'
169 WHERE
170 THREAD_ID = ' . $this->threadId . '
171 AND GROUP_STATE_ID = ' . $this->groupStateId;
172 Application::getConnection()->query($updateQuery);
173
174 } catch (\Exception $e)
175 {
176 return false;
177 }
178 finally
179 {
180 \CTimeZone::Enable();
181 }
182
183 return true;
184 }
185
190 public function hasUnprocessedThreads(): bool
191 {
192 try
193 {
194 $threads = GroupThreadTable::getList(
195 [
196 "select" => ["THREAD_ID"],
197 "filter" => [
198 '@STATUS' => new SqlExpression(
199 "?, ?", GroupThreadTable::STATUS_NEW, GroupThreadTable::STATUS_IN_PROGRESS
200 ),
201 '=GROUP_STATE_ID' => $this->groupStateId,
202 '!=THREAD_ID' => $this->threadId
203 ]
204 ]
205 )->fetchAll();
206 } catch (\Exception $e)
207 {
208 }
209
210 return !empty($threads);
211 }
212
217 public function getThreadId(): ?int
218 {
219 return $this->threadId;
220 }
221
226 public function lastThreadId(): int
227 {
228 return static::THREADS_COUNT - 1;
229 }
230
236 public function setGroupStateId(int $groupStateId): ThreadStrategy
237 {
238 $this->groupStateId = $groupStateId;
239
240 return $this;
241 }
242
247 protected function checkLock()
248 {
249 for ($i = 0; $i <= static::lastThreadId(); $i++)
250 {
251 if (Locker::lock(self::GROUP_THREAD_LOCK_KEY, $this->groupStateId))
252 {
253 return true;
254 }
255 sleep(rand(1, 7));
256 }
257 return false;
258 }
259
263 public function finalize()
264 {
265 if (!$this->checkToFinalizeStatus())
266 {
267 return false;
268 }
269
270 $tableName = GroupThreadTable::getTableName();
271 $sqlHelper = Application::getConnection()->getSqlHelper();
272 $query = 'DELETE FROM ' . $sqlHelper->quote($tableName) . ' WHERE GROUP_STATE_ID=' . intval($this->groupStateId);
273 try
274 {
275 Application::getConnection()->query($query);
276 } catch (SqlQueryException $e)
277 {
278 return false;
279 }
280
281 return true;
282 }
283
284 private function checkToFinalizeStatus()
285 {
286 if ($this->threadId < static::lastThreadId())
287 {
288 return true;
289 }
290
291 return !static::hasUnprocessedThreads();
292 }
293
294
295 public function getOffset(): ?int
296 {
297 return intval($this->offset);
298 }
299
300 public function setPerPage(int $perPage)
301 {
302 $this->perPage = $perPage;
303 }
304
309 public function isProcessLimited(): bool
310 {
311 return false;
312 }
313
314}
static unlock(string $key, int $id)
Определения locker.php:34
$status
Определения session.php:10
Определения arrayresult.php:2
$counter
Определения options.php:5