25 private const PACK_SIZE = 100;
27 private const LAST_PROCESSED_OPTION_NAME =
'queue_last_processed_id';
35 private Map $routedQueues;
41 $this->logger = $logger ??
new Logger();
49 if ($this->getMutex()->lock())
53 $this->handleMessages();
57 catch(Throwable $exception)
59 $this->logger->log($exception);
63 $this->getMutex()->unlock();
74 private function getMessages(): Generator
78 $messages = $this->getMessageMapper()->getMap(
80 '>ID' => $this->getLastProcessedId(),
91 $this->setLastProcessedId(
$message->getId());
103 private function routeMessage(Message
$message): bool
105 $rules = Registry::getInstance()->getRules();
107 foreach ($rules as $rule)
111 if ($handledMessage = $rule->route(
$message))
114 $handledMessage = $this->getHandledMessageMapper()->create($handledMessage);
119 catch(Throwable $exception)
121 $this->logger->log($exception);
131 private function getLastProcessedId(): int
133 return \COption::GetOptionInt(
"calendar", self::LAST_PROCESSED_OPTION_NAME, 0);
141 private function setLastProcessedId(
int $id = 0)
143 \COption::SetOptionInt(
"calendar", self::LAST_PROCESSED_OPTION_NAME, $id);
158 self::ON_QUEUE_PUSHED_EVENT_NAME,
172 if (empty($this->messageMapper))
177 return $this->messageMapper;
183 private function getHandledMessageMapper(): HandledMessageMapper
185 if (empty($this->handledMessageMapper))
187 $this->handledMessageMapper =
new HandledMessageMapper();
190 return $this->handledMessageMapper;
198 if (empty($this->routedQueues))
203 return $this->routedQueues;
209 private function getMutex():
Mutex
211 if (empty($this->mutex))
213 $this->mutex =
new Mutex(self::class);
225 public function handleMessages(): void
228 foreach ($this->getMessages() as
$message)
230 $isRouted = $this->routeMessage(
$message);
233 $this->getMessageMapper()->delete(
$message);