Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ dist/
*.js.map
*.d.ts.map
!scripts/*.ts
.env
scripts/.env
!scripts/.env.example
23 changes: 15 additions & 8 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ npm run type-check # tsc --noEmit
# Run individual test scripts (no build step needed — tsx runs TS directly)
npx tsx scripts/generate-bot-token.ts
npx tsx scripts/discover-channels.ts
npx tsx scripts/test-client.ts
npx tsx scripts/chat-client.ts
npx tsx scripts/test-roundtrip.ts
npx tsx scripts/test-thread.ts

Expand Down Expand Up @@ -67,14 +67,20 @@ message.new (WebSocket)
→ buildEnvelope (wraps text with thread/reply context tags)
→ finalizeInboundContext
→ recordInboundSession
→ onRunStarted (pre-creates placeholder + THINKING indicator)
→ dispatchReplyWithBufferedBlockDispatcher
deliver(payload, info) called per block:
replyOptions.onPartialReply fires per streaming token (cumulative text):
delta = full.slice(lastPartialText.length) → onTextChunk (throttled partialUpdateMessage)
deliver(payload, info) called once per complete block:
info.kind === "tool" → onRunProgress (EXTERNAL_SOURCES indicator)
text chunk → onRunStarted (first) + onTextChunk
payload.isError → onRunError (error text + ERROR indicator)
text block → no-op (already handled token-by-token above)
after dispatcher returns:
→ onRunCompleted (final partialUpdateMessage + ai_indicator.clear)
```

**Why pre-create the placeholder:** `onPartialReply` is called fire-and-forget (`void`) by OpenClaw, so it cannot safely do async work (like `channel.sendMessage`). The placeholder must exist before the first token arrives.

The `ai_generated: true` check is critical — without it the bot would trigger on its own empty placeholder message created by `onRunStarted`, causing an infinite loop.

### Event mapping
Expand All @@ -84,11 +90,11 @@ How each signal from the OpenClaw pipeline translates into Stream Chat API calls
| Trigger | Stream Chat action | Notes |
|---|---|---|
| Inbound message received | `channel.sendReaction(msgId, { type: "eyes" })` | Ack reaction, fire-and-forget |
| First `deliver` text block | `channel.sendMessage({ text: "", ai_generated: true })` | Creates the bot's placeholder message |
| First `deliver` text block | `channel.sendEvent({ type: "ai_indicator.update", ai_state: "AI_STATE_THINKING" })` | Immediately followed by GENERATING below |
| First text chunk processed | `channel.sendEvent({ type: "ai_indicator.update", ai_state: "AI_STATE_GENERATING" })` | Transitions from THINKING on the very first `onTextChunk` call |
| Text chunk — throttled flush | `client.partialUpdateMessage(msgId, { set: { text, generating: true } })` | Odd chunks 1,3,5,7; then every N (default 15). Chained via `lastUpdatePromise` to avoid out-of-order updates |
| `deliver` with `info.kind === "tool"` | `channel.sendEvent({ type: "ai_indicator.update", ai_state: "AI_STATE_EXTERNAL_SOURCES" })` | Only emitted once per run (de-duplicated by `indicatorState`); only if streaming has already started |
| Pre-dispatch (before agent runs) | `channel.sendMessage({ text: "", ai_generated: true })` | Creates the bot's placeholder message |
| Pre-dispatch (before agent runs) | `channel.sendEvent({ type: "ai_indicator.update", ai_state: "AI_STATE_THINKING" })` | Sent immediately with placeholder |
| `onPartialReply` first token | `channel.sendEvent({ type: "ai_indicator.update", ai_state: "AI_STATE_GENERATING" })` | Transitions from THINKING on the very first token |
| `onPartialReply` per token — throttled | `client.partialUpdateMessage(msgId, { set: { text, generating: true } })` | Delta-computed from cumulative text. Odd chunks 1,3,5,7; then every N (default 15). Chained via `lastUpdatePromise` to avoid out-of-order updates |
| `deliver` with `info.kind === "tool"` | `channel.sendEvent({ type: "ai_indicator.update", ai_state: "AI_STATE_EXTERNAL_SOURCES" })` | Only emitted once per run (de-duplicated by `indicatorState`) |
| Dispatcher resolves (run complete) | `client.partialUpdateMessage(msgId, { set: { text, generating: false } })` | Final flush, waits for any in-flight partial updates first |
| Dispatcher resolves (run complete) | `channel.sendEvent({ type: "ai_indicator.clear" })` | Clears the indicator bubble |
| Dispatcher resolves (run complete) | `channel.deleteReaction(inboundMsgId, "eyes")` → `channel.sendReaction(inboundMsgId, { type: "white_check_mark" })` | Reaction swap on the original user message |
Expand Down Expand Up @@ -145,3 +151,4 @@ Config supports a flat default account or named sub-accounts:
- **Partial updates are chained via `lastUpdatePromise`.** Each `partialUpdateMessage` is `.then()`-chained onto the previous one to avoid out-of-order message text.
- **`safeSendEvent` swallows errors.** Indicator events are best-effort; a failed `ai_indicator` update must not abort message delivery. Retries: 5 attempts, exponential backoff starting at 100 ms, only on 429/5xx.
- **`seenThreads` is process-scoped.** The `Set<string>` tracking "first message in thread" lives at module level, so it persists across gateway reloads until the process restarts. This is intentional — it avoids re-sending parent context for active threads after a config reload.
- **`onTextChunk` receives deltas despite the wire protocol using full text.** `onPartialReply` provides cumulative text; `channel.ts` extracts the delta before calling `onTextChunk`. Inside `StreamingHandler`, `onTextChunk` re-accumulates deltas into `accumulatedText` and passes that full string to `partialUpdateMessage`. The round-trip is: cumulative → delta → cumulative. The delta extraction exists because `StreamingHandler` was designed around the "streaming chunks" mental model — it owns the accumulation and the throttle counter, making that API feel natural. The redundancy is intentional for architectural clarity, not a bug.
77 changes: 34 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# @wunderchat/openclaw-channel-streamchat
# openclaw-channel-streamchat

OpenClaw channel plugin for [Stream Chat](https://getstream.io/chat/). Connects as a bot user via WebSocket, normalizes inbound messages into OpenClaw envelope format, and delivers agent responses using Stream Chat's AI streaming pattern (`partialUpdateMessage` + `ai_indicator` events).

Expand All @@ -17,57 +17,54 @@ cd openclaw-channel-streamchat
npm install
```

### 2. Generate a bot token
### 2. Provision the app

The plugin connects to Stream Chat as a regular user (the bot). You need a JWT for that user, generated from your API secret. The secret is only used here — it is **not** stored in the plugin config.
You have two options depending on your situation:

Create a `.env` file in the plugin root:
**Option A — Fresh app (recommended for first-time setup)**

```env
STREAM_API_KEY=your_api_key
STREAM_API_SECRET=your_api_secret
BOT_USER_ID=chatgpt
Use `setup-app.ts` if you are starting from a new Stream Chat app. It creates the bot and test users, generates their tokens, creates a test channel, and writes both `~/.openclaw/openclaw.json` and `scripts/.env` automatically:

```bash
STREAM_API_KEY=your_api_key STREAM_API_SECRET=your_api_secret npx tsx scripts/setup-app.ts
```

Then run:
After this, skip to step 4.

**Option B — Existing app (bot token only)**

Use `generate-bot-token.ts` if the app and channel already exist and you only need to mint or rotate the bot JWT. It prints the token to stdout — copy it into `~/.openclaw/openclaw.json` manually:

```bash
npx tsx scripts/generate-bot-token.ts
STREAM_API_KEY=your_api_key STREAM_API_SECRET=your_api_secret npx tsx scripts/generate-bot-token.ts
```

This prints the bot JWT. Copy it for the next step.
> **Note:** Pass the API secret inline as shown above. It is only needed by these two provisioning scripts and should not be stored in `scripts/.env`.

### 3. Configure OpenClaw
### 3. Configure OpenClaw (Option B only)

Add the channel config and plugin entry to your `~/.openclaw/openclaw.json`:
If you used Option B, add the channel config and plugin entry to `~/.openclaw/openclaw.json` manually:

```jsonc
{
// Add the channel configuration
"channels": {
"streamchat": {
"enabled": true,
"apiKey": "your_api_key",
"botUserId": "chatgpt",
"botUserToken": "<token from step 2>",
"botUserId": "openclaw-bot",
"botUserToken": "<token from generate-bot-token.ts>",
// Optional:
"ackReaction": "eyes", // reaction added when message is received (default: "eyes")
"ackReaction": "eyes", // reaction added when message is received (default: "eyes")
"doneReaction": "white_check_mark", // reaction swapped in when response is done (default: "white_check_mark")
"streamingThrottle": 15 // partial-update every Nth chunk (default: 15)
"streamingThrottle": 15 // partial-update every Nth chunk (default: 15)
}
},

// Register the plugin
"plugins": {
"load": {
"paths": [
"/absolute/path/to/openclaw-channel-streamchat"
]
"paths": ["/absolute/path/to/openclaw-channel-streamchat"]
},
"entries": {
"streamchat": {
"enabled": true
}
"streamchat": { "enabled": true }
}
}
}
Expand All @@ -83,35 +80,35 @@ The plugin will connect to Stream Chat, watch all channels where the bot is a me

## Testing

All test scripts live in `scripts/` and can be run with `npx tsx`. They load credentials from `.env` or use environment variables.
All test scripts live in `scripts/` and load credentials from `scripts/.env` (populated by `setup-app.ts`). The plugin itself reads only from `~/.openclaw/openclaw.json` — `scripts/.env` is not used at runtime.

### Discover channels

Lists all channels the test user belongs to:
See `scripts/.env.example` for the expected variables. You can also pass any variable inline to override the file:

```bash
npx tsx scripts/discover-channels.ts
STREAM_API_KEY=... TEST_USER_TOKEN=... npx tsx scripts/chat-client.ts
```

Override the defaults with environment variables:
### Discover channels

Lists all channels the test user belongs to:

```bash
STREAM_API_KEY=... USER_ID=myuser USER_TOKEN=... npx tsx scripts/discover-channels.ts
npx tsx scripts/discover-channels.ts
```

### Interactive test client
### Interactive chat client

Connects as a test user, watches a channel, and lets you send messages interactively while printing incoming bot responses and AI indicator events:

```bash
# Auto-discover channels and use the first one
npx tsx scripts/test-client.ts
npx tsx scripts/chat-client.ts

# Specify a channel
npx tsx scripts/test-client.ts myChannelId
npx tsx scripts/chat-client.ts myChannelId

# Send a single message
npx tsx scripts/test-client.ts myChannelId "Hello bot"
npx tsx scripts/chat-client.ts myChannelId "Hello bot"
```

Commands inside the interactive client:
Expand All @@ -122,12 +119,6 @@ Commands inside the interactive client:
| `/quote <messageId> <text>` | Send a quoted reply |
| `/quit` | Disconnect and exit |

Override the test user with environment variables:

```bash
STREAM_API_KEY=... TEST_USER_ID=myuser TEST_USER_TOKEN=... npx tsx scripts/test-client.ts
```

### Automated round-trip test

Sends a message and waits for the bot to respond, verifying the full streaming lifecycle (placeholder message, AI indicators, partial updates, final update):
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
},
"devDependencies": {
"@types/node": "^22.0.0",
"dotenv": "^17.3.1",
"typescript": "^5.3.0"
},
"peerDependencies": {
Expand Down
15 changes: 15 additions & 0 deletions scripts/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Stream Chat app credentials
# Get these from https://dashboard.getstream.io
STREAM_API_KEY=your_stream_api_key

# Test client credentials
# Run `npx tsx scripts/setup-app.ts` to populate these automatically,
# or generate a token manually with:
# STREAM_API_SECRET=your_secret npx tsx scripts/generate-bot-token.ts
TEST_USER_ID=test-user
TEST_USER_TOKEN=your_test_user_jwt_token
TEST_CHANNEL_ID=ai-test-channel

# Note: STREAM_API_SECRET is only needed when running setup scripts.
# Pass it on the command line rather than storing it here:
# STREAM_API_SECRET=your_secret npx tsx scripts/setup-app.ts
55 changes: 23 additions & 32 deletions scripts/test-client.ts → scripts/chat-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,19 @@
* npx tsx scripts/test-client.ts myChannel "Hello" # Send a message and watch
*/

import { config } from "dotenv";
config({ path: new URL(".env", import.meta.url).pathname });
import { StreamChat } from "stream-chat";
import { createInterface } from "node:readline";
import { readFileSync } from "node:fs";
import { resolve } from "node:path";

function loadEnv(): void {
try {
const envPath = resolve(import.meta.dirname ?? ".", "../.env");
const content = readFileSync(envPath, "utf-8");
for (const line of content.split("\n")) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith("#")) continue;
const eqIdx = trimmed.indexOf("=");
if (eqIdx === -1) continue;
const key = trimmed.slice(0, eqIdx).trim();
const value = trimmed.slice(eqIdx + 1).trim().replace(/^["']|["']$/g, "");
if (!process.env[key]) process.env[key] = value;
}
} catch {
// .env file not found
}
}

loadEnv();
const apiKey = process.env.STREAM_API_KEY;
const userId = process.env.TEST_USER_ID;
const userToken = process.env.TEST_USER_TOKEN;

const apiKey = process.env.STREAM_API_KEY || "b3haysfrr5yg";
const userId = process.env.TEST_USER_ID || "steookk";
const userToken =
process.env.TEST_USER_TOKEN ||
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoic3Rlb29rayJ9.9yO--MWVC9bYQAjdUR5vp_cKxiBXEzHrXXnPXesqakE";
if (!apiKey || !userId || !userToken) {
console.error("Error: STREAM_API_KEY, TEST_USER_ID, and TEST_USER_TOKEN must be set in .env");
process.exit(1);
}

const channelId = process.argv[2] || "";
const initialMessage = process.argv[3] || "";
Expand Down Expand Up @@ -97,6 +80,7 @@ client.on("message.new", (event) => {
const aiGenerated = event.message.ai_generated ? " [AI]" : "";
const generating = (event.message as Record<string, unknown>).generating ? " [generating...]" : "";
console.log(`\n[${from}]${aiGenerated}${generating}: ${text}`);
console.log(` id: ${event.message.id}`);
});

// Listen for AI indicators
Expand All @@ -112,21 +96,28 @@ for (const evType of ["ai_indicator.update", "ai_indicator.clear"]) {
}

// Listen for message updates (streaming)
const lastSeenText = new Map<string, string>();
client.on("message.updated", (event) => {
if (!event.message) return;
const msgId = event.message.id;
const text = event.message.text || "";
const generating = (event.message as Record<string, unknown>).generating;
const prev = lastSeenText.get(msgId) ?? "";
const delta = text.slice(prev.length);
if (generating) {
process.stdout.write(`\r [streaming] ${text.slice(-80)}`);
lastSeenText.set(msgId, text);
if (delta) process.stdout.write(delta);
} else {
console.log(`\n [final] ${text.slice(0, 200)}`);
lastSeenText.delete(msgId);
if (delta) process.stdout.write(delta);
process.stdout.write(`\n id: ${msgId}\n`);
}
});

// Send initial message if provided
if (initialMessage) {
console.log(`Sending: "${initialMessage}"`);
await channel.sendMessage({ text: initialMessage });
const { message: sent } = await channel.sendMessage({ text: initialMessage });
console.log(`Sent: "${initialMessage}" (id: ${sent.id})`);
}

// Interactive mode
Expand Down Expand Up @@ -176,8 +167,8 @@ rl.on("line", async (line) => {
return;
}

await channel.sendMessage({ text: trimmed });
console.log(` Sent: "${trimmed}"`);
const { message: sent } = await channel.sendMessage({ text: trimmed });
console.log(` Sent: "${trimmed}" (id: ${sent.id})`);
});

// Keep alive
Expand Down
39 changes: 12 additions & 27 deletions scripts/discover-channels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,23 @@
* Discover channels that a user belongs to.
*
* Usage:
* STREAM_API_KEY=... USER_ID=steookk USER_TOKEN=... npx tsx scripts/discover-channels.ts
* npx tsx scripts/discover-channels.ts
*
* Requires a .env file at the project root (see .env.example).
*/

import { config } from "dotenv";
config({ path: new URL(".env", import.meta.url).pathname });
import { StreamChat } from "stream-chat";
import { readFileSync } from "node:fs";
import { resolve } from "node:path";

function loadEnv(): void {
try {
const envPath = resolve(import.meta.dirname ?? ".", "../.env");
const content = readFileSync(envPath, "utf-8");
for (const line of content.split("\n")) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith("#")) continue;
const eqIdx = trimmed.indexOf("=");
if (eqIdx === -1) continue;
const key = trimmed.slice(0, eqIdx).trim();
const value = trimmed.slice(eqIdx + 1).trim().replace(/^["']|["']$/g, "");
if (!process.env[key]) process.env[key] = value;
}
} catch {
// .env file not found
}
}
const apiKey = process.env.STREAM_API_KEY;
const userId = process.env.TEST_USER_ID;
const userToken = process.env.TEST_USER_TOKEN;

loadEnv();

const apiKey = process.env.STREAM_API_KEY || "b3haysfrr5yg";
const userId = process.env.USER_ID || "steookk";
const userToken =
process.env.USER_TOKEN ||
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoic3Rlb29rayJ9.9yO--MWVC9bYQAjdUR5vp_cKxiBXEzHrXXnPXesqakE";
if (!apiKey || !userId || !userToken) {
console.error("Error: STREAM_API_KEY, TEST_USER_ID, and TEST_USER_TOKEN must be set in .env");
process.exit(1);
}

const client = new StreamChat(apiKey, { allowServerSideConnect: true });

Expand Down
Loading