Skip to content

Latest commit

 

History

History
1038 lines (793 loc) · 25.8 KB

File metadata and controls

1038 lines (793 loc) · 25.8 KB
title rigging.model

{/* ::: rigging.model */}

Models are the core datatypes for structured parsing.

Answer

Quick model for answers.

CommaDelimitedAnswer

Comma delimited answer (,)

DelimitedAnswer

Mixed support delimited answer (- | / ,) selected based on most-matches

items

items: list[str]

Parsed items from the content.

Description

Quick model for descriptions.

ErrorModel

from_exception

from_exception(exception: Exception) -> te.Self

Create an ErrorModel instance from an exception.

Parameters:

  • exception (Exception) –The exception to convert.

Returns:

  • Self –An instance of ErrorModel with the exception content.
```python @classmethod def from_exception(cls, exception: Exception) -> te.Self: """ Create an ErrorModel instance from an exception.
Args:
    exception: The exception to convert.

Returns:
    An instance of ErrorModel with the exception content.
"""
return cls(content=str(exception), type=type(exception).__name__)


</Accordion>

Instructions
------------

Quick model for instructions.

Model
-----

### from\_text

```python
from_text(
    content: str, *, return_errors: Literal[False] = False
) -> list[tuple[te.Self, slice]]
from_text(
    content: str, *, return_errors: Literal[True]
) -> list[tuple[te.Self | Exception, slice]]
from_text(
    content: str, *, return_errors: bool = False
) -> (
    list[tuple[te.Self, slice]]
    | list[tuple[te.Self | Exception, slice]]
)

The core parsing method which attempts to extract and parse as many valid instances of a model from semi-structured text.

Parameters:

  • content (str) –The text content to parse.

Returns:

  • list[tuple[Self, slice]] | list[tuple[Self | Exception, slice]] –A list of tuples containing the extracted models and their corresponding slices.

Raises:

  • MissingModelError –If the specified model tags are not found in the message.
  • ValidationError –If an error occurs while parsing the content.
```python @classmethod def from_text( cls, content: str, *, return_errors: bool = False, ) -> list[tuple[te.Self, slice]] | list[tuple[te.Self | Exception, slice]]: """ The core parsing method which attempts to extract and parse as many valid instances of a model from semi-structured text.
Args:
    content: The text content to parse.

Returns:
    A list of tuples containing the extracted models and their corresponding slices.

Raises:
    MissingModelError: If the specified model tags are not found in the message.
    ValidationError: If an error occurs while parsing the content.
"""
cls.ensure_valid()

tag_name = re.escape(cls.__xml_tag__ or "unknown")
pattern = f"((<({tag_name}).*?>)((.*?)</{tag_name}>))"

matches = [
    m
    for m in re.finditer(pattern, content, flags=re.DOTALL)
    if m.group(3) == cls.__xml_tag__
]

if not matches:
    raise MissingModelError(f"Failed to find '{cls.xml_tags()}' in message")

# Sort matches_with_tag based on the length of the interior text,
# longest first. This should help us avoid matching the model
# supplying hollow tags before the actual data.

sorted_matches = sorted(matches, key=lambda m: len(m.group(5)), reverse=True)

extracted: list[tuple[te.Self | Exception, slice]] = []
for match in sorted_matches:
    full_text, start_tag, _, inner_with_end_tag, inner = match.groups()

    # The model might trip up regex by including partial tags
    # in passing before actually using them. We'll continually try
    # to parse the inner text until we can't extract our model anymore.
    #
    # Example: "Sure I'll use <answer> tags: <answer>hello</answer>"
    #
    # TODO: The opposite could be true, and we could greedily parse
    # backwards if we get failures. This is a simple solution for now.

    inner_match: re.Match[str] | None = match
    slice_ = slice(match.start(), match.end())

    while inner_match is not None:
        inner_matches = re.finditer(
            pattern,
            inner_with_end_tag,
            flags=re.DOTALL,
        )
        inner_match = next(
            (m for m in inner_matches if m.group(3) == cls.__xml_tag__),
            None,
        )
        if inner_match is not None:
            slice_ = slice(
                slice_.start + len(start_tag) + inner_match.start(),
                slice_.start + len(start_tag) + inner_match.end(),
            )
            full_text, start_tag, _, inner_with_end_tag, inner = inner_match.groups()

    try:
        model = (
            cls(
                **{
                    next(iter(cls.model_fields)): unescape_xml(
                        textwrap.dedent(inner).strip()
                    )
                }
            )
            if cls.is_simple()
            else cls.from_xml(
                cls.preprocess_with_cdata(full_text),
            )
        )

        # If our model is relatively simple (only attributes and a single non-element field)
        # we should go back and update our non-element field with the extracted content.

        if not cls.is_simple() and cls.is_simple_with_attrs():
            name, field = next(
                (name, field)
                for name, field in cls.model_fields.items()
                if not isinstance(field, XmlEntityInfo)
            )
            if field.annotation in BASIC_TYPES:
                model.__dict__[name] = field.annotation(
                    unescape_xml(inner).strip(),
                )

        # Walk through any fields which are strings, and dedent them

        for field_name, field_info in cls.model_fields.items():
            if isinstance(field_info, XmlEntityInfo) and field_info.annotation == str:  # noqa: E721
                model.__dict__[field_name] = textwrap.dedent(
                    model.__dict__[field_name]
                ).strip()

        extracted.append((model, slice_))
    except Exception as e:  # noqa: BLE001
        extracted.append((e, slice_))
        continue

# sort back to original order
extracted.sort(key=lambda x: x[1].start)

if not return_errors and (
    first_error := next((m for m, _ in extracted if isinstance(m, Exception)), None)
):
    raise first_error

return (
    extracted
    if return_errors
    else [(m, s) for m, s in extracted if not isinstance(m, Exception)]
)


</Accordion>

### is\_simple

```python
is_simple() -> bool

Check if the model is "simple", meaning it has a single field with a basic datatype.

Until we refactor our XML parsing, this helps make the parsing more consistent for models which can support it.

Returns:

  • bool –True if the model is simple, False otherwise.
```python @classmethod def is_simple(cls) -> bool: """ Check if the model is "simple", meaning it has a single field with a basic datatype.
Until we refactor our XML parsing, this helps make the parsing more consistent for models
which can support it.

Returns:
    True if the model is simple, False otherwise.
"""
field_values = list(cls.model_fields.values())
if len(field_values) != 1:
    return False

# If the field is a pydantic-xml wrapper like element()
# or attr(), we'll assume the user knows what they're doing
if isinstance(field_values[0], XmlEntityInfo):
    return False

annotation = field_values[0].annotation
if t.get_origin(annotation) == t.Annotated:
    annotation = t.get_args(annotation)[0]

return annotation in BASIC_TYPES


</Accordion>

### is\_simple\_with\_attrs

```python
is_simple_with_attrs() -> bool

Check if the model would otherwise be marked as "simple", but has other fields which are all attributes. If so, we can do some parsing magic below and make sure our non-element field is updated with the extracted content properly, while pydantic-xml takes care of the attributes.

Returns:

  • bool –True if the model is simple with attrs, False otherwise.
```python @classmethod def is_simple_with_attrs(cls) -> bool: """ Check if the model would otherwise be marked as "simple", but has other fields which are all attributes. If so, we can do some parsing magic below and make sure our non-element field is updated with the extracted content properly, while pydantic-xml takes care of the attributes.
Returns:
    True if the model is simple with attrs, False otherwise.
"""
field_values = list(cls.model_fields.values())

none_entity_fields = [f for f in field_values if not isinstance(f, XmlEntityInfo)]
entity_fields = [f for f in field_values if isinstance(f, XmlEntityInfo)]
attr_fields = [f for f in entity_fields if f.location == EntityLocation.ATTRIBUTE]

if len(none_entity_fields) != 1 or len(attr_fields) != len(entity_fields):
    return False

annotation = field_values[0].annotation
if t.get_origin(annotation) == t.Annotated:
    annotation = t.get_args(annotation)[0]

return annotation in BASIC_TYPES


</Accordion>

### one\_from\_text

```python
one_from_text(
    content: str, *, fail_on_many: bool = False
) -> tuple[te.Self, slice]

Finds and returns a single match from the given text content.

Parameters:

  • content (str) –The text content to search for matches.
  • fail_on_many (bool, default: False ) –If True, raises a ValidationError if multiple matches are found.

Returns:

  • tuple[Self, slice] –A tuple containing the matched model and the slice indicating the match location.

Raises:

  • ValidationError –If multiple matches are found and fail_on_many is True.
```python @classmethod def one_from_text( cls, content: str, *, fail_on_many: bool = False, ) -> tuple[te.Self, slice]: """ Finds and returns a single match from the given text content.
Args:
    content: The text content to search for matches.
    fail_on_many: If True, raises a ValidationError if multiple matches are found.

Returns:
    A tuple containing the matched model and the slice indicating the match location.

Raises:
    ValidationError: If multiple matches are found and fail_on_many is True.
"""
matches = cls.from_text(content)
if fail_on_many and len(matches) > 1:
    raise ValidationError("Multiple matches found with 'fail_on_many=True'")
return max(matches, key=lambda x: x[1].stop - x[1].start)


</Accordion>

### preprocess\_with\_cdata

```python
preprocess_with_cdata(content: str) -> str

Process the content and attempt to auto-wrap interior field content in CDATA tags if they contain unescaped XML entities.

Parameters:

  • content (str) –The XML content to preprocess.

Returns:

  • str –The processed XML content with CDATA tags added where necessary.
```python @classmethod def preprocess_with_cdata(cls, content: str) -> str: """ Process the content and attempt to auto-wrap interior field content in CDATA tags if they contain unescaped XML entities.
Args:
    content: The XML content to preprocess.

Returns:
    The processed XML content with CDATA tags added where necessary.
"""

# This means our CDATA tags should wrap the entire content
# as the model is simple enough to not have nested elements.

if cls.is_simple() or cls.is_simple_with_attrs():
    field_map = {
        (cls.__xml_tag__ or "unknown"): next(
            info
            for info in cls.model_fields.values()
            if not isinstance(info, XmlEntityInfo)
        ),
    }
else:
    field_map = {
        (
            field_info.path
            if isinstance(field_info, XmlEntityInfo) and field_info.path
            else field_name
        ): field_info
        for field_name, field_info in cls.model_fields.items()
        if isinstance(field_info, XmlEntityInfo)
        and field_info.location == EntityLocation.ELEMENT
    }

def wrap_with_cdata(match: re.Match[str]) -> str:
    field_name = match.group(1)
    tag_attrs = match.group(2) or ""  # Handle attributes if present
    content = match.group(3)

    if field_name not in field_map:
        return match.group(0)

    field_info = field_map[field_name]
    field_type = field_info.annotation
    if t.get_origin(field_type) == t.Annotated:
        field_type = t.get_args(field_type)[0]

    is_basic_field = field_type in BASIC_TYPES
    is_already_cdata = content.strip().startswith("<![CDATA[")
    needs_escaping = escape_xml(unescape_xml(content)) != content

    if is_basic_field and not is_already_cdata and needs_escaping:
        content = f"<![CDATA[{textwrap.dedent(content).strip()}]]>"

    return f"<{field_name}{tag_attrs}>{content}</{field_name}>"

fields_pattern = "|".join(re.escape(field_name) for field_name in field_map)
pattern = f"<({fields_pattern})((?:\\s[^>]*?)?)>(.*?)</\\1>"

updated = re.sub(pattern, wrap_with_cdata, content, flags=re.DOTALL)

# If our updates created invalid XML, discard them
try:
    ET.fromstring(updated)  # noqa: S314 # nosec
except ET.ParseError:
    return content

return updated


</Accordion>

### to\_pretty\_xml

```python
to_pretty_xml(
    *,
    skip_empty: bool = False,
    exclude_none: bool = False,
    exclude_unset: bool = False,
    **_: Any,
) -> str

Converts the model to a pretty XML string with indents and newlines.

Returns:

  • str –The pretty XML representation of the model.
```python def to_pretty_xml( self, *, skip_empty: bool = False, exclude_none: bool = False, exclude_unset: bool = False, **_: t.Any, ) -> str: """ Converts the model to a pretty XML string with indents and newlines.
Returns:
    The pretty XML representation of the model.
"""
tree = self.to_xml_tree(
    skip_empty=skip_empty,
    exclude_none=exclude_none,
    exclude_unset=exclude_unset,
)
return self._serialize_tree_prettily(tree)


</Accordion>

### to\_xml

```python
to_xml(
    *,
    skip_empty: bool = False,
    exclude_none: bool = False,
    exclude_unset: bool = False,
    **kwargs: Any,
) -> str

Serializes the object to an xml string.

Parameters:

  • skip_empty (bool, default: False ) –skip empty elements (elements without sub-elements, attributes and text, Nones)
  • exclude_none (bool, default: False ) –exclude None values
  • exclude_unset (bool, default: False ) –exclude values that haven't been explicitly set
  • kwargs (Any, default: {} ) –additional xml serialization arguments

Returns:

  • str –object xml representation
```python def to_xml( self, *, skip_empty: bool = False, exclude_none: bool = False, exclude_unset: bool = False, **kwargs: t.Any, ) -> str: """ Serializes the object to an xml string.
Args:
    skip_empty: skip empty elements (elements without sub-elements, attributes and text, Nones)
    exclude_none: exclude `None` values
    exclude_unset: exclude values that haven't been explicitly set
    kwargs: additional xml serialization arguments

Returns:
    object xml representation
"""

tree = self.to_xml_tree(
    skip_empty=skip_empty,
    exclude_none=exclude_none,
    exclude_unset=exclude_unset,
)
tree = self._postprocess_with_cdata(tree)
xml = ET.tostring(tree, short_empty_elements=False, encoding="utf-8", **kwargs).decode()

# Now we can go back and safely unescape the XML
# that we observe between any CDATA tags

return unescape_cdata_tags(xml)


</Accordion>

### xml\_end\_tag

```python
xml_end_tag() -> str

Helper method which wrapped the class tag in XML braces with a leading slash.

```python @classmethod def xml_end_tag(cls) -> str: """Helper method which wrapped the class tag in XML braces with a leading slash.""" return f"" ```

xml_example

xml_example() -> str

Returns an example XML representation of the given class.

This method generates a pretty-printed XML string that includes:

  • Example values for each field, taken from the example argument in a field constructor.
  • Field descriptions as XML comments, derived from the field's docstring or the description argument.

Note: This implementation is designed for models with flat structures and does not recursively generate examples for nested models.

Returns:

  • str –A string containing the pretty-printed XML example.
```python @classmethod def xml_example(cls) -> str: """ Returns an example XML representation of the given class.
This method generates a pretty-printed XML string that includes:
- Example values for each field, taken from the `example` argument
  in a field constructor.
- Field descriptions as XML comments, derived from the field's
  docstring or the `description` argument.

Note: This implementation is designed for models with flat structures
and does not recursively generate examples for nested models.

Returns:
    A string containing the pretty-printed XML example.
"""
if cls.is_simple():
    field_info = next(iter(cls.model_fields.values()))
    example = str(next(iter(field_info.examples or []), ""))
    return f"<{cls.__xml_tag__}>{escape_xml(example)}</{cls.__xml_tag__}>"

lines = []
attribute_parts = []
element_fields = {}

for field_name, field_info in cls.model_fields.items():
    if (
        isinstance(field_info, XmlEntityInfo)
        and field_info.location == EntityLocation.ATTRIBUTE
    ):
        path = field_info.path or field_name
        example = str(next(iter(field_info.examples or []), "")).replace('"', "&quot;")
        attribute_parts.append(f'{path}="{example}"')
    else:
        element_fields[field_name] = field_info

attr_string = (" " + " ".join(attribute_parts)) if attribute_parts else ""
lines.append(f"<{cls.__xml_tag__}{attr_string}>")

for field_name, field_info in element_fields.items():
    path = (isinstance(field_info, XmlEntityInfo) and field_info.path) or field_name
    description = field_info.description
    example = str(next(iter(field_info.examples or []), ""))

    if description:
        lines.append(f"  <!-- {escape_xml(description.strip())} -->")
    if example:
        lines.append(f"  <{path}>{escape_xml(example)}</{path}>")
    else:
        lines.append(f"  <{path}/>")

lines.append(f"</{cls.__xml_tag__}>")
return "\n".join(lines)


</Accordion>

### xml\_start\_tag

```python
xml_start_tag() -> str

Helper method which wrapped the class tag in XML braces.

```python @classmethod def xml_start_tag(cls) -> str: """Helper method which wrapped the class tag in XML braces.""" return f"<{cls.__xml_tag__}>" ```

xml_tags

xml_tags() -> str

Helper method which returns the full XML tags for the class.

```python @classmethod def xml_tags(cls) -> str: """Helper method which returns the full XML tags for the class.""" return cls.xml_start_tag() + cls.xml_end_tag() ```

NewlineDelimitedAnswer

Newline delimited answer ( )

Question

Quick model for questions.

QuestionAnswer

Quick model for question-answer pairs.

answer

answer: Answer = element()

The answer

question

question: Question = element()

The question

Thinking

Quick model for thinking messages.

YesNoAnswer

Yes/No answer answer with coercion

boolean

boolean: bool

The boolean value of the answer.

make_from_schema

make_from_schema(
    schema: dict[str, Any],
    name: str | None = None,
    *,
    allow_primitive: bool = False,
) -> type[Model]

Helper to build a Rigging model dynamically from a JSON schema.

There are plenty of edge cases this doesn't handle, consider this very experimental and only suitable for simple schemas.

Parameters:

  • schema (dict[str, Any]) –The JSON schema to build the model from.
  • name (str | None, default: None ) –The name of the model (otherwise inferred from the schema).
  • allow_primitive (bool, default: False ) –If True, allows the model to be a simple primitive

Returns:

  • type[Model] –The Pydantic model class.
```python def make_from_schema( schema: dict[str, t.Any], name: str | None = None, *, allow_primitive: bool = False, ) -> type[Model]: """ Helper to build a Rigging model dynamically from a JSON schema.
Note:
    There are plenty of edge cases this doesn't handle, consider this
    very experimental and only suitable for simple schemas.

Args:
    schema: The JSON schema to build the model from.
    name: The name of the model (otherwise inferred from the schema).
    allow_primitive: If True, allows the model to be a simple primitive

Returns:
    The Pydantic model class.
"""
properties = schema.get("properties", {})
required = set(schema.get("required", []))
fields: dict[str, tuple[FieldType, FieldInfo]] = {}

# If we only have one property and the caller allows it,
# make the model "simple/primitive" by not using the element() wrapper
field_cls = Field if len(properties) == 1 and allow_primitive else element

for field_name, field_schema in properties.items():
    field_type, field_info = _process_field(field_name, field_schema)

    fields[field_name] = (
        field_type,
        field_cls(
            default=... if field_name in required else None,
            description=field_schema.get("description", ""),
            title=field_schema.get("title", ""),
            **field_info,
        )
        if isinstance(field_info, dict)
        else field_info,
    )

name = name or schema.get("title", "SchemaModel")
return create_pydantic_xml_model(name, __base__=Model, **fields)  # type: ignore [arg-type]


</Accordion>

make\_primitive
---------------

```python
make_primitive(
    name: str,
    type_: type[PrimitiveT] = str,
    *,
    tag: str | None = None,
    doc: str | None = None,
    validator: Callable[[str], str | None] | None = None,
    strip_content: bool = True,
) -> type[Primitive[PrimitiveT]]

Helper to create a simple primitive model with an optional content validator.

This API is experimental and may change in the future.

Parameters:

  • name (str) –The name of the model.
  • tag (str | None, default: None ) –The XML tag for the model.
  • doc (str | None, default: None ) –The documentation for the model.
  • validator (Callable[[str], str | None] | None, default: None ) –An optional content validator for the model.
  • strip_content (bool, default: True ) –Whether to strip the content string before pydantic validation.

Returns:

  • type[Primitive[PrimitiveT]] –The primitive model class.
```python def make_primitive( name: str, type_: type[PrimitiveT] = str, # type: ignore [assignment] *, tag: str | None = None, doc: str | None = None, validator: t.Callable[[str], str | None] | None = None, strip_content: bool = True, ) -> type[Primitive[PrimitiveT]]: """ Helper to create a simple primitive model with an optional content validator.
Note:
    This API is experimental and may change in the future.

Args:
    name: The name of the model.
    tag: The XML tag for the model.
    doc: The documentation for the model.
    validator: An optional content validator for the model.
    strip_content: Whether to strip the content string before pydantic validation.

Returns:
    The primitive model class.
"""

def _validate(value: str) -> str:
    if validator is not None:
        return validator(value) or value
    return value

if strip_content:
    type_ = t.Annotated[
        type_,
        BeforeValidator(lambda x: x.strip() if isinstance(x, str) else x),
    ]  # type: ignore [assignment]

return create_pydantic_model(
    name,
    __base__=Primitive[type_],  # type: ignore [valid-type, arg-type]
    __doc__=doc,
    __cls_kwargs__={"tag": tag},
    content=(type_, ...),
    __validators__={"content_validator": field_validator("content")(_validate)}
    if validator
    else {},
)


</Accordion>