Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions lib/crewai/src/crewai/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from crewai.agent.planning_config import PlanningConfig
from crewai.agent.utils import (
ahandle_knowledge_retrieval,
append_skill_context,
apply_training_data,
build_task_prompt_with_schema,
format_task_with_context,
Expand Down Expand Up @@ -549,7 +548,6 @@ def _finalize_task_prompt(
Returns:
The fully prepared task prompt.
"""
task_prompt = append_skill_context(self, task_prompt)
prepare_tools(self, tools, task)

return apply_training_data(self, task_prompt)
Expand Down Expand Up @@ -1486,8 +1484,6 @@ def _prepare_kickoff(
),
)

formatted_messages = append_skill_context(self, formatted_messages)

inputs: dict[str, Any] = {
"input": formatted_messages,
"tool_names": get_tool_names(parsed_tools),
Expand Down
24 changes: 0 additions & 24 deletions lib/crewai/src/crewai/agent/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,30 +213,6 @@ def _combine_knowledge_context(agent: Agent) -> str:
return agent_ctx + separator + crew_ctx


def append_skill_context(agent: Agent, task_prompt: str) -> str:
"""Append activated skill context sections to the task prompt.

Args:
agent: The agent with optional skills.
task_prompt: The current task prompt.

Returns:
The task prompt with skill context appended.
"""
if not agent.skills:
return task_prompt

from crewai.skills.loader import format_skill_context
from crewai.skills.models import Skill

skill_sections = [
format_skill_context(s) for s in agent.skills if isinstance(s, Skill)
]
if skill_sections:
task_prompt += "\n\n" + "\n\n".join(skill_sections)
return task_prompt


def apply_training_data(agent: Agent, task_prompt: str) -> str:
"""Apply training data to the task prompt.

Expand Down
19 changes: 16 additions & 3 deletions lib/crewai/src/crewai/agents/crew_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,18 +174,31 @@ def _setup_messages(self, inputs: dict[str, Any]) -> None:
if provider.setup_messages(cast(ExecutorContext, cast(object, self))):
return

from crewai.llms.cache import mark_cache_breakpoint

if self.prompt is not None and "system" in self.prompt:
system_prompt = self._format_prompt(
cast(str, self.prompt.get("system", "")), inputs
)
user_prompt = self._format_prompt(
cast(str, self.prompt.get("user", "")), inputs
)
self.messages.append(format_message_for_llm(system_prompt, role="system"))
self.messages.append(format_message_for_llm(user_prompt))
# Cache breakpoints: end-of-system caches the per-agent stable
# prefix; end-of-user caches the per-task stable prefix across
# ReAct-loop iterations.
self.messages.append(
mark_cache_breakpoint(
format_message_for_llm(system_prompt, role="system")
)
)
self.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)
elif self.prompt is not None:
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.messages.append(format_message_for_llm(user_prompt))
self.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)

provider.post_setup_messages(cast(ExecutorContext, cast(object, self)))

Expand Down
32 changes: 26 additions & 6 deletions lib/crewai/src/crewai/experimental/agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2586,16 +2586,26 @@ def invoke(
self._kickoff_input = inputs.get("input", "")

if "system" in self.prompt:
from crewai.llms.cache import mark_cache_breakpoint

prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
mark_cache_breakpoint(
format_message_for_llm(system_prompt, role="system")
)
)
self.state.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
from crewai.llms.cache import mark_cache_breakpoint

user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
self.state.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)

self._inject_files_from_inputs(inputs)

Expand Down Expand Up @@ -2677,16 +2687,26 @@ async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
self._kickoff_input = inputs.get("input", "")

if "system" in self.prompt:
from crewai.llms.cache import mark_cache_breakpoint

prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
mark_cache_breakpoint(
format_message_for_llm(system_prompt, role="system")
)
)
self.state.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
from crewai.llms.cache import mark_cache_breakpoint

user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
self.state.messages.append(
mark_cache_breakpoint(format_message_for_llm(user_prompt))
)

self._inject_files_from_inputs(inputs)

Expand Down
19 changes: 16 additions & 3 deletions lib/crewai/src/crewai/llms/base_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import json
import logging
import re
from typing import TYPE_CHECKING, Any, Final, Literal
from typing import TYPE_CHECKING, Any, Final, Literal, cast
import uuid

from pydantic import (
Expand Down Expand Up @@ -703,19 +703,32 @@ def _format_messages(self, messages: str | list[LLMMessage]) -> list[LLMMessage]
Raises:
ValueError: If message format is invalid
"""
from crewai.llms.cache import CACHE_BREAKPOINT_KEY
from crewai.utilities.types import LLMMessage as _LLMMessage

if isinstance(messages, str):
return [{"role": "user", "content": messages}]

# Validate message format
# Validate then copy each message, dropping the cache-breakpoint
# flag in the copy only. The caller (e.g. CrewAgentExecutor,
# experimental.AgentExecutor) reuses its messages buffer across
# many LLM calls in the tool-use loop; mutating their dicts
# in place would erase the markers after the first call and
# break prompt caching for every subsequent iteration.
cleaned: list[LLMMessage] = []
for i, msg in enumerate(messages):
if not isinstance(msg, dict):
raise ValueError(f"Message at index {i} must be a dictionary")
if "role" not in msg or "content" not in msg:
raise ValueError(
f"Message at index {i} must have 'role' and 'content' keys"
)
copy: dict[str, Any] = {
k: v for k, v in msg.items() if k != CACHE_BREAKPOINT_KEY
}
cleaned.append(cast(_LLMMessage, copy))
Comment thread
lorenzejay marked this conversation as resolved.

return self._process_message_files(messages)
return self._process_message_files(cleaned)

def _process_message_files(self, messages: list[LLMMessage]) -> list[LLMMessage]:
"""Process files attached to messages and format for the provider.
Expand Down
37 changes: 37 additions & 0 deletions lib/crewai/src/crewai/llms/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Provider-agnostic prompt-cache breakpoint marker.

Application code (prompt builders, agent executors) marks messages where a
stable prefix ends. Provider adapters then translate the marker into the
cache directive their API expects, or strip it for providers that cache
implicitly (OpenAI, Gemini) or do not cache at all.

Usage:

from crewai.llms.cache import mark_cache_breakpoint

messages = [
mark_cache_breakpoint({"role": "system", "content": stable_system}),
mark_cache_breakpoint({"role": "user", "content": stable_user_prefix}),
{"role": "user", "content": volatile_query},
]
"""

from __future__ import annotations

from typing import Any


CACHE_BREAKPOINT_KEY = "cache_breakpoint"


def mark_cache_breakpoint(message: dict[str, Any]) -> dict[str, Any]:
"""Return ``message`` with the cache-breakpoint flag set.

Returns a new dict so callers can safely pass literal dicts.
"""
return {**message, CACHE_BREAKPOINT_KEY: True}


def strip_cache_breakpoint(message: dict[str, Any]) -> None:
"""Remove the breakpoint flag from a message in place."""
message.pop(CACHE_BREAKPOINT_KEY, None)
106 changes: 102 additions & 4 deletions lib/crewai/src/crewai/llms/providers/anthropic/completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ async def acall(
def _prepare_completion_params(
self,
messages: list[LLMMessage],
system_message: str | None = None,
system_message: str | list[dict[str, Any]] | None = None,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Any] | None = None,
) -> dict[str, Any]:
Expand Down Expand Up @@ -665,7 +665,7 @@ def _convert_image_blocks(content: Any) -> Any:

def _format_messages_for_anthropic(
self, messages: str | list[LLMMessage]
) -> tuple[list[LLMMessage], str | None]:
) -> tuple[list[LLMMessage], str | list[dict[str, Any]] | None]:
"""Format messages for Anthropic API.

Anthropic has specific requirements:
Expand All @@ -679,8 +679,51 @@ def _format_messages_for_anthropic(
messages: Input messages

Returns:
Tuple of (formatted_messages, system_message)
Tuple of (formatted_messages, system_message). `system_message` is
a list of content blocks (with cache_control stamped) when any
system message in the input carried a cache_breakpoint flag;
otherwise a plain string for backwards compatibility.
"""
from crewai.llms.cache import CACHE_BREAKPOINT_KEY

# Read cache_breakpoint flags from raw input BEFORE super strips them.
# We track the CONTENT of marked user/assistant messages so we can
# locate the corresponding block in formatted_messages — Anthropic
# rewrites tool results into user messages, so positional indices
# do not survive the conversion. We must stamp the original stable
# message (typically the initial task prompt), not whatever happens
# to be the trailing user-role block after tool_result expansion.
cache_system = False
cache_match_contents: list[str] = []
if not isinstance(messages, str):
for m in messages:
if not (isinstance(m, dict) and m.get(CACHE_BREAKPOINT_KEY)):
continue
role = m.get("role")
if role == "system":
cache_system = True
continue
if role != "user":
# Only user messages survive Anthropic's role-coalescing
# in a stable, addressable position. Markers on assistant
# or tool messages have no reliable stamp target after
# tool_result expansion, so we ignore them.
continue
raw_content = m.get("content")
if isinstance(raw_content, str) and raw_content:
cache_match_contents.append(raw_content)
continue
if isinstance(raw_content, list):
# Pull text from a single-text-block list so callers that
# pre-format content blocks still match cleanly.
text_blocks = [
b.get("text")
for b in raw_content
if isinstance(b, dict) and b.get("type") == "text"
]
if len(text_blocks) == 1 and isinstance(text_blocks[0], str):
cache_match_contents.append(text_blocks[0])

# Use base class formatting first
base_formatted = super()._format_messages(messages)

Expand Down Expand Up @@ -788,7 +831,62 @@ def _format_messages_for_anthropic(
# If first message is not from user, insert a user message at the beginning
formatted_messages.insert(0, {"role": "user", "content": "Hello"})

return formatted_messages, system_message
# Stamp cache_control on the message(s) whose original content was
# marked. We scan formatted_messages in order and stamp the first
# match per marked content — Anthropic permits up to 4 cache
# breakpoints per request, which is more than enough for our usage.
# Matching by content (rather than position) handles the ReAct
# case where tool_result blocks get expanded into trailing user
# messages: the stable initial-task prompt still maps cleanly.
for needle in cache_match_contents:
for fm in formatted_messages:
if fm.get("role") != "user":
continue
content = fm.get("content")
if isinstance(content, str) and content == needle:
self._stamp_cache_control_on_message(fm)
break
if isinstance(content, list):
fm_texts: list[str] = [
b.get("text", "")
for b in content
if isinstance(b, dict) and b.get("type") == "text"
]
if len(fm_texts) == 1 and fm_texts[0] == needle:
self._stamp_cache_control_on_message(fm)
break

# Convert system to content-block form when caching is requested.
system_payload: str | list[dict[str, Any]] | None = system_message
if system_message and cache_system:
system_payload = [
{
"type": "text",
"text": system_message,
"cache_control": {"type": "ephemeral"},
}
]

return formatted_messages, system_payload

@staticmethod
def _stamp_cache_control_on_message(message: LLMMessage) -> None:
"""Stamp cache_control on the last content block of an Anthropic message."""
msg = cast(dict[str, Any], message)
content = msg.get("content")
if isinstance(content, str):
msg["content"] = [
{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"},
}
]
return
if isinstance(content, list) and content:
last = content[-1]
if isinstance(last, dict):
last["cache_control"] = {"type": "ephemeral"}

def _handle_completion(
self,
Expand Down
8 changes: 6 additions & 2 deletions lib/crewai/src/crewai/skills/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ def format_skill_context(skill: Skill) -> str:
At METADATA level: returns name and description only.
At INSTRUCTIONS level or above: returns full SKILL.md body.

Output is wrapped in <skill name="..."> XML tags so the block can serve
as a stable cache anchor when injected into the system prompt.

Args:
skill: The skill to format.

Expand All @@ -169,7 +172,7 @@ def format_skill_context(skill: Skill) -> str:
"""
if skill.disclosure_level >= INSTRUCTIONS and skill.instructions:
parts = [
f"## Skill: {skill.name}",
f'<skill name="{skill.name}">',
skill.description,
"",
skill.instructions,
Expand All @@ -180,5 +183,6 @@ def format_skill_context(skill: Skill) -> str:
for dir_name, files in sorted(skill.resource_files.items()):
if files:
parts.append(f"- **{dir_name}/**: {', '.join(files)}")
parts.append("</skill>")
return "\n".join(parts)
return f"## Skill: {skill.name}\n{skill.description}"
return f'<skill name="{skill.name}">\n{skill.description}\n</skill>'
Loading
Loading