Skip to content

Commit 419ed3f

Browse files
authored
fix(realtime): scope sse streams by authorized route (#786)
* fix(realtime): scope sse streams by authorized route * fix(realtime): preserve scoped and broadcast streams
1 parent 7adb9ef commit 419ed3f

4 files changed

Lines changed: 540 additions & 45 deletions

File tree

internal/appgen/appgen_test.go

Lines changed: 107 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,8 +1153,10 @@ func TestGenerateWritesRealtimeFanoutForSubscriptions(t *testing.T) {
11531153
`var realtimeFanout gowdkrealtime.PresentationFanout = gowdkrealtime.NewSSE()`,
11541154
`func RegisterRealtimeFanout(fanout gowdkrealtime.PresentationFanout)`,
11551155
`"example.com/app/contracts/patients.PatientNotice": true`,
1156-
`event.Category == gowdkcontracts.PresentationEvent`,
1157-
`gowdkcontracts.PresentationFanoutCommandEventSink(realtimeSubscriptionFanout{inner: fanout})`,
1156+
`var realtimeSubscriptionBroadcasts map[string]bool = map[string]bool{"example.com/app/contracts/patients.PatientNotice": true}`,
1157+
`realtimeAudienceScopedEvents(event)...`,
1158+
`scopedFanout := realtimeSubscriptionFanout{inner: fanout}`,
1159+
`gowdkcontracts.PresentationFanoutCommandEventSink(scopedFanout)`,
11581160
`gowdkcontracts.CompositeCommandEventSink(gowdkcontracts.InProcessCommandEventSink(), fanoutSink)`,
11591161
} {
11601162
if !strings.Contains(source, expected) {
@@ -1210,8 +1212,11 @@ func TestGenerateWritesRealtimeQueryInvalidationFanout(t *testing.T) {
12101212
for _, expected := range []string{
12111213
`gowdkcontracts.QueryInvalidationPresentationEventType: true`,
12121214
`var realtimeQueryInvalidations []gowdkcontracts.QueryInvalidation = []gowdkcontracts.QueryInvalidation{gowdkcontracts.QueryInvalidation{EventCategory: gowdkcontracts.DomainEvent, EventType: "example.com/app/contracts/patients.PatientCreated", QueryType: "example.com/app/contracts/patients.GetPatientPage"}}`,
1213-
`gowdkcontracts.QueryInvalidationCommandEventSink(fanout, realtimeQueryInvalidations)`,
1214-
`gowdkcontracts.CompositeCommandEventSink(gowdkcontracts.InProcessCommandEventSink(), gowdkcontracts.QueryInvalidationCommandEventSink(fanout, realtimeQueryInvalidations), fanoutSink)`,
1215+
`scopedFanout := realtimeSubscriptionFanout{inner: fanout}`,
1216+
`gowdkcontracts.QueryInvalidationCommandEventSink(scopedFanout, realtimeQueryInvalidations)`,
1217+
`gowdkcontracts.CompositeCommandEventSink(gowdkcontracts.InProcessCommandEventSink(), gowdkcontracts.QueryInvalidationCommandEventSink(scopedFanout, realtimeQueryInvalidations), fanoutSink)`,
1218+
`var realtimeQueryBroadcasts map[string]bool = map[string]bool{"example.com/app/contracts/patients.GetPatientPage": true}`,
1219+
`unscopedQueries := []string{}`,
12151220
// Single-flight write path: the command adapter tells the submitting
12161221
// client which g:query regions to refresh via the X-GOWDK-Queries header.
12171222
`invalidatedQueries := gowdkcontracts.InvalidatedQueryTypes(realtimeQueryInvalidations, events)`,
@@ -1234,6 +1239,63 @@ func TestGenerateWritesRealtimeQueryInvalidationFanout(t *testing.T) {
12341239
}
12351240
}
12361241

1242+
func TestRealtimeBroadcastsTrackMixedScopedAndUnscopedOwners(t *testing.T) {
1243+
options := Options{IR: &gwdkir.Program{
1244+
Pages: []gwdkir.Page{{ID: "patients", Route: "/patients"}},
1245+
RealtimeSubscriptions: []gwdkir.RealtimeSubscription{{
1246+
Query: "patients.GetPatientPage",
1247+
Event: "patients.PatientNotice",
1248+
EventImportPath: "example.com/app/contracts/patients",
1249+
EventType: "PatientNotice",
1250+
Status: gwdkir.ContractBindingBound,
1251+
OwnerKind: gwdkir.SourcePage,
1252+
OwnerID: "patients",
1253+
}, {
1254+
Query: "cards.GetPatientCard",
1255+
Event: "patients.PatientNotice",
1256+
EventImportPath: "example.com/app/contracts/patients",
1257+
EventType: "PatientNotice",
1258+
Status: gwdkir.ContractBindingBound,
1259+
OwnerKind: gwdkir.SourceComponent,
1260+
OwnerID: "PatientCard",
1261+
}},
1262+
QueryInvalidations: []gwdkir.QueryInvalidation{{
1263+
Query: "patients.GetPatientPage",
1264+
QueryType: "example.com/app/contracts/patients.GetPatientPage",
1265+
Event: "example.com/app/contracts/patients.PatientCreated",
1266+
EventType: "example.com/app/contracts/patients.PatientCreated",
1267+
EventCategory: "domain",
1268+
Status: gwdkir.ContractBindingBound,
1269+
OwnerKind: gwdkir.SourcePage,
1270+
OwnerID: "patients",
1271+
}, {
1272+
Query: "cards.GetPatientCard",
1273+
QueryType: "example.com/app/contracts/patients.GetPatientCard",
1274+
Event: "example.com/app/contracts/patients.PatientCreated",
1275+
EventType: "example.com/app/contracts/patients.PatientCreated",
1276+
EventCategory: "domain",
1277+
Status: gwdkir.ContractBindingBound,
1278+
OwnerKind: gwdkir.SourceComponent,
1279+
OwnerID: "PatientCard",
1280+
}},
1281+
}}
1282+
1283+
subscriptionAudiences := realtimeSubscriptionAudiences(options)
1284+
if got := subscriptionAudiences["example.com/app/contracts/patients.PatientNotice"]; len(got) != 1 || got[0] != "gowdk.route.0" {
1285+
t.Fatalf("subscription audiences = %#v, want route audience", subscriptionAudiences)
1286+
}
1287+
if !realtimeSubscriptionBroadcasts(options)["example.com/app/contracts/patients.PatientNotice"] {
1288+
t.Fatalf("expected mixed subscription event type to retain broadcast delivery")
1289+
}
1290+
queryAudiences := realtimeQueryAudiences(options)
1291+
if got := queryAudiences["example.com/app/contracts/patients.GetPatientPage"]; len(got) != 1 || got[0] != "gowdk.route.0" {
1292+
t.Fatalf("query audiences = %#v, want route audience", queryAudiences)
1293+
}
1294+
if !realtimeQueryBroadcasts(options)["example.com/app/contracts/patients.GetPatientCard"] {
1295+
t.Fatalf("expected component-owned query invalidation to retain broadcast delivery")
1296+
}
1297+
}
1298+
12371299
func TestGenerateRegistersSingleFlightRegionRenderers(t *testing.T) {
12381300
root := t.TempDir()
12391301
outputDir := filepath.Join(root, "dist")
@@ -1553,11 +1615,16 @@ func TestGenerateGuardsRealtimeStreamForSubscribedPages(t *testing.T) {
15531615
for _, expected := range []string{
15541616
`neturl "net/url"`,
15551617
`gowdkroute "github.com/cssbruno/gowdk/runtime/route"`,
1618+
`gowdkrealtime.NewSSE(gowdkrealtime.WithSSEAudienceFromRequest(realtimeStreamAudience))`,
1619+
`var realtimeSubscriptionAudiences map[string][]string = map[string][]string{"example.com/app/contracts/patients.PatientNotice": []string{"gowdk.route.0"}}`,
1620+
`func realtimeStreamAudience(request *http.Request) []string`,
15561621
`func realtimeStreamGuards(request *http.Request) []string`,
1557-
`request.URL.Query().Get("path")`,
1622+
`func realtimeStreamPath(request *http.Request) string`,
15581623
`referer := request.Referer()`,
15591624
`neturl.Parse(referer)`,
1625+
`return refererURL.Path`,
15601626
`gowdkroute.Match("/dashboard", requestPath)`,
1627+
`return []string{"gowdk.route.0"}`,
15611628
`return []string{"auth.required"}`,
15621629
`if !runGuards(response, request, realtimeStreamGuards(request))`,
15631630
`RegisterGuards(GOWDKGuardRegistry())`,
@@ -1566,6 +1633,9 @@ func TestGenerateGuardsRealtimeStreamForSubscribedPages(t *testing.T) {
15661633
t.Fatalf("expected generated guarded realtime source to contain %q:\n%s", expected, source)
15671634
}
15681635
}
1636+
if strings.Contains(source, `Query().Get("path")`) {
1637+
t.Fatalf("generated realtime stream must not authorize from client query path:\n%s", source)
1638+
}
15691639
}
15701640

15711641
func TestBoundActionFieldDecodePanicsOnUnsupportedFieldType(t *testing.T) {
@@ -7152,6 +7222,10 @@ func TestGeneratedBinaryCommandSetsInvalidatedQueriesHeader(t *testing.T) {
71527222
writeTestFile(t, filepath.Join(outputDir, "patients", "index.html"), "<main>Patients page</main>")
71537223

71547224
program := &gwdkir.Program{
7225+
Pages: []gwdkir.Page{
7226+
{ID: "dashboard", Route: "/dashboard"},
7227+
{ID: "patients", Route: "/patients"},
7228+
},
71557229
ContractRefs: []gwdkir.ContractReference{{
71567230
Kind: gwdkir.ContractCommand,
71577231
Name: "patients.CreatePatient",
@@ -7479,8 +7553,13 @@ func TestGeneratedBinaryRealtimeFanoutStreamsSubscribedPresentationEvents(t *tes
74797553
appDir := filepath.Join(root, "generated-app")
74807554
binaryPath := filepath.Join(root, "site")
74817555
writeTestFile(t, filepath.Join(outputDir, "patients", "index.html"), "<main>Patients page</main>")
7556+
writeTestFile(t, filepath.Join(outputDir, "dashboard", "index.html"), "<main>Dashboard page</main>")
74827557

74837558
program := &gwdkir.Program{
7559+
Pages: []gwdkir.Page{
7560+
{ID: "dashboard", Route: "/dashboard"},
7561+
{ID: "patients", Route: "/patients"},
7562+
},
74847563
ContractRefs: []gwdkir.ContractReference{{
74857564
Kind: gwdkir.ContractCommand,
74867565
Name: "patients.CreatePatient",
@@ -7506,6 +7585,14 @@ func TestGeneratedBinaryRealtimeFanoutStreamsSubscribedPresentationEvents(t *tes
75067585
Status: gwdkir.ContractBindingBound,
75077586
OwnerKind: gwdkir.SourcePage,
75087587
OwnerID: "patients",
7588+
}, {
7589+
Query: "patients.GetDashboard",
7590+
Event: "patients.OtherNotice",
7591+
EventImportPath: "gowdk-generated-app/patients",
7592+
EventType: "OtherNotice",
7593+
Status: gwdkir.ContractBindingBound,
7594+
OwnerKind: gwdkir.SourcePage,
7595+
OwnerID: "dashboard",
75097596
}},
75107597
}
75117598
if _, err := GenerateWithOptions(outputDir, appDir, Options{Config: csrfDisabledConfig(), IR: program}); err != nil {
@@ -7578,10 +7665,11 @@ func HandleCreatePatient(ctx context.Context, command CreatePatient) (CreatePati
75787665

75797666
streamCtx, cancelStream := context.WithCancel(context.Background())
75807667
defer cancelStream()
7581-
streamRequest, err := http.NewRequestWithContext(streamCtx, http.MethodGet, "http://"+addr+"/_gowdk/realtime/events", nil)
7668+
streamRequest, err := http.NewRequestWithContext(streamCtx, http.MethodGet, "http://"+addr+"/_gowdk/realtime/events?path=/dashboard", nil)
75827669
if err != nil {
75837670
t.Fatal(err)
75847671
}
7672+
streamRequest.Header.Set("Referer", "http://"+addr+"/patients")
75857673
streamResponse, err := http.DefaultClient.Do(streamRequest)
75867674
if err != nil {
75877675
t.Fatal(err)
@@ -7643,6 +7731,19 @@ func HandleCreatePatient(ctx context.Context, command CreatePatient) (CreatePati
76437731
t.Fatalf("realtime stream included unsubscribed event %q in %s", unexpected, dataLine)
76447732
}
76457733
}
7734+
noLeakDeadline := time.After(300 * time.Millisecond)
7735+
for {
7736+
select {
7737+
case line := <-lines:
7738+
if strings.HasPrefix(line, "data: ") && strings.Contains(line, "OtherNotice") {
7739+
t.Fatalf("route-scoped realtime stream leaked dashboard event: %s", line)
7740+
}
7741+
case err := <-readErrs:
7742+
t.Fatalf("read realtime stream after first event: %v", err)
7743+
case <-noLeakDeadline:
7744+
return
7745+
}
7746+
}
76467747
}
76477748

76487749
func TestGeneratedBinaryRealtimeStreamGuardDenialClosesStream(t *testing.T) {

internal/appgen/source.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func runtimeImportMap(options Options) map[string]string {
115115
imports["gowdkseo"] = "github.com/cssbruno/gowdk/runtime/seo"
116116
imports["gowdkseositemap"] = strings.TrimSpace(options.Sitemap.Dynamic.ImportPath)
117117
}
118-
if generatedRealtimeStreamUsesRouteMatching(options) {
118+
if generatedRealtimeStreamUsesRouteMatching(options) || generatedRealtimeStreamUsesAudience(options) {
119119
imports["gowdkroute"] = "github.com/cssbruno/gowdk/runtime/route"
120120
imports["neturl"] = "net/url"
121121
}

internal/appgen/source_contracts.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -554,15 +554,13 @@ func currentContractEventSinkDecl(realtime bool, queryInvalidations bool) ast.De
554554
&ast.IfStmt{
555555
Cond: &ast.BinaryExpr{X: id("fanout"), Op: token.NEQ, Y: id("nil")},
556556
Body: block(
557-
define([]ast.Expr{id("fanoutSink")}, call(sel("gowdkcontracts", "PresentationFanoutCommandEventSink"), &ast.CompositeLit{
558-
Type: id("realtimeSubscriptionFanout"),
559-
Elts: []ast.Expr{keyValue("inner", id("fanout"))},
560-
})),
557+
define([]ast.Expr{id("scopedFanout")}, realtimeSubscriptionFanoutExpr(id("fanout"))),
558+
define([]ast.Expr{id("fanoutSink")}, call(sel("gowdkcontracts", "PresentationFanoutCommandEventSink"), id("scopedFanout"))),
561559
&ast.IfStmt{
562560
Cond: &ast.BinaryExpr{X: id("sink"), Op: token.NEQ, Y: id("nil")},
563-
Body: block(&ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, id("sink"), id("fanoutSink"), id("fanout"))}}),
561+
Body: block(&ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, id("sink"), id("fanoutSink"), id("scopedFanout"))}}),
564562
},
565-
&ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, call(sel("gowdkcontracts", "InProcessCommandEventSink")), id("fanoutSink"), id("fanout"))}},
563+
&ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, call(sel("gowdkcontracts", "InProcessCommandEventSink")), id("fanoutSink"), id("scopedFanout"))}},
566564
),
567565
},
568566
)
@@ -572,6 +570,13 @@ func currentContractEventSinkDecl(realtime bool, queryInvalidations bool) ast.De
572570
}, stmts)
573571
}
574572

573+
func realtimeSubscriptionFanoutExpr(fanout ast.Expr) ast.Expr {
574+
return &ast.CompositeLit{
575+
Type: id("realtimeSubscriptionFanout"),
576+
Elts: []ast.Expr{keyValue("inner", fanout)},
577+
}
578+
}
579+
575580
func realtimeCompositeSinkExpr(queryInvalidations bool, base ast.Expr, fanoutSink ast.Expr, fanout ast.Expr) ast.Expr {
576581
args := []ast.Expr{base}
577582
if queryInvalidations {

0 commit comments

Comments
 (0)