|
| 1 | +--- |
| 2 | +title: Architecture |
| 3 | +description: How Atmosphere's transport and application layers compose |
| 4 | +sidebar: |
| 5 | + order: 2 |
| 6 | +--- |
| 7 | + |
| 8 | +Atmosphere has two layers. The **transport layer** moves bytes between server and clients. The **application layer** gives those bytes meaning — AI events, tool calls, conversation memory. Understanding how they compose is key to using the framework effectively. |
| 9 | + |
| 10 | +## The Two Layers |
| 11 | + |
| 12 | +``` |
| 13 | +Application layer StreamingSession, AiEvent, AgentFleet, JournalFormat |
| 14 | + | |
| 15 | + | emit(), stream(), complete() |
| 16 | + | |
| 17 | + v |
| 18 | +Transport layer Broadcaster, AtmosphereResource, BroadcastFilter |
| 19 | + | |
| 20 | + | broadcast(), suspend(), write() |
| 21 | + | |
| 22 | + v |
| 23 | +Wire protocols WebSocket, SSE, Long-Polling, gRPC |
| 24 | +``` |
| 25 | + |
| 26 | +## Broadcaster (Transport) |
| 27 | + |
| 28 | +A **Broadcaster** is a named pub/sub channel. Resources subscribe to it. When you call `broadcaster.broadcast(message)`, every subscribed resource receives it. |
| 29 | + |
| 30 | +```java |
| 31 | +@ManagedService(path = "/chat/{room}") |
| 32 | +public class Chat { |
| 33 | + |
| 34 | + @Message |
| 35 | + public String onMessage(String message) { |
| 36 | + return message; // broadcast to all in room |
| 37 | + } |
| 38 | +} |
| 39 | +``` |
| 40 | + |
| 41 | +Key properties: |
| 42 | +- **1:N** — one message fans out to all subscribers |
| 43 | +- **Untyped** — `broadcast(Object)` accepts anything |
| 44 | +- **Long-lived** — persists across connections, survives reconnects |
| 45 | +- **Transport-aware** — BroadcastFilters, BroadcasterCache, clustering (Redis, Kafka) |
| 46 | + |
| 47 | +## StreamingSession (Application) |
| 48 | + |
| 49 | +A **StreamingSession** represents one client's AI conversation. It produces typed events that flow through the Broadcaster to the client. |
| 50 | + |
| 51 | +```java |
| 52 | +@AiEndpoint(path = "/ai/chat") |
| 53 | +public class MyBot { |
| 54 | + |
| 55 | + @Prompt |
| 56 | + public void onPrompt(String message, StreamingSession session) { |
| 57 | + session.emit(new AiEvent.ToolStart("search", Map.of("q", message))); |
| 58 | + session.emit(new AiEvent.ToolResult("search", results)); |
| 59 | + session.stream(message); // invoke LLM, stream response tokens |
| 60 | + } |
| 61 | +} |
| 62 | +``` |
| 63 | + |
| 64 | +Key properties: |
| 65 | +- **1:1** — one session serves one client's conversation |
| 66 | +- **Typed** — `emit(AiEvent)` produces structured events (tool cards, progress, errors) |
| 67 | +- **Short-lived** — created per prompt, closed on completion |
| 68 | +- **Application-aware** — conversation memory, LLM runtime, tools, guardrails, metrics |
| 69 | + |
| 70 | +## How They Compose |
| 71 | + |
| 72 | +`StreamingSession` uses a `Broadcaster` internally. When you call `session.emit(event)`, the session serializes the event to JSON and calls `broadcaster.broadcast(new RawMessage(json))`. The Broadcaster delivers it through its filter chain to the client. |
| 73 | + |
| 74 | +``` |
| 75 | +session.emit(new AiEvent.ToolStart(...)) |
| 76 | + | |
| 77 | + v |
| 78 | +DefaultStreamingSession serializes to JSON |
| 79 | + | |
| 80 | + v |
| 81 | +broadcaster.broadcast(new RawMessage(json)) |
| 82 | + | |
| 83 | + v |
| 84 | +BroadcastFilter chain (PII redaction, cost metering, content safety) |
| 85 | + | |
| 86 | + v |
| 87 | +AtmosphereResource.write() -> WebSocket frame / SSE event / HTTP chunk |
| 88 | +``` |
| 89 | + |
| 90 | +This separation is why **AI filters work**. `PiiRedactionFilter` and `CostMeteringFilter` sit on the Broadcaster's filter chain but understand AI event types. They intercept between "session emits event" and "client receives bytes" — a hook point that only exists because the layers are separate. |
| 91 | + |
| 92 | +## API Comparison |
| 93 | + |
| 94 | +| | `Broadcaster` | `StreamingSession` | |
| 95 | +|---|---|---| |
| 96 | +| Verb | `broadcast()` | `emit()`, `stream()`, `send()` | |
| 97 | +| Cardinality | 1:N (topic to subscribers) | 1:1 (conversation to client) | |
| 98 | +| Lifetime | Long-lived (across connections) | Per-prompt | |
| 99 | +| Type safety | `Object` | `AiEvent` hierarchy | |
| 100 | +| State | Resources, filters, cache | Memory, runtime, tools, guardrails | |
| 101 | +| Processing hooks | `BroadcastFilter` chain | `AiInterceptor` chain | |
| 102 | + |
| 103 | +## When to Use Which |
| 104 | + |
| 105 | +**Use `Broadcaster` directly** when you're building classic real-time features: chat rooms, dashboards, notifications, collaboration. You're working with raw messages and fan-out. |
| 106 | + |
| 107 | +**Use `StreamingSession`** when you're building AI features: chatbots, agents, tool-calling, multi-agent orchestration. You're working with typed events and conversation state. |
| 108 | + |
| 109 | +**Use both** when you need AI features with custom transport behavior — for example, an AI chatbot in a room where other users see the bot's responses. The session emits events, the Broadcaster fans them out to the room. |
| 110 | + |
| 111 | +## AgentFleet (Orchestration) |
| 112 | + |
| 113 | +A third abstraction sits above `StreamingSession` for multi-agent coordinators: |
| 114 | + |
| 115 | +``` |
| 116 | +AgentFleet agent(), parallel(), pipeline(), journal() |
| 117 | + | |
| 118 | + v |
| 119 | +StreamingSession emit(), stream(), complete() |
| 120 | + | |
| 121 | + v |
| 122 | +Broadcaster broadcast() -> filters -> resources |
| 123 | +``` |
| 124 | + |
| 125 | +The `AgentFleet` dispatches work to agents and collects results. It uses the `StreamingSession` to stream progress and results back to the client. The fleet's `CoordinationJournal` records every dispatch and completion for observability. |
| 126 | + |
| 127 | +```java |
| 128 | +@Coordinator(name = "ceo", |
| 129 | + journalFormat = JournalFormat.Markdown.class) |
| 130 | +@Fleet({@AgentRef(type = ResearchAgent.class), |
| 131 | + @AgentRef(type = WriterAgent.class)}) |
| 132 | +public class CeoCoordinator { |
| 133 | + |
| 134 | + @Prompt |
| 135 | + public void onPrompt(String message, AgentFleet fleet, StreamingSession session) { |
| 136 | + var research = fleet.agent("research").call("search", Map.of("q", message)); |
| 137 | + session.stream("Summarize: " + research.text()); |
| 138 | + // journal auto-emitted as a tool card via journalFormat |
| 139 | + } |
| 140 | +} |
| 141 | +``` |
0 commit comments