diff --git a/dramatiq/actor.py b/dramatiq/actor.py index e73d69dc..36c54344 100644 --- a/dramatiq/actor.py +++ b/dramatiq/actor.py @@ -49,6 +49,7 @@ def __init__(self, fn, *, broker, actor_name, queue_name, priority, options): self.queue_name = queue_name self.priority = priority self.options = options + self.injections = {} self.broker.declare_actor(self) def message(self, *args, **kwargs): @@ -142,7 +143,7 @@ def __call__(self, *args, **kwargs): try: self.logger.debug("Received args=%r kwargs=%r.", args, kwargs) start = time.perf_counter() - return self.fn(*args, **kwargs) + return self.fn(*args, **{**self.injections, **kwargs}) finally: delta = time.perf_counter() - start self.logger.debug("Completed after %.02fms.", delta * 1000) diff --git a/dramatiq/message.py b/dramatiq/message.py index 4b65e0e1..c396e212 100644 --- a/dramatiq/message.py +++ b/dramatiq/message.py @@ -74,11 +74,14 @@ class Message(namedtuple("Message", ( """ def __new__(cls, *, queue_name, actor_name, args, kwargs, options, message_id=None, message_timestamp=None): - return super().__new__( + self = super().__new__( cls, queue_name, actor_name, tuple(args), kwargs, options, message_id=message_id or generate_unique_id(), message_timestamp=message_timestamp or int(time.time() * 1000), ) + # This is specifically not part of namedtuple so it's not part of _asdict() and such + self.injections = {} + return self def __or__(self, other) -> pipeline: """Combine this message into a pipeline with "other". diff --git a/dramatiq/worker.py b/dramatiq/worker.py index 8d6d1d71..d0b0f6d2 100644 --- a/dramatiq/worker.py +++ b/dramatiq/worker.py @@ -479,7 +479,7 @@ def process_message(self, message): res = None if not message.failed: actor = self.broker.get_actor(message.actor_name) - res = actor(*message.args, **message.kwargs) + res = actor(*message.args, **{**message.injections, **message.kwargs}) if res is not None \ and message.options.get("pipe_target") is None \ and not has_results_middleware(self.broker):