-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathmessagebus.py
More file actions
39 lines (30 loc) · 1.03 KB
/
messagebus.py
File metadata and controls
39 lines (30 loc) · 1.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from typing import Callable, Dict, Optional, Type
from aws_doc_sdk_examples_tools.lliam.domain import commands
from aws_doc_sdk_examples_tools.lliam.service_layer import (
create_prompts,
update_doc_gen,
run_ailly,
unit_of_work,
)
# Only handling Commands for now.
Message = commands.Command
def handle(
message: commands.Command, uow: Optional[unit_of_work.FsUnitOfWork] = None
):
queue = [message]
while queue:
message = queue.pop(0)
if isinstance(message, commands.Command):
handle_command(message, uow)
else:
raise Exception(f"{message} was not a Command")
def handle_command(
command: commands.Command, uow: Optional[unit_of_work.FsUnitOfWork]
):
handler = COMMAND_HANDLERS[type(command)]
handler(command, uow)
COMMAND_HANDLERS: Dict[Type[commands.Command], Callable] = {
commands.CreatePrompts: create_prompts.create_prompts,
commands.RunAilly: run_ailly.handle_run_ailly,
commands.UpdateReservoir: update_doc_gen.handle_update_reservoir,
}