Skip to content

Commit e9167ea

Browse files
artparclaude
andcommitted
Complete WebSocket commands per issue #23: subscribe, ping, listen, permission
Split listen (all events) from subscribe (topic-based, multi-topic), add ws ping for liveness checks, rename destroy→delete, combine get/set permission into single command with --set flag. Closes #23 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 5fcc51f commit e9167ea

2 files changed

Lines changed: 175 additions & 54 deletions

File tree

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,34 @@ Use `%` wildcards with `like` for partial matching. Raw JSON is also accepted:
253253
--debug Enable debug output
254254
```
255255

256+
## WebSocket
257+
258+
Real-time pub/sub via Daptin's `/live` endpoint.
259+
260+
```bash
261+
# Stream all events
262+
daptin-cli ws listen
263+
264+
# Subscribe to topics
265+
daptin-cli ws subscribe user_account
266+
daptin-cli ws subscribe user_account document order
267+
268+
# Publish a message
269+
daptin-cli ws publish chat-room-1 '{"text":"hello"}'
270+
271+
# Ping
272+
daptin-cli ws ping
273+
274+
# Topic management
275+
daptin-cli ws topic create chat-room-1
276+
daptin-cli ws topic delete chat-room-1
277+
daptin-cli ws topic permission chat-room-1
278+
daptin-cli ws topic permission chat-room-1 --set 2097151
279+
280+
# Cross-node verification
281+
daptin-cli ws verify --endpoints http://node1:6336,http://node2:6336
282+
```
283+
256284
## Environment Variables
257285

258286
```

cmd/ws.go

Lines changed: 147 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"os"
77
"os/signal"
8-
"strconv"
98
"strings"
109
"time"
1110

@@ -19,7 +18,9 @@ func wsCommand(appCtx *AppContext) *cli.Command {
1918
Usage: "WebSocket pub/sub commands",
2019
Subcommands: []*cli.Command{
2120
wsListenCommand(appCtx),
21+
wsSubscribeCommand(appCtx),
2222
wsPublishCommand(appCtx),
23+
wsPingCommand(appCtx),
2324
wsTopicCommand(appCtx),
2425
wsVerifyCommand(appCtx),
2526
},
@@ -28,20 +29,62 @@ func wsCommand(appCtx *AppContext) *cli.Command {
2829

2930
func wsListenCommand(appCtx *AppContext) *cli.Command {
3031
return &cli.Command{
31-
Name: "listen",
32-
Usage: "Subscribe to a topic and stream events",
33-
ArgsUsage: "<topic>",
32+
Name: "listen",
33+
Usage: "Open a WebSocket connection and print all received events",
34+
Action: func(c *cli.Context) error {
35+
ws, err := client.DialWebSocket(appCtx.Client.Endpoint, appCtx.Client.AuthToken)
36+
if err != nil {
37+
return err
38+
}
39+
defer ws.Close()
40+
41+
if !appCtx.Quiet {
42+
fmt.Fprintf(os.Stderr, "Connected (session open)\n")
43+
}
44+
45+
sigCh := make(chan os.Signal, 1)
46+
signal.Notify(sigCh, os.Interrupt)
47+
48+
go func() {
49+
<-sigCh
50+
ws.Close()
51+
os.Exit(0)
52+
}()
53+
54+
for {
55+
msg, err := ws.ReadMessage()
56+
if err != nil {
57+
return nil
58+
}
59+
if msg["type"] == "pong" {
60+
continue
61+
}
62+
line, err := client.EventToJSONLine(msg)
63+
if err != nil {
64+
continue
65+
}
66+
fmt.Println(line)
67+
}
68+
},
69+
}
70+
}
71+
72+
func wsSubscribeCommand(appCtx *AppContext) *cli.Command {
73+
return &cli.Command{
74+
Name: "subscribe",
75+
Usage: "Subscribe to one or more topics and stream events",
76+
ArgsUsage: "<topic> [topic2 ...]",
3477
Flags: []cli.Flag{
3578
&cli.StringSliceFlag{
3679
Name: "filter",
3780
Usage: "Filter expression key=value (repeatable)",
3881
},
3982
},
4083
Action: func(c *cli.Context) error {
41-
topic := c.Args().Get(0)
42-
if topic == "" {
43-
return fmt.Errorf("topic name required")
84+
if c.NArg() == 0 {
85+
return fmt.Errorf("at least one topic name required")
4486
}
87+
topics := c.Args().Slice()
4588

4689
ws, err := client.DialWebSocket(appCtx.Client.Endpoint, appCtx.Client.AuthToken)
4790
if err != nil {
@@ -53,52 +96,54 @@ func wsListenCommand(appCtx *AppContext) *cli.Command {
5396
fmt.Fprintf(os.Stderr, "Connected (session open)\n")
5497
}
5598

56-
// Build subscribe attributes
57-
attrs := map[string]interface{}{
58-
"topicName": topic,
59-
}
99+
var filterMap map[string]interface{}
60100
if filters := c.StringSlice("filter"); len(filters) > 0 {
61-
filterMap := make(map[string]interface{}, len(filters))
101+
filterMap = make(map[string]interface{}, len(filters))
62102
for _, f := range filters {
63103
parts := strings.SplitN(f, "=", 2)
64104
if len(parts) != 2 {
65105
return fmt.Errorf("invalid filter %q, expected key=value", f)
66106
}
67107
filterMap[parts[0]] = parts[1]
68108
}
69-
attrs["filters"] = filterMap
70-
}
71-
72-
id, err := ws.Send("subscribe", attrs)
73-
if err != nil {
74-
return err
75-
}
76-
77-
_, err = ws.WaitResponse(id, nil)
78-
if err != nil {
79-
return fmt.Errorf("subscribe failed: %w", err)
80109
}
81110

82-
if !appCtx.Quiet {
83-
fmt.Fprintf(os.Stderr, "Subscribed to %s\n", topic)
111+
for _, topic := range topics {
112+
attrs := map[string]interface{}{
113+
"topicName": topic,
114+
}
115+
if filterMap != nil {
116+
attrs["filters"] = filterMap
117+
}
118+
id, err := ws.Send("subscribe", attrs)
119+
if err != nil {
120+
return err
121+
}
122+
_, err = ws.WaitResponse(id, nil)
123+
if err != nil {
124+
return fmt.Errorf("subscribe to %s failed: %w", topic, err)
125+
}
126+
if !appCtx.Quiet {
127+
fmt.Fprintf(os.Stderr, "Subscribed to %s\n", topic)
128+
}
84129
}
85130

86-
// Handle SIGINT for clean shutdown
87131
sigCh := make(chan os.Signal, 1)
88132
signal.Notify(sigCh, os.Interrupt)
89133

90134
go func() {
91135
<-sigCh
92-
ws.Send("unsubscribe", map[string]interface{}{"topicName": topic})
136+
for _, topic := range topics {
137+
ws.Send("unsubscribe", map[string]interface{}{"topicName": topic})
138+
}
93139
ws.Close()
94140
os.Exit(0)
95141
}()
96142

97-
// Stream events
98143
for {
99144
msg, err := ws.ReadMessage()
100145
if err != nil {
101-
return nil // connection closed
146+
return nil
102147
}
103148
if msg["type"] == "pong" {
104149
continue
@@ -113,6 +158,37 @@ func wsListenCommand(appCtx *AppContext) *cli.Command {
113158
}
114159
}
115160

161+
func wsPingCommand(appCtx *AppContext) *cli.Command {
162+
return &cli.Command{
163+
Name: "ping",
164+
Usage: "Check WebSocket liveness",
165+
Action: func(c *cli.Context) error {
166+
ws, err := client.DialWebSocket(appCtx.Client.Endpoint, appCtx.Client.AuthToken)
167+
if err != nil {
168+
return err
169+
}
170+
defer ws.Close()
171+
172+
start := time.Now()
173+
if err := ws.SendPing(); err != nil {
174+
return err
175+
}
176+
177+
msg, err := ws.ReadMessageTimeout(5 * time.Second)
178+
if err != nil {
179+
return fmt.Errorf("ping failed: %w", err)
180+
}
181+
if msg == nil {
182+
return fmt.Errorf("ping timeout (5s)")
183+
}
184+
185+
d := time.Since(start)
186+
fmt.Fprintf(os.Stderr, "Pong received (%dms)\n", d.Milliseconds())
187+
return nil
188+
},
189+
}
190+
}
191+
116192
func wsPublishCommand(appCtx *AppContext) *cli.Command {
117193
return &cli.Command{
118194
Name: "publish",
@@ -198,8 +274,8 @@ func wsTopicCommand(appCtx *AppContext) *cli.Command {
198274
},
199275
},
200276
{
201-
Name: "destroy",
202-
Usage: "Destroy a user topic",
277+
Name: "delete",
278+
Usage: "Delete a user topic",
203279
ArgsUsage: "<name>",
204280
Action: func(c *cli.Context) error {
205281
name := c.Args().Get(0)
@@ -222,29 +298,29 @@ func wsTopicCommand(appCtx *AppContext) *cli.Command {
222298

223299
_, err = ws.WaitResponse(id, nil)
224300
if err != nil {
225-
return fmt.Errorf("destroy topic failed: %w", err)
301+
return fmt.Errorf("delete topic failed: %w", err)
226302
}
227303

228304
if !appCtx.Quiet {
229-
fmt.Fprintf(os.Stderr, "Destroyed topic %s\n", name)
305+
fmt.Fprintf(os.Stderr, "Deleted topic %s\n", name)
230306
}
231307
return nil
232308
},
233309
},
234310
{
235-
Name: "set-permission",
236-
Usage: "Set permission on a user topic",
237-
ArgsUsage: "<name> <permission>",
311+
Name: "permission",
312+
Usage: "Get or set permission on a user topic",
313+
ArgsUsage: "<name>",
314+
Flags: []cli.Flag{
315+
&cli.Int64Flag{
316+
Name: "set",
317+
Usage: "Permission bitmask to set",
318+
},
319+
},
238320
Action: func(c *cli.Context) error {
239321
name := c.Args().Get(0)
240-
permStr := c.Args().Get(1)
241-
if name == "" || permStr == "" {
242-
return fmt.Errorf("usage: ws topic set-permission <name> <permission>")
243-
}
244-
245-
perm, err := strconv.ParseInt(permStr, 10, 64)
246-
if err != nil {
247-
return fmt.Errorf("invalid permission value: %w", err)
322+
if name == "" {
323+
return fmt.Errorf("topic name required")
248324
}
249325

250326
ws, err := client.DialWebSocket(appCtx.Client.Endpoint, appCtx.Client.AuthToken)
@@ -253,22 +329,39 @@ func wsTopicCommand(appCtx *AppContext) *cli.Command {
253329
}
254330
defer ws.Close()
255331

256-
id, err := ws.Send("set-topic-permission", map[string]interface{}{
257-
"topicName": name,
258-
"permission": perm,
332+
if c.IsSet("set") {
333+
perm := c.Int64("set")
334+
id, err := ws.Send("set-topic-permission", map[string]interface{}{
335+
"topicName": name,
336+
"permission": perm,
337+
})
338+
if err != nil {
339+
return err
340+
}
341+
_, err = ws.WaitResponse(id, nil)
342+
if err != nil {
343+
return fmt.Errorf("set permission failed: %w", err)
344+
}
345+
if !appCtx.Quiet {
346+
fmt.Fprintf(os.Stderr, "Set permission %d on topic %s\n", perm, name)
347+
}
348+
return nil
349+
}
350+
351+
// Get permission
352+
id, err := ws.Send("get-topic-permission", map[string]interface{}{
353+
"topicName": name,
259354
})
260355
if err != nil {
261356
return err
262357
}
263-
264-
_, err = ws.WaitResponse(id, nil)
358+
resp, err := ws.WaitResponse(id, nil)
265359
if err != nil {
266-
return fmt.Errorf("set-permission failed: %w", err)
267-
}
268-
269-
if !appCtx.Quiet {
270-
fmt.Fprintf(os.Stderr, "Set permission %d on topic %s\n", perm, name)
360+
return fmt.Errorf("get permission failed: %w", err)
271361
}
362+
attrs, _ := resp["attributes"].(map[string]interface{})
363+
perm, _ := attrs["permission"]
364+
fmt.Println(perm)
272365
return nil
273366
},
274367
},

0 commit comments

Comments
 (0)