Skip to content

Commit 0a7904d

Browse files
authored
Refactor streaming code and remove versioning (#65)
* Updated streaming code * Removed version * Updated client streadming with example * Updatting stream * Updated bitwarden * Fixes * Updated README
1 parent 5c08758 commit 0a7904d

16 files changed

Lines changed: 1002 additions & 523 deletions

File tree

README.md

Lines changed: 35 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ func main() {
402402
}
403403

404404
err = c.Do(
405-
client.NewRequestEx(http.MethodGet, client.ContentTypeJsonStream),
405+
client.NewRequestEx(http.MethodGet, "application/ndjson"),
406406
nil,
407407
client.OptPath("events"),
408408
client.OptNoTimeout(),
@@ -429,19 +429,26 @@ immediately and return success.
429429

430430
### Bi-Directional JSON Streams
431431

432-
Use `Client.Stream(ctx, opts...)` to open a bi-directional JSON stream. The returned `JSONStream`
433-
lets you send newline-delimited JSON request frames with `Send`, receive response frames with `Recv`,
434-
close the outbound side with `CloseSend`, and tear down both sides with `Close`.
432+
Use `Client.Stream(ctx, callback, opts...)` to open a bi-directional NDJSON stream.
433+
The callback receives a `JSONStream`, which lets you send newline-delimited JSON
434+
request frames with `Send` and receive response frames from the channel returned
435+
by `Recv`.
435436

436-
Blank response lines are treated as keep-alive heartbeats and `Recv` returns `nil, nil` for them.
437+
Returning from the callback closes the stream. Canceling the context passed to
438+
`Client.Stream` also closes the stream. Blank response lines are treated as
439+
keep-alive heartbeats and are delivered as `nil` frames on the receive channel.
440+
441+
The receive side starts immediately when the stream opens. Callbacks should keep
442+
draining `Recv()` while the stream is active; if responses are not consumed and
443+
the internal receive buffer fills, the stream is canceled to avoid a full-duplex
444+
deadlock.
437445

438446
```go
439447
package main
440448

441449
import (
442450
"context"
443451
"encoding/json"
444-
"io"
445452
"log"
446453
"time"
447454

@@ -462,52 +469,37 @@ func main() {
462469
log.Fatal(err)
463470
}
464471

465-
// Create a bi-directional JSON stream to a server
466-
stream, err := c.Stream(ctx,client.OptPath("session", "1234", "channel"),client.OptNoTimeout())
467-
if err != nil {
468-
log.Fatal(err)
469-
}
470-
defer stream.Close()
471-
472-
// Background goroutine to send JSON frames every 5 seconds until the context is cancelled
473-
go func() {
472+
// Run the stream until the callback returns or the context is cancelled.
473+
if err := c.Stream(ctx, func(ctx context.Context, stream client.JSONStream) error {
474474
ticker := time.NewTicker(5 * time.Second)
475475
defer ticker.Stop()
476476

477477
for {
478478
select {
479-
case <-ctx.Done():
480-
_ = stream.CloseSend()
481-
return
482479
case <-ticker.C:
483480
frame := json.RawMessage(`{"text":"status"}`)
484481
if err := stream.Send(frame); err != nil {
485-
log.Printf("send error: %v", err)
486-
cancel()
487-
return
482+
return err
483+
}
484+
case frame, ok := <-stream.Recv():
485+
if !ok {
486+
return nil
487+
}
488+
if frame == nil {
489+
continue // keep-alive heartbeat
488490
}
489-
}
490-
}
491-
}()
492-
493-
// Foreground loop to receive JSON frames until the stream is closed or an error occurs
494-
for {
495-
frame, err := stream.Recv()
496-
if err == io.EOF {
497-
break
498-
}
499-
if err != nil {
500-
log.Fatal(err)
501-
}
502-
if len(frame) == 0 {
503-
continue // keep-alive heartbeat
504-
}
505491

506-
var reply Reply
507-
if err := json.Unmarshal(frame, &reply); err != nil {
508-
log.Fatal(err)
492+
var reply Reply
493+
if err := json.Unmarshal(frame, &reply); err != nil {
494+
return err
495+
}
496+
log.Printf("reply=%s", reply.Echo)
497+
case <-ctx.Done():
498+
return ctx.Err()
499+
}
509500
}
510-
log.Printf("reply=%s", reply.Echo)
501+
}, client.OptPath("session", "1234", "channel"), client.OptNoTimeout()); err != nil {
502+
log.Fatal(err)
511503
}
512504
}
513505
```
@@ -522,8 +514,8 @@ follows the same constructor pattern `New*(... , parent http.RoundTripper)` and
522514
### Logging Transport
523515

524516
`transport.NewLogging` logs every request and response to an `io.Writer`. When `verbose` is true
525-
the request and response bodies are also printed; `text/event-stream` bodies are printed
526-
line-by-line as events arrive rather than buffered:
517+
the request and response bodies are also printed; `text/event-stream` and NDJSON streaming bodies
518+
are printed line-by-line as events arrive rather than buffered:
527519

528520
```go
529521
import (

client.go

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,9 @@ type JsonStreamCallback func(json.RawMessage) error
5757
// GLOBALS
5858

5959
const (
60-
DefaultTimeout = time.Second * 30
61-
DefaultUserAgent = "github.com/mutablelogic/go-client"
62-
PathSeparator = "/"
63-
ContentTypeAny = types.ContentTypeAny
64-
ContentTypeJson = types.ContentTypeJSON
65-
ContentTypeJsonStream = "application/x-ndjson"
66-
ContentTypeTextXml = types.ContentTypeTextXml
67-
ContentTypeApplicationXml = types.ContentTypeXML
68-
ContentTypeTextPlain = types.ContentTypeTextPlain
69-
ContentTypeTextHTML = "text/html"
70-
ContentTypeBinary = types.ContentTypeBinary
60+
DefaultTimeout = time.Second * 30
61+
DefaultUserAgent = "github.com/mutablelogic/go-client"
62+
PathSeparator = "/"
7163
)
7264

7365
///////////////////////////////////////////////////////////////////////////////
@@ -200,19 +192,19 @@ func (client *Client) request(ctx context.Context, method, accept, mimetype stri
200192
// Set the credentials and user agent
201193
if body != nil {
202194
if mimetype == "" {
203-
mimetype = ContentTypeJson
195+
mimetype = types.ContentTypeJSON
204196
}
205197
r.Header.Set("Content-Type", mimetype)
206198
}
207199
if accept != "" {
208200
r.Header.Set("Accept", accept)
209201
} else {
210-
r.Header.Set("Accept", ContentTypeAny)
202+
r.Header.Set("Accept", types.ContentTypeAny)
211203
}
212204
// For SSE or NDJSON streams, disable caching and Nginx proxy buffering so
213205
// events are delivered immediately rather than held in intermediate buffers.
214206
// Accept may be a comma-separated list so use Contains rather than ==.
215-
if strings.Contains(accept, ContentTypeTextStream) || strings.Contains(accept, ContentTypeJsonStream) || strings.Contains(accept, types.ContentTypeJSONStream) {
207+
if strings.Contains(accept, ContentTypeTextStream) || strings.Contains(accept, types.ContentTypeJSONStream) {
216208
r.Header.Set("Cache-Control", "no-cache")
217209
r.Header.Set("X-Accel-Buffering", "no")
218210
}
@@ -361,7 +353,7 @@ func do(client *http.Client, req *http.Request, accept string, strict bool, out
361353
// When in strict mode, check content type returned is as expected.
362354
// Use 406 Not Acceptable since this is client-side validation that the
363355
// server's response doesn't match our Accept header expectations.
364-
if strict && (accept != "" && accept != ContentTypeAny) {
356+
if strict && (accept != "" && accept != types.ContentTypeAny) {
365357
if mimetype != accept {
366358
return httpresponse.Err(http.StatusNotAcceptable).Withf("strict mode: expected %q, got %q", accept, mimetype)
367359
}
@@ -389,7 +381,7 @@ func do(client *http.Client, req *http.Request, accept string, strict bool, out
389381
}
390382

391383
switch {
392-
case mimetype == ContentTypeJson || isJSONStreamContentType(mimetype):
384+
case mimetype == types.ContentTypeJSON || mimetype == types.ContentTypeJSONStream:
393385
dec := json.NewDecoder(response.Body)
394386
if reqopts.jsonStreamCallback != nil {
395387
for {
@@ -416,11 +408,11 @@ func do(client *http.Client, req *http.Request, accept string, strict bool, out
416408
return err
417409
}
418410
}
419-
case mimetype == ContentTypeTextXml || mimetype == ContentTypeApplicationXml:
411+
case mimetype == types.ContentTypeTextXml || mimetype == types.ContentTypeXML:
420412
if err := xml.NewDecoder(response.Body).Decode(out); err != nil {
421413
return err
422414
}
423-
case mimetype == ContentTypeTextPlain:
415+
case mimetype == types.ContentTypeTextPlain:
424416
data, err := io.ReadAll(response.Body)
425417
if err != nil {
426418
return err
@@ -455,7 +447,7 @@ func do(client *http.Client, req *http.Request, accept string, strict bool, out
455447
func respContentType(resp *http.Response) (string, error) {
456448
contenttype := resp.Header.Get("Content-Type")
457449
if contenttype == "" {
458-
return ContentTypeBinary, nil
450+
return types.ContentTypeBinary, nil
459451
}
460452
if mimetype, err := types.ParseContentType(contenttype); err != nil {
461453
return contenttype, httpresponse.Err(http.StatusUnsupportedMediaType).With(contenttype)

example/streaming/client.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/rand"
7+
"time"
8+
9+
// Packages
10+
client "github.com/mutablelogic/go-client"
11+
)
12+
13+
func runClient(ctx context.Context, listenAddr string) error {
14+
// Create a client
15+
c, err := client.New(client.OptEndpoint("http://" + listenAddr))
16+
if err != nil {
17+
return err
18+
}
19+
20+
// Create a bi-directional stream
21+
if err := c.Stream(ctx, func(ctx context.Context, stream client.JSONStream) error {
22+
ticker := time.NewTimer(time.Second)
23+
defer ticker.Stop()
24+
var seq int
25+
for {
26+
select {
27+
case <-ticker.C:
28+
if err := stream.Send(Event{Message: fmt.Sprintf("client seq %d", seq)}.JSON()); err != nil {
29+
return err
30+
}
31+
seq++
32+
ticker.Reset(time.Second * time.Duration(rand.Int31n(8)))
33+
case evt, ok := <-stream.Recv():
34+
if !ok {
35+
return nil
36+
}
37+
if e, err := NewEvent(evt); err != nil {
38+
return err
39+
} else {
40+
fmt.Println("received event:", e)
41+
}
42+
case <-ctx.Done():
43+
return ctx.Err()
44+
}
45+
}
46+
}); err != nil {
47+
return err
48+
}
49+
50+
// Create a ticker
51+
return nil
52+
}

example/streaming/event.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
6+
// Packages
7+
"github.com/mutablelogic/go-server/pkg/types"
8+
)
9+
10+
///////////////////////////////////////////////////////////////////////////////
11+
// TYPES
12+
13+
type Event struct {
14+
Message string `json:"message"`
15+
}
16+
17+
///////////////////////////////////////////////////////////////////////////////
18+
// STRINGIFY
19+
20+
func (e Event) String() string {
21+
return types.Stringify(e)
22+
}
23+
24+
///////////////////////////////////////////////////////////////////////////////
25+
// PUBLIC METHODS
26+
27+
func NewEvent(data json.RawMessage) (Event, error) {
28+
var evt Event
29+
if err := json.Unmarshal(data, &evt); err != nil {
30+
return Event{}, err
31+
}
32+
return evt, nil
33+
}
34+
35+
func (e Event) JSON() json.RawMessage {
36+
if data, err := json.Marshal(e); err != nil {
37+
return nil
38+
} else {
39+
return data
40+
}
41+
}

example/streaming/main.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"path/filepath"
9+
"syscall"
10+
)
11+
12+
///////////////////////////////////////////////////////////////////////////////
13+
// GLOBALS
14+
15+
const (
16+
ListenAdress = "localhost:8080"
17+
)
18+
19+
///////////////////////////////////////////////////////////////////////////////
20+
// MAIN
21+
22+
func main() {
23+
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
24+
defer cancel()
25+
if client, err := isClient(); err != nil {
26+
fmt.Fprintln(os.Stderr, err)
27+
os.Exit(1)
28+
} else if client {
29+
fmt.Fprintln(os.Stdout, "running client")
30+
if err := runClient(ctx, ListenAdress); err != nil {
31+
fmt.Fprintln(os.Stderr, err)
32+
os.Exit(1)
33+
}
34+
} else {
35+
fmt.Fprintln(os.Stdout, "running server")
36+
if err := runServer(ctx, ListenAdress); err != nil {
37+
fmt.Fprintln(os.Stderr, err)
38+
os.Exit(1)
39+
}
40+
}
41+
}
42+
43+
///////////////////////////////////////////////////////////////////////////////
44+
// PRIVATE METHODS
45+
46+
func isClient() (bool, error) {
47+
if len(os.Args) < 2 {
48+
return false, fmt.Errorf("usage: %s [client|server]", filepath.Base(os.Args[0]))
49+
}
50+
switch os.Args[1] {
51+
case "client":
52+
return true, nil
53+
case "server":
54+
return false, nil
55+
default:
56+
return false, fmt.Errorf("invalid argument: %q", os.Args[1])
57+
}
58+
}

0 commit comments

Comments
 (0)