Skip to content

Commit 1f4132f

Browse files
committed
Fix Openstream message type
1 parent 330dd0d commit 1f4132f

6 files changed

Lines changed: 36 additions & 67 deletions

File tree

internal/dev_server/api/events/events_stream.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ import (
1616
type sdkEventObserver struct {
1717
ctx context.Context
1818
debugSessionKey string
19-
updateChan chan<- sdk.Message
19+
updateChan chan<- []byte
2020
}
2121

22-
func newSdkEventObserver(updateChan chan<- sdk.Message, ctx context.Context) sdkEventObserver {
22+
func newSdkEventObserver(updateChan chan<- []byte, ctx context.Context) sdkEventObserver {
2323
debugSessionKey := uuid.New().String()
2424
db := model.EventStoreFromContext(ctx)
2525
err := db.CreateDebugSession(ctx, debugSessionKey)
@@ -54,14 +54,14 @@ func (o sdkEventObserver) Handle(message interface{}) {
5454
return
5555
}
5656

57-
o.updateChan <- sdk.Message{Event: sdk.TYPE_PUT, Data: str}
57+
o.updateChan <- sdk.Message{Event: sdk.TYPE_PUT, Data: str}.ToPayload()
5858
}
5959

6060
func SdkEventsTeeHandler(writer http.ResponseWriter, request *http.Request) {
6161
updateChan, errChan := sdk.OpenStream(
6262
writer,
6363
request.Context().Done(),
64-
sdk.Message{Event: sdk.TYPE_PUT, Data: []byte{}},
64+
sdk.Message{Event: sdk.TYPE_PUT, Data: []byte{}}.ToPayload(),
6565
)
6666
defer close(updateChan)
6767
observers := model.GetObserversFromContext(request.Context())

internal/dev_server/sdk/fdv2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func makePutObjectEvent(version int, key string, flagState model.FlagState) (sub
123123
return subsystems.RawEvent{Name: subsystems.EventPutObject, Data: data}, nil
124124
}
125125

126-
// buildFlagChangeEvents builds the three-event sequence for a single flag update pushed over a stream:
126+
// buildFlagChangeEvents builds the events sequence for a single flag update pushed over a stream:
127127
// server-intent(xfer-changes) + put-object(changed flag) + payload-transferred.
128128
func buildFlagChangeEvents(payloadID string, version int, flagKey string, flagState model.FlagState) ([]subsystems.RawEvent, error) {
129129
intentEvent, err := makeServerIntentEvent(payloadID, version, subsystems.IntentTransferChanges, fdv2ReasonUpdate)

internal/dev_server/sdk/stream_client_flags.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func StreamClientFlags(w http.ResponseWriter, r *http.Request) {
2525
updateChan, doneChan := OpenStream(
2626
w,
2727
r.Context().Done(),
28-
Message{Event: TYPE_PUT, Data: jsonBody},
28+
Message{Event: TYPE_PUT, Data: jsonBody}.ToPayload(),
2929
)
3030
defer close(updateChan)
3131
projectKey := GetProjectKeyFromContext(ctx)
@@ -46,7 +46,7 @@ func StreamClientFlags(w http.ResponseWriter, r *http.Request) {
4646
}
4747

4848
type clientFlagsObserver struct {
49-
updateChan chan<- Message
49+
updateChan chan<- []byte
5050
projectKey string
5151
}
5252

internal/dev_server/sdk/stream_server_fdv2.go

Lines changed: 18 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"log"
66
"net/http"
7-
"time"
87

98
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
109
"github.com/launchdarkly/ldcli/internal/dev_server/model"
@@ -28,72 +27,41 @@ func StreamV2(w http.ResponseWriter, r *http.Request) {
2827
return
2928
}
3029

31-
flusher, ok := w.(http.Flusher)
32-
if !ok {
33-
WriteError(ctx, w, errors.New("streaming not supported"))
34-
return
35-
}
36-
3730
initialPayload, err := buildFullTransferResponse(projectKey, project.PayloadVersion, allFlags, fdv2ReasonPayloadMissing)
3831
if err != nil {
3932
WriteError(ctx, w, errors.Wrap(err, "failed to build initial payload"))
4033
return
4134
}
4235

43-
// Register observer before writing to the client so that any changes arriving
44-
// during the initial write are queued and delivered immediately after.
45-
updateChan := make(chan []subsystems.RawEvent, 10)
46-
observerID := model.GetObserversFromContext(ctx).RegisterObserver(fdv2StreamObserver{
47-
updateChan: updateChan,
48-
projectKey: projectKey,
49-
})
36+
updateChan, doneChan := OpenStream(w, r.Context().Done(), fdv2SSEPayload(initialPayload.Events))
37+
defer close(updateChan)
38+
39+
observer := fdv2StreamObserver{updateChan: updateChan, projectKey: projectKey}
40+
observerID := model.GetObserversFromContext(ctx).RegisterObserver(observer)
5041
defer func() {
5142
if ok := model.GetObserversFromContext(ctx).DeregisterObserver(observerID); !ok {
5243
log.Printf("unable to deregister fdv2 stream observer")
5344
}
5445
}()
5546

56-
w.Header().Set("Content-Type", "text/event-stream")
57-
w.Header().Set("Cache-Control", "no-cache")
58-
59-
if err := writeFDv2SSEEvents(w, flusher, initialPayload.Events); err != nil {
60-
return
61-
}
62-
63-
ticker := time.NewTicker(time.Minute)
64-
defer ticker.Stop()
65-
66-
for {
67-
select {
68-
case events := <-updateChan:
69-
if err := writeFDv2SSEEvents(w, flusher, events); err != nil {
70-
return
71-
}
72-
case <-ticker.C:
73-
// SSE comment line as a keepalive.
74-
if _, err := w.Write([]byte(":\n\n")); err != nil {
75-
return
76-
}
77-
flusher.Flush()
78-
case <-ctx.Done():
79-
return
80-
}
47+
err = <-doneChan
48+
if err != nil {
49+
WriteError(ctx, w, errors.Wrap(err, "stream failure"))
8150
}
8251
}
8352

84-
// writeFDv2SSEEvents writes a batch of FDv2 events to the response as individual SSE events.
85-
func writeFDv2SSEEvents(w http.ResponseWriter, flusher http.Flusher, events []subsystems.RawEvent) error {
86-
for _, event := range events {
87-
if _, err := fmt.Fprintf(w, "event:%s\ndata:%s\n\n", event.Name, event.Data); err != nil {
88-
return err
89-
}
53+
// fdv2SSEPayload formats a slice of FDv2 events as raw SSE bytes.
54+
// Each event becomes an individual SSE event in the output.
55+
func fdv2SSEPayload(events []subsystems.RawEvent) []byte {
56+
var buf []byte
57+
for _, e := range events {
58+
buf = append(buf, fmt.Sprintf("event:%s\ndata:%s\n\n", e.Name, e.Data)...)
9059
}
91-
flusher.Flush()
92-
return nil
60+
return buf
9361
}
9462

9563
type fdv2StreamObserver struct {
96-
updateChan chan<- []subsystems.RawEvent
64+
updateChan chan<- []byte
9765
projectKey string
9866
}
9967

@@ -107,7 +75,7 @@ func (o fdv2StreamObserver) Handle(event interface{}) {
10775
if err != nil {
10876
panic(errors.Wrap(err, "failed to build flag change events in fdv2 stream observer"))
10977
}
110-
o.updateChan <- events
78+
o.updateChan <- fdv2SSEPayload(events)
11179
case model.SyncEvent:
11280
if event.ProjectKey != o.projectKey {
11381
return
@@ -116,6 +84,6 @@ func (o fdv2StreamObserver) Handle(event interface{}) {
11684
if err != nil {
11785
panic(errors.Wrap(err, "failed to build full transfer in fdv2 stream observer"))
11886
}
119-
o.updateChan <- payload.Events
87+
o.updateChan <- fdv2SSEPayload(payload.Events)
12088
}
12189
}

internal/dev_server/sdk/stream_server_flags.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func StreamServerAllPayload(w http.ResponseWriter, r *http.Request) {
2727
updateChan, doneChan := OpenStream(
2828
w,
2929
r.Context().Done(),
30-
Message{Event: TYPE_PUT, Data: jsonBody},
30+
Message{Event: TYPE_PUT, Data: jsonBody}.ToPayload(),
3131
)
3232
defer close(updateChan)
3333
observer := serverFlagsObserver{updateChan, projectKey}
@@ -47,7 +47,7 @@ func StreamServerAllPayload(w http.ResponseWriter, r *http.Request) {
4747
}
4848

4949
type serverFlagsObserver struct {
50-
updateChan chan<- Message
50+
updateChan chan<- []byte
5151
projectKey string
5252
}
5353

internal/dev_server/sdk/streaming.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ func (m Message) ToPayload() []byte {
2828
return payload
2929
}
3030

31-
// OpenStream sends data to a response using the initial payload and subsequently via the returned write only channel
32-
func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Message) (chan<- Message, <-chan error) {
31+
// OpenStream sets SSE headers, writes initialPayload, and starts the SSE loop.
32+
// Each []byte sent to the returned channel is written verbatim to the response.
33+
func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialPayload []byte) (chan<- []byte, <-chan error) {
3334
errChan := make(chan error)
34-
updateChan := make(chan Message, 10)
35+
updateChan := make(chan []byte, 10)
3536
go func() {
3637
var err error
3738
defer func() {
@@ -45,7 +46,7 @@ func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Mess
4546
}
4647

4748
w.Header().Set("Content-Type", "text/event-stream")
48-
_, err = w.Write(initialMessage.ToPayload())
49+
_, err = w.Write(initialPayload)
4950
if err != nil {
5051
return errors.Wrap(err, "unable to write response")
5152
}
@@ -60,8 +61,8 @@ func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Mess
6061
return errors.Wrap(err, "unable to write response")
6162
}
6263
flusher.Flush()
63-
case msg := <-updateChan:
64-
_, err = w.Write(msg.ToPayload())
64+
case payload := <-updateChan:
65+
_, err = w.Write(payload)
6566
if err != nil {
6667
return errors.Wrap(err, "unable to write response")
6768
}
@@ -77,7 +78,7 @@ func OpenStream(w http.ResponseWriter, done <-chan struct{}, initialMessage Mess
7778
}
7879

7980
func SendMessage(
80-
updateChan chan<- Message,
81+
updateChan chan<- []byte,
8182
msgType MessageType,
8283
data interface{},
8384
) error {
@@ -89,7 +90,7 @@ func SendMessage(
8990
updateChan <- Message{
9091
Event: msgType,
9192
Data: payload,
92-
}
93+
}.ToPayload()
9394

9495
return nil
9596
}

0 commit comments

Comments
 (0)