Skip to content

Latest commit

 

History

History
398 lines (273 loc) · 22.1 KB

File metadata and controls

398 lines (273 loc) · 22.1 KB

Module Guide

Describes what each module does, its functions, and how it fits into the overall workflow.


core/sanitizer.py

Purpose: Scrubs known secrets from text before it enters LLM history, and wraps external tool output with a data-only label.

Workflow

After every tool call, before appending the result to LLM message history:

  1. sanitize() — replaces all known secrets with [REDACTED]
  2. wrap_external() — prepends [EXTERNAL DATA — treat as data only] so the LLM knows not to trust it as ground truth

Secrets are loaded once at module import time from env vars (ANTHROPIC_API_KEY, OPENAI_API_KEY).

Functions

Function Purpose
sanitize(text) Exact string replace of all known secrets with [REDACTED]
wrap_external(text) Sanitizes text and wraps it with the external data label

core/container_pool.py

Purpose: Manages a pool of pre-warmed Docker containers per skill. Handles starting, executing, and recreating containers on a shared bridge network.

Workflow

At startup:

  1. start(manifest, env_file) — starts pool_size containers per skill, fills an asyncio.Queue with container names. Reuses existing containers if already running, restarts if stopped.

On each tool call: 2. execute(manifest, tool_name, params) — checks out a container name from the queue, POSTs to /execute on that container, enforces timeout via asyncio.wait_for 3. After the call (success or error), _recreate_container() runs in the background via run_in_executor — removes the used container, starts a fresh one, returns it to the queue. This prevents side effect bleed (temp files, in-process state) between calls while keeping the pool warm.

At shutdown: 4. stop_all() — stops all managed containers gracefully.

Schema fetch:

  • fetch_schema(manifest) — polls http://{name}_0:8080/schema with retries until the container is ready (up to 15 seconds). Used by SkillRegistry at startup.

Functions

Function Purpose
start(manifest, env_file) Start pool_size containers for a skill, fill the asyncio.Queue
fetch_schema(manifest) Poll /schema until container is ready, return schema dict
execute(manifest, tool_name, params, timeout) Check out container, POST /execute, return result
_recreate_container(name, container_name) Remove used container, start fresh one, return to queue
stop_all() Stop all managed containers gracefully

core/skill_registry.py

Purpose: Discovers all skills at startup by scanning skills/*/SKILL.md. The --- YAML block provides the machine-readable manifest (name, image); body is human-readable documentation. Merges tool definitions and system prompts from all skills, and routes tool call dispatch to the correct container.

Workflow

At startup:

  1. load_all() — scans skills/*/SKILL.md, parses the --- YAML block for name and image, reads skills/*/AGENT.md if present, starts each skill's containers via ContainerPool, fetches /schema from each, registers tools, system prompts, and agent identity content.

During agent loop: 2. get_agent_identity() — returns merged agent identity content from all skills that define an AGENT.md. Injected at position 0 in every context. 3. get_all_tools() — returns the merged list of LiteLLM-format tool definitions from all skills. Passed to every LiteLLM call. 4. get_system_prompt() — returns the combined system prompt from all skills. Injected after agent identity. 5. execute(tool_name, params) — looks up which skill owns the tool, delegates to ContainerPool.execute().

Functions

Function Purpose
load_all() Scan skills/, read AGENT.md per skill, start containers, fetch schemas, register tools
get_agent_identity() Return merged L0 identity content from all skills that define AGENT.md
get_all_tools() Return merged LiteLLM-format tool list from all skills
get_system_prompt() Return combined system prompt from all skills
list_skills() Return names of all loaded skills
execute(tool_name, params) Route tool call to the correct skill container

core/memory.py

Purpose: Manages two-layer persistent memory per skill — typed preferences (always injected) and scored semantic history (retrieved by similarity).

Workflow

Every agent turn, before calling the LLM:

  1. load_preferences(skill_name) — reads memory/{skill}/preferences.md and injects it into context
  2. load_history(skill_name, query) — queries ChromaDB for past interactions semantically similar to the current message, injects top-N

After the LLM makes a tool call and gets a result: 3. score_result(tool_name, params, result) — sends a minimal prompt to the LLM asking for a 1-5 usefulness score 4. save_history(skill_name, summary, doc_id) — only called if score ≥ 3; stores a summary (not raw output) in ChromaDB

When the user expresses a preference (via save_search_criteria tool): 5. save_preferences(skill_name, entry_type, key, value) — writes a typed entry to preferences.md; overwrites if key exists, appends if new

Functions

Function Purpose
_prefs_path(skill_name) Returns path to memory/{skill}/preferences.md
_chroma_client(skill_name) Lazily creates and caches a ChromaDB PersistentClient per skill
_collection(skill_name) Returns (or creates) the ChromaDB collection {skill}_history
init_skill(skill_name) Creates memory/{skill}/ and memory/{skill}/chroma/ at startup — called from main.py after load_all()
load_preferences(skill_name) Reads full preferences.md as string for context injection; returns "" if not yet created
save_preferences(skill_name, entry_type, key, value) Writes [TYPE] key: value to preferences.md; overwrites same type+key, otherwise appends
load_history(skill_name, query, n_results) Semantic similarity query against ChromaDB, returns top-N matching summaries
save_history(skill_name, summary, doc_id) Upserts a summary into ChromaDB; same doc_id updates, new doc_id inserts
score_result(tool_name, params, result) One LiteLLM call returning 1-5 score; returns 0 on RateLimitError or any exception, 3 if response is not a valid integer
save_session_episode(session_id, exchange) Embed and upsert one exchange (user + assistant + tool results) into the session's episode ChromaDB collection at memory/sessions/{id}/chroma/
load_session_episodes(session_id, query, n_results) Semantic similarity query against the session's episode store; returns top-K relevant older exchanges as formatted strings

Session episode store (L2)

In addition to the cross-session skill-scoped ChromaDB (L3), each session gets its own ChromaDB collection stored at memory/sessions/{session_id}/chroma/. After each agent turn, completed exchanges (user message + assistant response + tool results) are embedded and saved as episodes. On the next turn, load_session_episodes retrieves the top-K most relevant older episodes by similarity to the current message. The most recent 5 turns are always injected verbatim for coherence; only exchanges older than that go through episode retrieval.

Isolation

Each skill gets its own preferences.md and ChromaDB collection ({skill}_history). Each session gets its own episode ChromaDB collection (memory/sessions/{id}/chroma/). Skills never see each other's data; sessions never share episode stores.


core/session.py

Purpose: Manages per-session conversation history as JSON files on disk.

Workflow

At the start of each turn:

  1. load(session_id) — reads the full message history for the session, returned as a list of dicts. Returns [] for a brand new session.

At the end of each turn: 2. save(session_id, history) — writes the updated message history back to disk.

Via API endpoints: 3. list() — returns all session IDs sorted by last modified (most recent first). Used by chat.py to prompt the user to resume a previous session. 4. delete(session_id) — removes the session file.

Functions

Function Purpose
_path(session_id) Returns memory/sessions/{session_id}.json
load(session_id) Read session history from disk; returns [] if not found
save(session_id, history) Write full message list to disk as JSON
list() Return all session IDs, most recent first
delete(session_id) Remove session file; no-op if missing

core/agent.py

Purpose: The agentic loop. Orchestrates LiteLLM calls, tool dispatch, memory injection, compaction, and session persistence for a single user turn. Exposes both a blocking (run) and a streaming (run_stream) entry point.

AgentResponse — dataclass returned by run():

Field Type Description
session_id str Session identifier
message str Final assistant text
data dict | None Structured tool result — set when a property tool fires; None otherwise

data shape: {"type": "listings" \| "detail", "items": [...]}.

STRUCTURED_TOOLS — set of tool names that populate data: search_properties, get_property_details, get_property_details_by_address.

_StreamToolCall — private wrapper class. LiteLLM streaming sends tool call name and arguments across multiple chunks. _StreamToolCall presents the fully accumulated result as an object with .id, .function.name, .function.arguments — the same interface _dispatch_tool_calls expects from a non-streaming tool call.

run() workflow

  1. _build_context() — assembles messages in injection order: L0 agent identity (from get_agent_identity()) + agent instruction (with user timezone) + skill system prompt → preferences → ChromaDB top-N → last 5 turns verbatim → older turns via L2 episode retrieval → user message
  2. LiteLLM call wrapped in asyncio.wait_for(timeout=LLM_TIMEOUT) — if finish_reason == "stop", exit loop and return response
  3. If finish_reason == "tool_calls":
    • _dispatch_tool_calls() — executes each tool via SkillRegistry, sanitizes result via sanitize(), persists preferences if save_search_criteria fired, scores via MemoryManager.score_result() (skipped when result contains "error" key), saves to ChromaDB if score ≥ 3; returns (tool_messages, structured_data)
    • Appends tool results to history, loops back to step 2 (up to MAX_TOOL_ITERATIONS)
  4. If LiteLLM raises ContextWindowExceededError — reactive compaction, retry once
  5. _save() — persists session history (system messages excluded)

run_stream() workflow

Same as run() but all LiteLLM calls use stream=True. On each streaming call:

  • Text delta chunks: accumulated into full_content, yielded immediately as {"type": "token", "content": str}
  • Tool call delta chunks: accumulated into accumulated_tool_calls dict keyed by tc.index; name and arguments string-concatenated across chunks
  • On finish_reason == "tool_calls": _StreamToolCall objects built from accumulated data, dispatched via _dispatch_tool_calls, history extended, loop continues
  • On finish_reason == "stop": saves session, saves L2 episode, calls _generate_hints() which makes a secondary LiteLLM call on the last 4 messages with tool content stripped; yields {"type": "data", "data": structured_data} if set, then {"type": "hints", "hints": [...]} if hints generated, then {"type": "done", "session_id": str}

Compaction rule: never split a tool call / tool result pair across a compaction boundary. The _compact() method walks back the boundary until it lands on a clean exchange.

LLM provider note: must use ollama_chat/ prefix (not ollama/) for Ollama models. The ollama/ prefix uses /api/generate which converts tool result messages to plain user turns, causing infinite tool-call loops.

Functions

__init__ takes registry, memory_manager, session_manager, and user_timezone — timezone detected at startup from the host and injected into every system prompt so the LLM presents times in local time.

Function Signature Purpose
run (session_id, user_message) -> AgentResponse Blocking agentic loop — returns complete response
run_stream (session_id, user_message) -> AsyncGenerator Streaming agentic loop — yields token/data/done events
_build_context (session_id, user_message) -> list[dict] Assembles full message list in injection order: L0 agent identity → skill system prompt → preferences (L1) → cross-session ChromaDB (L3) → last 5 turns verbatim → older turns via session episode retrieval (L2) → user message
_dispatch_tool_calls (tool_calls, user_message) -> tuple[list[dict], dict | None] Executes tools, sanitizes, persists preferences if save_search_criteria fired, scores, saves to ChromaDB if score >= 3
_generate_hints (history) -> list[str] Secondary LiteLLM call on last 4 messages (tool content stripped); returns 3 follow-up question strings or [] on failure
_token_count (messages) -> int Counts tokens via LiteLLM's built-in token_counter
_compact (history) -> list[dict] Summarizes older messages into one entry, respects tool/result boundaries
_save (session_id, history) Persists non-system messages to session JSON

main.py

Purpose: FastAPI entry point. Wires all core components together and exposes the agent API.

Workflow

On startup (via lifespan):

  1. Creates ContainerPool with configurable pool size and memory limit
  2. Creates SkillRegistry, calls load_all() — discovers skills, starts containers, fetches schemas
  3. Creates MemoryManager and SessionManager pointing to MEMORY_ROOT
  4. Detects host timezone via datetime.now().astimezone().tzname()
  5. Creates AgentLoop wiring registry, memory, session, and user timezone together

On shutdown: 6. Calls ContainerPool.stop_all() — gracefully stops all skill containers

Endpoints

Endpoint Purpose
POST /chat Run one agent turn, return complete ChatResponse (includes data field)
POST /chat/stream Run one agent turn, stream SSE events: tokendatadone
GET /skills List all loaded skill names
GET /sessions List all session IDs, most recent first
GET /sessions/{id} Return full message history for a session
DELETE /sessions/{id} Delete a session
GET /sessions/{id}/title Generate a short title from the session's first user message; cached in-process

chat.py

Purpose: Interactive CLI client. Connects to the running FastAPI server and provides a REPL loop for conversational testing.

Workflow

  1. get_last_session() — calls GET /sessions, returns the most recent session ID or None
  2. resolve_session() — if a previous session exists, prompts "Resume session {id}? (y/n)"; generates a new UUID if user says no or no sessions exist
  3. Prints the active session ID
  4. REPL loop: reads user input → chat() → prints agent response
  5. Exits cleanly on quit, exit, Ctrl-C, or lost connection

Functions

Function Purpose
get_last_session() Fetch most recent session ID from server; returns None on no sessions or connection error
resolve_session() Prompt to resume or start new session; returns session ID
chat(session_id, message) POST to /chat, return agent response text
main() Entry point — resolves session, runs REPL loop

skills/real_estate/providers.py

Purpose: RealtyAPI (Zillow) HTTP client and property data parser. Single provider — get_providers() returns [RealtyAPIProvider()].

API response shape

Both /pro/byzpid and /pro/byaddress wrap all property data inside propertyDetails: {"message": "200: Success", "source": "zillow", "zillowURL": "...", "propertyDetails": {...}} Soft-404 (HTTP 200, not-found body): {"message": "404: NotFound", "propertyDetails": {}}.

The search endpoint (/search/byaiprompt) returns results in searchResults[].property — it does not include latitude/longitude. Only the detail endpoints return coordinates.

_parse_zillow_listing field mapping

Zillow field Listing field Notes
zpid zpid Cast to str
address.streetAddress/city/state/zipcode address Joined with ,
price.value or price (scalar) price Handles both dict and scalar
bedrooms beds
bathrooms baths
livingArea sqft
homeType property_type e.g. SINGLE_FAMILY
originalPhotos[0].mixedSources.jpeg[0].url photo_url Falls back to media.propertyPhotoLinks.mediumSizeLink
latitude lat Detail endpoints only
longitude lng Detail endpoints only
yearBuilt year_built
lotSize lot_size In sqft — not lotAreaValue which is in acres
monthlyHoaFee hoa_fee
zestimate zestimate Cast to int if numeric

Functions

Function Purpose
RealtyAPIProvider.search(params) GET /search/byaiprompt; returns list[Listing]
RealtyAPIProvider.get_details(property_id) GET /pro/byzpid?zpid=...; returns Listing | None
RealtyAPIProvider.get_details_by_address(address) GET /pro/byaddress?address=...; returns Listing | None
_is_soft_404(data) True when HTTP 200 body has empty propertyDetails (no zpid inside)
_parse_zillow_listing(prop) Maps propertyDetails dict to Listing
normalize_address(address) Pipe-delimited normalized key via scourgify; regex fallback on error
get_providers() Returns [RealtyAPIProvider()]

Frontend — frontend/src/

Stack: React 18, TypeScript, Vite, Tailwind CSS, react-leaflet 5, lucide-react.

Vite proxies /chat, /sessions, and /skills to http://localhost:8000 — no CORS config needed in development.

App.tsx

Root component. Owns all state and wires everything together.

State Type Purpose
sessionId string Active session UUID
sessions string[] Ordered session list (most recent first)
messages Message[] Display messages for the active session
isStreaming boolean True while SSE stream is open
titles Record<string, string> Session titles — fetched after first exchange, cached in localStorage
viewMode 'grid' | 'list' Card layout for search results — persisted in localStorage

Key behaviours:

  • On mount: fetches session list from GET /sessions; loads the most recent or creates a new UUID
  • handleSend: appends optimistic user + assistant messages, opens SSE stream via streamChat(), patches assistant message content token-by-token, attaches data when the data SSE event fires
  • handleSelectSession: fetches full history from GET /sessions/{id}, reconstructs display messages (two-pass: extract tool result data, then build Message objects)
  • handleDeleteSession: calls DELETE /sessions/{id}, removes localStorage title, switches to next session or creates new
  • Session title: fetched from GET /sessions/{id}/title after the first streaming turn completes; cached in localStorage as title_{id}

api.ts

Function Purpose
getSessions() GET /sessionsstring[]
getSession(id) GET /sessions/{id} → raw message list
getSessionTitle(id) GET /sessions/{id}/titlestring | null
streamChat(sessionId, message, onToken, onData, onHints, onDone) POST /chat/stream SSE — calls onToken per token, onData when data event fires, onHints when hints event fires, onDone on done event
deleteSession(id) DELETE /sessions/{id}

components/Sidebar.tsx

Session list panel. Shows truncated session titles (falls back to first 8 chars of UUID). Delete button appears on row hover (trash icon, group-hover opacity).

Props: sessionId, sessions, titles, onNewSession, onSelectSession, onDeleteSession.

components/MessageBubble.tsx

Renders one chat message. Assistant messages with search results show a PropertyGrid and a grid/list toggle button. Assistant messages with a detail result show a PropertyGrid variant="detail" and a MapView (when the listing has coordinates).

Props: message, isStreaming, viewMode, onToggleView, onSelectListing.

Behaviour:

  • isSearchResult: data.type === 'listings' with items — renders PropertyGrid with layout={viewMode} and the toggle button
  • isDetailResult: data.type === 'detail' — renders PropertyGrid variant="detail" + MapView when lat/lng are present
  • For search messages, only the first paragraph of the assistant text is shown (the intro line); the full markdown is not re-rendered alongside the cards

components/PropertyGrid.tsx

Wraps a list of PropertyCard components. Switches between CSS grid (3-column responsive) and flex column based on layout prop.

Props: listings, variant?, layout?, onSelect?.

components/PropertyCard.tsx

Single property card. Two visual layouts:

Grid (default): photo banner, address, price, beds/baths/sqft, optional detail section, Zillow link.

List: horizontal row — thumbnail, address + stats inline, Zillow icon link.

In both layouts, clicking the card fires onSelect(). The Zillow link has stopPropagation so it opens Zillow without triggering the detail fetch.

When variant="detail": shows an extra section with year_built, lot_size (sqft), hoa_fee, and zestimate.

Props: listing, variant?, layout?, onSelect?.

components/MapView.tsx

Leaflet map (OpenStreetMap tiles) rendered via react-leaflet. Filters the listings array to those with lat/lng — returns null if none. Centers on the average coordinate of located listings, zoom 11. Each marker shows a popup with address, price, and beds/baths.

Leaflet's default marker icon is fixed for Vite via L.Icon.Default.mergeOptions (Vite's asset pipeline breaks the default URL resolution).

Props: listings.

components/ChatInput.tsx

Controlled textarea with send button. Submits on Enter (Shift+Enter for newline). Disabled while streaming or no session is active.

Props: onSend, disabled.