Skip to content

Commit 6eef208

Browse files
authored
allow overriding sender and receiver locators (#23)
1 parent b77009f commit 6eef208

4 files changed

Lines changed: 136 additions & 16 deletions

File tree

src/Messenger/Kernel/CommandBusDependencies.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ enum CommandBusDependencies: string {
1111
case Serializer = self::class.'::Serializer';
1212
case Worker = self::class.'::Worker';
1313
case SupervisorConfigDir = self::class.'::SupervisorConfigDir';
14+
case SendersLocator = self::class.'::SendersLocator';
15+
case ReceiversLocator = self::class.'::ReceiversLocator';
1416
}

src/Messenger/Kernel/MessengerServiceFactory.php

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
use WonderNetwork\SlimKernel\Messenger\CommandBus;
3232
use WonderNetwork\SlimKernel\Messenger\QueryBus;
3333
use WonderNetwork\SlimKernel\ServiceFactory;
34+
use WonderNetwork\SlimKernel\ServiceOfExpectedType;
3435
use WonderNetwork\SlimKernel\ServicesBuilder;
3536
use WonderNetwork\SlimKernel\Supervisor\GenerateSupervisorConfigCommand;
3637
use WonderNetwork\SlimKernel\Supervisor\SupervisorConfiguration;
@@ -78,10 +79,25 @@ public function __invoke(ServicesBuilder $builder): iterable {
7879

7980
// region senders
8081
yield TransportLocatorBuilder::class => $this->transports ?? TransportLocatorBuilder::empty();
81-
yield SendersLocator::class => function (TransportLocatorBuilder $config, ContainerInterface $container) {
82+
yield CommandBusDependencies::SendersLocator->value => fn (
83+
TransportLocatorBuilder $config,
84+
ContainerInterface $container,
85+
) => $config->sendersLocator($container);
86+
yield CommandBusDependencies::ReceiversLocator->value => fn (
87+
TransportLocatorBuilder $config,
88+
ContainerInterface $container,
89+
) => $config->receiversLocator($container);
90+
91+
yield SendersLocator::class => function (ContainerInterface $container) {
92+
$sendersLocator = ServiceOfExpectedType::getFromContainer(
93+
container: $container,
94+
key: CommandBusDependencies::SendersLocator->value,
95+
expectedType: ContainerInterface::class,
96+
);
97+
8298
return new SendersLocator(
8399
sendersMap: [],
84-
sendersLocator: $config->sendersLocator($container),
100+
sendersLocator: $sendersLocator,
85101
);
86102
};
87103

@@ -101,12 +117,26 @@ public function __invoke(ServicesBuilder $builder): iterable {
101117
yield QueryBus::class => autowire();
102118

103119
yield ConsumeMessagesCommand::class => function (TransportLocatorBuilder $config, ContainerInterface $container) {
104-
/** @var LoggerInterface $logger */
105-
$logger = $container->get(CommandBusDependencies::Logger->value);
106-
/** @var CacheItemPoolInterface $pool */
107-
$pool = $container->get(CommandBusDependencies::CachePool->value);
108-
/** @var EventDispatcher $eventDispatcher */
109-
$eventDispatcher = $container->get(CommandBusDependencies::EventDispatcher->value);
120+
$logger = ServiceOfExpectedType::getFromContainer(
121+
container: $container,
122+
key: CommandBusDependencies::Logger->value,
123+
expectedType: LoggerInterface::class,
124+
);
125+
$pool = ServiceOfExpectedType::getFromContainer(
126+
container: $container,
127+
key: CommandBusDependencies::CachePool->value,
128+
expectedType: CacheItemPoolInterface::class,
129+
);
130+
$receiversLocator = ServiceOfExpectedType::getFromContainer(
131+
container: $container,
132+
key: CommandBusDependencies::ReceiversLocator->value,
133+
expectedType: ContainerInterface::class,
134+
);
135+
$eventDispatcher = ServiceOfExpectedType::getFromContainer(
136+
container: $container,
137+
key: CommandBusDependencies::EventDispatcher->value,
138+
expectedType: EventDispatcher::class,
139+
);
110140
$eventDispatcher->addSubscriber(
111141
new StopWorkerOnRestartSignalListener(
112142
cachePool: $pool,
@@ -121,9 +151,11 @@ public function __invoke(ServicesBuilder $builder): iterable {
121151
busLocator: new Container(),
122152
fallbackBus: $container->get(MessageBusInterface::class),
123153
),
124-
receiverLocator: $config->receiversLocator($container),
154+
receiverLocator: $receiversLocator,
125155
eventDispatcher: $eventDispatcher,
126156
logger: $logger,
157+
// if it works, it works.
158+
// if we override the receivers locator, then we’re out of luck
127159
receiverNames: array_keys($config->receivers),
128160
);
129161
};
@@ -140,8 +172,11 @@ public function __invoke(ServicesBuilder $builder): iterable {
140172

141173
yield CommandBusDependencies::Worker->value => function (ContainerInterface $container) {
142174
$app = new Application('worker');
143-
/** @var EventDispatcher $eventDispatcher */
144-
$eventDispatcher = $container->get(CommandBusDependencies::EventDispatcher->value);
175+
$eventDispatcher = ServiceOfExpectedType::getFromContainer(
176+
container: $container,
177+
key: CommandBusDependencies::EventDispatcher->value,
178+
expectedType: EventDispatcher::class,
179+
);
145180

146181
$app->setDispatcher($eventDispatcher);
147182
$app->addCommands(

src/ServiceOfExpectedType.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace WonderNetwork\SlimKernel;
6+
7+
use DI\DependencyException;
8+
use Psr\Container\ContainerExceptionInterface;
9+
use Psr\Container\ContainerInterface;
10+
use Psr\Container\NotFoundExceptionInterface;
11+
12+
final readonly class ServiceOfExpectedType {
13+
/**
14+
* @template T of object
15+
* @param class-string<T> $expectedType
16+
* @return T
17+
* @throws ContainerExceptionInterface
18+
* @throws NotFoundExceptionInterface
19+
*/
20+
public static function getFromContainer(ContainerInterface $container, string $key, string $expectedType): mixed {
21+
$actual = $container->get($key);
22+
23+
if (false === $actual instanceof $expectedType) {
24+
throw new DependencyException(
25+
sprintf(
26+
'Service "%s" is expected to be of type "%s", %s given',
27+
$key,
28+
$expectedType,
29+
get_debug_type($actual),
30+
),
31+
);
32+
}
33+
34+
return $actual;
35+
}
36+
}

tests/Messenger/MessengerTest.php

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,36 @@
66

77
use Acme\Sample\SideEffectsCommand;
88
use Acme\Sample\StateQuery;
9+
use DI\Container;
910
use PHPUnit\Framework\TestCase;
1011
use RuntimeException;
1112
use Symfony\Component\Console\Input\ArrayInput;
1213
use Symfony\Component\Console\Output\BufferedOutput;
1314
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
1415
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
1516
use WonderNetwork\SlimKernel\KernelBuilder;
17+
use WonderNetwork\SlimKernel\Messenger\Kernel\CommandBusDependencies;
1618
use WonderNetwork\SlimKernel\Messenger\Kernel\MessengerServiceFactory;
1719
use WonderNetwork\SlimKernel\Messenger\Kernel\TransportLocatorBuilder;
1820

1921
final class MessengerTest extends TestCase {
22+
private KernelBuilder $kernelBuilder;
23+
2024
protected function setUp(): void {
2125
if (file_exists($filename = __DIR__.'/../../.cache/CompiledContainer.php')) {
2226
unlink($filename);
2327
}
28+
29+
$this->kernelBuilder = KernelBuilder::start(
30+
realpath(__DIR__.'/../Resources/Messenger')
31+
?: throw new RuntimeException('Oops'),
32+
);
2433
}
2534

2635
public function testMessenger(): void {
2736
$transportName = 'in-memory';
2837

29-
$root = realpath(__DIR__.'/../Resources/Messenger')
30-
?: throw new RuntimeException('Oops');
31-
$container = KernelBuilder::start($root)
38+
$container = $this->kernelBuilder
3239
->useCache(__DIR__.'/../../.cache/')
3340
->register(
3441
new MessengerServiceFactory(
@@ -68,8 +75,7 @@ public function testMessenger(): void {
6875
}
6976

7077
public function testHandlersCanDependOnCommandBus(): void {
71-
$root = realpath(__DIR__.'/../Resources/Messenger') ?: throw new RuntimeException('Oops');
72-
$container = KernelBuilder::start($root)
78+
$container = $this->kernelBuilder
7379
->register(
7480
new MessengerServiceFactory(
7581
commandPath: 'src/Requeue/*Handler.php',
@@ -81,4 +87,45 @@ public function testHandlersCanDependOnCommandBus(): void {
8187
$this->expectNotToPerformAssertions();
8288
$container->get(CommandBus::class);
8389
}
90+
91+
public function testCustomTransports(): void {
92+
$transportName = 'default';
93+
94+
$defaultTransport = new InMemoryTransport();
95+
$customTransport = new InMemoryTransport();
96+
97+
$container = $this->kernelBuilder
98+
->register(
99+
new MessengerServiceFactory(
100+
commandPath: 'src/Sample/*AsyncHandler.php',
101+
queryPath: 'src/Sample/*QueryHandler.php',
102+
transports: TransportLocatorBuilder::start()
103+
->withTransport(
104+
name: $transportName,
105+
sender: InMemoryTransport::class,
106+
receiver: InMemoryTransport::class,
107+
),
108+
),
109+
)
110+
->add(
111+
[
112+
InMemoryTransport::class => $defaultTransport,
113+
CommandBusDependencies::SendersLocator->value => fn () => new Container(
114+
[
115+
$transportName => $customTransport,
116+
],
117+
),
118+
],
119+
)
120+
->build();
121+
122+
/** @var CommandBus $commandBus */
123+
$commandBus = $container->get(CommandBus::class);
124+
125+
$some = bin2hex(random_bytes(16));
126+
$commandBus->queue(new SideEffectsCommand($some), $transportName);
127+
128+
self::assertCount(0, $defaultTransport->get());
129+
self::assertCount(1, $customTransport->get());
130+
}
84131
}

0 commit comments

Comments
 (0)