1C-Bitrix 25.700.0
Загрузка...
Поиск...
Не найдено
consumemessagescommand.php
См. документацию.
1<?php
2
3declare(strict_types=1);
4
6
10use Symfony\Component\Console\Command\Command;
11use Symfony\Component\Console\Exception\InvalidOptionException;
12use Symfony\Component\Console\Input\InputArgument;
13use Symfony\Component\Console\Input\InputInterface;
14use Symfony\Component\Console\Input\InputOption;
15use Symfony\Component\Console\Output\OutputInterface;
16
25class ConsumeMessagesCommand extends Command
26{
27 protected function configure(): void
28 {
29 $this
30 ->setName('messenger:consume')
31 ->setDescription('Run Messenger\'s worker')
32 ->setDefinition(
33 [
34 new InputArgument(
35 'queues',
36 InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
37 'Names of the queues to consume'
38 ),
39// new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
40 new InputOption(
41 'time-limit',
42 't',
43 InputOption::VALUE_REQUIRED,
44 'The time limit in seconds the worker can handle new messages'
45 ),
46 new InputOption(
47 'sleep',
48 null,
49 InputOption::VALUE_REQUIRED,
50 'Seconds to sleep before asking for new messages after worker iteration',
51 1
52 ),
53 ]
54 )
55 ;
56 }
57
58 protected function execute(InputInterface $input, OutputInterface $output): int
59 {
60 $output->writeln('Started ' . date('Y-m-d H:i:s') . '. PID: ' . getmypid());
61
62 if (null !== $timeLimit = $input->getOption('time-limit'))
63 {
64 if (!is_numeric($timeLimit) || $timeLimit < 1)
65 {
66 throw new InvalidOptionException(
67 \sprintf('Option "time-limit" must be a positive integer, "%s" passed.', $timeLimit)
68 );
69 }
70 }
71
72 $config = Configuration::getValue('messenger');
73
74 if (!$config)
75 {
76 return self::SUCCESS;
77 }
78
79 if (!empty($config['run_mode']) && $config['run_mode'] === WorkerRunMode::Cli->value)
80 {
81 $worker = new Worker();
82
83 $options = [
84 'sleep' => $input->getOption('sleep') * 1000000,
85 ];
86
87 if ($queues = $input->getArgument('queues'))
88 {
89 $options['queues'] = $queues;
90 }
91
92 $startTime = time();
93 $endTime = $timeLimit ? $startTime + $timeLimit : null;
94
95 while (!$endTime || (time() < $endTime))
96 {
97 $worker->process($options);
98
99 usleep($options['sleep']);
100 }
101 }
102 else
103 {
104 $output->writeln(
105 'Messenger not configured to run in CLI. Set "messenger.run_mode" to "cli" in .setting.php'
106 );
107 }
108
109 $output->writeln('Stopped ' . date('Y-m-d H:i:s') . '. PID: ' . getmypid());
110
111 return self::SUCCESS;
112 }
113}
execute(InputInterface $input, OutputInterface $output)
Определения consumemessagescommand.php:58
$startTime
Определения sync.php:69
$options
Определения commerceml2.php:49
$output
Определения options.php:436
$config
Определения quickway.php:69