Skip to content

Commit ed4ff18

Browse files
authored
Python: [Breaking] Additional bug fix for declarative workflows (#6489)
* Fix declarative object parsing bug * Remove unnecessary comment * Address PR comments * Address PR comments. * Fix CI failures. * declarative action approval bugfix * Address PR comments * Inlined single use variables.
1 parent 0f483fa commit ed4ff18

7 files changed

Lines changed: 601 additions & 303 deletions

File tree

python/packages/declarative/agent_framework_declarative/_workflows/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,10 @@
7676
from ._executors_tools import (
7777
FUNCTION_TOOL_REGISTRY_KEY,
7878
TOOL_ACTION_EXECUTORS,
79-
TOOL_APPROVAL_STATE_KEY,
8079
BaseToolExecutor,
8180
InvokeFunctionToolExecutor,
8281
ToolApprovalRequest,
8382
ToolApprovalResponse,
84-
ToolApprovalState,
8583
ToolInvocationResult,
8684
)
8785
from ._factory import WorkflowFactory
@@ -111,7 +109,6 @@
111109
"HTTP_ACTION_EXECUTORS",
112110
"MCP_ACTION_EXECUTORS",
113111
"TOOL_ACTION_EXECUTORS",
114-
"TOOL_APPROVAL_STATE_KEY",
115112
"TOOL_REGISTRY_KEY",
116113
"ActionComplete",
117114
"ActionTrigger",
@@ -164,7 +161,6 @@
164161
"SetVariableExecutor",
165162
"ToolApprovalRequest",
166163
"ToolApprovalResponse",
167-
"ToolApprovalState",
168164
"ToolInvocationResult",
169165
"WorkflowFactory",
170166
"WorkflowState",

python/packages/declarative/agent_framework_declarative/_workflows/_executors_mcp.py

Lines changed: 51 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,11 @@
1010
1111
Security notes:
1212
13-
- The executor never echoes header VALUES (auth tokens, API keys) into the
14-
approval request — only header NAMES are surfaced to the caller. This
15-
matches the security posture of :mod:`._executors_http` (which never logs
16-
request headers either) and prevents secrets from leaking through workflow
17-
events that are typically observable to operators / UIs.
18-
- ``_MCPToolApprovalState`` snapshots the EVALUATED values for non-secret
19-
fields (server URL, tool name, arguments) at approval-request time so that
20-
subsequent state mutations cannot make the executor "approve X then call
21-
Y". Headers are stored as the raw expression strings (not evaluated values)
22-
so secrets are not persisted in the workflow's checkpoint state. They are
23-
re-evaluated on resume.
13+
- Approval requests surface header NAMES only; header values are not echoed,
14+
matching the posture of :mod:`._executors_http`.
15+
- :class:`MCPToolApprovalRequest` carries the values the resume handler will
16+
use; header values are re-evaluated on resume to keep secrets out of
17+
checkpoint state.
2418
- Tool outputs flow back into agent conversations through ``conversationId``
2519
and through Tool-role messages emitted to ``output.messages``. They share
2620
the same prompt-injection risk surface as ``HttpRequestAction``: workflow
@@ -60,8 +54,6 @@
6054

6155
logger = logging.getLogger(__name__)
6256

63-
_MCP_APPROVAL_STATE_KEY = "_mcp_tool_approval_state"
64-
6557

6658
# ---------------------------------------------------------------------------
6759
# Request / state types
@@ -72,20 +64,16 @@
7264
class MCPToolApprovalRequest:
7365
"""Approval request emitted before invoking an MCP tool.
7466
75-
Mirrors :class:`agent_framework_declarative.ToolApprovalRequest` but for
76-
MCP-style invocations. Only header NAMES are surfaced — header values are
77-
intentionally omitted because they typically carry authentication
78-
secrets.
79-
8067
Attributes:
81-
request_id: Unique identifier for this approval request. Matches the
82-
id workflow event-emitters use.
83-
tool_name: Evaluated name of the tool to be invoked.
68+
request_id: Identifier matching the framework's pending-request key.
69+
tool_name: Evaluated tool name.
8470
server_url: Evaluated MCP server URL.
85-
server_label: Optional human-readable label for diagnostics.
86-
arguments: Evaluated arguments to be forwarded to the tool.
87-
header_names: Sorted list of outbound header names (no values). Empty
88-
when no headers are configured.
71+
server_label: Optional human-readable label.
72+
arguments: Evaluated tool arguments.
73+
header_names: Outbound header names (values withheld).
74+
connection_name: Connection identifier the invocation will use.
75+
metadata: Internal routing data pinned at approval-request time
76+
(e.g. ``conversation_id``) for use by the resume handler.
8977
"""
9078

9179
request_id: str
@@ -94,50 +82,24 @@ class MCPToolApprovalRequest:
9482
server_label: str | None
9583
arguments: dict[str, Any]
9684
header_names: list[str] = field(default_factory=lambda: [])
97-
98-
99-
@dataclass
100-
class _MCPToolApprovalState:
101-
"""Internal state saved during the approval yield for resumption.
102-
103-
Stores **evaluated** values for non-secret fields to prevent
104-
"approve X / execute Y" attacks. Stores the raw expression string for
105-
``headers`` so that secret values are NOT persisted in checkpoint state;
106-
the expressions are re-evaluated against current state on resume.
107-
"""
108-
109-
server_url: str
110-
tool_name: str
111-
server_label: str | None
112-
arguments: dict[str, Any]
113-
connection_name: str | None
114-
headers_def: Any
115-
auto_send: bool
116-
conversation_id_expr: str | None
117-
output_messages_path: str | None
118-
output_result_path: str | None
85+
connection_name: str | None = None
86+
metadata: dict[str, Any] = field(default_factory=lambda: {})
11987

12088

12189
# ---------------------------------------------------------------------------
12290
# Helpers
12391
# ---------------------------------------------------------------------------
12492

12593

126-
def _get_messages_path(state: DeclarativeWorkflowState, conversation_id_expr: str | None) -> str | None:
127-
"""Return the configured conversation messages path, if any.
128-
129-
Returns ``System.conversations.{evaluated_id}.messages`` when a
130-
``conversation_id_expr`` is configured and evaluates to a non-empty value.
131-
Returns ``None`` when no conversation id expression is configured or when
132-
the expression evaluates to ``None`` or an empty string (mirrors .NET
133-
``GetConversationId`` behaviour).
134-
"""
135-
if not conversation_id_expr:
94+
def _evaluate_conversation_id(state: DeclarativeWorkflowState, conversation_id_expr: Any) -> str | None:
95+
"""Return the evaluated ``conversationId`` string, or None when empty/unset."""
96+
if not isinstance(conversation_id_expr, str) or not conversation_id_expr:
13697
return None
13798
evaluated = state.eval_if_expression(conversation_id_expr)
138-
if evaluated is None or (isinstance(evaluated, str) and not evaluated):
99+
if evaluated is None:
139100
return None
140-
return f"System.conversations.{evaluated}.messages"
101+
text = str(evaluated)
102+
return text or None
141103

142104

143105
def _get_output_path(action_def: Mapping[str, Any], key: str) -> str | None:
@@ -260,27 +222,16 @@ async def handle_action(
260222

261223
if require_approval:
262224
request_id = str(uuid.uuid4())
263-
approval_state = _MCPToolApprovalState(
264-
server_url=server_url,
265-
tool_name=tool_name,
266-
server_label=server_label,
267-
arguments=arguments,
268-
connection_name=connection_name,
269-
headers_def=self._action_def.get("headers"),
270-
auto_send=auto_send,
271-
conversation_id_expr=conversation_id_expr if isinstance(conversation_id_expr, str) else None,
272-
output_messages_path=output_messages_path,
273-
output_result_path=output_result_path,
274-
)
275-
ctx.state.set(self._approval_key(), approval_state)
276-
225+
conversation_id = _evaluate_conversation_id(state, conversation_id_expr)
277226
request = MCPToolApprovalRequest(
278227
request_id=request_id,
279228
tool_name=tool_name,
280229
server_url=server_url,
281230
server_label=server_label,
282231
arguments=arguments,
283232
header_names=sorted(headers.keys()),
233+
connection_name=connection_name,
234+
metadata={"conversation_id": conversation_id},
284235
)
285236
logger.info(
286237
"%s: requesting approval for MCP tool '%s' on '%s'",
@@ -289,7 +240,6 @@ async def handle_action(
289240
server_url,
290241
)
291242
await ctx.request_info(request, ToolApprovalResponse, request_id=request_id)
292-
# Workflow yields here — resume in handle_approval_response.
293243
return
294244

295245
# No approval required - invoke directly.
@@ -307,7 +257,7 @@ async def handle_action(
307257
state=state,
308258
result=result,
309259
auto_send=auto_send,
310-
conversation_id_expr=conversation_id_expr if isinstance(conversation_id_expr, str) else None,
260+
conversation_id=_evaluate_conversation_id(state, conversation_id_expr),
311261
output_messages_path=output_messages_path,
312262
output_result_path=output_result_path,
313263
)
@@ -322,54 +272,46 @@ async def handle_approval_response(
322272
response: ToolApprovalResponse,
323273
ctx: WorkflowContext[ActionComplete, str],
324274
) -> None:
325-
"""Resume after the workflow yielded for an approval request."""
275+
"""Resume the invocation using the values pinned on ``original_request``."""
326276
state = self._get_state(ctx.state)
327-
approval_key = self._approval_key()
328277

329-
try:
330-
approval_state: _MCPToolApprovalState = ctx.state.get(approval_key)
331-
except KeyError:
332-
logger.error("%s: approval state missing for executor '%s'", self.__class__.__name__, self.id)
333-
await ctx.send_message(ActionComplete())
334-
return
335-
try:
336-
ctx.state.delete(approval_key)
337-
except KeyError:
338-
logger.warning("%s: approval state already deleted for '%s'", self.__class__.__name__, self.id)
278+
tool_name = original_request.tool_name
279+
metadata: dict[str, Any] = getattr(original_request, "metadata", None) or {}
280+
raw_conversation_id = metadata.get("conversation_id")
281+
conversation_id = raw_conversation_id if isinstance(raw_conversation_id, str) and raw_conversation_id else None
282+
283+
auto_send = self._get_auto_send(state)
284+
output_messages_path = _get_output_path(self._action_def, "messages")
285+
output_result_path = _get_output_path(self._action_def, "result")
339286

340287
if not response.approved:
341288
logger.info(
342289
"%s: MCP tool '%s' rejected: %s",
343290
self.__class__.__name__,
344-
approval_state.tool_name,
291+
tool_name,
345292
response.reason,
346293
)
347-
self._assign_error(
348-
state, approval_state.output_result_path, "MCP tool invocation was not approved by user."
349-
)
294+
self._assign_error(state, output_result_path, "MCP tool invocation was not approved by user.")
350295
await ctx.send_message(ActionComplete())
351296
return
352297

353-
# Approved — re-evaluate headers (not stored at approval time for security).
354-
headers = self._evaluate_headers(state, approval_state.headers_def)
355-
356298
invocation = MCPToolInvocation(
357-
server_url=approval_state.server_url,
358-
tool_name=approval_state.tool_name,
359-
server_label=approval_state.server_label,
360-
arguments=approval_state.arguments,
361-
headers=headers,
362-
connection_name=approval_state.connection_name,
299+
server_url=original_request.server_url,
300+
tool_name=tool_name,
301+
server_label=original_request.server_label,
302+
arguments=original_request.arguments,
303+
headers=self._evaluate_headers(state, self._action_def.get("headers")),
304+
connection_name=getattr(original_request, "connection_name", None),
363305
)
364306
result = await self._invoke_with_narrow_catch(invocation)
365307
await self._process_result(
366308
ctx=ctx,
367309
state=state,
368310
result=result,
369-
auto_send=approval_state.auto_send,
370-
conversation_id_expr=approval_state.conversation_id_expr,
371-
output_messages_path=approval_state.output_messages_path,
372-
output_result_path=approval_state.output_result_path,
311+
auto_send=auto_send,
312+
conversation_id=conversation_id,
313+
output_messages_path=output_messages_path,
314+
output_result_path=output_result_path,
373315
)
374316
await ctx.send_message(ActionComplete())
375317

@@ -528,7 +470,7 @@ async def _process_result(
528470
state: DeclarativeWorkflowState,
529471
result: MCPToolResult,
530472
auto_send: bool,
531-
conversation_id_expr: str | None,
473+
conversation_id: str | None,
532474
output_messages_path: str | None,
533475
output_result_path: str | None,
534476
) -> None:
@@ -557,14 +499,10 @@ async def _process_result(
557499
if auto_send and parsed_results:
558500
await ctx.yield_output(_format_outputs_for_send(parsed_results))
559501

560-
if conversation_id_expr:
561-
messages_path = _get_messages_path(state, conversation_id_expr)
562-
if messages_path is not None:
563-
# Mirrors .NET: conversation gets ASSISTANT-role message with
564-
# the same outputs (so chat history reads it as the agent's
565-
# contribution).
566-
assistant_message = Message(role="assistant", contents=list(result.outputs))
567-
state.append(messages_path, assistant_message)
502+
if conversation_id:
503+
messages_path = f"System.conversations.{conversation_id}.messages"
504+
assistant_message = Message(role="assistant", contents=list(result.outputs))
505+
state.append(messages_path, assistant_message)
568506

569507
@staticmethod
570508
def _assign_error(
@@ -577,9 +515,6 @@ def _assign_error(
577515
return
578516
state.set(output_result_path, f"Error: {error_message}")
579517

580-
def _approval_key(self) -> str:
581-
return f"{_MCP_APPROVAL_STATE_KEY}_{self.id}"
582-
583518

584519
def _parse_outputs(outputs: list[Content]) -> list[Any]:
585520
"""Parse :class:`Content` outputs into Python values for ``output.result``.

0 commit comments

Comments
 (0)