Skip to content

Commit b4f7262

Browse files
committed
fix: harden runner stream cleanup and refactor deerflow config
1 parent b3cab6d commit b4f7262

2 files changed

Lines changed: 194 additions & 103 deletions

File tree

astrbot/core/agent/runners/deerflow/deerflow_agent_runner.py

Lines changed: 99 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,21 @@ class DeerFlowAgentRunner(BaseAgentRunner[TContext]):
4646

4747
_MAX_VALUES_HISTORY = 200
4848

49+
@dataclass(frozen=True)
50+
class _RunnerConfig:
51+
api_base: str
52+
api_key: str
53+
auth_header: str
54+
proxy: str
55+
assistant_id: str
56+
model_name: str
57+
thinking_enabled: bool
58+
plan_mode: bool
59+
subagent_enabled: bool
60+
max_concurrent_subagents: int
61+
timeout: int
62+
recursion_limit: int
63+
4964
@dataclass
5065
class _StreamState:
5166
latest_text: str = ""
@@ -131,75 +146,80 @@ async def close(self) -> None:
131146
if isinstance(api_client, DeerFlowAPIClient) and not api_client.is_closed:
132147
await api_client.close()
133148

134-
@override
135-
async def reset(
136-
self,
137-
request: ProviderRequest,
138-
run_context: ContextWrapper[TContext],
139-
agent_hooks: BaseAgentRunHooks[TContext],
140-
provider_config: dict,
141-
**kwargs: T.Any,
142-
) -> None:
143-
self.req = request
144-
self.streaming = kwargs.get("streaming", False)
145-
self.final_llm_resp = None
146-
self._state = AgentState.IDLE
147-
self.agent_hooks = agent_hooks
148-
self.run_context = run_context
149-
150-
self.api_base = provider_config.get(
151-
"deerflow_api_base", "http://127.0.0.1:2026"
152-
)
153-
if not isinstance(self.api_base, str) or not self.api_base.startswith(
149+
def _parse_runner_config(self, provider_config: dict) -> _RunnerConfig:
150+
api_base = provider_config.get("deerflow_api_base", "http://127.0.0.1:2026")
151+
if not isinstance(api_base, str) or not api_base.startswith(
154152
("http://", "https://"),
155153
):
156154
raise ValueError(
157155
"DeerFlow API Base URL format is invalid. It must start with http:// or https://.",
158156
)
159-
self.api_key = provider_config.get("deerflow_api_key", "")
160-
self.auth_header = provider_config.get("deerflow_auth_header", "")
157+
161158
proxy = provider_config.get("proxy", "")
162-
self.proxy = proxy.strip() if isinstance(proxy, str) else ""
163-
self.assistant_id = provider_config.get("deerflow_assistant_id", "lead_agent")
164-
self.model_name = provider_config.get("deerflow_model_name", "")
165-
self.thinking_enabled = bool(
166-
provider_config.get("deerflow_thinking_enabled", False),
167-
)
168-
self.plan_mode = bool(provider_config.get("deerflow_plan_mode", False))
169-
self.subagent_enabled = bool(
170-
provider_config.get("deerflow_subagent_enabled", False),
171-
)
172-
self.max_concurrent_subagents = self._coerce_int_config(
173-
"deerflow_max_concurrent_subagents",
174-
provider_config.get(
159+
normalized_proxy = proxy.strip() if isinstance(proxy, str) else ""
160+
161+
return self._RunnerConfig(
162+
api_base=api_base,
163+
api_key=provider_config.get("deerflow_api_key", ""),
164+
auth_header=provider_config.get("deerflow_auth_header", ""),
165+
proxy=normalized_proxy,
166+
assistant_id=provider_config.get("deerflow_assistant_id", "lead_agent"),
167+
model_name=provider_config.get("deerflow_model_name", ""),
168+
thinking_enabled=bool(
169+
provider_config.get("deerflow_thinking_enabled", False),
170+
),
171+
plan_mode=bool(provider_config.get("deerflow_plan_mode", False)),
172+
subagent_enabled=bool(
173+
provider_config.get("deerflow_subagent_enabled", False),
174+
),
175+
max_concurrent_subagents=self._coerce_int_config(
175176
"deerflow_max_concurrent_subagents",
176-
3,
177+
provider_config.get("deerflow_max_concurrent_subagents", 3),
178+
default=3,
179+
min_value=1,
180+
),
181+
timeout=self._coerce_int_config(
182+
"timeout",
183+
provider_config.get("timeout", 300),
184+
default=300,
185+
min_value=1,
186+
),
187+
recursion_limit=self._coerce_int_config(
188+
"deerflow_recursion_limit",
189+
provider_config.get("deerflow_recursion_limit", 1000),
190+
default=1000,
191+
min_value=1,
177192
),
178-
default=3,
179-
min_value=1,
180193
)
181194

182-
self.timeout = self._coerce_int_config(
183-
"timeout",
184-
provider_config.get("timeout", 300),
185-
default=300,
186-
min_value=1,
187-
)
188-
self.recursion_limit = self._coerce_int_config(
189-
"deerflow_recursion_limit",
190-
provider_config.get("deerflow_recursion_limit", 1000),
191-
default=1000,
192-
min_value=1,
195+
def _apply_runner_config(self, config: _RunnerConfig) -> None:
196+
self.api_base = config.api_base
197+
self.api_key = config.api_key
198+
self.auth_header = config.auth_header
199+
self.proxy = config.proxy
200+
self.assistant_id = config.assistant_id
201+
self.model_name = config.model_name
202+
self.thinking_enabled = config.thinking_enabled
203+
self.plan_mode = config.plan_mode
204+
self.subagent_enabled = config.subagent_enabled
205+
self.max_concurrent_subagents = config.max_concurrent_subagents
206+
self.timeout = config.timeout
207+
self.recursion_limit = config.recursion_limit
208+
209+
@staticmethod
210+
def _build_client_signature(config: _RunnerConfig) -> tuple[str, str, str, str]:
211+
return (
212+
config.api_base,
213+
config.api_key,
214+
config.auth_header,
215+
config.proxy,
193216
)
194217

195-
new_client_signature = (
196-
self.api_base,
197-
self.api_key,
198-
self.auth_header,
199-
self.proxy,
200-
)
218+
async def _refresh_api_client(self, config: _RunnerConfig) -> None:
219+
new_client_signature = self._build_client_signature(config)
201220
old_client = getattr(self, "api_client", None)
202221
old_signature = getattr(self, "_api_client_signature", None)
222+
203223
if (
204224
isinstance(old_client, DeerFlowAPIClient)
205225
and old_signature == new_client_signature
@@ -217,13 +237,33 @@ async def reset(
217237
)
218238

219239
self.api_client = DeerFlowAPIClient(
220-
api_base=self.api_base,
221-
api_key=self.api_key,
222-
auth_header=self.auth_header,
223-
proxy=self.proxy,
240+
api_base=config.api_base,
241+
api_key=config.api_key,
242+
auth_header=config.auth_header,
243+
proxy=config.proxy,
224244
)
225245
self._api_client_signature = new_client_signature
226246

247+
@override
248+
async def reset(
249+
self,
250+
request: ProviderRequest,
251+
run_context: ContextWrapper[TContext],
252+
agent_hooks: BaseAgentRunHooks[TContext],
253+
provider_config: dict,
254+
**kwargs: T.Any,
255+
) -> None:
256+
self.req = request
257+
self.streaming = kwargs.get("streaming", False)
258+
self.final_llm_resp = None
259+
self._state = AgentState.IDLE
260+
self.agent_hooks = agent_hooks
261+
self.run_context = run_context
262+
263+
config = self._parse_runner_config(provider_config)
264+
self._apply_runner_config(config)
265+
await self._refresh_api_client(config)
266+
227267
@override
228268
async def step(self):
229269
if not self.req:

0 commit comments

Comments
 (0)