Skip to content

Commit 86ce54f

Browse files
feat: runtime state checkpointing, event system, and executor refactor
- Pass RuntimeState through the event bus and enable entity auto-registration - Introduce checkpointing API: - .checkpoint(), .from_checkpoint(), and async checkpoint support - Provider-based storage with BaseProvider and JsonProvider - Mid-task resume and kickoff() integration - Add EventRecord tracking and full event serialization with subtype preservation - Enable checkpoint fidelity via llm_type and executor_type discriminators - Refactor executor architecture: - Convert executors, tools, prompts, and TokenProcess to BaseModel - Introduce proper base classes with typed fields (CrewAgentExecutorMixin, BaseAgentExecutor) - Add generic from_checkpoint with full LLM serialization - Support executor back-references and resume-safe initialization - Refactor runtime state system: - Move RuntimeState into state/ module with async checkpoint support - Add entity serialization improvements and JSON-safe round-tripping - Implement event scope tracking and replay for accurate resume behavior - Improve tool and schema handling: - Make BaseTool fully serializable with JSON round-trip support - Serialize args_schema via JSON schema and dynamically reconstruct models - Add automatic subclass restoration via tool_type discriminator - Enhance Flow checkpointing: - Support restoring execution state and subclass-aware deserialization - Performance improvements: - Cache handler signature inspection - Optimize event emission and metadata preparation - General cleanup: - Remove dead checkpoint payload structures - Simplify entity registration and serialization logic
1 parent bf2f4db commit 86ce54f

64 files changed

Lines changed: 2097 additions & 730 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

lib/crewai-tools/tests/test_generate_tool_specs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ def test_extract_init_params_schema(mock_tool_extractor):
9797
assert init_params_schema.keys() == {
9898
"$defs",
9999
"properties",
100+
"required",
100101
"title",
101102
"type",
102103
}

lib/crewai/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ dependencies = [
4343
"uv~=0.9.13",
4444
"aiosqlite~=0.21.0",
4545
"pyyaml~=6.0",
46+
"aiofiles~=24.1.0",
4647
"lancedb>=0.29.2,<0.30.1",
4748
]
4849

lib/crewai/src/crewai/__init__.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from crewai.llm import LLM
1717
from crewai.llms.base_llm import BaseLLM
1818
from crewai.process import Process
19-
from crewai.runtime_state import _entity_discriminator
2019
from crewai.task import Task
2120
from crewai.tasks.llm_guardrail import LLMGuardrail
2221
from crewai.tasks.task_output import TaskOutput
@@ -99,8 +98,8 @@ def __getattr__(name: str) -> Any:
9998

10099
try:
101100
from crewai.agents.agent_builder.base_agent import BaseAgent as _BaseAgent
102-
from crewai.agents.agent_builder.base_agent_executor_mixin import (
103-
CrewAgentExecutorMixin as _CrewAgentExecutorMixin,
101+
from crewai.agents.agent_builder.base_agent_executor import (
102+
BaseAgentExecutor as _BaseAgentExecutor,
104103
)
105104
from crewai.agents.tools_handler import ToolsHandler as _ToolsHandler
106105
from crewai.experimental.agent_executor import AgentExecutor as _AgentExecutor
@@ -118,10 +117,18 @@ def __getattr__(name: str) -> Any:
118117
"Flow": Flow,
119118
"BaseLLM": BaseLLM,
120119
"Task": Task,
121-
"CrewAgentExecutorMixin": _CrewAgentExecutorMixin,
120+
"BaseAgentExecutor": _BaseAgentExecutor,
122121
"ExecutionContext": ExecutionContext,
122+
"StandardPromptResult": _StandardPromptResult,
123+
"SystemPromptResult": _SystemPromptResult,
123124
}
124125

126+
from crewai.tools.base_tool import BaseTool as _BaseTool
127+
from crewai.tools.structured_tool import CrewStructuredTool as _CrewStructuredTool
128+
129+
_base_namespace["BaseTool"] = _BaseTool
130+
_base_namespace["CrewStructuredTool"] = _CrewStructuredTool
131+
125132
try:
126133
from crewai.a2a.config import (
127134
A2AClientConfig as _A2AClientConfig,
@@ -155,36 +162,49 @@ def __getattr__(name: str) -> Any:
155162
**sys.modules[_BaseAgent.__module__].__dict__,
156163
}
157164

165+
import crewai.state.runtime as _runtime_state_mod
166+
158167
for _mod_name in (
159168
_BaseAgent.__module__,
160169
Agent.__module__,
161170
Crew.__module__,
162171
Flow.__module__,
163172
Task.__module__,
173+
"crewai.agents.crew_agent_executor",
174+
_runtime_state_mod.__name__,
164175
_AgentExecutor.__module__,
165176
):
166177
sys.modules[_mod_name].__dict__.update(_resolve_namespace)
167178

179+
from crewai.agents.crew_agent_executor import (
180+
CrewAgentExecutor as _CrewAgentExecutor,
181+
)
168182
from crewai.tasks.conditional_task import ConditionalTask as _ConditionalTask
169183

184+
_BaseAgentExecutor.model_rebuild(force=True, _types_namespace=_full_namespace)
170185
_BaseAgent.model_rebuild(force=True, _types_namespace=_full_namespace)
171186
Task.model_rebuild(force=True, _types_namespace=_full_namespace)
172187
_ConditionalTask.model_rebuild(force=True, _types_namespace=_full_namespace)
188+
_CrewAgentExecutor.model_rebuild(force=True, _types_namespace=_full_namespace)
173189
Crew.model_rebuild(force=True, _types_namespace=_full_namespace)
174190
Flow.model_rebuild(force=True, _types_namespace=_full_namespace)
175191
_AgentExecutor.model_rebuild(force=True, _types_namespace=_full_namespace)
176192

177193
from typing import Annotated
178194

179-
from pydantic import Discriminator, RootModel, Tag
195+
from pydantic import Field
196+
197+
from crewai.state.runtime import RuntimeState
180198

181199
Entity = Annotated[
182-
Annotated[Flow, Tag("flow")] # type: ignore[type-arg]
183-
| Annotated[Crew, Tag("crew")]
184-
| Annotated[Agent, Tag("agent")],
185-
Discriminator(_entity_discriminator),
200+
Flow | Crew | Agent, # type: ignore[type-arg]
201+
Field(discriminator="entity_type"),
186202
]
187-
RuntimeState = RootModel[list[Entity]]
203+
204+
RuntimeState.model_rebuild(
205+
force=True,
206+
_types_namespace={**_full_namespace, "Entity": Entity},
207+
)
188208

189209
try:
190210
Agent.model_rebuild(force=True, _types_namespace=_full_namespace)
@@ -205,6 +225,7 @@ def __getattr__(name: str) -> Any:
205225
"BaseLLM",
206226
"Crew",
207227
"CrewOutput",
228+
"Entity",
208229
"ExecutionContext",
209230
"Flow",
210231
"Knowledge",

lib/crewai/src/crewai/agent/core.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
BeforeValidator,
2828
ConfigDict,
2929
Field,
30-
InstanceOf,
3130
PrivateAttr,
3231
model_validator,
3332
)
@@ -195,12 +194,12 @@ class Agent(BaseAgent):
195194
llm: Annotated[
196195
str | BaseLLM | None,
197196
BeforeValidator(_validate_llm_ref),
198-
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
197+
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
199198
] = Field(description="Language model that will run the agent.", default=None)
200199
function_calling_llm: Annotated[
201200
str | BaseLLM | None,
202201
BeforeValidator(_validate_llm_ref),
203-
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
202+
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
204203
] = Field(description="Language model that will run the agent.", default=None)
205204
system_template: str | None = Field(
206205
default=None, description="System format for the agent."
@@ -297,8 +296,8 @@ class Agent(BaseAgent):
297296
Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig.
298297
""",
299298
)
300-
agent_executor: InstanceOf[CrewAgentExecutor] | InstanceOf[AgentExecutor] | None = (
301-
Field(default=None, description="An instance of the CrewAgentExecutor class.")
299+
agent_executor: CrewAgentExecutor | AgentExecutor | None = Field(
300+
default=None, description="An instance of the CrewAgentExecutor class."
302301
)
303302
executor_class: Annotated[
304303
type[CrewAgentExecutor] | type[AgentExecutor],
@@ -1011,10 +1010,10 @@ def create_agent_executor(
10111010
)
10121011
self.agent_executor = self.executor_class(
10131012
llm=self.llm,
1014-
task=task, # type: ignore[arg-type]
1013+
task=task,
10151014
i18n=self.i18n,
10161015
agent=self,
1017-
crew=self.crew, # type: ignore[arg-type]
1016+
crew=self.crew,
10181017
tools=parsed_tools,
10191018
prompt=prompt,
10201019
original_tools=raw_tools,
@@ -1057,7 +1056,8 @@ def _update_executor_parameters(
10571056
if self.agent_executor is None:
10581057
raise RuntimeError("Agent executor is not initialized.")
10591058

1060-
self.agent_executor.task = task
1059+
if task is not None:
1060+
self.agent_executor.task = task
10611061
self.agent_executor.tools = tools
10621062
self.agent_executor.original_tools = raw_tools
10631063
self.agent_executor.prompt = prompt
@@ -1076,7 +1076,7 @@ def _update_executor_parameters(
10761076
self.agent_executor.tools_handler = self.tools_handler
10771077
self.agent_executor.request_within_rpm_limit = rpm_limit_fn
10781078

1079-
if self.agent_executor.llm:
1079+
if isinstance(self.agent_executor.llm, BaseLLM):
10801080
existing_stop = getattr(self.agent_executor.llm, "stop", [])
10811081
self.agent_executor.llm.stop = list(
10821082
set(

lib/crewai/src/crewai/agents/agent_builder/base_agent.py

Lines changed: 87 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
BaseModel,
1515
BeforeValidator,
1616
Field,
17-
InstanceOf,
1817
PrivateAttr,
18+
SerializeAsAny,
1919
field_validator,
2020
model_validator,
2121
)
@@ -24,7 +24,7 @@
2424
from typing_extensions import Self
2525

2626
from crewai.agent.internal.meta import AgentMeta
27-
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
27+
from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor
2828
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
2929
from crewai.agents.cache.cache_handler import CacheHandler
3030
from crewai.agents.tools_handler import ToolsHandler
@@ -51,6 +51,7 @@
5151
if TYPE_CHECKING:
5252
from crewai.context import ExecutionContext
5353
from crewai.crew import Crew
54+
from crewai.state.provider.core import BaseProvider
5455

5556

5657
def _validate_crew_ref(value: Any) -> Any:
@@ -63,7 +64,31 @@ def _serialize_crew_ref(value: Any) -> str | None:
6364
return str(value.id) if hasattr(value, "id") else str(value)
6465

6566

67+
_LLM_TYPE_REGISTRY: dict[str, str] = {
68+
"base": "crewai.llms.base_llm.BaseLLM",
69+
"litellm": "crewai.llm.LLM",
70+
"openai": "crewai.llms.providers.openai.completion.OpenAICompletion",
71+
"anthropic": "crewai.llms.providers.anthropic.completion.AnthropicCompletion",
72+
"azure": "crewai.llms.providers.azure.completion.AzureCompletion",
73+
"bedrock": "crewai.llms.providers.bedrock.completion.BedrockCompletion",
74+
"gemini": "crewai.llms.providers.gemini.completion.GeminiCompletion",
75+
}
76+
77+
6678
def _validate_llm_ref(value: Any) -> Any:
79+
if isinstance(value, dict):
80+
import importlib
81+
82+
llm_type = value.get("llm_type")
83+
if not llm_type or llm_type not in _LLM_TYPE_REGISTRY:
84+
raise ValueError(
85+
f"Unknown or missing llm_type: {llm_type!r}. "
86+
f"Expected one of {list(_LLM_TYPE_REGISTRY)}"
87+
)
88+
dotted = _LLM_TYPE_REGISTRY[llm_type]
89+
mod_path, cls_name = dotted.rsplit(".", 1)
90+
cls = getattr(importlib.import_module(mod_path), cls_name)
91+
return cls(**value)
6792
return value
6893

6994

@@ -75,12 +100,37 @@ def _resolve_agent(value: Any, info: Any) -> Any:
75100
return Agent.model_validate(value, context=getattr(info, "context", None))
76101

77102

78-
def _serialize_llm_ref(value: Any) -> str | None:
103+
_EXECUTOR_TYPE_REGISTRY: dict[str, str] = {
104+
"base": "crewai.agents.agent_builder.base_agent_executor.BaseAgentExecutor",
105+
"crew": "crewai.agents.crew_agent_executor.CrewAgentExecutor",
106+
"experimental": "crewai.experimental.agent_executor.AgentExecutor",
107+
}
108+
109+
110+
def _validate_executor_ref(value: Any) -> Any:
111+
if isinstance(value, dict):
112+
import importlib
113+
114+
executor_type = value.get("executor_type")
115+
if not executor_type or executor_type not in _EXECUTOR_TYPE_REGISTRY:
116+
raise ValueError(
117+
f"Unknown or missing executor_type: {executor_type!r}. "
118+
f"Expected one of {list(_EXECUTOR_TYPE_REGISTRY)}"
119+
)
120+
dotted = _EXECUTOR_TYPE_REGISTRY[executor_type]
121+
mod_path, cls_name = dotted.rsplit(".", 1)
122+
cls = getattr(importlib.import_module(mod_path), cls_name)
123+
return cls.model_validate(value)
124+
return value
125+
126+
127+
def _serialize_llm_ref(value: Any) -> dict[str, Any] | None:
79128
if value is None:
80129
return None
81130
if isinstance(value, str):
82-
return value
83-
return getattr(value, "model", str(value))
131+
return {"model": value}
132+
result: dict[str, Any] = value.model_dump()
133+
return result
84134

85135

86136
_SLUG_RE: Final[re.Pattern[str]] = re.compile(
@@ -197,13 +247,19 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
197247
max_iter: int = Field(
198248
default=25, description="Maximum iterations for an agent to execute a task"
199249
)
200-
agent_executor: InstanceOf[CrewAgentExecutorMixin] | None = Field(
250+
agent_executor: SerializeAsAny[BaseAgentExecutor] | None = Field(
201251
default=None, description="An instance of the CrewAgentExecutor class."
202252
)
253+
254+
@field_validator("agent_executor", mode="before")
255+
@classmethod
256+
def _validate_agent_executor(cls, v: Any) -> Any:
257+
return _validate_executor_ref(v)
258+
203259
llm: Annotated[
204260
str | BaseLLM | None,
205261
BeforeValidator(_validate_llm_ref),
206-
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
262+
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
207263
] = Field(default=None, description="Language model that will run the agent.")
208264
crew: Annotated[
209265
Crew | str | None,
@@ -276,6 +332,30 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
276332
)
277333
execution_context: ExecutionContext | None = Field(default=None)
278334

335+
@classmethod
336+
def from_checkpoint(
337+
cls, path: str, *, provider: BaseProvider | None = None
338+
) -> Self:
339+
"""Restore an Agent from a checkpoint file."""
340+
from crewai.context import apply_execution_context
341+
from crewai.state.provider.json_provider import JsonProvider
342+
from crewai.state.runtime import RuntimeState
343+
344+
state = RuntimeState.from_checkpoint(
345+
path,
346+
provider=provider or JsonProvider(),
347+
context={"from_checkpoint": True},
348+
)
349+
for entity in state.root:
350+
if isinstance(entity, cls):
351+
if entity.execution_context is not None:
352+
apply_execution_context(entity.execution_context)
353+
if entity.agent_executor is not None:
354+
entity.agent_executor.agent = entity
355+
entity.agent_executor._resuming = True
356+
return entity
357+
raise ValueError(f"No {cls.__name__} found in checkpoint: {path}")
358+
279359
@model_validator(mode="before")
280360
@classmethod
281361
def process_model_config(cls, values: Any) -> dict[str, Any]:

0 commit comments

Comments
 (0)