44import json
55
66from datetime import timedelta
7- from typing import Any , Optional
87import azure .functions as func
9- from urllib .parse import urlparse , quote
8+ from urllib .parse import urlparse , urljoin , quote
109
11- from durabletask .entities import EntityInstanceId
1210from durabletask .client import TaskHubGrpcClient
13- from durabletask .azurefunctions .internal .azurefunctions_grpc_interceptor import AzureFunctionsDefaultClientInterceptorImpl
14- from durabletask .azurefunctions .http import HttpManagementPayload
11+ from .internal .azurefunctions_grpc_interceptor import AzureFunctionsDefaultClientInterceptorImpl
12+ from .http import HttpManagementPayload
13+ import requests
1514
1615
1716# Client class used for Durable Functions
@@ -38,6 +37,31 @@ def __init__(self, client_as_string: str):
3837
3938 This string will be provided by the Durable Functions host extension upon invocation of the client trigger.
4039
40+ Args:
41+ client_as_string (str): A JSON string containing the Durable Functions client configuration.
42+
43+ Raises:
44+ json.JSONDecodeError: If the provided string is not valid JSON.
45+ """
46+ self ._parse_client_configuration (client_as_string )
47+ if self .httpBaseUrl is None :
48+ # This happens when the extension has not been configured for gRPC yet. For some reason, instead of
49+ # the client returning with null rpcBaseUrl and httpBaseUrl, it returns rpcBaseUrl with the http url.
50+ self .configure_extension_for_grpc ()
51+
52+ interceptors = [AzureFunctionsDefaultClientInterceptorImpl (self .taskHubName , self .requiredQueryStringParameters )]
53+
54+ # We pass in None for the metadata so we don't construct an additional interceptor in the parent class
55+ # Since the parent class doesn't use anything metadata for anything else, we can set it as None
56+ super ().__init__ (
57+ host_address = self .rpcBaseUrl ,
58+ secure_channel = False ,
59+ metadata = None ,
60+ interceptors = interceptors )
61+
62+ def _parse_client_configuration (self , client_as_string : str ) -> None :
63+ """Parses the client configuration JSON string and sets instance variables.
64+
4165 Args:
4266 client_as_string (str): A JSON string containing the Durable Functions client configuration.
4367
@@ -57,15 +81,31 @@ def __init__(self, client_as_string: str):
5781 self .maxGrpcMessageSizeInBytes = client .get ("maxGrpcMessageSizeInBytes" , 0 )
5882 # TODO: convert the string value back to timedelta - annoying regex?
5983 self .grpcHttpClientTimeout = client .get ("grpcHttpClientTimeout" , timedelta (seconds = 30 ))
60- interceptors = [AzureFunctionsDefaultClientInterceptorImpl (self .taskHubName , self .requiredQueryStringParameters )]
6184
62- # We pass in None for the metadata so we don't construct an additional interceptor in the parent class
63- # Since the parent class doesn't use anything metadata for anything else, we can set it as None
64- super ().__init__ (
65- host_address = self .rpcBaseUrl ,
66- secure_channel = False ,
67- metadata = None ,
68- interceptors = interceptors )
85+ def configure_extension_for_grpc (self ) -> None :
86+ """Configures the Durable Functions extension for gRPC communication.
87+
88+ Makes an HTTP request to the extension's management endpoint to enable gRPC.
89+ """
90+
91+ # Make an HTTP request to the extension to configure gRPC
92+ configure_base_url = self .httpBaseUrl
93+ if not configure_base_url :
94+ # For some reason, in the "bad" case when rpc has not been configured, the httpBaseUrl is empty and sent in rpcBaseUrl
95+ configure_base_url = self .rpcBaseUrl
96+ # configure_base_url = urlparse(configure_base_url)
97+ # url = f"{configure_base_url.scheme}://{configure_base_url.netloc}/management/configureGrpc"
98+ url = urljoin (configure_base_url , "management/configureGrpc" )
99+ params = {
100+ "taskHubName" : self .taskHubName ,
101+ "connectionName" : self .connectionName
102+ }
103+ response = requests .get (url , params = params )
104+ if response .status_code != 200 :
105+ raise Exception (f"Failed to configure gRPC for Durable Functions extension. Status code: { response .status_code } , Response: { response .text } " )
106+
107+ # Parse the response to update client configuration - it's double-encoded so we need to load it twice
108+ self ._parse_client_configuration (json .loads (response .text ))
69109
70110 def create_check_status_response (self , request : func .HttpRequest , instance_id : str ) -> func .HttpResponse :
71111 """Creates an HTTP response for checking the status of a Durable Function instance.
0 commit comments