1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
queue.php
См. документацию.
1<?php
2namespace Bitrix\MessageService;
3
4use Bitrix\Main\Application;
5use Bitrix\Main\Config;
6use Bitrix\Main\Data\Cache;
7use Bitrix\Main\Error;
8use Bitrix\Main\ORM\Query\Query;
9use Bitrix\Main\Type\DateTime;
10use Bitrix\MessageService\Internal\Entity\MessageTable;
11use Bitrix\MessageService\Sender\Result\SendMessage;
12use Bitrix\MessageService\Sender\SmsManager;
13use Bitrix\Main\Localization\Loc;
14use Bitrix\MessageService\Internal\Entity\Message\SuccessExec;
15use Bitrix\MessageService\Queue\Event\AfterProcessQueueEvent;
16use Bitrix\MessageService\Queue\Event\AfterSendMessageFromQueueEvent;
17use Bitrix\MessageService\Queue\Event\BeforeProcessQueueEvent;
18use Bitrix\MessageService\Queue\Event\BeforeSendMessageFromQueueEvent;
19
20class Queue
21{
26 public const EVENT_SEND_RESULT = 'messageSendResult';
27
28 private const OPTION_QUEUE_STOP_MODULE = 'messageservice';
29 private const OPTION_QUEUE_STOP_NAME = 'queue_stopped';
30
31 private const CACHE_HAS_MESSAGES_TIME = 2592000;
32 public const CACHE_HAS_MESSAGES_ID = 'has_messages_cache';
33 public const CACHE_HAS_MESSAGES_DIR = '/messageservice/';
34
35 public static function hasMessages(): bool
36 {
37 $nextExec = new DateTime();
38
39 $cache = Cache::createInstance();
40 if ($cache->initCache(self::CACHE_HAS_MESSAGES_TIME, self::CACHE_HAS_MESSAGES_ID, self::CACHE_HAS_MESSAGES_DIR))
41 {
42 $nextExec = $cache->getVars();
43 }
44 elseif ($cache->startDataCache())
45 {
46 $result = MessageTable::getList([
47 'select' => ['ID', 'NEXT_EXEC'],
48 'filter' => [
49 '=SUCCESS_EXEC' => SuccessExec::NO,
50 ],
51 'limit' => 1,
52 'order' => ['ID' => 'DESC'],
53 ])->fetch();
54
55 if ($result)
56 {
57 if ($result['NEXT_EXEC'] instanceof DateTime)
58 {
59 $nextExec = $result['NEXT_EXEC'];
60 }
61 else
62 {
63 $nextExec = new DateTime();
64 }
65 }
66 else
67 {
68 $nextExec = (new DateTime())->add('+' . self::CACHE_HAS_MESSAGES_TIME . ' seconds');
69 }
70
71 $cache->endDataCache($nextExec);
72 }
73
74 return $nextExec <= new DateTime();
75 }
76
80 public static function run()
81 {
82 if (
83 defined('DisableMessageServiceCheck') && DisableMessageServiceCheck === true
84 || (
85 !defined('DisableMessageServiceCheck')
86 && defined("DisableEventsCheck")
87 && DisableEventsCheck === true
88 )
89 )
90 {
91 return null;
92 }
93
94 if (static::isStopped() || !static::hasMessages())
95 {
96 return "";
97 }
98
99 Application::getInstance()->addBackgroundJob([static::class, "sendMessages"]);
100
101 return "";
102 }
103
107 public static function sendMessages(): ?string
108 {
109 if (static::isStopped())
110 {
111 return '';
112 }
113
114 $lockTag = 'b_messageservice_message';
115 if (!Application::getConnection()->lock($lockTag))
116 {
117 return '';
118 }
119
121 $event->send();
122 if (!$event->canProcessQueue())
123 {
124 Application::getConnection()->unlock($lockTag);
125
126 return '';
127 }
128
129 $limit = (int)Config\Option::get('messageservice', 'queue_limit');
130 if ($limit < 1)
131 {
132 $limit = 5;
133 }
134
135 $query =
136 MessageTable::query()
137 ->addSelect('ID')
138 ->addSelect('TYPE')
139 ->addSelect('SENDER_ID')
140 ->addSelect('AUTHOR_ID')
141 ->addSelect('MESSAGE_FROM')
142 ->addSelect('MESSAGE_TO')
143 ->addSelect('MESSAGE_HEADERS')
144 ->addSelect('MESSAGE_BODY')
145 ->addSelect('EXTERNAL_ID')
146 ->where(Query::filter()
147 ->logic('or')
148 ->where(Query::filter()
149 ->logic('and')
150 ->where('SUCCESS_EXEC', SuccessExec::NO)
151 ->where(Query::filter()
152 ->logic('or')
153 ->where('NEXT_EXEC', '<', new DateTime())
154 ->whereNull('NEXT_EXEC')
155 )
156 )
157 ->where(Query::filter()
158 ->logic('and')
159 ->where('SUCCESS_EXEC', SuccessExec::PROCESSED)
160 ->where('NEXT_EXEC', '<', (new DateTime())->add('-2 MINUTE'))
161 )
162 )
163 ->addOrder('ID')
164 ->setLimit($limit)
165 ;
166
167 if (defined('BX_CLUSTER_GROUP'))
168 {
169 $query->where('CLUSTER_GROUP', \BX_CLUSTER_GROUP);
170 }
171 $messageFieldsList = $query->fetchAll();
172
173 if (empty($messageFieldsList))
174 {
175 Application::getConnection()->unlock($lockTag);
176
177 return null;
178 }
179
180 $idList = array_column($messageFieldsList, 'ID');
181 MessageTable::updateMulti(
182 $idList,
183 [
184 'SUCCESS_EXEC' => SuccessExec::PROCESSED,
185 'NEXT_EXEC' => (new DateTime())->add('+2 MINUTE'),
186 ],
187 true
188 );
189
190 $hasDailyLimits = Sender\Limitation::hasDailyLimits();
191 if ($hasDailyLimits)
192 {
194 }
195 else
196 {
197 $counts = [];
198 }
199
200 $nextDay = static::getNextExecTime();
201 foreach ($messageFieldsList as $messageFields)
202 {
204
205 if ($hasDailyLimits)
206 {
207 $sender = $message->getSender();
208 if ($sender)
209 {
210 $limit = Sender\Limitation::getDailyLimit($sender->getId(), $messageFields['MESSAGE_FROM']);
211 if ($limit > 0)
212 {
213 $serviceId = $sender->getId() . ':' . $messageFields['MESSAGE_FROM'];
214
215 $counts[$serviceId] ??= 0;
216 if ($counts[$serviceId] >= $limit)
217 {
218 $message->update([
219 'STATUS_ID' => MessageStatus::DEFERRED,
220 'NEXT_EXEC' => $nextDay,
221 ]);
222
223 continue;
224 }
225
226 ++$counts[$serviceId];
227 }
228 }
229 }
230
231 try
232 {
233 $result = static::sendMessage($messageFields);
234 $message->updateWithSendResult($result, $nextDay);
235 }
236 catch (\Throwable $e)
237 {
238 Application::getInstance()->getExceptionHandler()->writeToLog($e);
239
240 $message->update([
241 'STATUS_ID' => MessageStatus::EXCEPTION,
242 'SUCCESS_EXEC' => SuccessExec::ERROR,
243 'DATE_EXEC' => new DateTime(),
244 'EXEC_ERROR' => $e->getMessage(),
245 ]);
246
247 break;
248 }
249 }
250
252 $event->send();
253
254 Application::getConnection()->unlock($lockTag);
255
256 return null;
257 }
258
263 private static function sendMessage(array $messageFields)
264 {
266 $event->send();
267
268 $sendResult = $event->processResults() ?? new SendMessage;
269 if (!$sendResult->isSuccess())
270 {
271 return $sendResult;
272 }
273
274 $type = $messageFields['TYPE'];
275 if ($type === MessageType::SMS)
276 {
277 $sender = SmsManager::getSenderById($messageFields['SENDER_ID']);
278 if (!$sender)
279 {
280 $sendResult->addError(new Error(Loc::getMessage("MESSAGESERVICE_QUEUE_SENDER_NOT_FOUND")));
281 }
282 else
283 {
284 $sender->setSocketTimeout(6);
285 $sender->setStreamTimeout(18);
286 $sendResult = $sender->sendMessage($messageFields);
287 }
288 }
289 else
290 {
291 $sendResult->addError(new Error(Loc::getMessage("MESSAGESERVICE_QUEUE_MESSAGE_TYPE_ERROR")));
292 }
293
294 $event = new AfterSendMessageFromQueueEvent($messageFields, $sendResult);
295 $event->send();
296 $event->sendAlias(static::EVENT_SEND_RESULT);
297
298 return $sendResult;
299 }
300
306 private static function getNextExecTime(): DateTime
307 {
308 $nextDay = DateTime::createFromTimestamp(time() + 86400);
309 $retryTime = Sender\Limitation::getRetryTime();
310 if (!$retryTime['auto'])
311 {
312 if ($nextDay->getTimeZone()->getName() !== $retryTime['tz'])
313 {
314 try //if TZ is incorrect
315 {
316 $nextDay->setTimeZone(new \DateTimeZone($retryTime['tz']));
317 }
318 catch (\Exception $e) {}
319 }
320 $nextDay->setTime($retryTime['h'], $retryTime['i'], 0);
321 }
322 return $nextDay;
323 }
324
325 public static function stop(): void
326 {
327 Config\Option::set(self::OPTION_QUEUE_STOP_MODULE, self::OPTION_QUEUE_STOP_NAME, 'Y');
328 }
329
330 public static function resume(): void
331 {
332 Config\Option::set(self::OPTION_QUEUE_STOP_MODULE, self::OPTION_QUEUE_STOP_NAME, 'N');
333 }
334
335 public static function isStopped(): bool
336 {
337 return Config\Option::get(self::OPTION_QUEUE_STOP_MODULE, self::OPTION_QUEUE_STOP_NAME) === 'Y';
338 }
339
343 public static function cleanUpAgent(): string
344 {
345 $period = abs(intval(Config\Option::get("messageservice", "clean_up_period", 14)));
346 $periodInSeconds = $period * 24 * 3600;
347
348 if ($periodInSeconds > 0)
349 {
351 $datetime = $connection->getSqlHelper()->addSecondsToDateTime('-' . $periodInSeconds);
352 $connection->queryExecute("DELETE FROM b_messageservice_message WHERE DATE_EXEC <= {$datetime}");
353 }
354
355 return __METHOD__.'();';
356 }
357}
$connection
Определения actionsdefinitions.php:38
$type
Определения options.php:106
$messageFields
Определения callback_ednaru.php:22
static getConnection($name="")
Определения application.php:638
static get($moduleId, $name, $default="", $siteId=false)
Определения option.php:30
static set($moduleId, $name, $value="", $siteId="")
Определения option.php:261
static createFromFields(array $fields, Sender\Base $sender=null)
Определения message.php:89
static stop()
Определения queue.php:325
const EVENT_SEND_RESULT
Определения queue.php:26
const CACHE_HAS_MESSAGES_DIR
Определения queue.php:33
static hasMessages()
Определения queue.php:35
const CACHE_HAS_MESSAGES_ID
Определения queue.php:32
static isStopped()
Определения queue.php:335
static sendMessages()
Определения queue.php:107
static cleanUpAgent()
Определения queue.php:343
static run()
Определения queue.php:80
static resume()
Определения queue.php:330
static getDailyLimit($senderId, $fromId)
Определения limitation.php:53
static getSenderById($id)
Определения smsmanager.php:158
</td ></tr ></table ></td ></tr >< tr >< td class="bx-popup-label bx-width30"><?=GetMessage("PAGE_NEW_TAGS")?> array( $site)
Определения file_new.php:804
$result
Определения get_property_values.php:14
$query
Определения get_search.php:11
const DisableEventsCheck
trait Error
Определения error.php:11
$message
Определения payment.php:8
$event
Определения prolog_after.php:141
if( $daysToExpire >=0 &&$daysToExpire< 60 elseif)( $daysToExpire< 0)
Определения prolog_main_admin.php:393