Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 109 additions & 8 deletions src/strands/agent/conversation_manager/conversation_manager.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
"""Abstract interface for conversation history management."""

import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any

from ...hooks.events import BeforeModelCallEvent
from ...hooks.registry import HookProvider, HookRegistry
from ...types.content import Message

if TYPE_CHECKING:
from ...agent.agent import Agent

logger = logging.getLogger(__name__)

DEFAULT_CONTEXT_WINDOW_LIMIT = 200_000
Comment thread
opieter-aws marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: This constant introduces implicit behavior that diverges from the design doc, which states: "Users must set contextWindowLimit on their model config until we ship per-provider default lookup tables." A model with a 32k context window using this 200k fallback would never trigger proactive compression until the real overflow occurs — defeating the purpose of the feature.

The warning at line 140 is good, but it only fires once and at WARNING level which may be missed in production logging. The previous iteration disabled compression entirely when context_window_limit was None, which was a safer fail-closed approach.

Suggestion: Consider one of:

  1. Fail-closed (safer): Skip the threshold check when context_window_limit is None (log warning, return early). This matches the design doc and avoids silent miscalculation.
  2. Keep fallback but make it explicit: Add default_context_window_limit as a parameter to __init__ so users consciously opt in to the fallback assumption.
  3. If keeping as-is: Add a note in the class docstring explaining the 200k fallback behavior and why it was chosen over disabling.



class ConversationManager(ABC, HookProvider):
"""Abstract base class for managing conversation history.
Expand All @@ -24,6 +30,11 @@ class ConversationManager(ABC, HookProvider):
lifecycle events. Derived classes that override register_hooks must call the base implementation to ensure proper
hook registration.

Optionally, a manager can enable proactive compression by setting ``compression_threshold``
in the constructor. When set, the base class registers a ``BeforeModelCallEvent`` hook that
checks projected input tokens against the model's context window limit and calls
:meth:`reduce_on_threshold` when the threshold is exceeded.

Example:
```python
class MyConversationManager(ConversationManager):
Expand All @@ -33,34 +44,124 @@ def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
```
"""

def __init__(self) -> None:
def __init__(self, *, compression_threshold: float | None = None) -> None:
"""Initialize the ConversationManager.

Args:
compression_threshold: Ratio of context window usage that triggers proactive compression.
Value between 0 (exclusive) and 1 (inclusive). For example, 0.7 means compress when 70%
of the context window is used. When not set, proactive compression is disabled and only
reactive overflow recovery is used.

Raises:
ValueError: If compression_threshold is not in the valid range (0, 1].

Attributes:
removed_message_count: The messages that have been removed from the agents messages array.
These represent messages provided by the user or LLM that have been removed, not messages
included by the conversation manager through something like summarization.
"""
if compression_threshold is not None and (compression_threshold <= 0 or compression_threshold > 1):
raise ValueError(
f"compression_threshold must be between 0 (exclusive) and 1 (inclusive), got {compression_threshold}"
)

self.removed_message_count = 0
self._compression_threshold = compression_threshold
self._context_window_limit_warned = False

def reduce_on_threshold(self, agent: "Agent", **kwargs: Any) -> bool:
Comment thread
opieter-aws marked this conversation as resolved.
"""Proactively reduce the conversation history before a model call.

Called when projected input tokens exceed the configured compression_threshold
of the model's context window limit. Subclasses implement this to reduce
context before the model call, avoiding overflow errors.

The base class catches any exceptions raised by this method and logs them
at debug level, so subclass implementations do not need to defensively
swallow errors — they can let them propagate. When an exception occurs,
the return value is never observed by the caller.

The default implementation returns False. Subclasses that support proactive
compression should override this method.

Args:
agent: The agent whose conversation history will be reduced.
The agent's messages list should be modified in-place.
**kwargs: Additional keyword arguments for future extensibility.

Returns:
True if the history was reduced, False otherwise. Only observed on success;
if the method raises, the base class catches the exception and the return
value is ignored.
"""
return False

def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
"""Register hooks for agent lifecycle events.

When ``compression_threshold`` is configured and the subclass overrides
``reduce_on_threshold``, registers a ``BeforeModelCallEvent`` hook for
proactive compression.

Derived classes that override this method must call the base implementation to ensure proper hook
registration chain.

Args:
registry: The hook registry to register callbacks with.
**kwargs: Additional keyword arguments for future extensibility.
"""
if self._compression_threshold is None:
return

Example:
```python
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
super().register_hooks(registry, **kwargs)
registry.add_callback(SomeEvent, self.on_some_event)
```
# Check if the subclass actually overrides reduce_on_threshold
has_override = type(self).reduce_on_threshold is not ConversationManager.reduce_on_threshold
if not has_override:
logger.warning(
"conversation_manager=<%s> | compression_threshold is configured but reduce_on_threshold is not"
" implemented, proactive compression is disabled",
type(self).__name__,
)
return

registry.add_callback(BeforeModelCallEvent, self._on_before_model_call_threshold)

def _on_before_model_call_threshold(self, event: BeforeModelCallEvent) -> None:
"""Handle BeforeModelCallEvent for proactive compression.

Args:
event: The before model call event.
"""
pass
context_window_limit = event.agent.model.context_window_limit
if context_window_limit is None:
context_window_limit = DEFAULT_CONTEXT_WINDOW_LIMIT
if not self._context_window_limit_warned:
self._context_window_limit_warned = True
logger.warning(
"context_window_limit=<None>, default=<%s>"
" | context_window_limit is not set on the model, using default"
" | set context_window_limit in your model config for accurate threshold checks",
DEFAULT_CONTEXT_WINDOW_LIMIT,
)

if event.projected_input_tokens is None:
logger.debug("projected_input_tokens=<None> | skipping proactive compression")
return

ratio = event.projected_input_tokens / context_window_limit
if ratio >= self._compression_threshold: # type: ignore[operator]
logger.debug(
"projected_tokens=<%s>, limit=<%s>, ratio=<%.2f>, compression_threshold=<%s>"
" | compression threshold exceeded, reducing context",
event.projected_input_tokens,
context_window_limit,
ratio,
self._compression_threshold,
)
try:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: There's redundant error handling between this base class try/except (line 157-160) and SummarizingConversationManager.reduce_on_threshold which also catches and swallows all exceptions. The base class already provides the safety net, so the subclass's try/except is effectively dead code for error propagation.

Suggestion: The design intent is clear (best-effort, non-fatal), but consider documenting in the reduce_on_threshold docstring that the base class already catches exceptions, so subclass implementations don't need to defensively swallow errors—they can let them propagate and the base class will handle them. This simplifies subclass implementations.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been addressed — SummarizingConversationManager.reduce_on_threshold no longer catches exceptions itself, and the base class docstring now explicitly documents the error-handling contract (lines 80-83, 94-96). The code is cleaner and the contract is clear. ✅

self.reduce_on_threshold(agent=event.agent)
except Exception:
logger.debug("proactive compression failed, will proceed with model call", exc_info=True)

def restore_from_session(self, state: dict[str, Any]) -> list[Message] | None:
"""Restore the Conversation Manager's state from a session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(
should_truncate_results: bool = True,
*,
per_turn: bool | int = False,
compression_threshold: float | None = None,
):
"""Initialize the sliding window conversation manager.

Expand All @@ -54,6 +55,8 @@ def __init__(
manage message history and prevent the agent loop from slowing down. Start with
per_turn=True and adjust to a specific frequency (e.g., per_turn=5) if needed
for performance tuning.
compression_threshold: Ratio of context window usage that triggers proactive compression.
See :class:`ConversationManager` for details.

Raises:
ValueError: If window_size is negative, or if per_turn is 0 or a negative integer.
Expand All @@ -63,7 +66,7 @@ def __init__(
if isinstance(per_turn, int) and not isinstance(per_turn, bool) and per_turn <= 0:
raise ValueError(f"per_turn must be a positive integer, True, or False, got {per_turn}")

super().__init__()
super().__init__(compression_threshold=compression_threshold)

self.window_size = window_size
self.should_truncate_results = should_truncate_results
Expand Down Expand Up @@ -155,6 +158,20 @@ def apply_management(self, agent: "Agent", **kwargs: Any) -> None:
return
self.reduce_context(agent)

def reduce_on_threshold(self, agent: "Agent", **kwargs: Any) -> bool:
Comment thread
opieter-aws marked this conversation as resolved.
"""Proactively reduce context by trimming oldest messages.

Args:
agent: The agent whose conversation history will be reduced.
**kwargs: Additional keyword arguments for future extensibility.

Returns:
True if the history was reduced, False otherwise.
"""
initial_count = len(agent.messages)
self.reduce_context(agent)
return len(agent.messages) < initial_count

def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: Any) -> None:
"""Trim the oldest messages to reduce the conversation context size.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def __init__(
preserve_recent_messages: int = 10,
summarization_agent: Optional["Agent"] = None,
summarization_system_prompt: str | None = None,
*,
compression_threshold: float | None = None,
):
"""Initialize the summarizing conversation manager.

Expand All @@ -77,8 +79,10 @@ def __init__(
If provided, this agent can use tools as part of the summarization process.
summarization_system_prompt: Optional system prompt override for summarization.
If None, uses the default summarization prompt.
compression_threshold: Ratio of context window usage that triggers proactive compression.
See :class:`ConversationManager` for details.
"""
super().__init__()
super().__init__(compression_threshold=compression_threshold)
if summarization_agent is not None and summarization_system_prompt is not None:
raise ValueError(
"Cannot provide both summarization_agent and summarization_system_prompt. "
Expand Down Expand Up @@ -136,44 +140,67 @@ def reduce_context(self, agent: "Agent", e: Exception | None = None, **kwargs: A
ContextWindowOverflowException: If the context cannot be summarized.
"""
try:
# Calculate how many messages to summarize
messages_to_summarize_count = max(1, int(len(agent.messages) * self.summary_ratio))
self._summarize_oldest(agent)
except Exception as summarization_error:
logger.error("Summarization failed: %s", summarization_error)
raise summarization_error from e

# Ensure we don't summarize recent messages
messages_to_summarize_count = min(
messages_to_summarize_count, len(agent.messages) - self.preserve_recent_messages
)
def reduce_on_threshold(self, agent: "Agent", **kwargs: Any) -> bool:
"""Proactively reduce context by summarizing oldest messages.

if messages_to_summarize_count <= 0:
raise ContextWindowOverflowException("Cannot summarize: insufficient messages for summarization")
Args:
agent: The agent whose conversation history will be reduced.
**kwargs: Additional keyword arguments for future extensibility.

# Adjust split point to avoid breaking ToolUse/ToolResult pairs
messages_to_summarize_count = self._adjust_split_point_for_tool_pairs(
agent.messages, messages_to_summarize_count
)
Returns:
True if the history was reduced, False otherwise.
"""
self._summarize_oldest(agent)
return True

if messages_to_summarize_count <= 0:
raise ContextWindowOverflowException("Cannot summarize: insufficient messages for summarization")
def _summarize_oldest(self, agent: "Agent") -> None:
"""Summarize the oldest messages and replace them with a summary.

# Extract messages to summarize
messages_to_summarize = agent.messages[:messages_to_summarize_count]
remaining_messages = agent.messages[messages_to_summarize_count:]
Args:
agent: The agent instance.

# Keep track of the number of messages that have been summarized thus far.
self.removed_message_count += len(messages_to_summarize)
# If there is a summary message, don't count it in the removed_message_count.
if self._summary_message:
self.removed_message_count -= 1
Raises:
ContextWindowOverflowException: If there are insufficient messages for summarization.
"""
# Calculate how many messages to summarize
messages_to_summarize_count = max(1, int(len(agent.messages) * self.summary_ratio))

# Generate summary
self._summary_message = self._generate_summary(messages_to_summarize, agent)
# Ensure we don't summarize recent messages
messages_to_summarize_count = min(
messages_to_summarize_count, len(agent.messages) - self.preserve_recent_messages
)

# Replace the summarized messages with the summary
agent.messages[:] = [self._summary_message] + remaining_messages
if messages_to_summarize_count <= 0:
raise ContextWindowOverflowException("Cannot summarize: insufficient messages for summarization")

except Exception as summarization_error:
logger.error("Summarization failed: %s", summarization_error)
raise summarization_error from e
# Adjust split point to avoid breaking ToolUse/ToolResult pairs
messages_to_summarize_count = self._adjust_split_point_for_tool_pairs(
agent.messages, messages_to_summarize_count
)

if messages_to_summarize_count <= 0:
raise ContextWindowOverflowException("Cannot summarize: insufficient messages for summarization")

# Extract messages to summarize
messages_to_summarize = agent.messages[:messages_to_summarize_count]
remaining_messages = agent.messages[messages_to_summarize_count:]

# Keep track of the number of messages that have been summarized thus far.
self.removed_message_count += len(messages_to_summarize)
# If there is a summary message, don't count it in the removed_message_count.
if self._summary_message:
self.removed_message_count -= 1

# Generate summary
self._summary_message = self._generate_summary(messages_to_summarize, agent)

# Replace the summarized messages with the summary
agent.messages[:] = [self._summary_message] + remaining_messages

def _generate_summary(self, messages: list[Message], agent: "Agent") -> Message:
"""Generate a summary of the provided messages.
Expand Down
Loading
Loading