| title | rigging.message |
|---|
{/* ::: rigging.message */}
This module covers core message objects and handling.
Content = ContentText | ContentImageUrl | ContentAudioInputThe types of content that can be included in a message.
EPHERMAL_CACHE_CONTROL = {'type': 'ephemeral'}Cache control entry for ephemeral messages.
Role = Literal['system', 'user', 'assistant', 'tool']The role of a message. Can be 'system', 'user', 'assistant', or 'tool'.
An audio content part of a message.
cache_control: dict[str, str] | None = NoneCache control entry for prompt caching.
input_audio: AudioThe audio URL content.
transcript: str | NoneReturns the transcript of the audio data.
Returns:
str | None–The transcript of the audio data.
type: Literal['input_audio'] = 'input_audio'The type of content (always input_audio).
data: strThe base64-encoded audio data.
format: strThe format of the audio data.
transcript: str | None = NoneThe transcript of the audio data (if available).
from_bytes(
data: bytes,
*,
format: ContentAudioFormat | None = None,
transcript: str | None = None,
) -> ContentAudioInputCreates a ContentAudioInput object from raw bytes.
Parameters:
data(bytes) –The raw bytes of the audio.format(ContentAudioFormat | None, default:None) –The format of the audio.
Returns:
ContentAudioInput–The created ContentAudioInput
Args:
data: The raw bytes of the audio.
format: The format of the audio.
Returns:
The created ContentAudioInput
"""
format = format or identify_audio_format(data) or "unknown" # type: ignore [assignment] # noqa: A001
encoded = base64.b64encode(data).decode()
return cls(input_audio=cls.Audio(data=encoded, format=format, transcript=transcript))
</Accordion>
### from\_file
```python
from_file(
file: Path | str,
*,
format: ContentAudioFormat | None = None,
transcript: str | None = None,
) -> ContentAudioInput
Creates a ContentAudioInput object from a file.
Parameters:
file(Path | str) –The file to create the content from.format(ContentAudioFormat | None, default:None) –The format of the audio. If not provided, it will be guessed based on the file extension.transcript(str | None, default:None) –The transcript of the audio data (if available).
Returns:
ContentAudioInput–The created ContentAudioInput object.
Args:
file: The file to create the content from.
format: The format of the audio. If not provided, it will be guessed based on the file extension.
transcript: The transcript of the audio data (if available).
Returns:
The created ContentAudioInput object.
"""
file = Path(file)
if not file.exists():
raise FileNotFoundError(f"File '{file}' does not exist")
if format is None:
mimetype = mimetypes.guess_type(file)[0]
if mimetype is None:
raise ValueError(
f"Could not determine format for file '{file}', please provide one",
)
format = t.cast("ContentAudioFormat", mimetype.split("/")[-1]) # noqa: A001
encoded = base64.b64encode(file.read_bytes()).decode()
return cls(input_audio=cls.Audio(data=encoded, format=format, transcript=transcript))
</Accordion>
### save
```python
save(path: Path | str) -> None
Saves the audio data to a file.
Parameters:
path(Path | str) –The path to save the audio to.
Args:
path: The path to save the audio to.
"""
data = self.to_bytes()
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(data)
</Accordion>
### to\_bytes
```python
to_bytes() -> bytes
Converts the audio data to bytes.
Returns:
bytes–The decoded audio data.
Returns:
The decoded audio data.
"""
return base64.b64decode(self.input_audio.data)
</Accordion>
ContentImageUrl
---------------
An image URL content part of a message.
### cache\_control
```python
cache_control: dict[str, str] | None = None
Cache control entry for prompt caching.
image_url: ImageUrlThe image URL content.
type: Literal['image_url'] = 'image_url'The type of content (always image_url).
detail: Literal['auto', 'low', 'high'] = 'auto'The detail level of the image.
url: strThe URL of the image (supports base64-encoded).
from_bytes(
data: bytes,
mimetype: str,
*,
detail: Literal["auto", "low", "high"] = "auto",
) -> ContentImageUrlCreates a ContentImageUrl object from raw bytes.
Parameters:
data(bytes) –The raw bytes of the image.mimetype(str) –The mimetype of the image.detail(Literal['auto', 'low', 'high'], default:'auto') –The detail level of the image.
Returns:
ContentImageUrl–The created ContentImageUrl
Args:
data: The raw bytes of the image.
mimetype: The mimetype of the image.
detail: The detail level of the image.
Returns:
The created ContentImageUrl
"""
encoded = base64.b64encode(data).decode()
url = f"data:{mimetype};base64,{encoded}"
return cls(image_url=cls.ImageUrl(url=url, detail=detail))
</Accordion>
### from\_file
```python
from_file(
file: Path | str,
*,
mimetype: str | None = None,
detail: Literal["auto", "low", "high"] = "auto",
) -> ContentImageUrl
Creates a ContentImageUrl object from a file.
Parameters:
file(Path | str) –The file to create the content from.mimetype(str | None, default:None) –The mimetype of the file. If not provided, it will be guessed.
Returns:
ContentImageUrl–The created ContentImageUrl object.
Args:
file: The file to create the content from.
mimetype: The mimetype of the file. If not provided, it will be guessed.
Returns:
The created ContentImageUrl object.
"""
file = Path(file)
if not file.exists():
raise FileNotFoundError(f"File '{file}' does not exist")
if mimetype is None:
mimetype = mimetypes.guess_type(file)[0]
if mimetype is None:
raise ValueError(f"Could not determine mimetype for file '{file}'")
encoded = base64.b64encode(file.read_bytes()).decode()
url = f"data:{mimetype};base64,{encoded}"
return cls(image_url=cls.ImageUrl(url=url, detail=detail))
</Accordion>
### from\_url
```python
from_url(
url: str,
*,
detail: Literal["auto", "low", "high"] = "auto",
) -> ContentImageUrl
Creates a ContentImageUrl object from a URL.
Parameters:
url(str) –The URL of the image.detail(Literal['auto', 'low', 'high'], default:'auto') –The detail level of the image.
Returns:
ContentImageUrl–The created ContentImageUrl object.
Args:
url: The URL of the image.
detail: The detail level of the image.
Returns:
The created ContentImageUrl object.
"""
return cls(image_url=cls.ImageUrl(url=url, detail=detail))
</Accordion>
### save
```python
save(path: Path | str) -> None
Saves the data to a file.
Parameters:
path(Path | str) –The path to save the image to.
Args:
path: The path to save the image to.
"""
data = self.to_bytes()
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(data)
</Accordion>
### to\_bytes
```python
to_bytes() -> bytes
Converts the data to bytes (if the URL is base64-encoded).
Returns:
bytes–The decoded image data.
Returns:
The decoded image data.
"""
if not self.image_url.url.startswith("data:"):
raise ValueError("Image URL is not base64-encoded")
return base64.b64decode(self.image_url.url.split(",")[1])
</Accordion>
ContentText
-----------
A text content part of a message.
### cache\_control
```python
cache_control: dict[str, str] | None = None
Cache control entry for prompt caching.
text: strThe text content.
type: Literal['text'] = 'text'The type of content (always text).
Message(
role: Role,
content: str | Sequence[str | Content] | None = None,
slices: Sequence[MessageSlice] | None = None,
tool_calls: Sequence[ToolCall]
| Sequence[dict[str, Any]]
| None = None,
tool_call_id: str | None = None,
cache_control: Literal["ephemeral"]
| dict[str, str]
| None = None,
**kwargs: Any,
)Represents a message with role, content, and parsed message parts.
Historically, `content` was a string, but multi-modal LLMs require us to have a more structured content representation.For interface stability, content will remain a property
accessor for the text of a message, but the "real" content
is available in content_parts. During serialization, we rename
content_parts to content for compatibility.
content = [content] if isinstance(content, str) else content
content_parts = [
ContentText(text=dedent(part)) if isinstance(part, str) else part for part in content
]
if cache_control is not None and content_parts:
content_parts[-1].cache_control = (
cache_control if isinstance(cache_control, dict) else EPHERMAL_CACHE_CONTROL
)
super().__init__(
role=role,
content_parts=content_parts,
slices=slices or [],
tool_calls=tool_calls,
tool_call_id=tool_call_id,
**kwargs,
)
for slice_ in self.slice_refs:
slice_._message = self # noqa: SLF001
</Accordion>
### all\_content
```python
all_content: str | list[Content]
Returns all content parts of the message or the single text content part as a string.
Deprecated - Use .content_parts instead
compatibility_flags: set[CompatibilityFlag] = Field(
default_factory=set, repr=False
)Compatibility flags to be applied when conversions occur.
content: strThe content of the message as a string. If multiple text parts are present, they will be concatenated together with newlines in between.
This is considered the ground truth for slices of this message. In other words, slices do not take into account any structured content parts like images or audio.
If you need to access the structured content parts, use .content_parts.
content_parts: list[Content] = Field([], repr=False)Interior str content or structured content parts.
metadata: dict[str, Any] = Field(
default_factory=dict, repr=False
)Metadata associated with the message.
models: list[Model]Returns a list of all models available in slices of the message.
parts: list[Any]Deprecated - iterate through .slices instead
role: RoleThe role of the message.
slices: list[MessageSlice]The slices of the message content.
tool_call_id: str | None = Field(None)Associated call id if this message is a response to a tool call.
tool_calls: list[ToolCall] | None = Field(None)The tool calls associated with the message.
uuid: UUID = Field(default_factory=uuid4, repr=False)The unique identifier for the message.
append_slice(
content: str | Model,
slice_type: SliceType | None = None,
*,
obj: SliceObj | None = None,
metadata: dict[str, Any] | None = None,
) -> MessageSliceAdd content to the end of the message (with newline separator) and create a slice tracking it.
Type defaults to 'model' for Model objects, 'other' for strings.
Parameters:
content(str | Model) –The content to append. This can be a string or a Model instance.slice_type(SliceType | None, default:None) –The type of slice to create, inferred from content type if not provided.obj(SliceObj | None, default:None) –The object associated with the slicemetadata(dict[str, Any] | None, default:None) –Additional metadata for the slice
Returns:
MessageSlice–The created MessageSlice
Type defaults to 'model' for Model objects, 'other' for strings.
Args:
content: The content to append. This can be a string or a Model instance.
slice_type: The type of slice to create, inferred from content type if not provided.
obj: The object associated with the slice
metadata: Additional metadata for the slice
Returns:
The created MessageSlice
"""
if isinstance(content, Model):
content_str = content.to_pretty_xml()
slice_type = slice_type or "model"
obj = obj or content
else:
content_str = content
slice_type = slice_type or "other"
start_pos = len(self.content) + (
1 if self.content_parts else 0 # +1 for newline if not empty
)
self.content_parts.append(ContentText(text=content_str))
return self._add_slice(
MessageSlice(
type=slice_type,
obj=obj,
start=start_pos,
stop=len(self.content),
metadata=metadata or {},
),
)
</Accordion>
### apply
```python
apply(**kwargs: str) -> Message
Applies the given keyword arguments with string templating to the content of the message.
Uses string.Template.safe_substitute underneath.
This call produces a clone of the message, leaving the original message unchanged.Parameters:
**kwargs(str, default:{}) –Keyword arguments to substitute in the message content.
Uses [string.Template.safe_substitute](https://docs.python.org/3/library/string.html#string.Template.safe_substitute) underneath.
Note:
This call produces a clone of the message, leaving the original message unchanged.
Args:
**kwargs: Keyword arguments to substitute in the message content.
"""
new = self.clone()
template = string.Template(new.content)
new.content = template.safe_substitute(**kwargs)
return new
</Accordion>
### apply\_to\_list
```python
apply_to_list(
messages: Sequence[Message], **kwargs: str
) -> list[Message]
Helper function to apply keyword arguments to a list of Message objects.
```python @classmethod def apply_to_list(cls, messages: t.Sequence["Message"], **kwargs: str) -> list["Message"]: """Helper function to apply keyword arguments to a list of Message objects.""" return [message.apply(**kwargs) for message in messages] ```cache(
cache_control: dict[str, str] | bool = True,
) -> MessageUpdate cache control settings for this message.
Parameters:
cache_control(dict[str, str] | bool, default:True) –The cache control settings to apply to the message. IfFalse, all cache control settings will be removed. IfTrue, the default ephemeral cache control will be applied. If a dictionary, it will be applied as the cache control settings.
Returns:
Message–The updated message.
Args:
cache_control: The cache control settings to
apply to the message. If `False`, all cache
control settings will be removed. If `True`,
the default ephemeral cache control will be applied.
If a dictionary, it will be applied as the cache control settings.
Returns:
The updated message.
"""
for part in self.content_parts:
part.cache_control = None
if cache_control is False:
return self
if cache_control is True:
cache_control = EPHERMAL_CACHE_CONTROL
if not isinstance(cache_control, dict):
raise TypeError(f"Invalid cache control: {cache_control}")
self.content_parts[-1].cache_control = cache_control
return self
</Accordion>
### clone
```python
clone() -> Message
Creates a copy of the message.
```python def clone(self) -> "Message": """Creates a copy of the message.""" return Message( role=self.role, content=copy.deepcopy(self.content_parts), slices=copy.deepcopy(self.slices), tool_calls=copy.deepcopy(self.tool_calls), tool_call_id=self.tool_call_id, metadata=copy.deepcopy(self.metadata), compatibility_flags=copy.deepcopy(self.compatibility_flags), ) ```find_slices(
slice_type: SliceType | None = None,
filter_fn: Callable[[MessageSlice], bool] | None = None,
*,
reverse: bool = False,
) -> list[MessageSlice]Find slices with simple filtering.
Parameters:
slice_type(SliceType | None, default:None) –Filter by slice typefilter_fn(Callable[[MessageSlice], bool] | None, default:None) –Custom filter function called for each slice
Returns:
list[MessageSlice]–List of matching slices
Args:
slice_type: Filter by slice type
filter_fn: Custom filter function called for each slice
Returns:
List of matching slices
"""
results = []
for slice_obj in self.iter_slices(slice_type=slice_type, reverse=reverse):
if filter_fn is not None and not filter_fn(slice_obj):
continue
results.append(slice_obj)
return results
</Accordion>
### fit
```python
fit(
message: Union[Message, MessageDict, Content, str],
) -> Message
Helper function to convert various common types to a Message object.
```python @classmethod def fit(cls, message: t.Union["Message", MessageDict, Content, str]) -> "Message": """Helper function to convert various common types to a Message object.""" if isinstance(message, (str, *ContentTypes)): return cls(role="user", content=[message]) return ( cls.model_validate(message) if isinstance(message, dict) else message.model_copy(deep=True) ) ```fit_as_list(
messages: Sequence[MessageDict]
| Sequence[Message]
| MessageDict
| Message
| Content
| str,
) -> list[Message]Helper function to convert various common types to a strict list of Message objects.
```python @classmethod def fit_as_list( cls, messages: "t.Sequence[MessageDict] | t.Sequence[Message] | MessageDict | Message | Content | str", ) -> list["Message"]: """Helper function to convert various common types to a strict list of Message objects.""" if isinstance(messages, (Message, dict, str, *ContentTypes)): return [cls.fit(messages)] return [cls.fit(message) for message in messages] ```from_model(
models: Model | Sequence[Model],
role: Role = "user",
suffix: str | None = None,
) -> MessageCreate a Message object from one or more Model objects.
Parameters:
models(Model | Sequence[Model]) –The Model object(s) to convert to a Message.role(Role, default:'user') –The role of the Message.suffix(str | None, default:None) –A suffix to append to the content.
Returns:
Message–The created Message object.
Args:
models: The Model object(s) to convert to a Message.
role: The role of the Message.
suffix: A suffix to append to the content.
Returns:
The created Message object.
"""
slices_: list[MessageSlice] = []
content: str = ""
for model in models if isinstance(models, list) else [models]:
model_xml = model.to_pretty_xml()
slice_ = slice(len(content), len(content) + len(model_xml))
content += f"{model_xml}\n"
slices_.append(
MessageSlice(
type="model",
obj=model,
start=slice_.start,
stop=slice_.stop,
metadata={"model_type": model.__class__.__name__},
),
)
if suffix is not None:
content += f"\n{suffix}"
return cls(role=role, content=content, slices=slices_)
</Accordion>
### get\_slice
```python
get_slice(
slice_type: SliceType | None = None,
*,
select: Literal["first", "last"] = "first",
) -> MessageSlice | None
Get a single slice of the message, optionally filtering by type.
Parameters:
slice_type(SliceType | None, default:None) –Optional type or string to filter slices by.select(Literal['first', 'last'], default:'first') –Which slice to return - 'first' or 'last'.
Returns:
MessageSlice | None–The requested MessageSlice or None if not found.
Args:
slice_type: Optional type or string to filter slices by.
select: Which slice to return - 'first' or 'last'.
Returns:
The requested MessageSlice or None if not found.
"""
return next(self.iter_slices(slice_type=slice_type, reverse=(select == "last")), None)
</Accordion>
### iter\_slices
```python
iter_slices(
slice_type: SliceType
| Iterable[SliceType]
| None = None,
*,
reverse: bool = False,
) -> t.Iterator[MessageSlice]
Iterate over slices of the message, optionally filtering by type.
Parameters:
slice_type(SliceType | Iterable[SliceType] | None, default:None) –Optional type or iterable of types to filter slices by.reverse(bool, default:False) –If True, iterate in reverse order.
Returns:
Iterator[MessageSlice]–An iterator over MessageSlice objects.
Args:
slice_type: Optional type or iterable of types to filter slices by.
reverse: If True, iterate in reverse order.
Returns:
An iterator over MessageSlice objects.
"""
sorted_slices = sorted(self.slices, key=lambda s: s.start, reverse=reverse)
if slice_type is None:
return iter(sorted_slices)
slice_type = [slice_type] if isinstance(slice_type, str) else slice_type
if isinstance(slice_type, (list, tuple)):
return (s for s in sorted_slices if s.type in slice_type)
return (s for s in sorted_slices if s.type == slice_type)
</Accordion>
### mark\_slice
```python
mark_slice(
target: str
| tuple[int, int]
| Literal[-1]
| Pattern[str]
| type[Model],
slice_type: SliceType | None = None,
*,
obj: SliceObj | None = None,
metadata: dict[str, Any] | None = None,
select: Literal["first", "last"] = "first",
case_sensitive: bool = True,
) -> MessageSlice | None
mark_slice(
target: str
| tuple[int, int]
| Literal[-1]
| Pattern[str]
| type[Model],
slice_type: SliceType | None = None,
*,
obj: SliceObj | None = None,
metadata: dict[str, Any] | None = None,
select: Literal["all"],
case_sensitive: bool = True,
) -> list[MessageSlice]mark_slice(
target: str
| tuple[int, int]
| Literal[-1]
| Pattern[str]
| type[Model],
slice_type: SliceType | None = None,
*,
obj: SliceObj | None = None,
metadata: dict[str, Any] | None = None,
select: Literal["first", "last", "all"] = "first",
case_sensitive: bool = True,
) -> MessageSlice | list[MessageSlice] | NoneMark existing content as slices without modifying content.
Parameters:
target(str | tuple[int, int] | Literal[-1] | Pattern[str] | type[Model]) –What to mark as a slice:- str: Find this text in content
- tuple[int, int]: Mark this exact range
- "*" or -1: Mark entire message content
- re.Pattern: Find matches of this pattern
- type[Model]: Parse and mark instances of this model type
slice_type(SliceType | None, default:None) –The type of slice to createobj(SliceObj | None, default:None) –The object associated with the slicemetadata(dict[str, Any] | None, default:None) –Additional metadata for the sliceselect(Literal['first', 'last', 'all'], default:'first') –Which matches to return - 'first', 'last', or 'all'case_sensitive(bool, default:True) –Whether string search should be case sensitive
Returns:
MessageSlice | list[MessageSlice] | None–If select='first'/'last': MessageSlice or None if no matches, otherwise if select='all': list[MessageSlice] (empty if no matches)
Args:
target: What to mark as a slice:
- str: Find this text in content
- tuple[int, int]: Mark this exact range
- "*" or -1: Mark entire message content
- re.Pattern: Find matches of this pattern
- type[Model]: Parse and mark instances of this model type
slice_type: The type of slice to create
obj: The object associated with the slice
metadata: Additional metadata for the slice
select: Which matches to return - 'first', 'last', or 'all'
case_sensitive: Whether string search should be case sensitive
Returns:
If select='first'/'last': MessageSlice or None if no matches, otherwise if select='all': list[MessageSlice] (empty if no matches)
"""
matches: list[tuple[int, int]] = []
objects: list[SliceObj] = []
content = self.content
# Mark entire content
if content and (target in (-1, "*")):
matches = [(0, len(content))]
# Direct range specification - validate bounds
elif isinstance(target, tuple):
start, stop = target
if not (0 <= start < len(content) and start < stop <= len(content)):
warnings.warn(
f"Invalid range ({start}, {stop}) for content length {len(content)}",
MessageWarning,
stacklevel=2,
)
matches = []
else:
matches = [(start, stop)]
elif isinstance(target, str):
# Handle empty string case
if not target:
warnings.warn("Empty string target provided", MessageWarning, stacklevel=2)
matches = []
# Find all occurrences of the string (case insensitive by default)
else:
search_content = content.lower() if not case_sensitive else content
search_target = target.lower() if not case_sensitive else target
start = 0
while True:
pos = search_content.find(search_target, start)
if pos == -1:
break
matches.append((pos, pos + len(target)))
start = pos + 1
# Find all regex matches
elif isinstance(target, re.Pattern):
matches = [(match.start(), match.end()) for match in target.finditer(content)]
# Parse and mark instances of this model type from content
elif isinstance(target, type) and issubclass(target, Model):
try:
parsed_models = try_parse_many(content, target)
for model, slice_range in parsed_models:
matches.append((slice_range.start, slice_range.stop))
objects.append(model)
except Exception as e: # noqa: BLE001
warnings.warn(
f"Failed to parse {target.__name__} from content: {e}",
MessageWarning,
stacklevel=2,
)
matches = []
if not objects:
objects = [obj] * len(matches)
# Create base slices for storage
created_slices = []
for (start, stop), obj_ in zip(matches, objects, strict=True):
base_slice = MessageSlice(
type=slice_type
or ("model" if isinstance(target, type) and issubclass(target, Model) else "other"),
obj=obj_,
start=start,
stop=stop,
metadata=metadata or {},
)
created_slices.append(self._add_slice(base_slice))
if select == "first":
return created_slices[0] if created_slices else None
if select == "last":
return created_slices[-1] if created_slices else None
return created_slices
</Accordion>
### meta
```python
meta(**kwargs: Any) -> Message
Updates the metadata of the message with the provided key-value pairs.
Parameters:
**kwargs(Any, default:{}) –Key-value pairs representing the metadata to be updated.
Returns:
Message–The updated message.
Args:
**kwargs: Key-value pairs representing the metadata to be updated.
Returns:
The updated message.
"""
self.metadata.update(kwargs)
return self
</Accordion>
### parse
```python
parse(model_type: type[ModelT]) -> ModelT
Parses a model from the message content.
Parameters:
model_type(type[ModelT]) –The type of model to parse.
Returns:
ModelT–The parsed model.
Raises:
ValueError–If no models of the given type are found andfail_on_missingis set toTrue.
Args:
model_type: The type of model to parse.
Returns:
The parsed model.
Raises:
ValueError: If no models of the given type are found and `fail_on_missing` is set to `True`.
"""
return self.try_parse_many(model_type, fail_on_missing=True)[0]
</Accordion>
### parse\_many
```python
parse_many(*types: type[ModelT]) -> list[ModelT]
Parses multiple models of the specified non-identical types from the message content.
Parameters:
*types(type[ModelT], default:()) –The types of models to parse.
Returns:
list[ModelT]–A list of parsed models.
Raises:
MissingModelError–If any of the models are missing.
Args:
*types: The types of models to parse.
Returns:
A list of parsed models.
Raises:
MissingModelError: If any of the models are missing.
"""
return self.try_parse_many(*types, fail_on_missing=True)
</Accordion>
### parse\_set
```python
parse_set(
model_type: type[ModelT], minimum: int | None = None
) -> list[ModelT]
Parses a set of models of the specified identical type from the message content.
Parameters:
model_type(type[ModelT]) –The type of models to parse.minimum(int | None, default:None) –The minimum number of models required.
Returns:
list[ModelT]–A list of parsed models.
Raises:
MissingModelError–If the minimum number of models is not met.
Args:
model_type: The type of models to parse.
minimum: The minimum number of models required.
Returns:
A list of parsed models.
Raises:
MissingModelError: If the minimum number of models is not met.
"""
return self.try_parse_set(model_type, minimum=minimum, fail_on_missing=True)
</Accordion>
### remove\_slices
```python
remove_slices(
*slices: MessageSlice | str | SliceType | type[Any],
) -> list[MessageSlice]
Removes and returns slices from the message that match the given object.
If the object is a string, it will find slices that match the string content.
If the object is a SliceType, it will find slices of that type.
If the object is a type, it will find slices that have an obj of that type.
If the object is a MessageSlice, it will remove that slice exactly.
Parameters:
*slices(MessageSlice | str | SliceType | type[Any], default:()) –The slices to remove. Can be aMessageSlice, a string, aSliceType, or a type.
Returns:
list[MessageSlice]–The removedMessageSliceRefobjects.
If the object is a string, it will find slices that match the string content.
If the object is a `SliceType`, it will find slices of that type.
If the object is a type, it will find slices that have an `obj` of that type.
If the object is a `MessageSlice`, it will remove that slice exactly.
Args:
*slices: The slices to remove. Can be a `MessageSlice`, a string, a `SliceType`, or a type.
Returns:
The removed `MessageSliceRef` objects.
"""
removed: list[MessageSlice] = []
matching_slices: list[MessageSlice] = []
for slice_ in slices:
for existing in self.slices:
if (
(
isinstance(slice_, str)
and slice_ in t.get_args(SliceType)
and existing.type == slice_
)
or (
isinstance(slice_, str)
and slice_ not in t.get_args(SliceType)
and self.content[existing.slice_].lower() == slice_.lower()
)
or (
isinstance(slice_, type)
and existing.obj
and isinstance(existing.obj, slice_)
)
or (isinstance(slice_, MessageSlice) and existing == slice_)
):
matching_slices.append(existing) # noqa: PERF401
removed = [
self._remove_slice(matched)
for matched in sorted(matching_slices, key=lambda s: s.start, reverse=True)
]
self.content = self.content.strip()
return removed
</Accordion>
### replace\_with\_slice
```python
replace_with_slice(
content: str | Model,
slice_type: SliceType | None = None,
*,
obj: SliceObj | None = None,
metadata: dict[str, Any] | None = None,
) -> MessageSlice
Replace all message content and create a slice tracking the new content.
Type defaults to 'model' for Model objects, 'other' for strings.
Parameters:
content(str | Model) –The content to replace with. This can be a string or a Model instance.slice_type(SliceType | None, default:None) –The type of slice to create, inferred from content type if not provided.obj(SliceObj | None, default:None) –The object associated with the slicemetadata(dict[str, Any] | None, default:None) –Additional metadata for the slice
Returns:
MessageSlice–The created MessageSlice
Type defaults to 'model' for Model objects, 'other' for strings.
Args:
content: The content to replace with. This can be a string or a Model instance.
slice_type: The type of slice to create, inferred from content type if not provided.
obj: The object associated with the slice
metadata: Additional metadata for the slice
Returns:
The created MessageSlice
"""
# Clear existing content and slices
self.content_parts = []
self.slices = []
return self.append_slice(
content,
slice_type=slice_type,
obj=obj,
metadata=metadata,
)
</Accordion>
### shorten
```python
shorten(max_length: int, sep: str = '...') -> Message
Shortens the message content to at most max_length characters long by removing the middle of the string
Parameters:
max_length(int) –The maximum length of the message content.sep(str, default:'...') –The separator to use when shortening the content.
Returns:
Message–The shortened message.
Args:
max_length: The maximum length of the message content.
sep: The separator to use when shortening the content.
Returns:
The shortened message.
"""
new = self.clone()
new.content = shorten_string(new.content, max_length, sep=sep)
return new
</Accordion>
### strip
```python
strip(obj: SliceType | type[Any]) -> list[MessageSlice]
Removes and returns all slices of the specified type from the message.
This is a deprecated method, use remove_slice() instead.
Parameters:
obj(SliceType | type[Any]) –The type of slice to remove. Can be aSliceTypeor a model class. If a model class is provided, it will remove all slices that have a model of that type.
Returns:
list[MessageSlice]–A list of removed slices.
This is a deprecated method, use `remove_slice()` instead.
Args:
obj: The type of slice to remove. Can be a `SliceType` or a model class.
If a model class is provided, it will remove all slices
that have a model of that type.
Returns:
A list of removed slices.
"""
warnings.warn(
".strip() is deprecated, use .remove_slice() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.remove_slices(obj)
</Accordion>
### to\_openai
```python
to_openai(
*,
compatibility_flags: set[CompatibilityFlag]
| None = None,
) -> dict[str, t.Any]
Converts the message to the OpenAI-compatible JSON format. This should be the primary way to serialize a message for use with APIs.
Returns:
dict[str, Any]–The serialized message.
Returns:
The serialized message.
"""
compatibility_flags = compatibility_flags or self.compatibility_flags
include_fields = {"role", "content_parts"}
if "skip_tools" not in compatibility_flags:
include_fields.add("tool_calls")
include_fields.add("tool_call_id")
obj = self.model_dump(
include=include_fields,
exclude_none=True,
)
# Some backwards compatibility for single text content
# which we'll load straight into the content value as opposed
# to a list of content parts.
if (
len(obj["content"]) == 1
and list(obj["content"][0].keys()) == ["type", "text"]
and obj["content"][0].get("type") == "text"
):
compatibility_flags.add("content_as_str")
# Walk content parts and add a `\n` to the end of any text parts
# which are followed by another text part (if not already present).
#
# This prevents model API behaviors from just concatenating the text
# parts together without any separation which confuses the model.
for i, current in enumerate(obj.get("content", [])):
if i == len(obj["content"]) - 1:
break
next_ = obj["content"][i + 1]
if (
isinstance(current, dict)
and current.get("type") == "text"
and next_.get("type") == "text"
and not str(current.get("text", "")).endswith("\n")
):
current["text"] += "\n"
# Strip any transcript parts from audio input
for part in obj.get("content", []):
if isinstance(part, dict) and part.get("type") == "input_audio":
part.get("input_audio", {}).pop("transcript", None)
# If enabled, we need to convert our content to a flat
# string for API compatibility. Groq is an example of an API
# which will complain for some roles if we send a list of content parts.
if "content_as_str" in compatibility_flags:
obj["content"] = "".join(
part["text"]
for part in obj["content"]
if isinstance(part, dict) and part.get("type") == "text"
)
return obj
</Accordion>
### to\_openai\_spec
```python
to_openai_spec() -> dict[str, t.Any]
Converts the message to the OpenAI-compatible JSON format. This should be the primary way to serialize a message for use with APIs.
Deprecated - Use .to_openai instead
Deprecated - Use `.to_openai` instead
"""
warnings.warn(
".to_openai_spec() is deprecated, use .to_openai() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.to_openai()
</Accordion>
### truncate
```python
truncate(
max_length: int, suffix: str = "\n[truncated]"
) -> Message
Truncates the message content to a maximum length.
Parameters:
max_length(int) –The maximum length of the message content.
Returns:
Message–The truncated message.
Args:
max_length: The maximum length of the message content.
Returns:
The truncated message.
"""
new = self.clone()
new.content = truncate_string(new.content, max_length, suf=suffix)
return new
</Accordion>
### try\_parse
```python
try_parse(model_type: type[ModelT]) -> ModelT | None
Tries to parse a model from the message content.
Parameters:
model_type(type[ModelT]) –The type of model to search for.
Returns:
ModelT | None–The first model that matches the given model type, or None if no match is found.
Args:
model_type: The type of model to search for.
Returns:
The first model that matches the given model type, or None if no match is found.
"""
return next(iter(self.try_parse_many(model_type)), None)
</Accordion>
### try\_parse\_many
```python
try_parse_many(
*types: type[ModelT], fail_on_missing: bool = False
) -> list[ModelT]
Tries to parse multiple models from the content of the message.
Parameters:
*types(type[ModelT], default:()) –The types of models to parse.fail_on_missing(bool, default:False) –Whether to raise an exception if a model type is missing.
Returns:
list[ModelT]–A list of parsed models.
Raises:
MissingModelError–If a model type is missing andfail_on_missingis True.
Args:
*types: The types of models to parse.
fail_on_missing: Whether to raise an exception if a model type is missing.
Returns:
A list of parsed models.
Raises:
MissingModelError: If a model type is missing and `fail_on_missing` is True.
"""
model: ModelT
parsed: list[tuple[ModelT, slice]] = try_parse_many(
self.content,
*types,
fail_on_missing=fail_on_missing,
)
for model, slice_ in parsed:
self._add_slice(
MessageSlice(
type="model",
obj=model,
start=slice_.start,
stop=slice_.stop,
metadata={"model_type": model.__class__.__name__},
),
)
return [p[0] for p in parsed]
</Accordion>
### try\_parse\_set
```python
try_parse_set(
model_type: type[ModelT],
minimum: int | None = None,
fail_on_missing: bool = False,
) -> list[ModelT]
Tries to parse a set of models from the message content.
Parameters:
model_type(type[ModelT]) –The type of model to parse.minimum(int | None, default:None) –The minimum number of models expected.fail_on_missing(bool, default:False) –Whether to raise an exception if models are missing.
Returns:
list[ModelT]–The parsed models.
Raises:
MissingModelError–If the number of parsed models is less than the minimum required.
Args:
model_type: The type of model to parse.
minimum: The minimum number of models expected.
fail_on_missing: Whether to raise an exception if models are missing.
Returns:
The parsed models.
Raises:
MissingModelError: If the number of parsed models is less than the minimum required.
"""
models = self.try_parse_many(model_type, fail_on_missing=fail_on_missing)
if minimum is not None and len(models) < minimum:
raise MissingModelError(f"Expected at least {minimum} {model_type.__name__} in message")
return models
</Accordion>
MessageDict
-----------
Helper to represent a [rigging.message.Message][] as a dictionary.
### content
```python
content: str | list[Any]
The content of the message.
role: RoleThe role of the message.
Represents a slice content within a message.
This can be a tool call, tool response, or model output. You can associate metadata with the slice to add rich information like scores, confidence levels, or reward information.
content: strGet the content text for this slice from the parent message.
metadata: dict[str, Any] = Field(default_factory=dict)Metadata associated with the slice.
obj: SerializeAsAny[SliceObj] | None = Field(
default=None, repr=False
)The model, tool call, or other object associated with the slice.
slice_: sliceReturns the slice representing the range into the message content.
start: intThe start index of the slice.
stop: intThe stop index of the slice.
type: SliceTypeThe type of the slice.
__len__() -> intReturns the length of the slice.
```python def __len__(self) -> int: """Returns the length of the slice.""" return self.stop - self.start ```__str__() -> strReturns a string representation of the slice.
```python def __str__(self) -> str: """Returns a string representation of the slice.""" content = shorten_string(self.content if self._message else "[detached]", 50) obj = self.obj.__class__.__name__ if self.obj else None return f"MessageSlice(type='{self.type}', start={self.start}, stop={self.stop} obj={obj} content='{content}')" ```clone() -> MessageSliceCreates a deep copy of the MessageSlice.
Returns:
MessageSlice–A new MessageSlice instance with the same properties.
Returns:
A new MessageSlice instance with the same properties.
"""
cloned = MessageSlice(
type=self.type,
obj=self.obj,
start=self.start,
stop=self.stop,
metadata=copy.deepcopy(self.metadata),
)
# Leaving this detached to align with tests
# cloned._message = self._message
return cloned # noqa: RET504
</Accordion>
inject\_system\_content
-----------------------
```python
inject_system_content(
messages: list[Message], content: str
) -> list[Message]
Injects content into a list of messages as a system message.
If the message list is empty or the first message is not a system message, a new system message with the given content is inserted at the beginning of the list. If the first message is a system message, the content is appended to it.Parameters:
messages(list[Message]) –The list of messages to modify.content(str) –The content to be injected.
Returns:
list[Message]–The modified list of messages
Note:
If the message list is empty or the first message is not a system message,
a new system message with the given content is inserted at the beginning of the list.
If the first message is a system message, the content is appended to it.
Args:
messages: The list of messages to modify.
content: The content to be injected.
Returns:
The modified list of messages
"""
if content.strip() == "":
return messages
if len(messages) == 0 or messages[0].role != "system":
messages.insert(0, Message(role="system", content=content))
elif messages[0].role == "system" and content not in messages[0].content:
messages[0].content += "\n\n" + content
return messages
</Accordion>
strip\_system\_content
----------------------
```python
strip_system_content(
messages: list[Message], content: str
) -> list[Message]
Strips the system message from a list of messages.
Parameters:
messages(list[Message]) –The list of messages to modify.
Returns:
list[Message]–The modified list of messages without the system message.
Args:
messages: The list of messages to modify.
Returns:
The modified list of messages without the system message.
"""
if content.strip() == "":
return messages
if not messages or messages[0].role != "system":
return messages
system_message = messages[0]
system_message.content = system_message.content.replace(content, "").strip()
if system_message.content == "":
return messages[1:]
return messages
</Accordion>