diff --git a/internal/appgen/appgen_test.go b/internal/appgen/appgen_test.go
index 416bbed2..cacc4ba0 100644
--- a/internal/appgen/appgen_test.go
+++ b/internal/appgen/appgen_test.go
@@ -1153,8 +1153,10 @@ func TestGenerateWritesRealtimeFanoutForSubscriptions(t *testing.T) {
`var realtimeFanout gowdkrealtime.PresentationFanout = gowdkrealtime.NewSSE()`,
`func RegisterRealtimeFanout(fanout gowdkrealtime.PresentationFanout)`,
`"example.com/app/contracts/patients.PatientNotice": true`,
- `event.Category == gowdkcontracts.PresentationEvent`,
- `gowdkcontracts.PresentationFanoutCommandEventSink(realtimeSubscriptionFanout{inner: fanout})`,
+ `var realtimeSubscriptionBroadcasts map[string]bool = map[string]bool{"example.com/app/contracts/patients.PatientNotice": true}`,
+ `realtimeAudienceScopedEvents(event)...`,
+ `scopedFanout := realtimeSubscriptionFanout{inner: fanout}`,
+ `gowdkcontracts.PresentationFanoutCommandEventSink(scopedFanout)`,
`gowdkcontracts.CompositeCommandEventSink(gowdkcontracts.InProcessCommandEventSink(), fanoutSink)`,
} {
if !strings.Contains(source, expected) {
@@ -1210,8 +1212,11 @@ func TestGenerateWritesRealtimeQueryInvalidationFanout(t *testing.T) {
for _, expected := range []string{
`gowdkcontracts.QueryInvalidationPresentationEventType: true`,
`var realtimeQueryInvalidations []gowdkcontracts.QueryInvalidation = []gowdkcontracts.QueryInvalidation{gowdkcontracts.QueryInvalidation{EventCategory: gowdkcontracts.DomainEvent, EventType: "example.com/app/contracts/patients.PatientCreated", QueryType: "example.com/app/contracts/patients.GetPatientPage"}}`,
- `gowdkcontracts.QueryInvalidationCommandEventSink(fanout, realtimeQueryInvalidations)`,
- `gowdkcontracts.CompositeCommandEventSink(gowdkcontracts.InProcessCommandEventSink(), gowdkcontracts.QueryInvalidationCommandEventSink(fanout, realtimeQueryInvalidations), fanoutSink)`,
+ `scopedFanout := realtimeSubscriptionFanout{inner: fanout}`,
+ `gowdkcontracts.QueryInvalidationCommandEventSink(scopedFanout, realtimeQueryInvalidations)`,
+ `gowdkcontracts.CompositeCommandEventSink(gowdkcontracts.InProcessCommandEventSink(), gowdkcontracts.QueryInvalidationCommandEventSink(scopedFanout, realtimeQueryInvalidations), fanoutSink)`,
+ `var realtimeQueryBroadcasts map[string]bool = map[string]bool{"example.com/app/contracts/patients.GetPatientPage": true}`,
+ `unscopedQueries := []string{}`,
// Single-flight write path: the command adapter tells the submitting
// client which g:query regions to refresh via the X-GOWDK-Queries header.
`invalidatedQueries := gowdkcontracts.InvalidatedQueryTypes(realtimeQueryInvalidations, events)`,
@@ -1234,6 +1239,63 @@ func TestGenerateWritesRealtimeQueryInvalidationFanout(t *testing.T) {
}
}
+func TestRealtimeBroadcastsTrackMixedScopedAndUnscopedOwners(t *testing.T) {
+ options := Options{IR: &gwdkir.Program{
+ Pages: []gwdkir.Page{{ID: "patients", Route: "/patients"}},
+ RealtimeSubscriptions: []gwdkir.RealtimeSubscription{{
+ Query: "patients.GetPatientPage",
+ Event: "patients.PatientNotice",
+ EventImportPath: "example.com/app/contracts/patients",
+ EventType: "PatientNotice",
+ Status: gwdkir.ContractBindingBound,
+ OwnerKind: gwdkir.SourcePage,
+ OwnerID: "patients",
+ }, {
+ Query: "cards.GetPatientCard",
+ Event: "patients.PatientNotice",
+ EventImportPath: "example.com/app/contracts/patients",
+ EventType: "PatientNotice",
+ Status: gwdkir.ContractBindingBound,
+ OwnerKind: gwdkir.SourceComponent,
+ OwnerID: "PatientCard",
+ }},
+ QueryInvalidations: []gwdkir.QueryInvalidation{{
+ Query: "patients.GetPatientPage",
+ QueryType: "example.com/app/contracts/patients.GetPatientPage",
+ Event: "example.com/app/contracts/patients.PatientCreated",
+ EventType: "example.com/app/contracts/patients.PatientCreated",
+ EventCategory: "domain",
+ Status: gwdkir.ContractBindingBound,
+ OwnerKind: gwdkir.SourcePage,
+ OwnerID: "patients",
+ }, {
+ Query: "cards.GetPatientCard",
+ QueryType: "example.com/app/contracts/patients.GetPatientCard",
+ Event: "example.com/app/contracts/patients.PatientCreated",
+ EventType: "example.com/app/contracts/patients.PatientCreated",
+ EventCategory: "domain",
+ Status: gwdkir.ContractBindingBound,
+ OwnerKind: gwdkir.SourceComponent,
+ OwnerID: "PatientCard",
+ }},
+ }}
+
+ subscriptionAudiences := realtimeSubscriptionAudiences(options)
+ if got := subscriptionAudiences["example.com/app/contracts/patients.PatientNotice"]; len(got) != 1 || got[0] != "gowdk.route.0" {
+ t.Fatalf("subscription audiences = %#v, want route audience", subscriptionAudiences)
+ }
+ if !realtimeSubscriptionBroadcasts(options)["example.com/app/contracts/patients.PatientNotice"] {
+ t.Fatalf("expected mixed subscription event type to retain broadcast delivery")
+ }
+ queryAudiences := realtimeQueryAudiences(options)
+ if got := queryAudiences["example.com/app/contracts/patients.GetPatientPage"]; len(got) != 1 || got[0] != "gowdk.route.0" {
+ t.Fatalf("query audiences = %#v, want route audience", queryAudiences)
+ }
+ if !realtimeQueryBroadcasts(options)["example.com/app/contracts/patients.GetPatientCard"] {
+ t.Fatalf("expected component-owned query invalidation to retain broadcast delivery")
+ }
+}
+
func TestGenerateRegistersSingleFlightRegionRenderers(t *testing.T) {
root := t.TempDir()
outputDir := filepath.Join(root, "dist")
@@ -1551,11 +1613,16 @@ func TestGenerateGuardsRealtimeStreamForSubscribedPages(t *testing.T) {
for _, expected := range []string{
`neturl "net/url"`,
`gowdkroute "github.com/cssbruno/gowdk/runtime/route"`,
+ `gowdkrealtime.NewSSE(gowdkrealtime.WithSSEAudienceFromRequest(realtimeStreamAudience))`,
+ `var realtimeSubscriptionAudiences map[string][]string = map[string][]string{"example.com/app/contracts/patients.PatientNotice": []string{"gowdk.route.0"}}`,
+ `func realtimeStreamAudience(request *http.Request) []string`,
`func realtimeStreamGuards(request *http.Request) []string`,
- `request.URL.Query().Get("path")`,
+ `func realtimeStreamPath(request *http.Request) string`,
`referer := request.Referer()`,
`neturl.Parse(referer)`,
+ `return refererURL.Path`,
`gowdkroute.Match("/dashboard", requestPath)`,
+ `return []string{"gowdk.route.0"}`,
`return []string{"auth.required"}`,
`if !runGuards(response, request, realtimeStreamGuards(request))`,
`RegisterGuards(GOWDKGuardRegistry())`,
@@ -1564,6 +1631,9 @@ func TestGenerateGuardsRealtimeStreamForSubscribedPages(t *testing.T) {
t.Fatalf("expected generated guarded realtime source to contain %q:\n%s", expected, source)
}
}
+ if strings.Contains(source, `Query().Get("path")`) {
+ t.Fatalf("generated realtime stream must not authorize from client query path:\n%s", source)
+ }
}
func TestBoundActionFieldDecodePanicsOnUnsupportedFieldType(t *testing.T) {
@@ -7150,6 +7220,10 @@ func TestGeneratedBinaryCommandSetsInvalidatedQueriesHeader(t *testing.T) {
writeTestFile(t, filepath.Join(outputDir, "patients", "index.html"), "Patients page")
program := &gwdkir.Program{
+ Pages: []gwdkir.Page{
+ {ID: "dashboard", Route: "/dashboard"},
+ {ID: "patients", Route: "/patients"},
+ },
ContractRefs: []gwdkir.ContractReference{{
Kind: gwdkir.ContractCommand,
Name: "patients.CreatePatient",
@@ -7477,8 +7551,13 @@ func TestGeneratedBinaryRealtimeFanoutStreamsSubscribedPresentationEvents(t *tes
appDir := filepath.Join(root, "generated-app")
binaryPath := filepath.Join(root, "site")
writeTestFile(t, filepath.Join(outputDir, "patients", "index.html"), "Patients page")
+ writeTestFile(t, filepath.Join(outputDir, "dashboard", "index.html"), "Dashboard page")
program := &gwdkir.Program{
+ Pages: []gwdkir.Page{
+ {ID: "dashboard", Route: "/dashboard"},
+ {ID: "patients", Route: "/patients"},
+ },
ContractRefs: []gwdkir.ContractReference{{
Kind: gwdkir.ContractCommand,
Name: "patients.CreatePatient",
@@ -7504,6 +7583,14 @@ func TestGeneratedBinaryRealtimeFanoutStreamsSubscribedPresentationEvents(t *tes
Status: gwdkir.ContractBindingBound,
OwnerKind: gwdkir.SourcePage,
OwnerID: "patients",
+ }, {
+ Query: "patients.GetDashboard",
+ Event: "patients.OtherNotice",
+ EventImportPath: "gowdk-generated-app/patients",
+ EventType: "OtherNotice",
+ Status: gwdkir.ContractBindingBound,
+ OwnerKind: gwdkir.SourcePage,
+ OwnerID: "dashboard",
}},
}
if _, err := GenerateWithOptions(outputDir, appDir, Options{Config: csrfDisabledConfig(), IR: program}); err != nil {
@@ -7576,10 +7663,11 @@ func HandleCreatePatient(ctx context.Context, command CreatePatient) (CreatePati
streamCtx, cancelStream := context.WithCancel(context.Background())
defer cancelStream()
- streamRequest, err := http.NewRequestWithContext(streamCtx, http.MethodGet, "http://"+addr+"/_gowdk/realtime/events", nil)
+ streamRequest, err := http.NewRequestWithContext(streamCtx, http.MethodGet, "http://"+addr+"/_gowdk/realtime/events?path=/dashboard", nil)
if err != nil {
t.Fatal(err)
}
+ streamRequest.Header.Set("Referer", "http://"+addr+"/patients")
streamResponse, err := http.DefaultClient.Do(streamRequest)
if err != nil {
t.Fatal(err)
@@ -7641,6 +7729,19 @@ func HandleCreatePatient(ctx context.Context, command CreatePatient) (CreatePati
t.Fatalf("realtime stream included unsubscribed event %q in %s", unexpected, dataLine)
}
}
+ noLeakDeadline := time.After(300 * time.Millisecond)
+ for {
+ select {
+ case line := <-lines:
+ if strings.HasPrefix(line, "data: ") && strings.Contains(line, "OtherNotice") {
+ t.Fatalf("route-scoped realtime stream leaked dashboard event: %s", line)
+ }
+ case err := <-readErrs:
+ t.Fatalf("read realtime stream after first event: %v", err)
+ case <-noLeakDeadline:
+ return
+ }
+ }
}
func TestGeneratedBinaryRealtimeStreamGuardDenialClosesStream(t *testing.T) {
diff --git a/internal/appgen/source.go b/internal/appgen/source.go
index b3f44b85..ce8c1f6b 100644
--- a/internal/appgen/source.go
+++ b/internal/appgen/source.go
@@ -115,7 +115,7 @@ func runtimeImportMap(options Options) map[string]string {
imports["gowdkseo"] = "github.com/cssbruno/gowdk/runtime/seo"
imports["gowdkseositemap"] = strings.TrimSpace(options.Sitemap.Dynamic.ImportPath)
}
- if generatedRealtimeStreamUsesRouteMatching(options) {
+ if generatedRealtimeStreamUsesRouteMatching(options) || generatedRealtimeStreamUsesAudience(options) {
imports["gowdkroute"] = "github.com/cssbruno/gowdk/runtime/route"
imports["neturl"] = "net/url"
}
diff --git a/internal/appgen/source_contracts.go b/internal/appgen/source_contracts.go
index 2a1f1605..23b8ea60 100644
--- a/internal/appgen/source_contracts.go
+++ b/internal/appgen/source_contracts.go
@@ -554,15 +554,13 @@ func currentContractEventSinkDecl(realtime bool, queryInvalidations bool) ast.De
&ast.IfStmt{
Cond: &ast.BinaryExpr{X: id("fanout"), Op: token.NEQ, Y: id("nil")},
Body: block(
- define([]ast.Expr{id("fanoutSink")}, call(sel("gowdkcontracts", "PresentationFanoutCommandEventSink"), &ast.CompositeLit{
- Type: id("realtimeSubscriptionFanout"),
- Elts: []ast.Expr{keyValue("inner", id("fanout"))},
- })),
+ define([]ast.Expr{id("scopedFanout")}, realtimeSubscriptionFanoutExpr(id("fanout"))),
+ define([]ast.Expr{id("fanoutSink")}, call(sel("gowdkcontracts", "PresentationFanoutCommandEventSink"), id("scopedFanout"))),
&ast.IfStmt{
Cond: &ast.BinaryExpr{X: id("sink"), Op: token.NEQ, Y: id("nil")},
- Body: block(&ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, id("sink"), id("fanoutSink"), id("fanout"))}}),
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, id("sink"), id("fanoutSink"), id("scopedFanout"))}}),
},
- &ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, call(sel("gowdkcontracts", "InProcessCommandEventSink")), id("fanoutSink"), id("fanout"))}},
+ &ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, call(sel("gowdkcontracts", "InProcessCommandEventSink")), id("fanoutSink"), id("scopedFanout"))}},
),
},
)
@@ -572,6 +570,13 @@ func currentContractEventSinkDecl(realtime bool, queryInvalidations bool) ast.De
}, stmts)
}
+func realtimeSubscriptionFanoutExpr(fanout ast.Expr) ast.Expr {
+ return &ast.CompositeLit{
+ Type: id("realtimeSubscriptionFanout"),
+ Elts: []ast.Expr{keyValue("inner", fanout)},
+ }
+}
+
func realtimeCompositeSinkExpr(queryInvalidations bool, base ast.Expr, fanoutSink ast.Expr, fanout ast.Expr) ast.Expr {
args := []ast.Expr{base}
if queryInvalidations {
diff --git a/internal/appgen/source_realtime.go b/internal/appgen/source_realtime.go
index c4411164..c0777347 100644
--- a/internal/appgen/source_realtime.go
+++ b/internal/appgen/source_realtime.go
@@ -1,6 +1,7 @@
package appgen
import (
+ "fmt"
"go/ast"
"go/token"
"sort"
@@ -84,23 +85,28 @@ func boundQueryInvalidations(options Options) []gwdkir.QueryInvalidation {
}
type realtimeStreamRoute struct {
- Route string
- Guards []string
+ Route string
+ Guards []string
+ Audience string
}
func generatedRealtimeStreamUsesGuards(options Options) bool {
return len(realtimeStreamFallbackGuards(options)) > 0
}
+func generatedRealtimeStreamUsesAudience(options Options) bool {
+ return len(realtimeStreamRoutes(options)) > 0
+}
+
func generatedRealtimeStreamUsesRouteMatching(options Options) bool {
return generatedRealtimeStreamUsesGuards(options) && len(realtimeStreamRoutes(options)) > 0
}
-func realtimeStreamRoutes(options Options) []realtimeStreamRoute {
+func realtimeRoutesByPage(options Options) map[string]string {
+ routesByPage := map[string]string{}
if options.IR == nil {
- return nil
+ return routesByPage
}
- routesByPage := map[string]string{}
for _, page := range options.IR.Pages {
route := strings.TrimSpace(page.Route)
if page.ID == "" || route == "" {
@@ -108,6 +114,11 @@ func realtimeStreamRoutes(options Options) []realtimeStreamRoute {
}
routesByPage[page.ID] = route
}
+ return routesByPage
+}
+
+func realtimeStreamRoutes(options Options) []realtimeStreamRoute {
+ routesByPage := realtimeRoutesByPage(options)
seen := map[string]bool{}
var routes []realtimeStreamRoute
for _, subscription := range boundRealtimeSubscriptions(options) {
@@ -148,9 +159,126 @@ func realtimeStreamRoutes(options Options) []realtimeStreamRoute {
}
return strings.Join(routes[i].Guards, "\x00") < strings.Join(routes[j].Guards, "\x00")
})
+ labels := map[string]string{}
+ for index := range routes {
+ label := labels[routes[index].Route]
+ if label == "" {
+ label = fmt.Sprintf("gowdk.route.%d", len(labels))
+ labels[routes[index].Route] = label
+ }
+ routes[index].Audience = label
+ }
return routes
}
+func realtimeRouteAudienceByRoute(options Options) map[string]string {
+ audiences := map[string]string{}
+ for _, route := range realtimeStreamRoutes(options) {
+ if route.Route == "" || route.Audience == "" {
+ continue
+ }
+ audiences[route.Route] = route.Audience
+ }
+ return audiences
+}
+
+func realtimeSubscriptionAudiences(options Options) map[string][]string {
+ routesByPage := realtimeRoutesByPage(options)
+ audiencesByRoute := realtimeRouteAudienceByRoute(options)
+ audiences := map[string][]string{}
+ for _, subscription := range boundRealtimeSubscriptions(options) {
+ if subscription.OwnerKind != gwdkir.SourcePage {
+ continue
+ }
+ eventType := realtimeEventEnvelopeType(subscription)
+ route := routesByPage[subscription.OwnerID]
+ audience := audiencesByRoute[route]
+ if eventType == "" || audience == "" {
+ continue
+ }
+ addRealtimeAudience(audiences, eventType, audience)
+ }
+ sortRealtimeAudienceMap(audiences)
+ return audiences
+}
+
+func realtimeSubscriptionBroadcasts(options Options) map[string]bool {
+ routesByPage := realtimeRoutesByPage(options)
+ audiencesByRoute := realtimeRouteAudienceByRoute(options)
+ broadcasts := map[string]bool{}
+ for _, subscription := range boundRealtimeSubscriptions(options) {
+ eventType := realtimeEventEnvelopeType(subscription)
+ if eventType == "" {
+ continue
+ }
+ if subscription.OwnerKind != gwdkir.SourcePage {
+ broadcasts[eventType] = true
+ continue
+ }
+ route := routesByPage[subscription.OwnerID]
+ if audiencesByRoute[route] == "" {
+ broadcasts[eventType] = true
+ }
+ }
+ return broadcasts
+}
+
+func realtimeQueryAudiences(options Options) map[string][]string {
+ routesByPage := realtimeRoutesByPage(options)
+ audiencesByRoute := realtimeRouteAudienceByRoute(options)
+ audiences := map[string][]string{}
+ for _, invalidation := range boundQueryInvalidations(options) {
+ if invalidation.OwnerKind != gwdkir.SourcePage {
+ continue
+ }
+ queryType := strings.TrimSpace(invalidation.QueryType)
+ route := routesByPage[invalidation.OwnerID]
+ audience := audiencesByRoute[route]
+ if queryType == "" || audience == "" {
+ continue
+ }
+ addRealtimeAudience(audiences, queryType, audience)
+ }
+ sortRealtimeAudienceMap(audiences)
+ return audiences
+}
+
+func realtimeQueryBroadcasts(options Options) map[string]bool {
+ routesByPage := realtimeRoutesByPage(options)
+ audiencesByRoute := realtimeRouteAudienceByRoute(options)
+ broadcasts := map[string]bool{}
+ for _, invalidation := range boundQueryInvalidations(options) {
+ queryType := strings.TrimSpace(invalidation.QueryType)
+ if queryType == "" {
+ continue
+ }
+ if invalidation.OwnerKind != gwdkir.SourcePage {
+ broadcasts[queryType] = true
+ continue
+ }
+ route := routesByPage[invalidation.OwnerID]
+ if audiencesByRoute[route] == "" {
+ broadcasts[queryType] = true
+ }
+ }
+ return broadcasts
+}
+
+func addRealtimeAudience(audiences map[string][]string, key string, audience string) {
+ for _, existing := range audiences[key] {
+ if existing == audience {
+ return
+ }
+ }
+ audiences[key] = append(audiences[key], audience)
+}
+
+func sortRealtimeAudienceMap(audiences map[string][]string) {
+ for key := range audiences {
+ sort.Strings(audiences[key])
+ }
+}
+
func realtimeStreamFallbackGuards(options Options) []string {
seen := map[string]bool{}
var guards []string
@@ -183,15 +311,28 @@ func realtimeDecls(options Options) []ast.Decl {
decls := []ast.Decl{
realtimeEventsPathDecl(),
realtimeFanoutMutexDecl(),
- realtimeFanoutVarDecl(),
+ realtimeFanoutVarDecl(options),
realtimeSubscriptionEventTypesDecl(options),
+ realtimeSubscriptionAudiencesDecl(options),
+ realtimeSubscriptionBroadcastsDecl(options),
+ realtimeQueryAudiencesDecl(options),
+ realtimeQueryBroadcastsDecl(options),
realtimeQueryInvalidationsDecl(options),
registerRealtimeFanoutDecl(),
currentRealtimeFanoutDecl(),
realtimeEventsHandlerDecl(options),
+ realtimeAudienceScopedEventsDecl(),
+ realtimeEventWithAudienceDecl(),
+ realtimeQueryInvalidationAudienceEventsDecl(),
realtimeSubscriptionFanoutTypeDecl(),
realtimeSubscriptionFanoutSendDecl(),
}
+ if generatedRealtimeStreamUsesRouteMatching(options) || generatedRealtimeStreamUsesAudience(options) {
+ decls = append(decls, realtimeStreamPathDecl())
+ }
+ if generatedRealtimeStreamUsesAudience(options) {
+ decls = append(decls, realtimeStreamAudienceDecl(options))
+ }
if generatedRealtimeStreamUsesGuards(options) {
decls = append(decls, realtimeStreamGuardsDecl(options))
}
@@ -219,11 +360,18 @@ func realtimeFanoutMutexDecl() ast.Decl {
}}}
}
-func realtimeFanoutVarDecl() ast.Decl {
+func realtimeFanoutVarDecl(options Options) ast.Decl {
+ newSSE := call(sel("gowdkrealtime", "NewSSE"))
+ if generatedRealtimeStreamUsesAudience(options) {
+ newSSE = call(
+ sel("gowdkrealtime", "NewSSE"),
+ call(sel("gowdkrealtime", "WithSSEAudienceFromRequest"), id("realtimeStreamAudience")),
+ )
+ }
return &ast.GenDecl{Tok: token.VAR, Specs: []ast.Spec{&ast.ValueSpec{
Names: []*ast.Ident{id("realtimeFanout")},
Type: sel("gowdkrealtime", "PresentationFanout"),
- Values: []ast.Expr{call(sel("gowdkrealtime", "NewSSE"))},
+ Values: []ast.Expr{newSSE},
}}}
}
@@ -256,6 +404,72 @@ func realtimeSubscriptionEventTypesDecl(options Options) ast.Decl {
}}}
}
+func realtimeSubscriptionAudiencesDecl(options Options) ast.Decl {
+ return stringSliceMapVarDecl("realtimeSubscriptionAudiences", realtimeSubscriptionAudiences(options))
+}
+
+func realtimeSubscriptionBroadcastsDecl(options Options) ast.Decl {
+ return stringBoolMapVarDecl("realtimeSubscriptionBroadcasts", realtimeSubscriptionBroadcasts(options))
+}
+
+func realtimeQueryAudiencesDecl(options Options) ast.Decl {
+ return stringSliceMapVarDecl("realtimeQueryAudiences", realtimeQueryAudiences(options))
+}
+
+func realtimeQueryBroadcastsDecl(options Options) ast.Decl {
+ return stringBoolMapVarDecl("realtimeQueryBroadcasts", realtimeQueryBroadcasts(options))
+}
+
+func stringSliceMapVarDecl(name string, values map[string][]string) ast.Decl {
+ return &ast.GenDecl{Tok: token.VAR, Specs: []ast.Spec{&ast.ValueSpec{
+ Names: []*ast.Ident{id(name)},
+ Type: &ast.MapType{Key: id("string"), Value: &ast.ArrayType{Elt: id("string")}},
+ Values: []ast.Expr{stringSliceMapExpr(values)},
+ }}}
+}
+
+func stringSliceMapExpr(values map[string][]string) ast.Expr {
+ keys := make([]string, 0, len(values))
+ for key := range values {
+ keys = append(keys, key)
+ }
+ sort.Strings(keys)
+ elts := make([]ast.Expr, 0, len(keys))
+ for _, key := range keys {
+ elts = append(elts, &ast.KeyValueExpr{Key: stringLit(key), Value: stringSliceExpr(values[key])})
+ }
+ return &ast.CompositeLit{
+ Type: &ast.MapType{Key: id("string"), Value: &ast.ArrayType{Elt: id("string")}},
+ Elts: elts,
+ }
+}
+
+func stringBoolMapVarDecl(name string, values map[string]bool) ast.Decl {
+ return &ast.GenDecl{Tok: token.VAR, Specs: []ast.Spec{&ast.ValueSpec{
+ Names: []*ast.Ident{id(name)},
+ Type: &ast.MapType{Key: id("string"), Value: id("bool")},
+ Values: []ast.Expr{stringBoolMapExpr(values)},
+ }}}
+}
+
+func stringBoolMapExpr(values map[string]bool) ast.Expr {
+ keys := make([]string, 0, len(values))
+ for key, value := range values {
+ if value {
+ keys = append(keys, key)
+ }
+ }
+ sort.Strings(keys)
+ elts := make([]ast.Expr, 0, len(keys))
+ for _, key := range keys {
+ elts = append(elts, &ast.KeyValueExpr{Key: stringLit(key), Value: id("true")})
+ }
+ return &ast.CompositeLit{
+ Type: &ast.MapType{Key: id("string"), Value: id("bool")},
+ Elts: elts,
+ }
+}
+
func realtimeQueryInvalidationsDecl(options Options) ast.Decl {
invalidations := boundQueryInvalidations(options)
elts := make([]ast.Expr, 0, len(invalidations))
@@ -358,13 +572,53 @@ func realtimeEventsHandlerDecl(options Options) ast.Decl {
})
}
+func realtimeStreamPathDecl() ast.Decl {
+ stmts := []ast.Stmt{
+ define([]ast.Expr{id("referer")}, call(selExpr(id("request"), "Referer"))),
+ &ast.IfStmt{
+ Cond: &ast.BinaryExpr{X: id("referer"), Op: token.EQL, Y: stringLit("")},
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{stringLit("")}}),
+ },
+ define([]ast.Expr{id("refererURL"), id("err")}, call(sel("neturl", "Parse"), id("referer"))),
+ &ast.IfStmt{
+ Cond: notNil("err"),
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{stringLit("")}}),
+ },
+ &ast.ReturnStmt{Results: []ast.Expr{selExpr(id("refererURL"), "Path")}},
+ }
+ return funcDecl("realtimeStreamPath", []*ast.Field{
+ {Names: []*ast.Ident{id("request")}, Type: &ast.StarExpr{X: sel("http", "Request")}},
+ }, []*ast.Field{{Type: id("string")}}, stmts)
+}
+
+func realtimeStreamAudienceDecl(options Options) ast.Decl {
+ stmts := []ast.Stmt{
+ define([]ast.Expr{id("requestPath")}, call(id("realtimeStreamPath"), id("request"))),
+ }
+ var routeStmts []ast.Stmt
+ for _, route := range realtimeStreamRoutes(options) {
+ routeStmts = append(routeStmts, &ast.IfStmt{
+ Init: define([]ast.Expr{id("_"), id("ok")}, call(sel("gowdkroute", "Match"), stringLit(route.Route), id("requestPath"))),
+ Cond: id("ok"),
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{stringSliceExpr([]string{route.Audience})}}),
+ })
+ }
+ stmts = append(stmts,
+ &ast.IfStmt{
+ Cond: &ast.BinaryExpr{X: id("requestPath"), Op: token.NEQ, Y: stringLit("")},
+ Body: block(routeStmts...),
+ },
+ &ast.ReturnStmt{Results: []ast.Expr{id("nil")}},
+ )
+ return funcDecl("realtimeStreamAudience", []*ast.Field{
+ {Names: []*ast.Ident{id("request")}, Type: &ast.StarExpr{X: sel("http", "Request")}},
+ }, []*ast.Field{{Type: &ast.ArrayType{Elt: id("string")}}}, stmts)
+}
+
func realtimeStreamGuardsDecl(options Options) ast.Decl {
fallback := realtimeStreamFallbackGuards(options)
stmts := []ast.Stmt{
- define([]ast.Expr{id("requestPath")}, call(
- selExpr(call(selExpr(selExpr(id("request"), "URL"), "Query")), "Get"),
- stringLit("path"),
- )),
+ define([]ast.Expr{id("requestPath")}, call(id("realtimeStreamPath"), id("request"))),
}
var routeStmts []ast.Stmt
for _, route := range realtimeStreamRoutes(options) {
@@ -376,18 +630,6 @@ func realtimeStreamGuardsDecl(options Options) ast.Decl {
}
if len(routeStmts) > 0 {
stmts = append(stmts,
- &ast.IfStmt{
- Cond: &ast.BinaryExpr{X: id("requestPath"), Op: token.EQL, Y: stringLit("")},
- Body: block(&ast.IfStmt{
- Init: define([]ast.Expr{id("referer")}, call(selExpr(id("request"), "Referer"))),
- Cond: &ast.BinaryExpr{X: id("referer"), Op: token.NEQ, Y: stringLit("")},
- Body: block(&ast.IfStmt{
- Init: define([]ast.Expr{id("refererURL"), id("err")}, call(sel("neturl", "Parse"), id("referer"))),
- Cond: &ast.BinaryExpr{X: id("err"), Op: token.EQL, Y: id("nil")},
- Body: block(assign([]ast.Expr{id("requestPath")}, selExpr(id("refererURL"), "Path"))),
- }),
- }),
- },
&ast.IfStmt{
Cond: &ast.BinaryExpr{X: id("requestPath"), Op: token.NEQ, Y: stringLit("")},
Body: block(routeStmts...),
@@ -400,6 +642,156 @@ func realtimeStreamGuardsDecl(options Options) ast.Decl {
}, []*ast.Field{{Type: &ast.ArrayType{Elt: id("string")}}}, stmts)
}
+func eventEnvelopeSliceType() ast.Expr {
+ return &ast.ArrayType{Elt: sel("gowdkcontracts", "EventEnvelope")}
+}
+
+func eventEnvelopeSliceExpr(values ...ast.Expr) ast.Expr {
+ return &ast.CompositeLit{
+ Type: eventEnvelopeSliceType(),
+ Elts: values,
+ }
+}
+
+func realtimeAudienceScopedEventsDecl() ast.Decl {
+ return funcDecl("realtimeAudienceScopedEvents", []*ast.Field{
+ {Names: []*ast.Ident{id("event")}, Type: sel("gowdkcontracts", "EventEnvelope")},
+ }, []*ast.Field{{Type: eventEnvelopeSliceType()}}, []ast.Stmt{
+ &ast.IfStmt{
+ Cond: &ast.BinaryExpr{X: selExpr(id("event"), "Category"), Op: token.NEQ, Y: sel("gowdkcontracts", "PresentationEvent")},
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{id("nil")}}),
+ },
+ &ast.IfStmt{
+ Cond: &ast.BinaryExpr{X: selExpr(id("event"), "Type"), Op: token.EQL, Y: sel("gowdkcontracts", "QueryInvalidationPresentationEventType")},
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{call(id("realtimeQueryInvalidationAudienceEvents"), id("event"))}}),
+ },
+ &ast.IfStmt{
+ Cond: &ast.UnaryExpr{Op: token.NOT, X: &ast.IndexExpr{X: id("realtimeSubscriptionEventTypes"), Index: selExpr(id("event"), "Type")}},
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{id("nil")}}),
+ },
+ &ast.IfStmt{
+ Cond: &ast.IndexExpr{X: id("realtimeSubscriptionBroadcasts"), Index: selExpr(id("event"), "Type")},
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{eventEnvelopeSliceExpr(id("event"))}}),
+ },
+ define([]ast.Expr{id("audiences")}, &ast.IndexExpr{X: id("realtimeSubscriptionAudiences"), Index: selExpr(id("event"), "Type")}),
+ &ast.IfStmt{
+ Cond: &ast.BinaryExpr{X: call(id("len"), id("audiences")), Op: token.EQL, Y: intLit(0)},
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{eventEnvelopeSliceExpr(id("event"))}}),
+ },
+ define([]ast.Expr{id("scoped")}, call(id("make"), eventEnvelopeSliceType(), intLit(0), call(id("len"), id("audiences")))),
+ &ast.RangeStmt{
+ Key: id("_"),
+ Value: id("audience"),
+ Tok: token.DEFINE,
+ X: id("audiences"),
+ Body: block(assign([]ast.Expr{id("scoped")}, call(id("append"), id("scoped"), call(id("realtimeEventWithAudience"), id("event"), id("audience"))))),
+ },
+ &ast.ReturnStmt{Results: []ast.Expr{id("scoped")}},
+ })
+}
+
+func realtimeEventWithAudienceDecl() ast.Decl {
+ return funcDecl("realtimeEventWithAudience", []*ast.Field{
+ {Names: []*ast.Ident{id("event")}, Type: sel("gowdkcontracts", "EventEnvelope")},
+ {Names: []*ast.Ident{id("audience")}, Type: id("string")},
+ }, []*ast.Field{{Type: sel("gowdkcontracts", "EventEnvelope")}}, []ast.Stmt{
+ define([]ast.Expr{id("scoped")}, id("event")),
+ define([]ast.Expr{id("audiences")}, &ast.CallExpr{
+ Fun: id("append"),
+ Args: []ast.Expr{&ast.CompositeLit{Type: &ast.ArrayType{Elt: id("string")}}, selExpr(id("event"), "Audience")},
+ Ellipsis: token.Pos(1),
+ }),
+ assign([]ast.Expr{selExpr(id("scoped"), "Audience")}, call(id("append"), id("audiences"), id("audience"))),
+ &ast.ReturnStmt{Results: []ast.Expr{id("scoped")}},
+ })
+}
+
+func realtimeQueryInvalidationAudienceEventsDecl() ast.Decl {
+ return funcDecl("realtimeQueryInvalidationAudienceEvents", []*ast.Field{
+ {Names: []*ast.Ident{id("event")}, Type: sel("gowdkcontracts", "EventEnvelope")},
+ }, []*ast.Field{{Type: eventEnvelopeSliceType()}}, []ast.Stmt{
+ define([]ast.Expr{id("notice"), id("ok")}, &ast.TypeAssertExpr{X: selExpr(id("event"), "Value"), Type: sel("gowdkcontracts", "QueryInvalidationNotice")}),
+ &ast.IfStmt{
+ Cond: &ast.UnaryExpr{Op: token.NOT, X: id("ok")},
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{eventEnvelopeSliceExpr(id("event"))}}),
+ },
+ define([]ast.Expr{id("queriesByAudience")}, &ast.CompositeLit{Type: &ast.MapType{Key: id("string"), Value: &ast.ArrayType{Elt: id("string")}}}),
+ define([]ast.Expr{id("unscopedQueries")}, &ast.CompositeLit{Type: &ast.ArrayType{Elt: id("string")}}),
+ define([]ast.Expr{id("audiences")}, &ast.CompositeLit{Type: &ast.ArrayType{Elt: id("string")}}),
+ define([]ast.Expr{id("seenAudience")}, &ast.CompositeLit{Type: &ast.MapType{Key: id("string"), Value: id("bool")}}),
+ &ast.RangeStmt{
+ Key: id("_"),
+ Value: id("query"),
+ Tok: token.DEFINE,
+ X: selExpr(id("notice"), "Queries"),
+ Body: block(
+ define([]ast.Expr{id("queryAudiences")}, &ast.IndexExpr{X: id("realtimeQueryAudiences"), Index: id("query")}),
+ &ast.IfStmt{
+ Cond: &ast.BinaryExpr{
+ X: &ast.IndexExpr{X: id("realtimeQueryBroadcasts"), Index: id("query")},
+ Op: token.LOR,
+ Y: &ast.BinaryExpr{X: call(id("len"), id("queryAudiences")), Op: token.EQL, Y: intLit(0)},
+ },
+ Body: block(
+ assign([]ast.Expr{id("unscopedQueries")}, call(id("append"), id("unscopedQueries"), id("query"))),
+ &ast.BranchStmt{Tok: token.CONTINUE},
+ ),
+ },
+ &ast.RangeStmt{
+ Key: id("_"),
+ Value: id("audience"),
+ Tok: token.DEFINE,
+ X: id("queryAudiences"),
+ Body: block(
+ &ast.IfStmt{
+ Cond: &ast.UnaryExpr{Op: token.NOT, X: &ast.IndexExpr{X: id("seenAudience"), Index: id("audience")}},
+ Body: block(
+ assign([]ast.Expr{&ast.IndexExpr{X: id("seenAudience"), Index: id("audience")}}, id("true")),
+ assign([]ast.Expr{id("audiences")}, call(id("append"), id("audiences"), id("audience"))),
+ ),
+ },
+ assign([]ast.Expr{&ast.IndexExpr{X: id("queriesByAudience"), Index: id("audience")}}, call(id("append"), &ast.IndexExpr{X: id("queriesByAudience"), Index: id("audience")}, id("query"))),
+ ),
+ },
+ ),
+ },
+ &ast.IfStmt{
+ Cond: &ast.BinaryExpr{X: call(id("len"), id("audiences")), Op: token.EQL, Y: intLit(0)},
+ Body: block(&ast.ReturnStmt{Results: []ast.Expr{eventEnvelopeSliceExpr(id("event"))}}),
+ },
+ define([]ast.Expr{id("scopedCapacity")}, call(id("len"), id("audiences"))),
+ &ast.IfStmt{
+ Cond: &ast.BinaryExpr{X: call(id("len"), id("unscopedQueries")), Op: token.GTR, Y: intLit(0)},
+ Body: block(assign([]ast.Expr{id("scopedCapacity")}, &ast.BinaryExpr{X: id("scopedCapacity"), Op: token.ADD, Y: intLit(1)})),
+ },
+ define([]ast.Expr{id("scoped")}, call(id("make"), eventEnvelopeSliceType(), intLit(0), id("scopedCapacity"))),
+ &ast.IfStmt{
+ Cond: &ast.BinaryExpr{X: call(id("len"), id("unscopedQueries")), Op: token.GTR, Y: intLit(0)},
+ Body: block(
+ define([]ast.Expr{id("unscopedNotice")}, id("notice")),
+ assign([]ast.Expr{selExpr(id("unscopedNotice"), "Queries")}, id("unscopedQueries")),
+ define([]ast.Expr{id("unscopedEvent")}, id("event")),
+ assign([]ast.Expr{selExpr(id("unscopedEvent"), "Value")}, id("unscopedNotice")),
+ assign([]ast.Expr{id("scoped")}, call(id("append"), id("scoped"), id("unscopedEvent"))),
+ ),
+ },
+ &ast.RangeStmt{
+ Key: id("_"),
+ Value: id("audience"),
+ Tok: token.DEFINE,
+ X: id("audiences"),
+ Body: block(
+ define([]ast.Expr{id("scopedNotice")}, id("notice")),
+ assign([]ast.Expr{selExpr(id("scopedNotice"), "Queries")}, &ast.IndexExpr{X: id("queriesByAudience"), Index: id("audience")}),
+ define([]ast.Expr{id("scopedEvent")}, call(id("realtimeEventWithAudience"), id("event"), id("audience"))),
+ assign([]ast.Expr{selExpr(id("scopedEvent"), "Value")}, id("scopedNotice")),
+ assign([]ast.Expr{id("scoped")}, call(id("append"), id("scoped"), id("scopedEvent"))),
+ ),
+ },
+ &ast.ReturnStmt{Results: []ast.Expr{id("scoped")}},
+ })
+}
+
func realtimeSubscriptionFanoutTypeDecl() ast.Decl {
return &ast.GenDecl{Tok: token.TYPE, Specs: []ast.Spec{&ast.TypeSpec{
Name: id("realtimeSubscriptionFanout"),
@@ -434,14 +826,11 @@ func realtimeSubscriptionFanoutSendDecl() ast.Decl {
Value: id("event"),
Tok: token.DEFINE,
X: id("events"),
- Body: block(&ast.IfStmt{
- Cond: &ast.BinaryExpr{
- X: &ast.IndexExpr{X: id("realtimeSubscriptionEventTypes"), Index: selExpr(id("event"), "Type")},
- Op: token.LAND,
- Y: &ast.BinaryExpr{X: selExpr(id("event"), "Category"), Op: token.EQL, Y: sel("gowdkcontracts", "PresentationEvent")},
- },
- Body: block(assign([]ast.Expr{id("filtered")}, call(id("append"), id("filtered"), id("event")))),
- }),
+ Body: block(assign([]ast.Expr{id("filtered")}, &ast.CallExpr{
+ Fun: id("append"),
+ Args: []ast.Expr{id("filtered"), call(id("realtimeAudienceScopedEvents"), id("event"))},
+ Ellipsis: token.Pos(1),
+ })),
},
&ast.IfStmt{
Cond: &ast.BinaryExpr{X: call(id("len"), id("filtered")), Op: token.EQL, Y: intLit(0)},