1C-Bitrix
25.700.0
Загрузка...
Поиск...
Не найдено
abstractthreadstrategy.php
См. документацию.
1
<?php
2
3
namespace
Bitrix\Sender\Posting\ThreadStrategy
;
4
5
use
Bitrix\Main\Application
;
6
use
Bitrix\Main\DB
;
7
use
Bitrix\Main\DB\SqlExpression
;
8
use
Bitrix\Main\DB\SqlQueryException
;
9
use
Bitrix\Main\ORM\Query\Result
;
10
use
Bitrix\Main\Type\DateTime
;
11
use
Bitrix\Sender\Internals\Model\PostingThreadTable
;
12
use
Bitrix\Sender\Internals\SqlBatch
;
13
use
Bitrix\Sender\PostingRecipientTable
;
14
15
abstract
class
AbstractThreadStrategy
implements
IThreadStrategy
16
{
20
protected
$threadId
;
21
22
protected
$postingId
;
23
24
protected
$select
;
25
26
protected
$filter
;
27
protected
$runtime
;
28
29
public
const
THREAD_UNAVAILABLE
= -1;
30
public
const
THREAD_LOCKED
= -2;
31
public
const
THREAD_NEEDED
= 1;
32
38
public
function
fillThreads
(): void
39
{
40
$insertData = [];
41
42
\CTimeZone::Disable();
43
for
($thread = 0; $thread < static::THREADS_COUNT; $thread++)
44
{
45
$insertData[] = [
46
'THREAD_ID'
=> $thread,
47
'POSTING_ID'
=>
$this->postingId
,
48
'THREAD_TYPE'
=> static::THREADS_COUNT,
49
'EXPIRE_AT'
=>
new
DateTime
(),
50
];
51
}
52
53
SqlBatch::insert(
PostingThreadTable::getTableName
(), $insertData);
54
\CTimeZone::Enable();
55
}
56
65
public
function
getRecipients
(
int
$limit):
Result
66
{
67
static::setRuntime();
68
static::setFilter();
69
static::setSelect();
70
71
// select all recipients of posting, only not processed
72
$recipients =
PostingRecipientTable::getList
(
73
[
74
'select'
=> $this->
select
,
75
'filter'
=> $this->filter,
76
'runtime'
=> $this->runtime,
77
'order'
=> [
'STATUS'
=>
'DESC'
],
78
'limit'
=> $limit
79
]
80
);
81
82
$recipients->addFetchDataModifier(
83
function
($row)
84
{
85
$row[
'FIELDS'
] = is_array($row[
'FIELDS'
]) ? $row[
'FIELDS'
] : [];
86
87
return
$row;
88
}
89
);
90
91
return
$recipients;
92
}
93
94
abstract
protected
function
setRuntime
(): void;
95
96
protected
function
setFilter
() : void
97
{
98
$this->filter = [
'=IS_UNSUB'
=>
'N'
];
99
}
100
protected
function
setSelect
(): void
101
{
102
$this->
select
= [
103
'*'
,
104
'NAME'
=>
'CONTACT.NAME'
,
105
'CONTACT_CODE'
=>
'CONTACT.CODE'
,
106
'CONTACT_TYPE_ID'
=>
'CONTACT.TYPE_ID'
,
107
'CONTACT_IS_SEND_SUCCESS'
=>
'CONTACT.IS_SEND_SUCCESS'
,
108
'CONTACT_BLACKLISTED'
=>
'CONTACT.BLACKLISTED'
,
109
'CONTACT_UNSUBSCRIBED'
=>
'CONTACT.IS_UNSUB'
,
110
'CONTACT_CONSENT_STATUS'
=>
'CONTACT.CONSENT_STATUS'
,
111
'CONTACT_MAILING_UNSUBSCRIBED'
=>
'MAILING_SUB.IS_UNSUB'
,
112
'CONTACT_CONSENT_REQUEST'
=>
'CONTACT.CONSENT_REQUEST'
,
113
'CAMPAIGN_ID'
=>
'POSTING.MAILING_ID'
,
114
];
115
}
116
124
public
function
lockThread
(): void
125
{
126
if
(!static::checkLock())
127
{
128
return
;
129
}
130
131
\CTimeZone::Disable();
132
$thread = PostingThreadTable::getList(
133
[
134
"select"
=> [
"THREAD_ID"
],
135
"filter"
=> [
136
'=POSTING_ID'
=> $this->postingId,
137
[
138
'LOGIC'
=>
'OR'
,
139
[
140
'=STATUS'
=>
PostingThreadTable::STATUS_NEW
,
141
],
142
[
143
'=STATUS'
=>
PostingThreadTable::STATUS_IN_PROGRESS
,
144
'<EXPIRE_AT'
=>
new
DateTime
()
145
]
146
]
147
],
148
"limit"
=> 1
149
]
150
)->fetchAll();
151
\CTimeZone::enable();
152
153
if
(!isset($thread[0]) && !isset($thread[0][
"THREAD_ID"
]))
154
{
155
return
;
156
}
157
$this->threadId = $thread[0][
"THREAD_ID"
];
158
$this->
updateStatus
(
PostingThreadTable::STATUS_IN_PROGRESS
);
159
$this->
unlock
();
160
}
161
169
public
function
checkThreads
(): ?int
170
{
171
$thread = PostingThreadTable::getList(
172
[
173
"select"
=> [
"THREAD_ID"
],
174
"filter"
=> [
175
'=POSTING_ID'
=> $this->postingId,
176
],
177
"limit"
=> 1
178
]
179
)->fetchAll();
180
181
if
(isset($thread[0]) && isset($thread[0][
"THREAD_ID"
]))
182
{
183
return
self::THREAD_UNAVAILABLE;
184
}
185
186
return
self::THREAD_NEEDED;
187
}
188
194
protected
function
lock
()
195
{
196
$connection
= Application::getInstance()->getConnection();
197
198
return
$connection
->lock($this->getLockName());
199
}
200
206
public
function
updateStatus
(
string
$status
): bool
207
{
208
if
(
$status
===
PostingThreadTable::STATUS_DONE
&& !$this->checkToFinalizeStatus())
209
{
210
$status
=
PostingThreadTable::STATUS_NEW
;
211
}
212
213
try
214
{
215
\CTimeZone::Disable();
216
217
$tableName =
PostingThreadTable::getTableName
();
218
$expireAt = (new \DateTime())->modify(
"+10 minutes"
)->format(
'Y-m-d H:i:s'
);
219
$updateQuery =
'UPDATE '
.$tableName.
'
220
SET
221
STATUS = \''
.
$status
.
'\'
,
222
EXPIRE_AT = \
''
.$expireAt.
'\'
223
WHERE
224
THREAD_ID =
'.$this->threadId.'
225
AND POSTING_ID =
'.$this->postingId;
226
Application::getConnection()->query($updateQuery);
227
}
228
catch (\Exception $e)
229
{
230
return false;
231
}
232
finally
233
{
234
\CTimeZone::Enable();
235
}
236
237
return true;
238
}
239
245
protected function unlock()
246
{
247
$connection = Application::getInstance()->getConnection();
248
249
return $connection->unlock($this->getLockName());
250
}
251
257
private function getLockName(): string
258
{
259
return "posting_thread_$this->postingId";
260
}
261
266
public function hasUnprocessedThreads(): bool
267
{
268
try
269
{
270
$filter = [
271
'@STATUS
' => [PostingThreadTable::STATUS_NEW, PostingThreadTable::STATUS_IN_PROGRESS],
272
'
=POSTING_ID
' => $this->postingId,
273
];
274
275
if ($this->threadId !== null)
276
{
277
$filter['
!=THREAD_ID
'] = $this->threadId;
278
}
279
280
$threads = PostingThreadTable::getList(
281
[
282
"select" => ["THREAD_ID"],
283
"filter" => $filter,
284
]
285
)->fetchAll();
286
}
287
catch (\Exception $e)
288
{
289
}
290
291
return !empty($threads);
292
}
293
298
public function getThreadId(): ?int
299
{
300
return $this->threadId;
301
}
302
307
public function lastThreadId(): int
308
{
309
return static::THREADS_COUNT - 1;
310
}
311
317
public function setPostingId(int $postingId): IThreadStrategy
318
{
319
$this->postingId = $postingId;
320
321
return $this;
322
}
323
328
protected function checkLock()
329
{
330
for($i = 0; $i <= static::lastThreadId(); $i++)
331
{
332
if ($this->lock())
333
{
334
return true;
335
}
336
sleep(rand(1,7));
337
}
338
return false;
339
}
340
344
public function finalize()
345
{
346
if(!$this->checkToFinalizeStatus())
347
{
348
return false;
349
}
350
351
$tableName = PostingThreadTable::getTableName();
352
$sqlHelper = Application::getConnection()->getSqlHelper();
353
$query = 'DELETE FROM
' . $sqlHelper->quote($tableName) . '
WHERE POSTING_ID=
' . intval($this->postingId);
354
try
355
{
356
Application::getConnection()->query($query);
357
}
358
catch (SqlQueryException $e)
359
{
360
return false;
361
}
362
363
return true;
364
}
365
366
private function checkToFinalizeStatus()
367
{
368
if($this->threadId < static::lastThreadId())
369
{
370
return true;
371
}
372
373
return !static::hasUnprocessedThreads();
374
}
375
380
public function isProcessLimited(): bool
381
{
382
return false;
383
}
384
385
}
select
return select
Определения
access_edit.php:440
$connection
$connection
Определения
actionsdefinitions.php:38
Bitrix\Main\Application
Определения
application.php:30
Bitrix\Main\DB\Result
Определения
result.php:20
Bitrix\Main\DB\SqlExpression
Определения
sqlexpression.php:21
Bitrix\Main\DB\SqlQueryException
Определения
sqlqueryexception.php:9
Bitrix\Main\ORM\Data\DataManager\getList
static getList(array $parameters=array())
Определения
datamanager.php:431
Bitrix\Main\ORM\Data\Result
Определения
result.php:16
Bitrix\Main\Type\DateTime
Определения
datetime.php:9
Bitrix\Sender\Internals\Model\PostingThreadTable
Определения
postingthread.php:24
Bitrix\Sender\Internals\Model\PostingThreadTable\STATUS_DONE
const STATUS_DONE
Определения
postingthread.php:27
Bitrix\Sender\Internals\Model\PostingThreadTable\STATUS_IN_PROGRESS
const STATUS_IN_PROGRESS
Определения
postingthread.php:26
Bitrix\Sender\Internals\Model\PostingThreadTable\STATUS_NEW
const STATUS_NEW
Определения
postingthread.php:25
Bitrix\Sender\Internals\Model\PostingThreadTable\getTableName
static getTableName()
Определения
postingthread.php:32
Bitrix\Sender\Internals\SqlBatch
Определения
sqlbatch.php:22
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy
Определения
abstractthreadstrategy.php:16
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\setSelect
setSelect()
Определения
abstractthreadstrategy.php:100
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\lock
lock()
Определения
abstractthreadstrategy.php:194
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\getRecipients
getRecipients(int $limit)
Определения
abstractthreadstrategy.php:65
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\fillThreads
fillThreads()
Определения
abstractthreadstrategy.php:38
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\$postingId
$postingId
Определения
abstractthreadstrategy.php:22
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\$threadId
$threadId
Определения
abstractthreadstrategy.php:20
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\updateStatus
updateStatus(string $status)
Определения
abstractthreadstrategy.php:206
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\checkThreads
checkThreads()
Определения
abstractthreadstrategy.php:169
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\lockThread
lockThread()
Определения
abstractthreadstrategy.php:124
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\$select
$select
Определения
abstractthreadstrategy.php:24
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\$runtime
$runtime
Определения
abstractthreadstrategy.php:27
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\setFilter
setFilter()
Определения
abstractthreadstrategy.php:96
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\$filter
$filter
Определения
abstractthreadstrategy.php:26
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\setRuntime
setRuntime()
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\THREAD_LOCKED
const THREAD_LOCKED
Определения
abstractthreadstrategy.php:30
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\THREAD_UNAVAILABLE
const THREAD_UNAVAILABLE
Определения
abstractthreadstrategy.php:29
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\THREAD_NEEDED
const THREAD_NEEDED
Определения
abstractthreadstrategy.php:31
Bitrix\Sender\Posting\ThreadStrategy\AbstractThreadStrategy\unlock
unlock()
Определения
abstractthreadstrategy.php:245
Bitrix\Sender\PostingRecipientTable
Определения
posting.php:669
Bitrix\Sender\Posting\ThreadStrategy\IThreadStrategy
Определения
ithreadstrategy.php:8
$status
$status
Определения
session.php:10
Bitrix\Main\DB
Определения
arrayresult.php:2
Bitrix\Sender\Posting\ThreadStrategy
Определения
abstractthreadstrategy.php:3
bitrix
modules
sender
lib
posting
threadstrategy
abstractthreadstrategy.php
Создано системой
1.14.0