Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/claude_agent_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
SubagentStartHookSpecificOutput,
SubagentStopHookInput,
SystemMessage,
TaskNotificationMessage,
TaskNotificationStatus,
TaskProgressMessage,
TaskStartedMessage,
TaskUsage,
TextBlock,
ThinkingBlock,
ThinkingConfig,
Expand Down Expand Up @@ -333,6 +338,11 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
"UserMessage",
"AssistantMessage",
"SystemMessage",
"TaskStartedMessage",
"TaskProgressMessage",
"TaskNotificationMessage",
"TaskNotificationStatus",
"TaskUsage",
"ResultMessage",
"Message",
"ClaudeAgentOptions",
Expand Down
50 changes: 46 additions & 4 deletions src/claude_agent_sdk/_internal/message_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
ResultMessage,
StreamEvent,
SystemMessage,
TaskNotificationMessage,
TaskProgressMessage,
TaskStartedMessage,
TextBlock,
ThinkingBlock,
ToolResultBlock,
Expand Down Expand Up @@ -135,10 +138,49 @@ def parse_message(data: dict[str, Any]) -> Message | None:

case "system":
try:
return SystemMessage(
subtype=data["subtype"],
data=data,
)
subtype = data["subtype"]
match subtype:
case "task_started":
return TaskStartedMessage(
subtype=subtype,
data=data,
task_id=data["task_id"],
description=data["description"],
uuid=data["uuid"],
session_id=data["session_id"],
tool_use_id=data.get("tool_use_id"),
task_type=data.get("task_type"),
)
case "task_progress":
return TaskProgressMessage(
subtype=subtype,
data=data,
task_id=data["task_id"],
description=data["description"],
usage=data["usage"],
uuid=data["uuid"],
session_id=data["session_id"],
tool_use_id=data.get("tool_use_id"),
last_tool_name=data.get("last_tool_name"),
)
case "task_notification":
return TaskNotificationMessage(
subtype=subtype,
data=data,
task_id=data["task_id"],
status=data["status"],
output_file=data["output_file"],
summary=data["summary"],
uuid=data["uuid"],
session_id=data["session_id"],
tool_use_id=data.get("tool_use_id"),
usage=data.get("usage"),
)
case _:
return SystemMessage(
subtype=subtype,
data=data,
)
except KeyError as e:
raise MessageParseError(
f"Missing required field in system message: {e}", data
Expand Down
66 changes: 66 additions & 0 deletions src/claude_agent_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,72 @@ class SystemMessage:
data: dict[str, Any]


class TaskUsage(TypedDict):
"""Usage statistics reported in task_progress and task_notification messages."""

total_tokens: int
tool_uses: int
duration_ms: int


# Possible status values for a task_notification message.
TaskNotificationStatus = Literal["completed", "failed", "stopped"]


@dataclass
class TaskStartedMessage(SystemMessage):
"""System message emitted when a task starts.

Subclass of SystemMessage: existing ``isinstance(msg, SystemMessage)`` and
``case SystemMessage()`` checks continue to match. The base ``subtype``
and ``data`` fields remain populated with the raw payload.
"""

task_id: str
description: str
uuid: str
session_id: str
tool_use_id: str | None = None
task_type: str | None = None


@dataclass
class TaskProgressMessage(SystemMessage):
"""System message emitted while a task is in progress.

Subclass of SystemMessage: existing ``isinstance(msg, SystemMessage)`` and
``case SystemMessage()`` checks continue to match. The base ``subtype``
and ``data`` fields remain populated with the raw payload.
"""

task_id: str
description: str
usage: TaskUsage
uuid: str
session_id: str
tool_use_id: str | None = None
last_tool_name: str | None = None


@dataclass
class TaskNotificationMessage(SystemMessage):
"""System message emitted when a task completes, fails, or is stopped.

Subclass of SystemMessage: existing ``isinstance(msg, SystemMessage)`` and
``case SystemMessage()`` checks continue to match. The base ``subtype``
and ``data`` fields remain populated with the raw payload.
"""

task_id: str
status: TaskNotificationStatus
output_file: str
summary: str
uuid: str
session_id: str
tool_use_id: str | None = None
usage: TaskUsage | None = None


@dataclass
class ResultMessage:
"""Result message with cost and usage information."""
Expand Down
194 changes: 194 additions & 0 deletions tests/test_message_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
AssistantMessage,
ResultMessage,
SystemMessage,
TaskNotificationMessage,
TaskProgressMessage,
TaskStartedMessage,
TextBlock,
ThinkingBlock,
ToolResultBlock,
Expand Down Expand Up @@ -277,6 +280,197 @@ def test_parse_valid_system_message(self):
assert isinstance(message, SystemMessage)
assert message.subtype == "start"

def test_parse_task_started_message(self):
"""Test parsing a task_started system message yields a TaskStartedMessage."""
data = {
"type": "system",
"subtype": "task_started",
"task_id": "task-abc",
"tool_use_id": "toolu_01",
"description": "Reticulating splines",
"task_type": "background",
"uuid": "uuid-1",
"session_id": "session-1",
}
message = parse_message(data)
assert isinstance(message, TaskStartedMessage)
assert message.task_id == "task-abc"
assert message.description == "Reticulating splines"
assert message.uuid == "uuid-1"
assert message.session_id == "session-1"
assert message.tool_use_id == "toolu_01"
assert message.task_type == "background"

def test_parse_task_started_message_optional_fields_absent(self):
"""task_started with no optional fields should still parse, optionals set to None."""
data = {
"type": "system",
"subtype": "task_started",
"task_id": "task-abc",
"description": "Working",
"uuid": "uuid-1",
"session_id": "session-1",
}
message = parse_message(data)
assert isinstance(message, TaskStartedMessage)
assert message.tool_use_id is None
assert message.task_type is None

def test_parse_task_progress_message(self):
"""Test parsing a task_progress system message yields a TaskProgressMessage."""
data = {
"type": "system",
"subtype": "task_progress",
"task_id": "task-abc",
"tool_use_id": "toolu_01",
"description": "Halfway there",
"usage": {
"total_tokens": 1234,
"tool_uses": 5,
"duration_ms": 9876,
},
"last_tool_name": "Read",
"uuid": "uuid-2",
"session_id": "session-1",
}
message = parse_message(data)
assert isinstance(message, TaskProgressMessage)
assert message.task_id == "task-abc"
assert message.description == "Halfway there"
assert message.usage == {
"total_tokens": 1234,
"tool_uses": 5,
"duration_ms": 9876,
}
assert message.last_tool_name == "Read"
assert message.tool_use_id == "toolu_01"
assert message.uuid == "uuid-2"
assert message.session_id == "session-1"

def test_parse_task_notification_message(self):
"""Test parsing a task_notification system message yields a TaskNotificationMessage."""
data = {
"type": "system",
"subtype": "task_notification",
"task_id": "task-abc",
"tool_use_id": "toolu_01",
"status": "completed",
"output_file": "/tmp/out.md",
"summary": "All done",
"usage": {
"total_tokens": 2000,
"tool_uses": 7,
"duration_ms": 12345,
},
"uuid": "uuid-3",
"session_id": "session-1",
}
message = parse_message(data)
assert isinstance(message, TaskNotificationMessage)
assert message.task_id == "task-abc"
assert message.status == "completed"
assert message.output_file == "/tmp/out.md"
assert message.summary == "All done"
assert message.usage == {
"total_tokens": 2000,
"tool_uses": 7,
"duration_ms": 12345,
}
assert message.tool_use_id == "toolu_01"
assert message.uuid == "uuid-3"
assert message.session_id == "session-1"

def test_parse_task_notification_message_optional_fields_absent(self):
"""task_notification with no optional fields (usage, tool_use_id) still parses."""
data = {
"type": "system",
"subtype": "task_notification",
"task_id": "task-abc",
"status": "failed",
"output_file": "/tmp/out.md",
"summary": "Boom",
"uuid": "uuid-3",
"session_id": "session-1",
}
message = parse_message(data)
assert isinstance(message, TaskNotificationMessage)
assert message.status == "failed"
assert message.usage is None
assert message.tool_use_id is None

def test_task_message_backward_compat_isinstance(self):
"""Backward-compat: typed task messages are still SystemMessage instances."""
started_data = {
"type": "system",
"subtype": "task_started",
"task_id": "t1",
"description": "desc",
"uuid": "u1",
"session_id": "s1",
}
progress_data = {
"type": "system",
"subtype": "task_progress",
"task_id": "t1",
"description": "desc",
"usage": {"total_tokens": 1, "tool_uses": 0, "duration_ms": 10},
"uuid": "u2",
"session_id": "s1",
}
notif_data = {
"type": "system",
"subtype": "task_notification",
"task_id": "t1",
"status": "stopped",
"output_file": "/o",
"summary": "s",
"uuid": "u3",
"session_id": "s1",
}
started = parse_message(started_data)
progress = parse_message(progress_data)
notif = parse_message(notif_data)
# isinstance checks against the base class still work
assert isinstance(started, SystemMessage)
assert isinstance(progress, SystemMessage)
assert isinstance(notif, SystemMessage)
# match-case against SystemMessage still works
matched = False
match started:
case SystemMessage():
matched = True
assert matched

def test_task_message_backward_compat_base_fields(self):
"""Backward-compat: subtype and data fields on typed task messages are populated."""
data = {
"type": "system",
"subtype": "task_started",
"task_id": "t1",
"description": "desc",
"uuid": "u1",
"session_id": "s1",
}
message = parse_message(data)
assert isinstance(message, TaskStartedMessage)
# Base class fields still populated for legacy code paths
assert message.subtype == "task_started"
assert message.data == data
assert message.data["task_id"] == "t1"

def test_unknown_system_subtype_yields_generic(self):
"""Unknown system subtypes fall through to generic SystemMessage (not a subclass)."""
data = {"type": "system", "subtype": "some_future_subtype", "foo": "bar"}
message = parse_message(data)
assert isinstance(message, SystemMessage)
# Ensure it's exactly SystemMessage, not one of the typed subclasses
assert type(message) is SystemMessage
assert not isinstance(message, TaskStartedMessage)
assert not isinstance(message, TaskProgressMessage)
assert not isinstance(message, TaskNotificationMessage)
assert message.subtype == "some_future_subtype"
assert message.data == data

def test_parse_assistant_message_inside_subagent(self):
"""Test parsing a valid assistant message."""
data = {
Expand Down