2namespace Bitrix\MessageService;
4use Bitrix\Main\Application;
6use Bitrix\Main\Data\Cache;
8use Bitrix\Main\ORM\Query\Query;
9use Bitrix\Main\Type\DateTime;
10use Bitrix\MessageService\Internal\Entity\MessageTable;
11use Bitrix\MessageService\Sender\Result\SendMessage;
12use Bitrix\MessageService\Sender\SmsManager;
13use Bitrix\Main\Localization\Loc;
14use Bitrix\MessageService\Internal\Entity\Message\SuccessExec;
15use Bitrix\MessageService\Queue\Event\AfterProcessQueueEvent;
16use Bitrix\MessageService\Queue\Event\AfterSendMessageFromQueueEvent;
17use Bitrix\MessageService\Queue\Event\BeforeProcessQueueEvent;
18use Bitrix\MessageService\Queue\Event\BeforeSendMessageFromQueueEvent;
28 private const OPTION_QUEUE_STOP_MODULE =
'messageservice';
29 private const OPTION_QUEUE_STOP_NAME =
'queue_stopped';
31 private const CACHE_HAS_MESSAGES_TIME = 2592000;
39 $cache = Cache::createInstance();
40 if ($cache->initCache(self::CACHE_HAS_MESSAGES_TIME, self::CACHE_HAS_MESSAGES_ID, self::CACHE_HAS_MESSAGES_DIR))
42 $nextExec = $cache->getVars();
44 elseif ($cache->startDataCache())
46 $result = MessageTable::getList([
47 'select' => [
'ID',
'NEXT_EXEC'],
49 '=SUCCESS_EXEC' => SuccessExec::NO,
52 'order' => [
'ID' =>
'DESC'],
59 $nextExec =
$result[
'NEXT_EXEC'];
68 $nextExec = (
new DateTime())->add(
'+' . self::CACHE_HAS_MESSAGES_TIME .
' seconds');
71 $cache->endDataCache($nextExec);
80 public static function run()
83 defined(
'DisableMessageServiceCheck') && DisableMessageServiceCheck ===
true
85 !defined(
'DisableMessageServiceCheck')
86 && defined(
"DisableEventsCheck")
94 if (static::isStopped() || !static::hasMessages())
99 Application::getInstance()->addBackgroundJob([static::class,
"sendMessages"]);
109 if (static::isStopped())
114 $lockTag =
'b_messageservice_message';
115 if (!Application::getConnection()->lock($lockTag))
122 if (!
$event->canProcessQueue())
124 Application::getConnection()->unlock($lockTag);
129 $limit = (int)
Config\Option::get(
'messageservice',
'queue_limit');
136 MessageTable::query()
139 ->addSelect(
'SENDER_ID')
140 ->addSelect(
'AUTHOR_ID')
141 ->addSelect(
'MESSAGE_FROM')
142 ->addSelect(
'MESSAGE_TO')
143 ->addSelect(
'MESSAGE_HEADERS')
144 ->addSelect(
'MESSAGE_BODY')
145 ->addSelect(
'EXTERNAL_ID')
146 ->where(Query::filter()
148 ->where(Query::filter()
150 ->where(
'SUCCESS_EXEC', SuccessExec::NO)
151 ->where(Query::filter()
153 ->where(
'NEXT_EXEC',
'<',
new DateTime())
154 ->whereNull(
'NEXT_EXEC')
157 ->where(Query::filter()
159 ->where(
'SUCCESS_EXEC', SuccessExec::PROCESSED)
160 ->where(
'NEXT_EXEC',
'<', (
new DateTime())->add(
'-2 MINUTE'))
167 if (defined(
'BX_CLUSTER_GROUP'))
169 $query->where(
'CLUSTER_GROUP', \BX_CLUSTER_GROUP);
171 $messageFieldsList =
$query->fetchAll();
173 if (empty($messageFieldsList))
175 Application::getConnection()->unlock($lockTag);
180 $idList = array_column($messageFieldsList,
'ID');
181 MessageTable::updateMulti(
184 'SUCCESS_EXEC' => SuccessExec::PROCESSED,
185 'NEXT_EXEC' => (
new DateTime())->add(
'+2 MINUTE'),
200 $nextDay = static::getNextExecTime();
213 $serviceId = $sender->getId() .
':' .
$messageFields[
'MESSAGE_FROM'];
215 $counts[$serviceId] ??= 0;
216 if ($counts[$serviceId] >= $limit)
220 'NEXT_EXEC' => $nextDay,
226 ++$counts[$serviceId];
236 catch (\Throwable $e)
238 Application::getInstance()->getExceptionHandler()->writeToLog($e);
242 'SUCCESS_EXEC' => SuccessExec::ERROR,
244 'EXEC_ERROR' => $e->getMessage(),
254 Application::getConnection()->unlock($lockTag);
269 if (!$sendResult->isSuccess())
280 $sendResult->addError(
new Error(Loc::getMessage(
"MESSAGESERVICE_QUEUE_SENDER_NOT_FOUND")));
284 $sender->setSocketTimeout(6);
285 $sender->setStreamTimeout(18);
291 $sendResult->addError(
new Error(Loc::getMessage(
"MESSAGESERVICE_QUEUE_MESSAGE_TYPE_ERROR")));
296 $event->sendAlias(static::EVENT_SEND_RESULT);
306 private static function getNextExecTime(): DateTime
308 $nextDay = DateTime::createFromTimestamp(time() + 86400);
309 $retryTime = Sender\Limitation::getRetryTime();
310 if (!$retryTime[
'auto'])
312 if ($nextDay->getTimeZone()->getName() !== $retryTime[
'tz'])
316 $nextDay->setTimeZone(
new \DateTimeZone($retryTime[
'tz']));
318 catch (\Exception $e) {}
320 $nextDay->setTime($retryTime[
'h'], $retryTime[
'i'], 0);
325 public static function stop(): void
327 Config\Option::set(self::OPTION_QUEUE_STOP_MODULE, self::OPTION_QUEUE_STOP_NAME,
'Y');
332 Config\Option::set(self::OPTION_QUEUE_STOP_MODULE, self::OPTION_QUEUE_STOP_NAME,
'N');
337 return Config\Option::get(self::OPTION_QUEUE_STOP_MODULE, self::OPTION_QUEUE_STOP_NAME) ===
'Y';
345 $period = abs(intval(
Config\Option::get(
"messageservice",
"clean_up_period", 14)));
346 $periodInSeconds = $period * 24 * 3600;
348 if ($periodInSeconds > 0)
351 $datetime =
$connection->getSqlHelper()->addSecondsToDateTime(
'-' . $periodInSeconds);
352 $connection->queryExecute(
"DELETE FROM b_messageservice_message WHERE DATE_EXEC <= {$datetime}");
355 return __METHOD__.
'();';
static getConnection($name="")
static get($moduleId, $name, $default="", $siteId=false)
static set($moduleId, $name, $value="", $siteId="")
static getAllDailyCount()
static createFromFields(array $fields, Sender\Base $sender=null)
const CACHE_HAS_MESSAGES_DIR
const CACHE_HAS_MESSAGES_ID
static getDailyLimit($senderId, $fromId)
static getSenderById($id)
</td ></tr ></table ></td ></tr >< tr >< td class="bx-popup-label bx-width30"><?=GetMessage("PAGE_NEW_TAGS")?> array( $site)
if( $daysToExpire >=0 &&$daysToExpire< 60 elseif)( $daysToExpire< 0)