diff --git a/.github/workflows/durabletask-azurefunctions.yml b/.github/workflows/durabletask-azurefunctions.yml new file mode 100644 index 00000000..6df274f1 --- /dev/null +++ b/.github/workflows/durabletask-azurefunctions.yml @@ -0,0 +1,67 @@ +name: Durable Task Scheduler SDK (azure-functions-durable) + +on: + push: + branches: + - "main" + tags: + - "azurefunctions-v*" # Only run for tags starting with "azurefunctions-v" + pull_request: + branches: + - "main" + +permissions: + contents: read + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.14 + uses: actions/setup-python@v5 + with: + python-version: 3.14 + - name: Install dependencies + working-directory: azure-functions-durable + run: | + python -m pip install --upgrade pip + pip install setuptools wheel tox + pip install flake8 + - name: Run flake8 Linter + working-directory: azure-functions-durable + run: flake8 . + - name: Run flake8 Linter + working-directory: tests/azure-functions-durable + run: flake8 . + + run-tests: + strategy: + fail-fast: false + matrix: + python-version: ["3.13", "3.14"] + needs: lint + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install durabletask locally + run: | + python -m pip install --upgrade pip + pip install pytest + pip install . --force-reinstall + + - name: Install azure-functions-durable locally + run: | + pip install ./azure-functions-durable --force-reinstall + + - name: Run unit tests + working-directory: tests/azure-functions-durable + run: | + pytest -m "not dts and not azurite" --verbose diff --git a/.github/workflows/typecheck.yml b/.github/workflows/typecheck.yml index f10463ad..cc0c2bdd 100644 --- a/.github/workflows/typecheck.yml +++ b/.github/workflows/typecheck.yml @@ -7,6 +7,7 @@ on: tags: - "v*" - "azuremanaged-v*" + - "azurefunctions-v*" pull_request: branches: - "main" @@ -36,3 +37,25 @@ jobs: - name: Run pyright (strict, Python 3.10) run: pyright + + pyright-azurefunctions: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python 3.13 (lowest supported by azure-functions-durable) + uses: actions/setup-python@v5 + with: + python-version: "3.13" + + - name: Install packages and dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -e ".[azure-blob-payloads,opentelemetry]" + pip install -e ./azure-functions-durable + pip install pyright + + - name: Run pyright (strict, Python 3.13) + run: pyright -p azure-functions-durable/pyrightconfig.json diff --git a/azure-functions-durable/CHANGELOG.md b/azure-functions-durable/CHANGELOG.md new file mode 100644 index 00000000..b9be1590 --- /dev/null +++ b/azure-functions-durable/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## v0.1.0 + +- Initial implementation diff --git a/azure-functions-durable/azure/durable_functions/__init__.py b/azure-functions-durable/azure/durable_functions/__init__.py new file mode 100644 index 00000000..01389b80 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .internal.functions_json import install_custom_serialization +from .decorators.durable_app import Blueprint, DFApp +from .client import DurableFunctionsClient +from .orchestrator import Orchestrator + +# Ensure the durabletask JSON encoder/decoder is replaced as soon as the +# durable_functions package is imported. +install_custom_serialization() + +# IMPORTANT: DO NOT REMOVE. `azure-functions` relies on the presence and value of this variable +# for version detection +version = "2.x" + +__all__ = ["Blueprint", "DFApp", "DurableFunctionsClient", "Orchestrator", "version"] diff --git a/azure-functions-durable/azure/durable_functions/client.py b/azure-functions-durable/azure/durable_functions/client.py new file mode 100644 index 00000000..b3f7e549 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/client.py @@ -0,0 +1,115 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json + +from datetime import timedelta +import azure.functions as func +from urllib.parse import urlparse, quote + +from durabletask.client import TaskHubGrpcClient +from .internal.azurefunctions_grpc_interceptor import AzureFunctionsDefaultClientInterceptorImpl +from .http import HttpManagementPayload + + +# Client class used for Durable Functions +class DurableFunctionsClient(TaskHubGrpcClient): + """A gRPC client passed to Durable Functions durable client bindings. + + Connects to the Durable Functions runtime using gRPC and provides methods + for creating and managing Durable orchestrations, interacting with Durable entities, + and creating HTTP management payloads and check status responses for use with Durable Functions invocations. + """ + taskHubName: str + connectionName: str + creationUrls: dict[str, str] + managementUrls: dict[str, str] + baseUrl: str + requiredQueryStringParameters: str + rpcBaseUrl: str + httpBaseUrl: str + maxGrpcMessageSizeInBytes: int + grpcHttpClientTimeout: timedelta + + def __init__(self, client_as_string: str): + """Initializes a DurableFunctionsClient instance from a JSON string. + + This string will be provided by the Durable Functions host extension upon invocation of the client trigger. + + Args: + client_as_string (str): A JSON string containing the Durable Functions client configuration. + + Raises: + json.JSONDecodeError: If the provided string is not valid JSON. + """ + self._parse_client_configuration(client_as_string) + + interceptors = [AzureFunctionsDefaultClientInterceptorImpl(self.taskHubName, self.requiredQueryStringParameters)] + + # We pass in None for the metadata so we don't construct an additional interceptor in the parent class + # Since the parent class doesn't use anything metadata for anything else, we can set it as None + super().__init__( + host_address=self.rpcBaseUrl, + secure_channel=False, + metadata=None, + interceptors=interceptors) + + def _parse_client_configuration(self, client_as_string: str) -> None: + """Parses the client configuration JSON string and sets instance variables. + + Args: + client_as_string (str): A JSON string containing the Durable Functions client configuration. + + Raises: + json.JSONDecodeError: If the provided string is not valid JSON. + """ + client = json.loads(client_as_string) + + self.taskHubName = client.get("taskHubName", "") + self.connectionName = client.get("connectionName", "") + self.creationUrls = client.get("creationUrls", {}) + self.managementUrls = client.get("managementUrls", {}) + self.baseUrl = client.get("baseUrl", "") + self.requiredQueryStringParameters = client.get("requiredQueryStringParameters", "") + self.rpcBaseUrl = client.get("rpcBaseUrl", "") + self.httpBaseUrl = client.get("httpBaseUrl", "") + self.maxGrpcMessageSizeInBytes = client.get("maxGrpcMessageSizeInBytes", 0) + # TODO: convert the string value back to timedelta - annoying regex? + self.grpcHttpClientTimeout = client.get("grpcHttpClientTimeout", timedelta(seconds=30)) + + def create_check_status_response(self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse: + """Creates an HTTP response for checking the status of a Durable Function instance. + + Args: + request (func.HttpRequest): The incoming HTTP request. + instance_id (str): The ID of the Durable Function instance. + """ + location_url = self._get_instance_status_url(request, instance_id) + return func.HttpResponse( + body=str(self._get_client_response_links(request, instance_id)), + status_code=202, + headers={ + 'content-type': 'application/json', + 'Location': location_url, + }, + ) + + def create_http_management_payload(self, request: func.HttpRequest, instance_id: str) -> HttpManagementPayload: + """Creates an HTTP management payload for a Durable Function instance. + + Args: + instance_id (str): The ID of the Durable Function instance. + """ + return self._get_client_response_links(request, instance_id) + + def _get_client_response_links(self, request: func.HttpRequest, instance_id: str) -> HttpManagementPayload: + instance_status_url = self._get_instance_status_url(request, instance_id) + return HttpManagementPayload(instance_id, instance_status_url, self.requiredQueryStringParameters) + + @staticmethod + def _get_instance_status_url(request: func.HttpRequest, instance_id: str) -> str: + request_url = urlparse(request.url) + location_url = f"{request_url.scheme}://{request_url.netloc}" + encoded_instance_id = quote(instance_id) + location_url = location_url + "/runtime/webhooks/durabletask/instances/" + encoded_instance_id + return location_url diff --git a/azure-functions-durable/azure/durable_functions/constants.py b/azure-functions-durable/azure/durable_functions/constants.py new file mode 100644 index 00000000..fbd268a7 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/constants.py @@ -0,0 +1,8 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Constants used to determine the local running context.""" +ORCHESTRATION_TRIGGER = "orchestrationTrigger" +ACTIVITY_TRIGGER = "activityTrigger" +ENTITY_TRIGGER = "entityTrigger" +DURABLE_CLIENT = "durableClient" diff --git a/azure-functions-durable/azure/durable_functions/decorators/__init__.py b/azure-functions-durable/azure/durable_functions/decorators/__init__.py new file mode 100644 index 00000000..59e481eb --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/decorators/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/azure-functions-durable/azure/durable_functions/decorators/durable_app.py b/azure-functions-durable/azure/durable_functions/decorators/durable_app.py new file mode 100644 index 00000000..4f828a11 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/decorators/durable_app.py @@ -0,0 +1,285 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from functools import wraps +from typing import Any, Callable, Optional, Union + +from azure.functions import FunctionRegister, TriggerApi, BindingApi, AuthLevel +from azure.functions.decorators.function_app import FunctionBuilder + +from durabletask import task + +from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger, \ + DurableClient +from ..worker import DurableFunctionsWorker +from ..orchestrator import Orchestrator + + +class Blueprint(TriggerApi, BindingApi): + """Durable Functions (DF) Blueprint container. + + It allows functions to be declared via trigger and binding decorators, + but does not automatically index/register these functions. + + To register these functions, utilize the `register_functions` method from any + :class:`FunctionRegister` subclass, such as `DFApp`. + """ + + def __init__(self, + http_auth_level: Union[AuthLevel, str] = AuthLevel.FUNCTION): + """Instantiate a Durable Functions app with which to register Functions. + + Parameters + ---------- + http_auth_level: Union[AuthLevel, str] + Authorization level required for Function invocation. + Defaults to AuthLevel.Function. + + Returns + ------- + DFApp + New instance of a Durable Functions app + """ + # The next-in-MRO base (``DecoratorApi.__init__``) is declared with + # untyped ``*args``/``**kwargs``, so pyright cannot see this call's type. + super().__init__(auth_level=http_auth_level) # pyright: ignore[reportUnknownMemberType] + + def _configure_orchestrator_callable( + self, + wrap: Callable[[Callable[..., Any]], FunctionBuilder] + ) -> Callable[[task.Orchestrator[Any, Any]], FunctionBuilder]: + """Obtain decorator to construct an Orchestrator class from a user-defined Function. + + Parameters + ---------- + wrap: Callable + The next decorator to be applied. + + Returns + ------- + Callable + The function to construct an Orchestrator class from the user-defined Function, + wrapped by the next decorator in the sequence. + """ + def decorator(orchestrator_func: task.Orchestrator[Any, Any]) -> FunctionBuilder: + # Construct an orchestrator based on the end-user code + + handle = Orchestrator.create(orchestrator_func) + + # invoke next decorator, with the Orchestrator as input + handle.__name__ = orchestrator_func.__name__ + return wrap(handle) + + return decorator + + def _configure_entity_callable( + self, + wrap: Callable[[Callable[..., Any]], FunctionBuilder] + ) -> Callable[[task.Entity[Any, Any]], FunctionBuilder]: + """Obtain decorator to construct an Entity class from a user-defined Function. + + Parameters + ---------- + wrap: Callable + The next decorator to be applied. + + Returns + ------- + Callable + The function to construct an Entity class from the user-defined Function, + wrapped by the next decorator in the sequence. + """ + def decorator(entity_func: task.Entity[Any, Any]) -> FunctionBuilder: + # Construct an orchestrator based on the end-user code + + # TODO: Because this handle method is the one actually exposed to the Functions SDK decorator, + # the parameter name will always be "context" here, even if the user specified a different name. + # We need to find a way to allow custom context names (like "ctx"). + def handle(context: Any) -> str: + return DurableFunctionsWorker().execute_entity_batch_request(entity_func, context) + + handle.entity_function = entity_func # pyright: ignore[reportFunctionMemberAccess] + + # invoke next decorator, with the Entity as input + handle.__name__ = entity_func.__name__ + return wrap(handle) + + return decorator + + def _add_rich_client( + self, + fb: FunctionBuilder, + parameter_name: str, + client_constructor: Callable[[Any], Any] + ) -> None: + # Obtain user-code and force type annotation on the client-binding parameter to be `str`. + # This ensures a passing type-check of that specific parameter, + # circumventing a limitation of the worker in type-checking rich DF Client objects. + # TODO: Once rich-binding type checking is possible, remove the annotation change. + # ``FunctionBuilder._function`` and ``Function._func`` are private to + # azure-functions with no public accessor for mutating the wrapped + # user function. Holding it as ``Any`` keeps the single private-access + # waiver here rather than spreading it across each ``._func`` use. + function_obj: Any = fb._function # pyright: ignore[reportPrivateUsage] + user_code = function_obj._func + user_code.__annotations__[parameter_name] = str + + # `wraps` This ensures we re-export the same method-signature as the decorated method + @wraps(user_code) + async def df_client_middleware(*args: Any, **kwargs: Any) -> Any: + + # Obtain JSON-string currently passed as DF Client, + # construct rich object from it, + # and assign parameter to that rich object + starter = kwargs[parameter_name] + client = client_constructor(starter) + kwargs[parameter_name] = client + + # Invoke user code with rich DF Client binding + return await user_code(*args, **kwargs) + + # TODO: Is there a better way to support retrieving the unwrapped user code? + df_client_middleware.client_function = function_obj._func # pyright: ignore[reportAttributeAccessIssue] + + function_obj._func = df_client_middleware + + def _build_function( + self, + wrap: Callable[[FunctionBuilder], FunctionBuilder] + ) -> Callable[[Callable[..., Any]], FunctionBuilder]: + """Typed equivalent of the base ``_configure_function_builder``. + + The inherited method is untyped, which would otherwise propagate + ``Unknown`` types through every decorator below. This mirrors its + behaviour exactly using the typed protected members it relies on. + """ + def decorator(func: Callable[..., Any]) -> FunctionBuilder: + fb = self._validate_type(func) + self._function_builders.append(fb) + return wrap(fb) + + return decorator + + def orchestration_trigger(self, context_name: str, + orchestration: Optional[str] = None + ) -> Callable[[task.Orchestrator[Any, Any]], FunctionBuilder]: + """Register an Orchestrator Function. + + Parameters + ---------- + context_name: str + Parameter name of the DurableOrchestrationContext object. + orchestration: Optional[str] + Name of Orchestrator Function. + The value is None by default, in which case the name of the method is used. + """ + @self._configure_orchestrator_callable + @self._build_function + def wrap(fb: FunctionBuilder) -> FunctionBuilder: + + def decorator() -> FunctionBuilder: + fb.add_trigger( + trigger=OrchestrationTrigger(name=context_name, + orchestration=orchestration)) + return fb + + return decorator() + + return wrap + + def activity_trigger(self, input_name: str, + activity: Optional[str] = None + ) -> Callable[[Callable[..., Any]], FunctionBuilder]: + """Register an Activity Function. + + Parameters + ---------- + input_name: str + Parameter name of the Activity input. + activity: Optional[str] + Name of Activity Function. + The value is None by default, in which case the name of the method is used. + """ + @self._build_function + def wrap(fb: FunctionBuilder) -> FunctionBuilder: + def decorator() -> FunctionBuilder: + fb.add_trigger( + trigger=ActivityTrigger(name=input_name, + activity=activity)) + return fb + + return decorator() + + return wrap + + def entity_trigger(self, + context_name: str, + entity_name: Optional[str] = None + ) -> Callable[[task.Entity[Any, Any]], FunctionBuilder]: + """Register an Entity Function. + + Parameters + ---------- + context_name: str + Parameter name of the Entity input. + entity_name: Optional[str] + Name of Entity Function. + The value is None by default, in which case the name of the method is used. + """ + @self._configure_entity_callable + @self._build_function + def wrap(fb: FunctionBuilder) -> FunctionBuilder: + def decorator() -> FunctionBuilder: + fb.add_trigger( + trigger=EntityTrigger(name=context_name, + entity_name=entity_name)) + return fb + + return decorator() + + return wrap + + def durable_client_input(self, + client_name: str, + task_hub: Optional[str] = None, + connection_name: Optional[str] = None + ) -> Callable[[Callable[..., Any]], FunctionBuilder]: + """Register a Durable-client Function. + + Parameters + ---------- + client_name: str + Parameter name of durable client. + task_hub: Optional[str] + Used in scenarios where multiple function apps share the same storage account + but need to be isolated from each other. If not specified, the default value + from host.json is used. + This value must match the value used by the target orchestrator functions. + connection_name: Optional[str] + The name of an app setting that contains a storage account connection string. + The storage account represented by this connection string must be the same one + used by the target orchestrator functions. If not specified, the default storage + account connection string for the function app is used. + """ + + @self._build_function + def wrap(fb: FunctionBuilder) -> FunctionBuilder: + def decorator() -> FunctionBuilder: + fb.add_binding( + binding=DurableClient(name=client_name, + task_hub=task_hub, + connection_name=connection_name)) + return fb + + return decorator() + + return wrap + + +class DFApp(Blueprint, FunctionRegister): + """Durable Functions (DF) app. + + Exports the decorators required to declare and index DF Function-types. + """ + + pass diff --git a/azure-functions-durable/azure/durable_functions/decorators/metadata.py b/azure-functions-durable/azure/durable_functions/decorators/metadata.py new file mode 100644 index 00000000..efe3983d --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/decorators/metadata.py @@ -0,0 +1,118 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Optional + +from ..constants import ORCHESTRATION_TRIGGER, \ + ACTIVITY_TRIGGER, ENTITY_TRIGGER, DURABLE_CLIENT +from azure.functions.decorators.core import Trigger, InputBinding + + +class OrchestrationTrigger(Trigger): + """OrchestrationTrigger. + + Trigger representing an Orchestration Function. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ORCHESTRATION_TRIGGER + + def __init__(self, + name: str, + orchestration: Optional[str] = None, + durable_requires_grpc: bool = True, + ) -> None: + self.orchestration = orchestration + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) + + +class ActivityTrigger(Trigger): + """ActivityTrigger. + + Trigger representing a Durable Functions Activity. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ACTIVITY_TRIGGER + + def __init__(self, + name: str, + activity: Optional[str] = None, + durable_requires_grpc: bool = True, + ) -> None: + self.activity = activity + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) + + +class EntityTrigger(Trigger): + """EntityTrigger. + + Trigger representing an Entity Function. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ENTITY_TRIGGER + + def __init__(self, + name: str, + entity_name: Optional[str] = None, + durable_requires_grpc: bool = True, + ) -> None: + self.entity_name = entity_name + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) + + +class DurableClient(InputBinding): + """DurableClient. + + Binding representing a Durable-client object. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this Binding, as a string. + + Returns + ------- + str + The string representation of this binding. + """ + return DURABLE_CLIENT + + def __init__(self, + name: str, + task_hub: Optional[str] = None, + connection_name: Optional[str] = None, + durable_requires_grpc: bool = True, + ) -> None: + self.task_hub = task_hub + self.connection_name = connection_name + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) diff --git a/azure-functions-durable/azure/durable_functions/http/__init__.py b/azure-functions-durable/azure/durable_functions/http/__init__.py new file mode 100644 index 00000000..b4d2c355 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/http/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from ..http.http_management_payload import HttpManagementPayload + +__all__ = ["HttpManagementPayload"] diff --git a/azure-functions-durable/azure/durable_functions/http/http_management_payload.py b/azure-functions-durable/azure/durable_functions/http/http_management_payload.py new file mode 100644 index 00000000..a6836844 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/http/http_management_payload.py @@ -0,0 +1,34 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json + + +class HttpManagementPayload: + """A class representing the HTTP management payload for a Durable Function orchestration instance. + + Contains URLs for managing the instance, such as querying status, + sending events, terminating, restarting, etc. + """ + + def __init__(self, instance_id: str, instance_status_url: str, required_query_string_parameters: str): + """Initializes the HttpManagementPayload with the necessary URLs. + + Args: + instance_id (str): The ID of the Durable Function instance. + instance_status_url (str): The base URL for the instance status. + required_query_string_parameters (str): The required URL parameters provided by the Durable extension. + """ + self.urls = { + 'id': instance_id, + 'purgeHistoryDeleteUri': instance_status_url + "?" + required_query_string_parameters, + 'restartPostUri': instance_status_url + "/restart?" + required_query_string_parameters, + 'sendEventPostUri': instance_status_url + "/raiseEvent/{eventName}?" + required_query_string_parameters, + 'statusQueryGetUri': instance_status_url + "?" + required_query_string_parameters, + 'terminatePostUri': instance_status_url + "/terminate?reason={text}&" + required_query_string_parameters, + 'resumePostUri': instance_status_url + "/resume?reason={text}&" + required_query_string_parameters, + 'suspendPostUri': instance_status_url + "/suspend?reason={text}&" + required_query_string_parameters + } + + def __str__(self): + return json.dumps(self.urls) diff --git a/azure-functions-durable/azure/durable_functions/internal/__init__.py b/azure-functions-durable/azure/durable_functions/internal/__init__.py new file mode 100644 index 00000000..59e481eb --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/azure-functions-durable/azure/durable_functions/internal/azurefunctions_grpc_interceptor.py b/azure-functions-durable/azure/durable_functions/internal/azurefunctions_grpc_interceptor.py new file mode 100644 index 00000000..8736bf6f --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/azurefunctions_grpc_interceptor.py @@ -0,0 +1,27 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from importlib.metadata import version + +from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl + + +class AzureFunctionsDefaultClientInterceptorImpl(DefaultClientInterceptorImpl): + """The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, + StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an + interceptor to add additional headers to all calls as needed.""" + required_query_string_parameters: str + + def __init__(self, taskhub_name: str, required_query_string_parameters: str): + self.required_query_string_parameters = required_query_string_parameters + try: + # Get the version of the azurefunctions package + sdk_version = version('durabletask-azurefunctions') + except Exception: + # Fallback if version cannot be determined + sdk_version = "unknown" + user_agent = f"durabletask-python/{sdk_version}" + self._metadata = [ + ("taskhub", taskhub_name), + ("x-user-agent", user_agent)] # 'user-agent' is a reserved header in grpc, so we use 'x-user-agent' instead + super().__init__(self._metadata) diff --git a/azure-functions-durable/azure/durable_functions/internal/azurefunctions_null_stub.py b/azure-functions-durable/azure/durable_functions/internal/azurefunctions_null_stub.py new file mode 100644 index 00000000..af8593d1 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/azurefunctions_null_stub.py @@ -0,0 +1,23 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from typing import Any, Callable + + +class AzureFunctionsNullStub: + """A task hub sidecar stub whose every method is a no-op. + + Instances structurally satisfy the methods of + ``ProtoTaskHubSidecarServiceStub`` without inheriting from that + ``Protocol`` (a ``Protocol`` subclass cannot be instantiated). Any + attribute access resolves to a callable that ignores its arguments and + returns ``None``, which is sufficient because the Azure Functions worker + replaces the relevant completion callbacks before invoking the base + worker logic. + """ + + def __getattr__(self, name: str) -> Callable[..., None]: + def _noop(*args: Any, **kwargs: Any) -> None: + return None + + return _noop diff --git a/azure-functions-durable/azure/durable_functions/internal/functions_json.py b/azure-functions-durable/azure/durable_functions/internal/functions_json.py new file mode 100644 index 00000000..383e255d --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/internal/functions_json.py @@ -0,0 +1,37 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import importlib +import json +from typing import Any, Callable + +from durabletask.internal import shared + +# ``azure.functions`` only exposes its custom-object (de)serialization helpers +# from a private module, and they are untyped. Resolve them dynamically and +# bind them to locally-typed callables so the rest of the module stays fully +# type-checked. +_df_serializers = importlib.import_module("azure.functions._durable_functions") +_serialize_custom_object: Callable[[Any], Any] = getattr( + _df_serializers, "_serialize_custom_object") +_deserialize_custom_object: Callable[[dict[str, Any]], Any] = getattr( + _df_serializers, "_deserialize_custom_object") + + +def _to_json(obj: Any) -> str: + return json.dumps(obj, default=_serialize_custom_object) + + +def _from_json(json_str: str | bytes | bytearray) -> Any: + return json.loads(json_str, object_hook=_deserialize_custom_object) + + +def install_custom_serialization() -> None: + """Replace durabletask's global JSON (de)serialization helpers. + + Routes ``durabletask`` payload serialization through azure-functions' + custom-object (de)serializers so that user types round-trip consistently + between the Functions host and the durabletask runtime. + """ + shared.to_json = _to_json + shared.from_json = _from_json diff --git a/azure-functions-durable/azure/durable_functions/orchestrator.py b/azure-functions-durable/azure/durable_functions/orchestrator.py new file mode 100644 index 00000000..168ee61e --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/orchestrator.py @@ -0,0 +1,68 @@ +"""Durable Orchestrator. + +Responsible for orchestrating the execution of the user defined generator +function. +""" +from typing import Any, Callable, Generator + +from durabletask.task import OrchestrationContext + +from .worker import DurableFunctionsWorker + + +class Orchestrator: + """Durable Orchestration Class. + + Responsible for orchestrating the execution of the user defined generator + function. + """ + + def __init__(self, + activity_func: Callable[[OrchestrationContext, Any], Generator[Any, Any, Any]]): + """Create a new orchestrator for the user defined generator. + + Responsible for orchestrating the execution of the user defined + generator function. + :param activity_func: Generator function to orchestrate. + """ + self.fn: Callable[[OrchestrationContext, Any], Generator[Any, Any, Any]] = activity_func + + def handle(self, context: OrchestrationContext) -> str: + """Handle the orchestration of the user defined generator function. + + Parameters + ---------- + context : DurableOrchestrationContext + The DF orchestration context + + Returns + ------- + str + The JSON-formatted string representing the user's orchestration + state after this invocation + """ + self.durable_context = context + return DurableFunctionsWorker().execute_orchestration_request(self.fn, context) + + @classmethod + def create(cls, fn: Callable[[OrchestrationContext, Any], Generator[Any, Any, Any]]) \ + -> Callable[[Any], str]: + """Create an instance of the orchestration class. + + Parameters + ---------- + fn: Callable[[DurableOrchestrationContext], Iterator[Any]] + Generator function that needs orchestration + + Returns + ------- + Callable[[Any], str] + Handle function of the newly created orchestration client + """ + + def handle(context: Any) -> str: + return Orchestrator(fn).handle(context) + + handle.orchestrator_function = fn # pyright: ignore[reportFunctionMemberAccess] + + return handle diff --git a/azure-functions-durable/azure/durable_functions/worker.py b/azure-functions-durable/azure/durable_functions/worker.py new file mode 100644 index 00000000..b17f0c16 --- /dev/null +++ b/azure-functions-durable/azure/durable_functions/worker.py @@ -0,0 +1,92 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import base64 +from typing import Any, Optional + +from durabletask import task +from durabletask.internal.orchestrator_service_pb2 import ( + EntityBatchRequest, + EntityBatchResult, + HistoryEvent, + OrchestratorRequest, + OrchestratorResponse, +) +from durabletask.worker import TaskHubGrpcWorker +from .internal.azurefunctions_null_stub import AzureFunctionsNullStub + + +# Worker class used for Durable Task Scheduler (DTS) +class DurableFunctionsWorker(TaskHubGrpcWorker): + """A worker that can execute orchestrator and entity functions in the context of Azure Functions. + + Used internally by the Durable Functions Python SDK, and should not be visible to functionapps directly. + See TaskHubGrpcWorker for base class documentation. + """ + + def __init__(self) -> None: + # We never start the worker loop or open a gRPC channel. The base + # constructor only initialises in-memory state (registry, logger, + # concurrency options, payload store, etc.) that the inherited + # ``_execute_*`` methods rely on; work items are delivered directly by + # the methods below rather than streamed from a sidecar. + super().__init__() + + def add_named_orchestrator(self, name: str, func: task.Orchestrator[Any, Any]) -> None: + self._registry.add_named_orchestrator(name, func) + + def execute_orchestration_request(self, func: task.Orchestrator[Any, Any], context: Any) -> str: + context_body = getattr(context, "body", None) + if context_body is None: + context_body = context + orchestration_context = context_body + request = OrchestratorRequest() + request.ParseFromString(base64.b64decode(orchestration_context)) + stub: Any = AzureFunctionsNullStub() + response: Optional[OrchestratorResponse] = None + + def stub_complete(stub_response: OrchestratorResponse) -> None: + nonlocal response + response = stub_response + stub.CompleteOrchestratorTask = stub_complete + execution_started_events: list[HistoryEvent] = [] + for e in request.pastEvents: + if e.HasField("executionStarted"): + execution_started_events.append(e) + for e in request.newEvents: + if e.HasField("executionStarted"): + execution_started_events.append(e) + if len(execution_started_events) == 0: + raise Exception("No ExecutionStarted event found in orchestration request.") + + function_name = execution_started_events[-1].executionStarted.name + self.add_named_orchestrator(function_name, func) + super()._execute_orchestrator(request, stub, None) + + if response is None: + raise Exception("Orchestrator execution did not produce a response.") + # The Python worker returns the input as type "json", so double-encoding is necessary + return base64.b64encode(response.SerializeToString()).decode('utf-8') + + def execute_entity_batch_request(self, func: task.Entity[Any, Any], context: Any) -> str: + context_body = getattr(context, "body", None) + if context_body is None: + context_body = context + orchestration_context = context_body + request = EntityBatchRequest() + request.ParseFromString(base64.b64decode(orchestration_context)) + stub: Any = AzureFunctionsNullStub() + response: Optional[EntityBatchResult] = None + + def stub_complete(stub_response: EntityBatchResult) -> None: + nonlocal response + response = stub_response + stub.CompleteEntityTask = stub_complete + + self.add_entity(func) + super()._execute_entity_batch(request, stub, None) + + if response is None: + raise Exception("Entity execution did not produce a response.") + # The Python worker returns the input as type "json", so double-encoding is necessary + return base64.b64encode(response.SerializeToString()).decode('utf-8') diff --git a/azure-functions-durable/pyproject.toml b/azure-functions-durable/pyproject.toml new file mode 100644 index 00000000..46faa264 --- /dev/null +++ b/azure-functions-durable/pyproject.toml @@ -0,0 +1,43 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# For more information on pyproject.toml, see https://peps.python.org/pep-0621/ + +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "azure-functions-durable" +version = "2.0.0dev0" +description = "Durable Task Python SDK provider implementation for Durable Azure Functions" +keywords = [ + "durable", + "task", + "workflow", + "azure", + "azure functions" +] +classifiers = [ + "Development Status :: 3 - Alpha", + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", +] +requires-python = ">=3.13" +license = {file = "LICENSE"} +readme = "README.md" +dependencies = [ + "durabletask>=1.2.0dev0", + "azure-identity>=1.19.0", + "azure-functions>=1.25.0b3.dev1" +] + +[project.urls] +repository = "https://github.com/microsoft/durabletask-python" +changelog = "https://github.com/microsoft/durabletask-python/blob/main/CHANGELOG.md" + +[tool.setuptools.packages.find] +include = ["azure.durable_functions", "azure.durable_functions.*"] + +[tool.pytest.ini_options] +minversion = "6.0" diff --git a/azure-functions-durable/pyrightconfig.json b/azure-functions-durable/pyrightconfig.json new file mode 100644 index 00000000..fc3affe5 --- /dev/null +++ b/azure-functions-durable/pyrightconfig.json @@ -0,0 +1,16 @@ +{ + "include": [ + "azure" + ], + "extraPaths": [ + ".." + ], + "exclude": [ + "**/__pycache__", + "**/.venv*", + ".venv*", + "build" + ], + "pythonVersion": "3.13", + "typeCheckingMode": "strict" +} diff --git a/durabletask/worker.py b/durabletask/worker.py index aff8f4f1..7397e94e 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1383,8 +1383,7 @@ def _execute_entity_batch( stub.CompleteEntityTask(batch_result) except Exception as ex: self._logger.exception( - f"Failed to deliver entity response for '{entity_instance_id}' of orchestration ID '{instance_id}' to sidecar: {ex}" - ) + f"Failed to deliver entity response for '{entity_instance_id}' of orchestration ID '{instance_id}' to sidecar: {ex}") # TODO: Reset context @@ -1811,6 +1810,7 @@ def call_entity_function_helper( raise RuntimeError(error_message) encoded_input = shared.to_json(input) if input is not None else None + action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input, self.new_uuid()) self._pending_actions[id] = action @@ -2648,6 +2648,16 @@ def _handle_entity_event_raised(self, if not ph.is_empty(event.eventRaised.input): # TODO: Investigate why the event result is wrapped in a dict with "result" key result = shared.from_json(event.eventRaised.input.value)["result"] + # The result here is double-encoded somewhere, so we need to decode it again. This does not happen + # with entityOperationCompleted, so it's either part of the event entity messaging protocol in Core, + # or something done by the WebJobs extension. + if result and isinstance(result, str): + try: + result = shared.from_json(result) + except Exception as ex: + self._logger.warning(f"{ctx.instance_id}: Could not deserialize entity operation result to object " + f"for entity '{entity_id}', defaulting to encoded string." + f"Decode error: {ex}") if is_lock_event: ctx._entity_context.complete_acquire(event.eventRaised.name) # pyright: ignore[reportPrivateUsage] entity_task.complete(EntityLock(ctx)) diff --git a/eng/ci/release.yml b/eng/ci/release.yml index 7b58c7fd..335e58ee 100644 --- a/eng/ci/release.yml +++ b/eng/ci/release.yml @@ -90,3 +90,35 @@ extends: serviceendpointurl: "https://api.esrp.microsoft.com" mainpublisher: "durabletask-java" domaintenantid: "33e01921-4d64-4f8c-a055-5bdaffd5e33d" + + - job: azure_functions_durable + displayName: "Release azure-functions-durable" + templateContext: + type: releaseJob + isProduction: true + environment: durabletask-pypi-prod + inputs: + - input: pipelineArtifact + pipeline: DurableTaskPythonBuildPipeline + artifactName: drop + targetPath: $(System.DefaultWorkingDirectory)/drop + + steps: + - task: SFP.release-tasks.custom-build-release-task.EsrpRelease@9 + displayName: "ESRP Release azure-functions-durable" + inputs: + connectedservicename: "dtfx-internal-esrp-prod" + usemanagedidentity: true + keyvaultname: "durable-esrp-akv" + signcertname: "dts-esrp-cert" + clientid: "0b3ed1a4-0727-4a50-b82a-02c2bd9dec89" + intent: "PackageDistribution" + contenttype: "PyPi" + contentsource: "Folder" + folderlocation: "$(System.DefaultWorkingDirectory)/drop/buildoutputs/azure-functions-durable" + waitforreleasecompletion: true + owners: $(Build.RequestedForEmail) + approvers: $(Build.RequestedForEmail) + serviceendpointurl: "https://api.esrp.microsoft.com" + mainpublisher: "durabletask-java" + domaintenantid: "33e01921-4d64-4f8c-a055-5bdaffd5e33d" diff --git a/eng/templates/build.yml b/eng/templates/build.yml index 498ba942..c2294b04 100644 --- a/eng/templates/build.yml +++ b/eng/templates/build.yml @@ -13,9 +13,9 @@ jobs: - checkout: self - task: UsePythonVersion@0 - displayName: "Use Python 3.12" + displayName: "Use Python 3.13" inputs: - versionSpec: "3.12" + versionSpec: "3.13" addToPath: true # The 1ES pool is network-isolated, so direct pypi.org access is blocked. @@ -45,6 +45,11 @@ jobs: displayName: "flake8: durabletask-azuremanaged" workingDirectory: durabletask-azuremanaged + # Lint azurefunctions provider + - script: flake8 . + displayName: "flake8: azure-functions-durable" + workingDirectory: azure-functions-durable + # Build sdist + wheel for durabletask (core SDK) - script: | python -m build --sdist --wheel --outdir $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask . @@ -55,10 +60,16 @@ jobs: python -m build --sdist --wheel --outdir $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask-azuremanaged ./durabletask-azuremanaged displayName: "Build durabletask-azuremanaged (sdist + wheel)" + # Build sdist + wheel for azure-functions-durable + - script: | + python -m build --sdist --wheel --outdir $(Build.ArtifactStagingDirectory)/buildoutputs/azure-functions-durable ./azure-functions-durable + displayName: "Build azure-functions-durable (sdist + wheel)" + # List staged outputs for visibility in logs - script: | ls -la $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask ls -la $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask-azuremanaged + ls -la $(Build.ArtifactStagingDirectory)/buildoutputs/azure-functions-durable displayName: "List build outputs" # Install the built wheels with all declared optional extras and let @@ -89,8 +100,10 @@ jobs: # append the extras correctly. DT_WHEEL=$(ls $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask/*.whl) DT_AM_WHEEL=$(ls $(Build.ArtifactStagingDirectory)/buildoutputs/durabletask-azuremanaged/*.whl) + AF_WHEEL=$(ls $(Build.ArtifactStagingDirectory)/buildoutputs/azure-functions-durable/*.whl) python -m pip install "${DT_WHEEL}[opentelemetry,azure-blob-payloads]" python -m pip install "${DT_AM_WHEEL}[azure-blob-payloads]" + python -m pip install "${AF_WHEEL}" displayName: "Install built wheels" - script: pytest -m "not dts and not azurite" --verbose @@ -104,3 +117,12 @@ jobs: set -e python -P -c "import durabletask.azuremanaged; from durabletask.azuremanaged.client import DurableTaskSchedulerClient; from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker; print('durabletask.azuremanaged smoke import OK')" displayName: "smoke import: durabletask-azuremanaged" + + # azure-functions-durable unit tests run here. Integration tests that + # require Azurite or the Azure Functions host emulator are marked + # (azurite / dts) and excluded since those external services aren't + # provisioned in this network-isolated pool. The full suite runs in + # GitHub Actions on PRs to main and main itself. + - script: pytest -m "not dts and not azurite" --verbose + displayName: "pytest: azure-functions-durable (unit tests, no emulators)" + workingDirectory: tests/azure-functions-durable diff --git a/requirements.txt b/requirements.txt index ee1cad9c..61b4ca6b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ pytest pytest-asyncio pytest-cov azure-identity +azure-functions asyncio packaging opentelemetry-api diff --git a/tests/azure-functions-durable/__init__.py b/tests/azure-functions-durable/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/azure-functions-durable/test_smoke.py b/tests/azure-functions-durable/test_smoke.py new file mode 100644 index 00000000..1046663e --- /dev/null +++ b/tests/azure-functions-durable/test_smoke.py @@ -0,0 +1,20 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import azure.durable_functions as df + + +def test_public_api_is_importable(): + """Smoke test: the package imports and exposes its public API. + + This is a no-op placeholder establishing the unit-test structure for the + azure-functions-durable module. Real unit tests should be added alongside + it; integration tests that require Azurite or the Azure Functions host + emulator should be marked (e.g. ``azurite``) so they can be excluded on + the network-isolated ADO build pool. + """ + assert df.version + assert df.DFApp is not None + assert df.Blueprint is not None + assert df.DurableFunctionsClient is not None + assert df.Orchestrator is not None