This guide covers how ECS Agent selects, constructs, and configures LLM models.
The high-level rule is simple:
- Use
Model(...)when you already know the endpoint settings. - Use
get_model("provider/model", registry=...)when you want registry-based routing. - Drop down to
ProviderConfigand concrete model classes only when you need lower-level control.
The recommended way to create any LLM model is the Model(...) factory. It automatically selects the correct implementation class (OpenAIModel, ClaudeModel, or LiteLLMModel) based on the api_format or model_type argument.
from ecs_agent.providers import Model
from ecs_agent.providers.config import ApiFormat
# OpenAI-compatible Chat Completions
model = Model(
"qwen3.5-flash",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS,
)
# OpenAI Responses API
model = Model(
"qwen3.5-flash",
base_url="https://dashscope.aliyuncs.com/api/v2/apps/protocols/compatible-mode/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_RESPONSES,
)
# Anthropic Messages API
model = Model(
"claude-3-5-haiku-latest",
base_url="https://api.anthropic.com",
api_key="your-api-key",
api_format=ApiFormat.ANTHROPIC_MESSAGES,
)
# Explicit model_type (skips api_format inference)
from ecs_agent.providers import ModelType
model = Model("gpt-4o", base_url="...", api_key="...", model_type=ModelType.OPENAI)
# api_format as string
model = Model("gpt-4o", base_url="...", api_key="...", api_format="openai_chat_completions")| Parameter | Type | Default | Description |
|---|---|---|---|
model_id |
str |
(required, positional) | Model name, e.g. "gpt-4o", "claude-3-5-haiku-latest" |
base_url |
str |
(required) | Provider endpoint base URL |
api_key |
str |
(required) | Bearer token / API key |
api_format |
ApiFormat | str | None |
None |
Wire protocol. Overrides model_type default when both are given. Conflicts raise ValueError. |
model_type |
str | type | None |
None |
Implementation to use: "openai", "claude", "litellm", or the class itself. Inferred from api_format when omitted. |
provider_id |
str |
"" |
Label stored in ProviderConfig (not used at runtime) |
extra_headers |
dict[str, str] | None |
None |
Additional HTTP headers |
timeout |
float | None |
None |
Global request timeout (seconds) |
enable_store |
bool |
False |
Enable conversation storage (Responses API feature) |
**kwargs |
Forwarded to the underlying model constructor (e.g. connect_timeout, max_tokens) |
api_formatonly: infersmodel_type(OPENAI_CHAT_COMPLETIONS/OPENAI_RESPONSES→openai;ANTHROPIC_MESSAGES→claude).model_typeonly: infers defaultapi_format(openai→OPENAI_CHAT_COMPLETIONS;claude→ANTHROPIC_MESSAGES).- Both given:
api_formatoverrides the model_type default. Incompatible combinations raiseValueErrorcontaining "conflict". - Neither given: raises
ValueError.
The model stack is organized around three linked concepts: a canonical model ID (provider/model) that carries both routing provider and API model name, a ProviderConfig that defines endpoint/auth/protocol settings, and event-driven accounting that measures usage and cache behavior. The quick start below shows the full end-to-end flow.
import os
from ecs_agent.accounting.subscriber import AccountingSubscriber
from ecs_agent.core import World
from ecs_agent.providers.registry import ProviderRegistry, get_model
# 1) Load provider configs from TOML file
registry = ProviderRegistry.from_toml("providers.toml")
# or inline:
# registry = ProviderRegistry.from_dict({
# "aliyun": {
# "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
# "api_format": "openai_chat_completions",
# "api_key_env": "LLM_API_KEY",
# }
# })
# 2) One call: model ID → correct LLMModel instance (determined by api_format)
model = get_model("aliyun/qwen3.5-flash", registry=registry)
# 3) Attach accounting to the World's event bus
world = World()
subscriber = AccountingSubscriber()
subscriber.subscribe(world.event_bus)ProviderRegistry maps provider IDs to endpoint/auth/protocol configs. get_model resolves a provider/model identifier and delegates final construction to the unified Model(...) entry point.
[providers.aliyun]
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
api_format = "openai_chat_completions"
api_key_env = "LLM_API_KEY"
[providers.aliyun-responses]
base_url = "https://dashscope.aliyuncs.com/api/v2/apps/protocols/compatible-mode/v1"
api_format = "openai_responses"
api_key_env = "LLM_API_KEY"
[providers.moonshot]
base_url = "https://dashscope.aliyuncs.com/apps/anthropic"
api_format = "anthropic_messages"
api_key_env = "LLM_API_KEY"
default_max_tokens = 8192from ecs_agent.providers.registry import ProviderRegistry, get_model
# From TOML file
registry = ProviderRegistry.from_toml("providers.toml")
# From dict (useful in tests)
registry = ProviderRegistry.from_dict({
"aliyun": {
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"api_format": "openai_chat_completions",
"api_key_env": "LLM_API_KEY",
}
})from ecs_agent.providers.registry import get_model
# Resolves provider ID → ProviderEntry → Model(...) → correct LLMModel instance
model = get_model("aliyun/qwen3.5-flash", registry=registry)
# Override API key at call time
model = get_model("aliyun/qwen3.5-flash", registry=registry, api_key="sk-...")API key resolution order: explicit api_key argument → ProviderEntry.api_key → env var named by ProviderEntry.api_key_env.
Dispatch by api_format:
api_format |
Model type returned |
|---|---|
openai_chat_completions |
OpenAIModel |
openai_responses |
OpenAIModel (Responses API) |
anthropic_messages |
ClaudeModel |
openai_embeddings |
raises ValueError — use get_embedding_provider |
openai_files |
raises ValueError — use get_file_service |
| Field | Type | Default | Description |
|---|---|---|---|
base_url |
str |
(required) | Endpoint base URL (trailing slash stripped) |
api_format |
ApiFormat |
(required) | Wire protocol; parsed eagerly |
api_key |
str | None |
None |
Literal API key (not shown in repr) |
api_key_env |
str | None |
None |
Env var name to read the key from |
extra_headers |
dict[str, str] |
{} |
Extra HTTP headers |
timeout |
float | None |
None |
Global timeout override (seconds) |
default_max_tokens |
int |
4096 |
Used as max_tokens for ClaudeModel |
ProviderConfig holds all connection parameters for a provider endpoint. ApiFormat selects the wire protocol.
from ecs_agent.providers.config import ProviderConfig, ApiFormat
# OpenAI-compatible Chat Completions (most common)
chat_config = ProviderConfig(
provider_id="aliyun",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS,
)
# OpenAI-compatible Responses API
responses_config = ProviderConfig(
provider_id="aliyun",
base_url="https://dashscope.aliyuncs.com/api/v2/apps/protocols/compatible-mode/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_RESPONSES,
)
# Anthropic-compatible Messages API
anthropic_config = ProviderConfig(
provider_id="moonshot",
base_url="https://dashscope.aliyuncs.com/apps/anthropic",
api_key="your-api-key",
api_format=ApiFormat.ANTHROPIC_MESSAGES,
)Available ApiFormat values:
| Value | Wire protocol |
|---|---|
OPENAI_CHAT_COMPLETIONS |
POST /chat/completions (standard OpenAI-compat) |
OPENAI_RESPONSES |
POST /responses (OpenAI Responses API) |
OPENAI_EMBEDDINGS |
POST /embeddings |
OPENAI_FILES |
POST /files (file upload) |
ANTHROPIC_MESSAGES |
POST /v1/messages (Anthropic Messages) |
ProviderConfig fields:
| Field | Type | Default | Description |
|---|---|---|---|
provider_id |
str |
(required) | Logical provider name, e.g. "aliyun" |
base_url |
str |
(required) | API base URL |
api_key |
str |
(required) | Bearer token / API key |
api_format |
ApiFormat |
(required) | Wire protocol selection |
extra_headers |
dict[str, str] |
{} |
Additional HTTP headers |
timeout |
float | None |
None |
Global timeout override (seconds) |
The LLMModel protocol defines the interface for all language model implementations. It's located in ecs_agent.providers.protocol.
from typing import Any, Protocol, runtime_checkable
from collections.abc import AsyncIterator
from ecs_agent.types import Message, CompletionResult, StreamDelta, ToolSchema
@runtime_checkable
class LLMModel(Protocol):
@property
def model_id(self) -> str:
"""Canonical model identifier, e.g. 'qwen3.5-flash'."""
...
async def complete(
self,
messages: list[Message],
tools: list[ToolSchema] | None = None,
stream: bool = False,
response_format: dict[str, Any] | None = None,
) -> CompletionResult | AsyncIterator[StreamDelta]:
...The complete method returns a CompletionResult when stream=False and an AsyncIterator[StreamDelta] when stream=True.
from ecs_agent.components import LLMComponent
from ecs_agent.providers import Model
from ecs_agent.providers.config import ApiFormat
model = Model(
"qwen3.5-flash",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS,
)
# LLMComponent takes the model object directly (not a string)
llm = LLMComponent(model=model, system_prompt="You are a helpful assistant.")Set pending_model on LLMComponent to a new LLMModel object before the tick runs. The system will swap it in, update model, and clear pending_model after the call:
llm_component = world.get_component(entity, LLMComponent)
llm_component.pending_model = get_model("aliyun/qwen3.5-flash", registry=registry)OpenAIModel is an OpenAI-compatible HTTP model using httpx.AsyncClient. It works with OpenAI's API as well as compatible alternatives like DashScope, vLLM, or Ollama. Internally it dispatches to explicit chat completions or responses adapters based on the api_format in the ProviderConfig.
from ecs_agent.providers import OpenAIModel
from ecs_agent.providers.config import ApiFormat, ProviderConfig
# Chat Completions
config = ProviderConfig(
provider_id="openai",
base_url="https://api.openai.com/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS,
)
model = OpenAIModel(
config=config,
model="gpt-4o-mini",
connect_timeout=10.0,
read_timeout=120.0,
write_timeout=10.0,
pool_timeout=10.0,
)
# Responses API
responses_config = ProviderConfig(
provider_id="aliyun",
base_url="https://dashscope.aliyuncs.com/api/v2/apps/protocols/compatible-mode/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_RESPONSES,
)
model = OpenAIModel(config=responses_config, model="qwen3.5-flash")The default adapter (ApiFormat.OPENAI_CHAT_COMPLETIONS) sends POST requests to /chat/completions. It handles:
- Multimodal messages:
ImageUrlPartandFileRefPartare serialized to the OpenAI vision format. Text goes inmessage.content. - Streaming: yields
StreamDeltaobjects from SSE chunks; emits a single terminalLLMInvocationEventwith normalizedUsageRecord. - Usage normalization: reads
cached_tokensfrom the OpenAI usage response to populateUsageRecord.cached_input_tokens.
Set api_format=ApiFormat.OPENAI_RESPONSES in the ProviderConfig to activate the Responses API adapter. It sends POST requests to /responses.
For multimodal vision input, build user messages with content= for the text prompt and Message.parts containing ImageUrlPart(url=...) entries. The adapter converts ImageUrlPart into Responses API input_image items automatically (see examples/vision_agent.py for a full runnable example).
Threading state (previous_response_id) is not stored on the provider instance — it lives on an ECS component:
from ecs_agent.components.definitions import ResponsesAPIStateComponent
world.add_component(agent, ResponsesAPIStateComponent(previous_response_id=None))ReasoningSystem reads and writes ResponsesAPIStateComponent automatically on each tick.
from pydantic import BaseModel
from ecs_agent.providers.openai_provider import pydantic_to_response_format
class User(BaseModel):
name: str
age: int
response_format = pydantic_to_response_format(User)
# Result: {'type': 'json_schema', 'json_schema': {'name': 'User', 'schema': {...}, 'strict': True}}ClaudeModel is an Anthropic-compatible model with full SSE streaming and cache-aware usage normalization. It communicates with the Anthropic Messages API format using httpx.AsyncClient.
from ecs_agent.providers import ClaudeModel
from ecs_agent.providers.config import ApiFormat, ProviderConfig
# Direct Anthropic API
config = ProviderConfig(
provider_id="anthropic",
base_url="https://api.anthropic.com",
api_key="your-anthropic-api-key",
api_format=ApiFormat.ANTHROPIC_MESSAGES,
)
model = ClaudeModel(config=config, model="claude-3-5-haiku-latest", max_tokens=4096)For Anthropic-compatible endpoints (e.g. Aliyun Kimi):
from ecs_agent.providers import ClaudeModel
from ecs_agent.providers.config import ApiFormat, ProviderConfig
# Anthropic-compatible endpoint (e.g. Aliyun Kimi)
config = ProviderConfig(
provider_id="moonshot",
base_url="https://dashscope.aliyuncs.com/apps/anthropic",
api_key="your-api-key",
api_format=ApiFormat.ANTHROPIC_MESSAGES,
)
model = ClaudeModel(config=config, model="kimi-k2.5")| Parameter | Type | Default | Description |
|---|---|---|---|
config |
ProviderConfig |
(required) | Endpoint/auth/protocol config |
model |
str |
"claude-3-5-haiku-latest" |
Model identifier |
max_tokens |
int |
4096 |
Maximum tokens in response |
connect_timeout |
float |
10.0 |
Connection timeout in seconds |
read_timeout |
float |
120.0 |
Read timeout in seconds |
write_timeout |
float |
10.0 |
Write timeout in seconds |
pool_timeout |
float |
10.0 |
Connection pool timeout in seconds |
supports_vision |
bool |
False |
Enable image input parts |
- Non-streaming: Sends a POST request to
/v1/messageswith the Anthropic message format and returns aCompletionResult. - Streaming: Uses SSE streaming with
content_block_deltaevents. Accumulates text deltas and tool use inputs, yieldingStreamDeltaobjects. - Tool Use: Supports Anthropic's native tool use format, converting between the framework's
ToolSchema/ToolCallformat and Anthropic'stool_useblocks. - Cache-aware usage: Normalizes
cache_creation_input_tokensandcache_read_input_tokensfrom Anthropic responses into the canonicalUsageRecordfields. - Error Handling:
httpx.HTTPStatusErrorandhttpx.RequestErrorare logged and re-raised. - Headers: Sends
x-api-keyandanthropic-version: 2023-06-01headers.
from ecs_agent import RetryModel, RetryConfig
from ecs_agent.providers import ClaudeModel
from ecs_agent.providers.config import ApiFormat, ProviderConfig
config = ProviderConfig(
provider_id="anthropic",
base_url="https://api.anthropic.com",
api_key="your-api-key",
api_format=ApiFormat.ANTHROPIC_MESSAGES,
)
model = RetryModel(
model=ClaudeModel(config=config, model="claude-sonnet-4-20250514"),
retry_config=RetryConfig(max_retries=3),
)FakeModel is designed for deterministic testing. It returns a sequence of pre-configured responses.
from ecs_agent.providers import FakeModel
from ecs_agent.types import CompletionResult, Message
responses = [
CompletionResult(message=Message(role="assistant", content="Hello!")),
CompletionResult(message=Message(role="assistant", content="How can I help?"))
]
model = FakeModel(responses=responses, model_id="test-model")
# First call returns "Hello!"
# Second call returns "How can I help?"
# Third call raises IndexError| Parameter | Type | Default | Description |
|---|---|---|---|
responses |
list[CompletionResult] |
(required) | Sequence of responses to return |
model_id |
str |
"fake" |
Identifier returned by model.model_id |
- Sequential: Returns responses in the order they were provided. If the index exceeds the list length, it raises
IndexError. - Streaming: When
stream=True, it yields character-by-characterStreamDeltaobjects. The final delta contains thefinish_reason="stop"and usage information. - Verification: Stores the
last_response_formatfor use in test assertions.
RetryModel adds resilience to any LLMModel by wrapping it and implementing retry logic using tenacity.
from ecs_agent.providers import OpenAIModel
from ecs_agent import RetryModel
from ecs_agent.types import RetryConfig
from ecs_agent.providers.config import ApiFormat, ProviderConfig
config = ProviderConfig(
provider_id="openai",
base_url="https://api.openai.com/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS,
)
base_model = OpenAIModel(config=config, model="gpt-4o-mini")
retry_config = RetryConfig(
max_attempts=3,
multiplier=1.0,
min_wait=4.0,
max_wait=60.0,
retry_status_codes=(429, 500, 502, 503, 504)
)
model = RetryModel(model=base_model, retry_config=retry_config)- Non-streaming: Automatically retries on
httpx.HTTPStatusError(for specific status codes) andhttpx.RequestError. It logs retry attempts at theWARNINGlevel. - Streaming: Calls are passed through directly to the underlying provider. Streaming calls are not retried.
- Default Config: If
retry_configis not provided, it uses standard defaults (3 attempts, exponential backoff starting at 4 seconds).
LiteLLMModel enables access to 100+ LLM providers through a single unified interface via the litellm library. This is an optional dependency — install with pip install litellm.
from ecs_agent.providers import LiteLLMModel
# OpenAI
model = LiteLLMModel(model="gpt-4o", api_key="sk-...")
# Anthropic
model = LiteLLMModel(model="claude-sonnet-4-20250514", api_key="sk-ant-...")
# Any litellm-supported model
model = LiteLLMModel(model="ollama/llama3", base_url="http://localhost:11434")| Parameter | Type | Default | Description |
|---|---|---|---|
model |
str |
(required) | litellm model identifier (e.g. gpt-4o, claude-sonnet-4-20250514, ollama/llama3) |
api_key |
str | None |
None |
API key (can also be set via environment variables) |
base_url |
str | None |
None |
Custom base URL for self-hosted models |
- Non-streaming: Calls
litellm.acompletion()and returns aCompletionResult. - Streaming: Calls
litellm.acompletion(stream=True)and yieldsStreamDeltaobjects. - Tool Use: Converts between the framework's
ToolSchemaformat and litellm's tool format. - Optional Dependency:
litellmis not a hard dependency. AnImportErrorwith a helpful message is raised if litellm is not installed.
litellm supports 100+ providers including: OpenAI, Anthropic, Google Gemini, AWS Bedrock, Azure OpenAI, Ollama, vLLM, Together AI, Groq, Mistral, and many more.
The framework emits one canonical LLMInvocationEvent per LLM call. Use AccountingSubscriber to capture cost and cache hit-rate metrics.
from ecs_agent.accounting.subscriber import AccountingSubscriber
subscriber = AccountingSubscriber()
subscriber.subscribe(world.event_bus)
# ... run your agent ...
# Query per-provider/model aggregate cache hit-rate
stats = subscriber.get_aggregate_stats("aliyun", "qwen3.5-flash")
if stats is not None:
print(f"Cache hit rate: {stats.hit_rate}") # float 0.0–1.0, or None
print(f"Cache read tokens: {stats.cache_read_tokens}")
print(f"Total prompt tokens: {stats.total_prompt_tokens}")AccountingSubscriber computes token-weighted aggregate hit rate across all observed invocations for a given (provider_id, model) pair:
hit_rate = sum(cache_read_tokens) / sum(total_prompt_tokens)
where total_prompt_tokens = uncached_input_tokens + cache_write_tokens + cache_read_tokens. If the denominator is zero, hit_rate is None.
Pass a custom PricingCatalog to override built-in pricing:
from ecs_agent.accounting.catalog import PricingCatalog, ModelPricing
from ecs_agent.accounting.subscriber import AccountingSubscriber
catalog = PricingCatalog()
catalog.register("aliyun", "qwen3.5-flash", ModelPricing(
input_per_million=0.5,
output_per_million=1.5,
cached_input_per_million=0.1,
cache_write_per_million=None,
))
subscriber = AccountingSubscriber(pricing_catalog=catalog)All provider adapters normalize usage into a canonical UsageRecord:
from ecs_agent.accounting.models import UsageRecord, StreamCompleteness
usage = UsageRecord(
prompt_tokens=1024,
completion_tokens=256,
total_tokens=1280,
cached_input_tokens=512, # OpenAI cached_tokens
cache_creation_tokens=None, # Anthropic cache_creation_input_tokens
cache_read_tokens=512, # Anthropic cache_read_input_tokens
stream_completeness=StreamCompleteness.COMPLETE,
provider_id="aliyun",
model="qwen3.5-flash",
)StreamCompleteness values:
COMPLETE— full usage data availablePARTIAL— stream was interrupted; usage may be incompleteUNKNOWN— usage chunk was not received (e.g. server dropped the final SSE event)
The EmbeddingProvider protocol defines the interface for converting text into numerical vectors. Located in ecs_agent.providers.embedding_protocol.
from typing import Protocol, runtime_checkable
@runtime_checkable
class EmbeddingProvider(Protocol):
async def embed(self, texts: list[str]) -> list[list[float]]:
...OpenAIEmbeddingProvider is an OpenAI-compatible provider for generating text embeddings aligned with the new ProviderConfig model.
from ecs_agent.providers.embedding_provider import OpenAIEmbeddingProvider
provider = OpenAIEmbeddingProvider(
api_key="your-api-key",
model="text-embedding-3-small"
)FakeEmbeddingProvider returns deterministic vectors based on the hash of the input text. Ideal for testing and development without API costs.
from ecs_agent.providers.fake_embedding_provider import FakeEmbeddingProvider
provider = FakeEmbeddingProvider(dimension=384)
vectors = await provider.embed(["hello", "world"])OpenAIFilesService provides a typed file-upload service for OpenAI-compatible endpoints.
from ecs_agent.providers.openai_files import OpenAIFilesService
from ecs_agent.providers.config import ProviderConfig, ApiFormat
config = ProviderConfig(
provider_id="openai",
base_url="https://api.openai.com/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_FILES,
)
service = OpenAIFilesService(provider_config=config)
file_ref = await service.upload(path="/path/to/document.pdf", purpose="assistants")
# file_ref.file_id — stable reference usable in FileRefPart messagesUploaded file references can be embedded in multimodal messages via FileRefPart:
from ecs_agent.types import Message, FileRefPart
msg = Message(
role="user",
content="Summarize this document.",
parts=[
FileRefPart(file_id=file_ref.file_id),
],
)
---
## Multimodal Messages
The framework supports multimodal content via typed `Message.parts` entries.
### Core Types
Import path:
```python
from ecs_agent.types import Message, ImageUrlPart, FileRefPartType definitions (from ecs_agent.types):
@dataclass(slots=True)
class ImageUrlPart:
url: str
detail: str | None = None
@dataclass(slots=True)
class FileRefPart:
file_id: str
filename: str | None = None
MessagePart = ImageUrlPart | FileRefPart
@dataclass(slots=True)
class Message:
role: str
content: str # canonical text — always use this for text
parts: list[MessagePart] | None = None # non-text media onlyMessage.content is the canonical text field. Message.parts holds non-text media (ImageUrlPart, FileRefPart) only. Text always goes in content, never in parts.
from ecs_agent.types import Message, ImageUrlPart, FileRefPart
msg = Message(
role="user",
content="Please review the image and file. Focus on entities and relationships.",
parts=[
ImageUrlPart(url="https://example.com/diagram.png", detail="high"),
FileRefPart(file_id="file-abc123", filename="spec.pdf"),
],
)Text lives in content; media-only parts go in parts. Adapters prepend content as a text block when parts is also set.
message.content→{"type": "text", "text": message.content}(prepended when non-empty)ImageUrlPart→{"type": "image_url", "image_url": {"url": part.url, "detail": part.detail}}(detailomitted whenNone)FileRefPart→{"type": "file", "file": {"file_id": part.file_id, "filename": part.filename}}(filenameomitted whenNone)
{
"role": "user",
"content": [
{"type": "text", "text": "Please review the image and file. Focus on entities and relationships."},
{"type": "image_url", "image_url": {"url": "https://example.com/diagram.png", "detail": "high"}},
{"type": "file", "file": {"file_id": "file-abc123", "filename": "spec.pdf"}}
]
}- Text type is role-aware:
"input_text"(user) or"output_text"(assistant) ImageUrlPart→{"type": "input_image", "image_url": part.url, "detail": part.detail}(detailomitted whenNone)FileRefPart→{"type": "input_file", "file_id": part.file_id, "filename": part.filename}(filenameomitted whenNone)
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Please review the image and file. Focus on entities and relationships."},
{"type": "input_image", "image_url": "https://example.com/diagram.png", "detail": "high"},
{"type": "input_file", "file_id": "file-abc123", "filename": "spec.pdf"}
]
}message.content→{"type": "text", "text": message.content}(prepended when non-empty)ImageUrlPart→{"type": "image", "source": {"type": "url", "url": part.url}}- Vision requires adapter config
supports_vision=True; otherwise image parts raiseValueError. FileRefPartis not supported by the Anthropic adapter and always raisesValueError.
detail accepts "low", "high", or "auto"; set None to omit it from payloads.
Use FileRefPart with IDs returned by OpenAIFilesService (see File Upload above).
For live vision tests, set env vars such as LLM_API_KEY, LLM_BASE_URL, LLM_MODEL, and IMAGE_URL (never hardcode secrets).
The VectorStore protocol defines the interface for storing and searching vectors. Located in ecs_agent.providers.vector_store.
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class VectorStore(Protocol):
async def add(self, id: str, vector: list[float], metadata: dict[str, Any] | None = None) -> None: ...
async def search(self, query_vector: list[float], top_k: int = 5) -> list[tuple[str, float]]: ...
async def delete(self, id: str) -> None: ...InMemoryVectorStore provides a simple, dictionary-backed vector store with cosine similarity search. It optionally uses numpy for faster computations if available.
from ecs_agent.providers.vector_store import InMemoryVectorStore
store = InMemoryVectorStore(dimension=384)
await store.add("doc1", [0.1, 0.2, ...], metadata={"text": "content"})
results = await store.search([0.1, 0.2, ...], top_k=5)- Production: Use
OpenAIModelfor real API interaction. Wrap it in aRetryModelto handle transient network issues or rate limits. - Testing: Use
FakeModelfor unit tests where you need predictable, deterministic results without making real network requests. - Resilience: Always consider wrapping your primary model in a
RetryModelfor production environments. - Claude-native: Use
ClaudeModelfor direct Anthropic API access with native tool use support and cache-aware accounting. - Multi-provider: Use
LiteLLMModelwhen you need to switch between different providers without changing code. - Accounting: Attach
AccountingSubscriberto theEventBusto track cost and cache hit-rate metrics across invocations.