1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
abstractthreadstrategy.php
См. документацию.
1<?php
2
4
14
15abstract class AbstractThreadStrategy implements IThreadStrategy
16{
20 protected $threadId;
21
22 protected $postingId;
23
24 protected $select;
25
26 protected $filter;
27 protected $runtime;
28
29 public const THREAD_UNAVAILABLE = -1;
30 public const THREAD_LOCKED = -2;
31 public const THREAD_NEEDED = 1;
32
38 public function fillThreads(): void
39 {
40 $insertData = [];
41
42 \CTimeZone::Disable();
43 for ($thread = 0; $thread < static::THREADS_COUNT; $thread++)
44 {
45 $insertData[] = [
46 'THREAD_ID' => $thread,
47 'POSTING_ID' => $this->postingId,
48 'THREAD_TYPE' => static::THREADS_COUNT,
49 'EXPIRE_AT' => new DateTime(),
50 ];
51 }
52
53 SqlBatch::insert(PostingThreadTable::getTableName(), $insertData);
54 \CTimeZone::Enable();
55 }
56
65 public function getRecipients(int $limit): Result
66 {
67 static::setRuntime();
68 static::setFilter();
69 static::setSelect();
70
71 // select all recipients of posting, only not processed
73 [
74 'select' => $this->select,
75 'filter' => $this->filter,
76 'runtime' => $this->runtime,
77 'order' => ['STATUS' => 'DESC'],
78 'limit' => $limit
79 ]
80 );
81
82 $recipients->addFetchDataModifier(
83 function($row)
84 {
85 $row['FIELDS'] = is_array($row['FIELDS']) ? $row['FIELDS'] : [];
86
87 return $row;
88 }
89 );
90
91 return $recipients;
92 }
93
94 abstract protected function setRuntime(): void;
95
96 protected function setFilter() : void
97 {
98 $this->filter = ['=IS_UNSUB' => 'N'];
99 }
100 protected function setSelect(): void
101 {
102 $this->select = [
103 '*',
104 'NAME' => 'CONTACT.NAME',
105 'CONTACT_CODE' => 'CONTACT.CODE',
106 'CONTACT_TYPE_ID' => 'CONTACT.TYPE_ID',
107 'CONTACT_IS_SEND_SUCCESS' => 'CONTACT.IS_SEND_SUCCESS',
108 'CONTACT_BLACKLISTED' => 'CONTACT.BLACKLISTED',
109 'CONTACT_UNSUBSCRIBED' => 'CONTACT.IS_UNSUB',
110 'CONTACT_CONSENT_STATUS' => 'CONTACT.CONSENT_STATUS',
111 'CONTACT_MAILING_UNSUBSCRIBED' => 'MAILING_SUB.IS_UNSUB',
112 'CONTACT_CONSENT_REQUEST' => 'CONTACT.CONSENT_REQUEST',
113 'CAMPAIGN_ID' => 'POSTING.MAILING_ID',
114 ];
115 }
116
124 public function lockThread(): void
125 {
126 if(!static::checkLock())
127 {
128 return;
129 }
130
131 \CTimeZone::Disable();
132 $thread = PostingThreadTable::getList(
133 [
134 "select" => ["THREAD_ID"],
135 "filter" => [
136 '=POSTING_ID' => $this->postingId,
137 [
138 'LOGIC' => 'OR',
139 [
141 ],
142 [
144 '<EXPIRE_AT' => new DateTime()
145 ]
146 ]
147 ],
148 "limit" => 1
149 ]
150 )->fetchAll();
151 \CTimeZone::enable();
152
153 if (!isset($thread[0]) && !isset($thread[0]["THREAD_ID"]))
154 {
155 return;
156 }
157 $this->threadId = $thread[0]["THREAD_ID"];
159 $this->unlock();
160 }
161
169 public function checkThreads(): ?int
170 {
171 $thread = PostingThreadTable::getList(
172 [
173 "select" => ["THREAD_ID"],
174 "filter" => [
175 '=POSTING_ID' => $this->postingId,
176 ],
177 "limit" => 1
178 ]
179 )->fetchAll();
180
181 if (isset($thread[0]) && isset($thread[0]["THREAD_ID"]))
182 {
183 return self::THREAD_UNAVAILABLE;
184 }
185
186 return self::THREAD_NEEDED;
187 }
188
194 protected function lock()
195 {
196 $connection = Application::getInstance()->getConnection();
197
198 return $connection->lock($this->getLockName());
199 }
200
206 public function updateStatus(string $status): bool
207 {
208 if($status === PostingThreadTable::STATUS_DONE && !$this->checkToFinalizeStatus())
209 {
211 }
212
213 try
214 {
215 \CTimeZone::Disable();
216
218 $expireAt = (new \DateTime())->modify("+10 minutes")->format('Y-m-d H:i:s');
219 $updateQuery = 'UPDATE '.$tableName.'
220 SET
221 STATUS = \''.$status.'\',
222 EXPIRE_AT = \''.$expireAt.'\'
223 WHERE
224 THREAD_ID = '.$this->threadId.'
225 AND POSTING_ID = '.$this->postingId;
226 Application::getConnection()->query($updateQuery);
227 }
228 catch (\Exception $e)
229 {
230 return false;
231 }
232 finally
233 {
234 \CTimeZone::Enable();
235 }
236
237 return true;
238 }
239
245 protected function unlock()
246 {
247 $connection = Application::getInstance()->getConnection();
248
249 return $connection->unlock($this->getLockName());
250 }
251
257 private function getLockName(): string
258 {
259 return "posting_thread_$this->postingId";
260 }
261
266 public function hasUnprocessedThreads(): bool
267 {
268 try
269 {
270 $filter = [
271 '@STATUS' => [PostingThreadTable::STATUS_NEW, PostingThreadTable::STATUS_IN_PROGRESS],
272 '=POSTING_ID' => $this->postingId,
273 ];
274
275 if ($this->threadId !== null)
276 {
277 $filter['!=THREAD_ID'] = $this->threadId;
278 }
279
280 $threads = PostingThreadTable::getList(
281 [
282 "select" => ["THREAD_ID"],
283 "filter" => $filter,
284 ]
285 )->fetchAll();
286 }
287 catch (\Exception $e)
288 {
289 }
290
291 return !empty($threads);
292 }
293
298 public function getThreadId(): ?int
299 {
300 return $this->threadId;
301 }
302
307 public function lastThreadId(): int
308 {
309 return static::THREADS_COUNT - 1;
310 }
311
317 public function setPostingId(int $postingId): IThreadStrategy
318 {
319 $this->postingId = $postingId;
320
321 return $this;
322 }
323
328 protected function checkLock()
329 {
330 for($i = 0; $i <= static::lastThreadId(); $i++)
331 {
332 if ($this->lock())
333 {
334 return true;
335 }
336 sleep(rand(1,7));
337 }
338 return false;
339 }
340
344 public function finalize()
345 {
346 if(!$this->checkToFinalizeStatus())
347 {
348 return false;
349 }
350
351 $tableName = PostingThreadTable::getTableName();
352 $sqlHelper = Application::getConnection()->getSqlHelper();
353 $query = 'DELETE FROM ' . $sqlHelper->quote($tableName) . ' WHERE POSTING_ID=' . intval($this->postingId);
354 try
355 {
356 Application::getConnection()->query($query);
357 }
358 catch (SqlQueryException $e)
359 {
360 return false;
361 }
362
363 return true;
364 }
365
366 private function checkToFinalizeStatus()
367 {
368 if($this->threadId < static::lastThreadId())
369 {
370 return true;
371 }
372
373 return !static::hasUnprocessedThreads();
374 }
375
380 public function isProcessLimited(): bool
381 {
382 return false;
383 }
384
385}
return select
Определения access_edit.php:440
$connection
Определения actionsdefinitions.php:38
Определения result.php:20
static getList(array $parameters=array())
Определения datamanager.php:431
$status
Определения session.php:10
Определения arrayresult.php:2