-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathPublishDomainEventsFromMutationsCommand.php
More file actions
94 lines (76 loc) · 2.93 KB
/
PublishDomainEventsFromMutationsCommand.php
File metadata and controls
94 lines (76 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
<?php
declare(strict_types=1);
namespace CodelyTv\Apps\Mooc\Backend\Command\DomainEvents;
use CodelyTv\Mooc\Courses\Infrastructure\Cdc\DatabaseMutationToCourseCreatedDomainEvent;
use CodelyTv\Shared\Domain\Bus\Event\EventBus;
use CodelyTv\Shared\Infrastructure\Cdc\DatabaseMutationAction;
use CodelyTv\Shared\Infrastructure\Cdc\DatabaseMutationToDomainEvent;
use Doctrine\ORM\EntityManager;
use Override;
use RuntimeException;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(
name: 'codely:domain-events:generate-from-mutations',
description: 'Publish domain events from mutations',
)]
final class PublishDomainEventsFromMutationsCommand extends Command
{
private array $transformers;
public function __construct(
private readonly EntityManager $entityManager,
private readonly EventBus $eventBus
) {
parent::__construct();
$this->transformers = [
'courses' => [
DatabaseMutationAction::INSERT->value => DatabaseMutationToCourseCreatedDomainEvent::class,
DatabaseMutationAction::UPDATE->value => null,
DatabaseMutationAction::DELETE->value => null,
],
];
}
#[Override]
protected function configure(): void
{
$this->addArgument('quantity', InputArgument::REQUIRED, 'Quantity of mutations to process');
}
#[Override]
protected function execute(InputInterface $input, OutputInterface $output): int
{
$totalMutations = (int) $input->getArgument('quantity');
$this->entityManager->wrapInTransaction(function (EntityManager $entityManager) use ($totalMutations): void {
$mutations = $entityManager->getConnection()
->executeQuery("SELECT * FROM mutations ORDER BY id ASC LIMIT $totalMutations FOR UPDATE")
->fetchAllAssociative();
foreach ($mutations as $mutation) {
$transformer = $this->findTransformer($mutation['table_name'], $mutation['operation']);
if ($transformer === null) {
echo sprintf("Ignoring %s %s\n", $mutation['table_name'], $mutation['operation']);
continue;
}
$domainEvents = $transformer->transform($mutation);
$this->eventBus->publish(...$domainEvents);
}
$entityManager->getConnection()->executeStatement(
sprintf('DELETE FROM mutations WHERE id IN (%s)', implode(',', array_column($mutations, 'id')))
);
});
return 0;
}
private function findTransformer(string $tableName, string $operation): ?DatabaseMutationToDomainEvent
{
if (!array_key_exists($tableName, $this->transformers) && array_key_exists(
$operation,
$this->transformers[$tableName]
)) {
throw new RuntimeException("Transformer not found for table $tableName and operation $operation");
}
/** @var class-string<DatabaseMutationToDomainEvent>|null $class */
$class = $this->transformers[$tableName][$operation];
return $class ? new $class() : null;
}
}