From 4773325ae854a33d0a9b2e380afa4d3835725cff Mon Sep 17 00:00:00 2001 From: Ferdinand Kuhl Date: Sun, 18 Apr 2021 22:24:02 +0200 Subject: [PATCH] First working version of symfony/messenger bus --- .phpstorm.meta.php | 8 + Classes/Command/FailedCommandController.php | 101 +++++++++++++ .../Command/MessengerCommandController.php | 120 +++++++++++++++ Classes/Command/RunSymfonyCommandTrait.php | 89 +++++++++++ Classes/EventDispatcherFactory.php | 76 ++++++++++ .../StopWorkerOnRestartSignalListener.php | 70 +++++++++ Classes/HandlersLocatorFactory.php | 57 +++++++ Classes/MessageBusContainer.php | 45 ++++++ Classes/ObjectManagement/ChainedContainer.php | 35 +++++ .../ObjectManagement/RewindableGenerator.php | 60 ++++++++ Classes/Package.php | 14 ++ Classes/RetryStrategiesContainer.php | 61 ++++++++ .../FlowDoctrineTransportFactory.php | 61 ++++++++ Classes/Transport/NullTransport.php | 41 +++++ Classes/Transport/NullTransportFactory.php | 26 ++++ Classes/Transport/TransportsContainer.php | 76 ++++++++++ Configuration/Caches.yaml | 3 + Configuration/Objects.yaml | 142 ++++++++++++++++++ Configuration/Settings.yaml | 59 ++++++++ Configuration/Testing/Settings.yaml | 36 +++++ Tests/Functional/BusTest.php | 92 ++++++++++++ .../Fixtures/Message/FailingMessage.php | 7 + .../Message/FailingTestMessageHandler.php | 18 +++ .../Fixtures/Message/TestMessage.php | 24 +++ .../Fixtures/Message/TestMessageHandler.php | 18 +++ composer.json | 28 ++++ 26 files changed, 1367 insertions(+) create mode 100644 .phpstorm.meta.php create mode 100644 Classes/Command/FailedCommandController.php create mode 100644 Classes/Command/MessengerCommandController.php create mode 100644 Classes/Command/RunSymfonyCommandTrait.php create mode 100644 Classes/EventDispatcherFactory.php create mode 100644 Classes/EventListener/StopWorkerOnRestartSignalListener.php create mode 100644 Classes/HandlersLocatorFactory.php create mode 100644 Classes/MessageBusContainer.php create mode 100644 Classes/ObjectManagement/ChainedContainer.php create mode 100644 Classes/ObjectManagement/RewindableGenerator.php create mode 100644 Classes/Package.php create mode 100644 Classes/RetryStrategiesContainer.php create mode 100644 Classes/Transport/FlowDoctrineTransportFactory.php create mode 100644 Classes/Transport/NullTransport.php create mode 100644 Classes/Transport/NullTransportFactory.php create mode 100644 Classes/Transport/TransportsContainer.php create mode 100644 Configuration/Caches.yaml create mode 100644 Configuration/Objects.yaml create mode 100644 Configuration/Settings.yaml create mode 100644 Configuration/Testing/Settings.yaml create mode 100644 Tests/Functional/BusTest.php create mode 100644 Tests/Functional/Fixtures/Message/FailingMessage.php create mode 100644 Tests/Functional/Fixtures/Message/FailingTestMessageHandler.php create mode 100644 Tests/Functional/Fixtures/Message/TestMessage.php create mode 100644 Tests/Functional/Fixtures/Message/TestMessageHandler.php create mode 100644 composer.json diff --git a/.phpstorm.meta.php b/.phpstorm.meta.php new file mode 100644 index 0000000..bd6d75d --- /dev/null +++ b/.phpstorm.meta.php @@ -0,0 +1,8 @@ +%command.name% shows message that are pending in the failure transport. + * + * php %command.full_name% + * + * Or look at a specific message by its id: + * + * php %command.full_name% {id} + * + * Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask) + */ + public function showCommand() + { + $command = new FailedMessagesShowCommand( + $this->configuration['failureTransport'], + $this->receiverContainer->get($this->configuration['failureTransport']) + ); + $this->run($command); + } + + /** + * Remove given messages from the failure transport + * + * The %command.name% removes given messages that are pending in the failure transport. + * + * php %command.full_name% {id1} [{id2} ...] + * + * The specific ids can be found via the messenger:failed:show command. + * + * Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask) + */ + public function removeCommand() + { + $command = new FailedMessagesRemoveCommand( + $this->configuration['failureTransport'], + $this->receiverContainer->get($this->configuration['failureTransport']) + ); + $this->run($command); + } + + /** + * Retry one or more messages from the failure transport + * + * The command will interactively ask if each message should be retried + * or discarded. + * + * Some transports support retrying a specific message id, which comes + * from the messenger:failed:show command. + * + * php %command.full_name% {id} + * + * Or pass multiple ids at once to process multiple messages: + * + * php %command.full_name% {id1} {id2} {id3} + * + * Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask) + * + * @noinspection PhpParamsInspection + */ + public function retryCommand() + { + $command = new FailedMessagesRetryCommand( + $this->configuration['failureTransport'], + $this->receiverContainer->get($this->configuration['failureTransport']), + $this->objectManager->get('DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus'), + $this->objectManager->get('DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher'), + $this->objectManager->get(LoggerInterface::class) + ); + $this->run($command); + } +} diff --git a/Classes/Command/MessengerCommandController.php b/Classes/Command/MessengerCommandController.php new file mode 100644 index 0000000..9b692ff --- /dev/null +++ b/Classes/Command/MessengerCommandController.php @@ -0,0 +1,120 @@ +worker:consume receiver1 receiver2 + * + * Options are: + * --limit limits the number of messages received + * --failure-limit stop the worker when the given number of failed messages is reached + * --memory-limit stop the worker if it exceeds a given memory usage limit. You can use shorthand + * byte values [K, M, or G] + * --time-limit stop the worker when the gien time limit (in seconds) is reached. If a message is beeing handled, + * the worker will stop after the processing is finished + * --bus specify the message bus to dispatch received messages to instead of trying to determine it automatically. + * This is required if the messages didn't originate from Messenger + * + * Optional arguments are -q (quiet) and -v[v[v]] (verbosity) + */ + public function consumeCommand() + { + if ($this->receiverContainer instanceof DependencyProxy) { + $this->receiverContainer->_activateDependency(); + } + if ($this->eventDispatcher instanceof DependencyProxy) { + $this->eventDispatcher->_activateDependency(); + } + $command = new ConsumeMessagesCommand( + $this->routableBus, + $this->receiverContainer, + $this->eventDispatcher, + $this->logger, + array_keys($this->configuration['transports']) + ); + $this->run($command); + } + + + /** + * List all available receivers + */ + public function listReceiversCommand() + { + foreach (array_keys($this->configuration['transports']) as $transportName) { + $this->outputLine('- ' . $transportName); + } + } + + /** + * Stop workers after their current message + * + * Each worker command will finish the message they are currently processing + * and then exit. Worker commands are *not* automatically restarted: that + * should be handled by a process control system. + */ + public function stopWorkersCommand() + { + $cacheItem = $this->restartSignalCachePool->getItem( + StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY + ); + $cacheItem->set(microtime(true)); + $this->restartSignalCachePool->save($cacheItem); + + //TODO: Add the possibility to wait until all are exited + } +} diff --git a/Classes/Command/RunSymfonyCommandTrait.php b/Classes/Command/RunSymfonyCommandTrait.php new file mode 100644 index 0000000..68654cb --- /dev/null +++ b/Classes/Command/RunSymfonyCommandTrait.php @@ -0,0 +1,89 @@ +getDefinition(); + $definition->setArguments(array_merge( + [new InputArgument('command', InputArgument::REQUIRED)], + $definition->getArguments() + )); + $definition->setOptions(array_merge( + [ + new InputOption('--verbose', '-v|vv|vvv', InputOption::VALUE_NONE, 'Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug'), + new InputOption('--quiet', '-q', InputOption::VALUE_NONE, 'Do not output any message'), + ], + $definition->getOptions() + )); + $input = new ArgvInput(null, $command->getDefinition()); + $this->configureIO($input, $this->output->getOutput()); + $command->run($input, $this->output->getOutput()); + } + + protected function configureIO($input, $output) + { + switch ($shellVerbosity = (int) getenv('SHELL_VERBOSITY')) { + case -1: + $output->setVerbosity(OutputInterface::VERBOSITY_QUIET); + break; + case 1: + $output->setVerbosity(OutputInterface::VERBOSITY_VERBOSE); + break; + case 2: + $output->setVerbosity(OutputInterface::VERBOSITY_VERY_VERBOSE); + break; + case 3: + $output->setVerbosity(OutputInterface::VERBOSITY_DEBUG); + break; + default: + $shellVerbosity = 0; + break; + } + + if (true === $input->hasParameterOption(['--quiet', '-q'], true)) { + $output->setVerbosity(OutputInterface::VERBOSITY_QUIET); + $shellVerbosity = -1; + } else { + if ( + $input->hasParameterOption('-vvv', true) + || $input->hasParameterOption('--verbose=3', true) + || 3 === $input->getParameterOption('--verbose', false, true) + ) { + $output->setVerbosity(OutputInterface::VERBOSITY_DEBUG); + $shellVerbosity = 3; + } elseif ( + $input->hasParameterOption('-vv', true) + || $input->hasParameterOption('--verbose=2', true) + || 2 === $input->getParameterOption('--verbose', false, true) + ) { + $output->setVerbosity(OutputInterface::VERBOSITY_VERY_VERBOSE); + $shellVerbosity = 2; + } elseif ( + $input->hasParameterOption('-v', true) + || $input->hasParameterOption('--verbose=1', true) + || $input->hasParameterOption('--verbose', true) + || $input->getParameterOption('--verbose', false, true) + ) { + $output->setVerbosity(OutputInterface::VERBOSITY_VERBOSE); + $shellVerbosity = 1; + } + } + + if (-1 === $shellVerbosity) { + $input->setInteractive(false); + } + + putenv('SHELL_VERBOSITY=' . $shellVerbosity); + $_ENV['SHELL_VERBOSITY'] = $shellVerbosity; + $_SERVER['SHELL_VERBOSITY'] = $shellVerbosity; + } +} diff --git a/Classes/EventDispatcherFactory.php b/Classes/EventDispatcherFactory.php new file mode 100644 index 0000000..1b612e3 --- /dev/null +++ b/Classes/EventDispatcherFactory.php @@ -0,0 +1,76 @@ +configuration['eventDispatcher']['subscribers'] as $subscriberId => $enabled) { + if ($subscriberId === null || ! (bool) $enabled) { + continue; + } + $this->addLazySubscribers($eventDispatcher, $subscriberId); + } + return $eventDispatcher; + } + + private function addLazySubscribers(EventDispatcherInterface $eventDispatcher, $subscriberId) + { + $subscriberClass = $this->objectManager->getClassNameByObjectName($subscriberId); + if (! is_a($subscriberClass, EventSubscriberInterface::class, true)) { + throw new \RuntimeException( + 'Object with name ' . $subscriberId . ' is not an EventSubscriberInterface', + 1618753949 + ); + } + + foreach ($subscriberClass::getSubscribedEvents() as $eventName => $params) { + if (\is_string($params)) { + $callClosure = function (...$arguments) use ($subscriberId, $params) { + $subscriber = $this->objectManager->get($subscriberId); + $method = $params; + return $subscriber->$method(...$arguments); + }; + $eventDispatcher->addListener($eventName, $callClosure); + } elseif (\is_string($params[0])) { + $callClosure = function (...$arguments) use ($subscriberId, $params) { + $subscriber = $this->objectManager->get($subscriberId); + $method = $params[0]; + return $subscriber->$method(...$arguments); + }; + $eventDispatcher->addListener($eventName, $callClosure, $params[1] ?? 0); + } else { + foreach ($params as $listener) { + $callClosure = function (...$arguments) use ($subscriberId, $listener) { + $subscriber = $this->objectManager->get($subscriberId); + $method = $listener[0]; + return $subscriber->$method(...$arguments); + }; + $eventDispatcher->addListener($eventName, $callClosure, $listener[1] ?? 0); + } + } + } + } +} diff --git a/Classes/EventListener/StopWorkerOnRestartSignalListener.php b/Classes/EventListener/StopWorkerOnRestartSignalListener.php new file mode 100644 index 0000000..2104fea --- /dev/null +++ b/Classes/EventListener/StopWorkerOnRestartSignalListener.php @@ -0,0 +1,70 @@ +workerStartedAt = microtime(true); + } + + public function onWorkerRunning(WorkerRunningEvent $event): void + { + if ($this->shouldRestart()) { + $event->getWorker()->stop(); + if (null !== $this->logger) { + $this->logger->info('Worker stopped because a restart was requested.'); + } + } + } + + public static function getSubscribedEvents() + { + return [ + WorkerStartedEvent::class => 'onWorkerStarted', + WorkerRunningEvent::class => 'onWorkerRunning', + ]; + } + + private function shouldRestart(): bool + { + $cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY); + + if (!$cacheItem->isHit()) { + // no restart has ever been scheduled + return false; + } + + return $this->workerStartedAt < $cacheItem->get(); + } +} diff --git a/Classes/HandlersLocatorFactory.php b/Classes/HandlersLocatorFactory.php new file mode 100644 index 0000000..b7b3b50 --- /dev/null +++ b/Classes/HandlersLocatorFactory.php @@ -0,0 +1,57 @@ +reflectionService + ->getAllImplementationClassNamesForInterface(MessageSubscriberInterface::class); + $handlerDescriptors = []; + foreach ($messageHandlerClasses as $messageHandlerClass) { + foreach ($messageHandlerClass::getHandledMessages() as $messageName => $config) { + if (! is_array($config)) { + throw new \InvalidArgumentException( + 'different from doctrine, we (currently) need subscribers to always have an option array' + ); + } + if (isset($config['bus']) && $config['bus'] !== $busName) { + continue; + } + $handlerDescriptors[$messageName][] = new HandlerDescriptor( + $this->objectManager->get($messageHandlerClass), + $config + ); + } + } + // TODO: Maybe we can allow handlers to be added to bus or globally by configuration? + + return new HandlersLocator($handlerDescriptors); + } +} diff --git a/Classes/MessageBusContainer.php b/Classes/MessageBusContainer.php new file mode 100644 index 0000000..e461003 --- /dev/null +++ b/Classes/MessageBusContainer.php @@ -0,0 +1,45 @@ +buses[$id])) { + $middlewares = new RewindableGenerator($this->configuration[$id]['middleware']); + $this->buses[$id] = new MessageBus($middlewares); + } + return $this->buses[$id]; + } + + /** + * @inheritDoc + */ + public function has(string $id) + { + return isset($this->configuration[$id]); + } +} diff --git a/Classes/ObjectManagement/ChainedContainer.php b/Classes/ObjectManagement/ChainedContainer.php new file mode 100644 index 0000000..22cd004 --- /dev/null +++ b/Classes/ObjectManagement/ChainedContainer.php @@ -0,0 +1,35 @@ +childContainers = $childContainers; + } + + public function get(string $id) + { + foreach ($this->childContainers as $childContainer) { + if ($childContainer->has($id)) { + return $childContainer->get($id); + } + } + throw new \InvalidArgumentException('Service id is unknown: ' . $id); + } + + public function has(string $id) + { + foreach ($this->childContainers as $childContainer) { + if ($childContainer->has($id)) { + return true; + } + } + return false; + } +} diff --git a/Classes/ObjectManagement/RewindableGenerator.php b/Classes/ObjectManagement/RewindableGenerator.php new file mode 100644 index 0000000..5332e5d --- /dev/null +++ b/Classes/ObjectManagement/RewindableGenerator.php @@ -0,0 +1,60 @@ +serviceIds = $serviceIds; + $sortedServiceIds = array_keys( + (new PositionalArraySorter($serviceIds))->toArray() + ); + $this->generator = function () use ($sortedServiceIds) { + foreach ($sortedServiceIds as $serviceId) { + if ($serviceId === null) { + continue; + } + $object = $this->objectManager->get($serviceId); + // TODO: Thats a quite poor solution to dynamically inject the logger - but it is easy + if (method_exists($object, 'setLogger')) { + $object->setLogger($this->objectManager->get(LoggerInterface::class)); + } + yield $object; + } + }; + } + + public function getIterator() + { + $g = $this->generator; + + return $g(); + } + + public function count() + { + return count($this->serviceIds); + } +} diff --git a/Classes/Package.php b/Classes/Package.php new file mode 100644 index 0000000..77fae7d --- /dev/null +++ b/Classes/Package.php @@ -0,0 +1,14 @@ +configuration['transports'][$id])) { + throw new \InvalidArgumentException('Unknown transport name: ' . $id); + } + if (! isset($this->retryStrategies[$id])) { + $strategyDefinition = array_merge( + $this->configuration['defaultRetryStrategyOptions'], + $this->configuration['transports'][$id]['retryStrategy'] ?? [] + ); + if ($strategyDefinition['service']) { + $this->retryStrategies[$id] = $this->objectManager->get($strategyDefinition['service']); + } else { + $this->retryStrategies[$id] = new MultiplierRetryStrategy( + $strategyDefinition['maxRetries'], + $strategyDefinition['delay'], + $strategyDefinition['multiplier'], + $strategyDefinition['maxDelay'] + ); + } + } + return $this->retryStrategies[$id]; + } + + public function has(string $id) + { + return isset($this->configuration['transports'][$id]); + } +} diff --git a/Classes/Transport/FlowDoctrineTransportFactory.php b/Classes/Transport/FlowDoctrineTransportFactory.php new file mode 100644 index 0000000..d18897f --- /dev/null +++ b/Classes/Transport/FlowDoctrineTransportFactory.php @@ -0,0 +1,61 @@ +entityManager = $entityManager; + } + + public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface + { + $useNotify = ($options['use_notify'] ?? true); + unset($options['transport_name'], $options['use_notify']); + // Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver + // when LISTEN/NOTIFY isn't available + $configuration = PostgreSqlConnection::buildConfiguration($dsn, $options); + + try { + $driverConnection = $this->entityManager->getConnection(); + } catch (\InvalidArgumentException $e) { + throw new TransportException(sprintf( + 'Could not find Doctrine connection from Messenger DSN "%s".', + $dsn + ), 0, $e); + } + + if ($useNotify && $driverConnection->getDriver() instanceof AbstractPostgreSQLDriver) { + $connection = new PostgreSqlConnection($configuration, $driverConnection); + } else { + $connection = new Connection($configuration, $driverConnection); + } + + return new DoctrineTransport($connection, $serializer); + } + + /** + * @inheritDoc + */ + public function supports(string $dsn, array $options): bool + { + return 0 === strpos($dsn, 'flow-doctrine://'); + } +} diff --git a/Classes/Transport/NullTransport.php b/Classes/Transport/NullTransport.php new file mode 100644 index 0000000..77973af --- /dev/null +++ b/Classes/Transport/NullTransport.php @@ -0,0 +1,41 @@ +configuration['transports'][$id])) { + throw new \InvalidArgumentException('Unknown transport name: ' . $id); + } + if (! isset($this->transports[$id])) { + $transportDefinition = array_merge([ + 'dsn' => '', + 'options' => [], + 'serializer' => $this->configuration['defaultSerializerName'], + # TODO: Probably this has to be setup elsewhere, as the transport does not care by itself + 'retry_strategy' => [ # TODO: Make the default configurable + 'max_retries' => 3, + # milliseconds delay + 'delay' => 1000, + # causes the delay to be higher before each retry + # e.g. 1 second delay, 2 seconds, 4 seconds + 'multiplier' => 2, + 'max_delay' => 0, + # override all of this with a service that + # implements Symfony\Component\Messenger\Retry\RetryStrategyInterface + 'service' => null + ] + ], $this->configuration['transports'][$id]); + $this->transports[$id] = $this->transportFactory->createTransport( + $transportDefinition['dsn'], + $transportDefinition['options'], + $this->objectManager->get($transportDefinition['serializer']) + ); + } + return $this->transports[$id]; + } + + public function has(string $id) + { + return isset($this->configuration['transports'][$id]); + } +} diff --git a/Configuration/Caches.yaml b/Configuration/Caches.yaml new file mode 100644 index 0000000..cc0591e --- /dev/null +++ b/Configuration/Caches.yaml @@ -0,0 +1,3 @@ +DigiComp_FlowSymfony_Bridge_Messenger_RestartSignal: + frontend: 'Neos\Cache\Frontend\VariableFrontend' + backend: 'Neos\Cache\Backend\FileBackend' diff --git a/Configuration/Objects.yaml b/Configuration/Objects.yaml new file mode 100644 index 0000000..380de1e --- /dev/null +++ b/Configuration/Objects.yaml @@ -0,0 +1,142 @@ +DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus: + className: 'Symfony\Component\Messenger\RoutableMessageBus' + arguments: + 1: + object: 'DigiComp\FlowSymfonyBridge\Messenger\MessageBusContainer' + 2: + object: 'Symfony\Component\Messenger\MessageBusInterface' + +Symfony\Component\Messenger\MessageBusInterface: + className: 'Symfony\Component\Messenger\MessageBus' + factoryObjectName: 'DigiComp\FlowSymfonyBridge\Messenger\MessageBusContainer' + factoryMethodName: 'get' + arguments: + 1: + setting: 'DigiComp.FlowSymfonyBridge.Messenger.defaultBusName' + +DigiComp.FlowSymfonyBridge.Messenger:DefaultBusHandlersLocator: + className: 'Symfony\Component\Messenger\Handler\HandlersLocator' + factoryObjectName: 'DigiComp\FlowSymfonyBridge\Messenger\HandlersLocatorFactory' + factoryMethodName: 'create' + arguments: + 1: + setting: 'DigiComp.FlowSymfonyBridge.Messenger.defaultBusName' + +DigiComp.FlowSymfonyBridge.Messenger:DefaultSendersLocator: + className: 'Symfony\Component\Messenger\Transport\Sender\SendersLocator' + arguments: + 1: + # TODO: This would be the position were routes with bus specific routes could be merged + setting: 'DigiComp.FlowSymfonyBridge.Messenger.routing' + 2: + object: 'DigiComp\FlowSymfonyBridge\Messenger\Transport\TransportsContainer' + +DigiComp.FlowSymfonyBridge.Messenger:DefaultAddBusNameStampMiddleware: + className: 'Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware' + arguments: + 1: + setting: 'DigiComp.FlowSymfonyBridge.Messenger.defaultBusName' + +DigiComp.FlowSymfonyBridge.Messenger:DefaultHandleMessageMiddleware: + className: 'Symfony\Component\Messenger\Middleware\HandleMessageMiddleware' + arguments: + 1: + object: 'DigiComp.FlowSymfonyBridge.Messenger:DefaultBusHandlersLocator' + +DigiComp.FlowSymfonyBridge.Messenger:DefaultSendMessageMiddleware: + className: 'Symfony\Component\Messenger\Middleware\SendMessageMiddleware' + arguments: + 1: + object: 'DigiComp.FlowSymfonyBridge.Messenger:DefaultSendersLocator' + 2: + object: 'DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher' + +DigiComp.FlowSymfonyBridge.Messenger:TransportFactory: + className: 'Symfony\Component\Messenger\Transport\TransportFactory' + scope: 'singleton' + arguments: + 1: + object: 'DigiComp.FlowSymfonyBridge.Messenger:DefaultTransportFactories' + +DigiComp.FlowSymfonyBridge.Messenger:DefaultTransportFactories: + className: 'DigiComp\FlowSymfonyBridge\Messenger\ObjectManagement\RewindableGenerator' + arguments: + 1: + setting: 'DigiComp.FlowSymfonyBridge.Messenger.transportFactories' + +DigiComp.FlowSymfonyBridge.Messenger:SyncTransportFactory: + className: 'Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory' + arguments: + 1: + object: 'DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus' + +DigiComp.FlowSymfonyBridge.Messenger:DefaultSerializer: + className: 'Symfony\Component\Messenger\Transport\Serialization\PhpSerializer' + +DigiComp.FlowSymfonyBridge.Messenger:SendersContainer: + className: 'DigiComp\FlowSymfonyBridge\Messenger\ObjectManagement\ChainedContainer' + scope: 'singleton' + arguments: + 1: + object: 'DigiComp\FlowSymfonyBridge\Messenger\Transport\TransportsContainer' + # TODO: add own senders here, which are no transports + +DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer: + className: 'DigiComp\FlowSymfonyBridge\Messenger\ObjectManagement\ChainedContainer' + scope: 'singleton' + arguments: + 1: + object: 'DigiComp\FlowSymfonyBridge\Messenger\Transport\TransportsContainer' + # TODO: add own receivers here, which are no transports + +DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher: + className: 'Symfony\Component\EventDispatcher\EventDispatcher' + scope: 'singleton' + factoryObjectName: 'DigiComp\FlowSymfonyBridge\Messenger\EventDispatcherFactory' + factoryMethodName: 'create' + +DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCache: + className: 'Neos\Cache\Frontend\FrontendInterface' + factoryObjectName: 'Neos\Flow\Cache\CacheManager' + factoryMethodName: 'getCache' + arguments: + 1: + value: 'DigiComp_FlowSymfony_Bridge_Messenger_RestartSignal' + +DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool: + className: 'Neos\Cache\Psr\Cache\CachePool' + scope: 'singleton' + arguments: + 1: + value: 'DigiComp_FlowSymfony_Bridge_Messenger_RestartSignal' + 2: + object: + factoryObjectName: 'DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCache' + factoryMethodName: 'getBackend' + +Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener: + scope: 'singleton' + arguments: + 1: + object: + name: 'DigiComp.FlowSymfonyBridge.Messenger:SendersContainer' + 2: + object: 'DigiComp\FlowSymfonyBridge\Messenger\RetryStrategiesContainer' + 3: + object: 'Psr\Log\LoggerInterface' + 4: + object: + name: 'DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher' + +Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener: + scope: 'singleton' + arguments: + 1: + object: + factoryObjectName: 'DigiComp.FlowSymfonyBridge.Messenger:SendersContainer' + factoryMethodName: 'get' + arguments: + 1: + setting: 'DigiComp.FlowSymfonyBridge.Messenger.failureTransport' + 2: + object: 'Psr\Log\LoggerInterface' diff --git a/Configuration/Settings.yaml b/Configuration/Settings.yaml new file mode 100644 index 0000000..bd43264 --- /dev/null +++ b/Configuration/Settings.yaml @@ -0,0 +1,59 @@ +Neos: + Flow: + object: + includeClasses: + symfony.messenger: + - "Symfony\\\\Component\\\\Messenger\\\\EventListener\\\\.*" + +DigiComp: + FlowSymfonyBridge: + Messenger: + defaultBusName: "default" + defaultSerializerName: "DigiComp.FlowSymfonyBridge.Messenger:DefaultSerializer" + # TODO: use this + defaultRetryStrategyOptions: + maxRetries: 3 + # milliseconds delay + delay: 1000 + # causes the delay to be higher before each retry + # e.g. 1 second delay, 2 seconds, 4 seconds + multiplier: 2 + maxDelay: 0 + # override all of this with a service that + # implements Symfony\Component\Messenger\Retry\RetryStrategyInterface + service: null + + eventDispatcher: + subscribers: + Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener: true + Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener: true + Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener: true + DigiComp\FlowSymfonyBridge\Messenger\EventListener\StopWorkerOnRestartSignalListener: true + + buses: + default: + middleware: + DigiComp.FlowSymfonyBridge.Messenger:DefaultAddBusNameStampMiddleware: + position: "start" + Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware: true + Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware: true + Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware: true + DigiComp.FlowSymfonyBridge.Messenger:DefaultSendMessageMiddleware: + position: "end" + DigiComp.FlowSymfonyBridge.Messenger:DefaultHandleMessageMiddleware: + position: "end" + transportFactories: + DigiComp\FlowSymfonyBridge\Messenger\Transport\NullTransportFactory: true + DigiComp.FlowSymfonyBridge.Messenger:SyncTransportFactory: true + Symfony\Component\Messenger\Transport\InMemoryTransportFactory: true + DigiComp\FlowSymfonyBridge\Messenger\Transport\FlowDoctrineTransportFactory: true + + transports: + discard: + dsn: "null://" + + failureTransport: "discard" + # TODO: Receivers and Senders? (As far as I can see not possible in Symfony) + # receivers:[] + # senders: [] + routing: [] diff --git a/Configuration/Testing/Settings.yaml b/Configuration/Testing/Settings.yaml new file mode 100644 index 0000000..2a3ffdf --- /dev/null +++ b/Configuration/Testing/Settings.yaml @@ -0,0 +1,36 @@ +DigiComp: + FlowSymfonyBridge: + Messenger: + transports: + "test-in-memory-2": + dsn: "in-memory://" + "test-in-memory-1": + dsn: "in-memory://" + "test-doctrine": + dsn: "flow-doctrine://default?table_name=test_messenger_messages" + "test-sync": + dsn: "sync://" + "test-retry-doctrine": + dsn: "flow-doctrine://default?table_name=test_messenger_messages&queue_name=retry" + retryStrategy: + maxRetries: 1 + # milliseconds delay + delay: 50 + # causes the delay to be higher before each retry + # e.g. 1 second delay, 2 seconds, 4 seconds + multiplier: 2 + maxDelay: 0 + # override all of this with a service that + # implements Symfony\Component\Messenger\Retry\RetryStrategyInterface + # service: null + "test-failed-doctrine": + dsn: "flow-doctrine://default?table_name=test_messenger_messages&queue_name=failed" + failureTransport: "test-failed-doctrine" + routing: + DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message\TestMessage: + - "test-in-memory-1" + - "test-in-memory-2" + - "test-doctrine" + - "test-sync" + DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message\FailingMessage: + - "test-retry-doctrine" diff --git a/Tests/Functional/BusTest.php b/Tests/Functional/BusTest.php new file mode 100644 index 0000000..36e01ce --- /dev/null +++ b/Tests/Functional/BusTest.php @@ -0,0 +1,92 @@ +objectManager->get(MessageBusInterface::class); + + $messageBus->dispatch(new TestMessage('Hallo Welt!')); + $sendersContainer = $this->objectManager->get(TransportsContainer::class); + + /* @var InMemoryTransport $transport1 */ + $transport1 = $sendersContainer->get('test-in-memory-1'); + /* @var InMemoryTransport $transport2 */ + $transport2 = $sendersContainer->get('test-in-memory-2'); + /* @var DoctrineTransport $transport3 */ + $transport3 = $sendersContainer->get('test-doctrine'); + $this->assertInstanceOf(InMemoryTransport::class, $transport1); + $this->assertInstanceOf(InMemoryTransport::class, $transport2); + $this->assertInstanceOf(DoctrineTransport::class, $transport3); + $this->assertCount(1, $transport1->getSent()); + $this->assertCount(1, $transport2->getSent()); + $this->assertCount(1, $transport3->all()); + $this->assertCount(0, $transport1->getAcknowledged()); + $this->assertCount(0, $transport2->getAcknowledged()); + $this->assertCount(1, $transport3->all()); + + $eventDispatcher = $this->objectManager->get('DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher'); + $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + foreach (['test-in-memory-1', 'test-in-memory-2', 'test-doctrine'] as $transportId) { + $worker = new Worker([$transportId => $sendersContainer->get($transportId)], $messageBus, $eventDispatcher); + $worker->run(); + } + // TODO: Check for success on all workers - doctrine does not seem to get executed + $this->assertCount(1, $transport1->getAcknowledged()); + $this->assertCount(1, $transport2->getAcknowledged()); + $this->assertCount(0, $transport3->all()); + } + + /** + * @test + */ + public function itRetriesFailingMessages() + { + $messageBus = $this->objectManager->get(MessageBusInterface::class); + + $messageBus->dispatch(new FailingMessage()); + $sendersContainer = $this->objectManager->get(TransportsContainer::class); + + /* @var DoctrineTransport $transport1 */ + $transport1 = $sendersContainer->get('test-retry-doctrine'); + /* @var DoctrineTransport $failedTransport */ + $failedTransport = $sendersContainer->get('test-failed-doctrine'); + $this->assertCount(1, $transport1->all()); + $this->assertCount(0, $failedTransport->all()); + + $eventDispatcher = $this->objectManager->get('DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher'); + $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + $worker = new Worker( + ['test-retry-doctrine' => $sendersContainer->get('test-retry-doctrine')], + $messageBus, + $eventDispatcher + ); + $worker->run(); + $this->assertCount(1, $transport1->all()); + $this->assertCount(0, $failedTransport->all()); + + $worker = new Worker( + ['test-retry-doctrine' => $sendersContainer->get('test-retry-doctrine')], + $messageBus, + $eventDispatcher + ); + $worker->run(); + $this->assertCount(0, $transport1->all()); + $this->assertCount(1, $failedTransport->all()); + } +} diff --git a/Tests/Functional/Fixtures/Message/FailingMessage.php b/Tests/Functional/Fixtures/Message/FailingMessage.php new file mode 100644 index 0000000..4e181db --- /dev/null +++ b/Tests/Functional/Fixtures/Message/FailingMessage.php @@ -0,0 +1,7 @@ + []; + } + + public function __invoke(FailingMessage $message) + { + throw new \Exception('bang!'); + } +} diff --git a/Tests/Functional/Fixtures/Message/TestMessage.php b/Tests/Functional/Fixtures/Message/TestMessage.php new file mode 100644 index 0000000..d8dec37 --- /dev/null +++ b/Tests/Functional/Fixtures/Message/TestMessage.php @@ -0,0 +1,24 @@ +message = $message; + } + + /** + * @return string + */ + public function getMessage(): string + { + return $this->message; + } +} diff --git a/Tests/Functional/Fixtures/Message/TestMessageHandler.php b/Tests/Functional/Fixtures/Message/TestMessageHandler.php new file mode 100644 index 0000000..bddf7d0 --- /dev/null +++ b/Tests/Functional/Fixtures/Message/TestMessageHandler.php @@ -0,0 +1,18 @@ + []; + } + + public function __invoke(TestMessage $message) + { + //do nothing for now + } +} diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..6ad2036 --- /dev/null +++ b/composer.json @@ -0,0 +1,28 @@ +{ + "name": "digicomp/flow-symfony-bridge-messenger", + "type": "neos-package", + "description": "Flow dependency injection bridge to symfony/messenger", + "require": { + "neos/flow": "^6.3", + "symfony/doctrine-messenger": "^5.2.5", + "symfony/event-dispatcher": "^4.2 | ^5.2" + }, + "autoload": { + "psr-4": { + "DigiComp\\FlowSymfonyBridge\\Messenger\\": "Classes/" + } + }, + "autoload-dev": { + "psr-4": { + "DigiComp\\FlowSymfonyBridge\\Messenger\\Tests\\": "Tests/" + } + }, + "extra": { + "branch-alias": { + "dev-master": "0.0.x-dev" + }, + "neos": { + "package-key": "DigiComp.FlowSymfonyBridge.Messenger" + } + } +}