diff --git a/internal/appgen/appgen_test.go b/internal/appgen/appgen_test.go index 416bbed2..cacc4ba0 100644 --- a/internal/appgen/appgen_test.go +++ b/internal/appgen/appgen_test.go @@ -1153,8 +1153,10 @@ func TestGenerateWritesRealtimeFanoutForSubscriptions(t *testing.T) { `var realtimeFanout gowdkrealtime.PresentationFanout = gowdkrealtime.NewSSE()`, `func RegisterRealtimeFanout(fanout gowdkrealtime.PresentationFanout)`, `"example.com/app/contracts/patients.PatientNotice": true`, - `event.Category == gowdkcontracts.PresentationEvent`, - `gowdkcontracts.PresentationFanoutCommandEventSink(realtimeSubscriptionFanout{inner: fanout})`, + `var realtimeSubscriptionBroadcasts map[string]bool = map[string]bool{"example.com/app/contracts/patients.PatientNotice": true}`, + `realtimeAudienceScopedEvents(event)...`, + `scopedFanout := realtimeSubscriptionFanout{inner: fanout}`, + `gowdkcontracts.PresentationFanoutCommandEventSink(scopedFanout)`, `gowdkcontracts.CompositeCommandEventSink(gowdkcontracts.InProcessCommandEventSink(), fanoutSink)`, } { if !strings.Contains(source, expected) { @@ -1210,8 +1212,11 @@ func TestGenerateWritesRealtimeQueryInvalidationFanout(t *testing.T) { for _, expected := range []string{ `gowdkcontracts.QueryInvalidationPresentationEventType: true`, `var realtimeQueryInvalidations []gowdkcontracts.QueryInvalidation = []gowdkcontracts.QueryInvalidation{gowdkcontracts.QueryInvalidation{EventCategory: gowdkcontracts.DomainEvent, EventType: "example.com/app/contracts/patients.PatientCreated", QueryType: "example.com/app/contracts/patients.GetPatientPage"}}`, - `gowdkcontracts.QueryInvalidationCommandEventSink(fanout, realtimeQueryInvalidations)`, - `gowdkcontracts.CompositeCommandEventSink(gowdkcontracts.InProcessCommandEventSink(), gowdkcontracts.QueryInvalidationCommandEventSink(fanout, realtimeQueryInvalidations), fanoutSink)`, + `scopedFanout := realtimeSubscriptionFanout{inner: fanout}`, + `gowdkcontracts.QueryInvalidationCommandEventSink(scopedFanout, realtimeQueryInvalidations)`, + `gowdkcontracts.CompositeCommandEventSink(gowdkcontracts.InProcessCommandEventSink(), gowdkcontracts.QueryInvalidationCommandEventSink(scopedFanout, realtimeQueryInvalidations), fanoutSink)`, + `var realtimeQueryBroadcasts map[string]bool = map[string]bool{"example.com/app/contracts/patients.GetPatientPage": true}`, + `unscopedQueries := []string{}`, // Single-flight write path: the command adapter tells the submitting // client which g:query regions to refresh via the X-GOWDK-Queries header. `invalidatedQueries := gowdkcontracts.InvalidatedQueryTypes(realtimeQueryInvalidations, events)`, @@ -1234,6 +1239,63 @@ func TestGenerateWritesRealtimeQueryInvalidationFanout(t *testing.T) { } } +func TestRealtimeBroadcastsTrackMixedScopedAndUnscopedOwners(t *testing.T) { + options := Options{IR: &gwdkir.Program{ + Pages: []gwdkir.Page{{ID: "patients", Route: "/patients"}}, + RealtimeSubscriptions: []gwdkir.RealtimeSubscription{{ + Query: "patients.GetPatientPage", + Event: "patients.PatientNotice", + EventImportPath: "example.com/app/contracts/patients", + EventType: "PatientNotice", + Status: gwdkir.ContractBindingBound, + OwnerKind: gwdkir.SourcePage, + OwnerID: "patients", + }, { + Query: "cards.GetPatientCard", + Event: "patients.PatientNotice", + EventImportPath: "example.com/app/contracts/patients", + EventType: "PatientNotice", + Status: gwdkir.ContractBindingBound, + OwnerKind: gwdkir.SourceComponent, + OwnerID: "PatientCard", + }}, + QueryInvalidations: []gwdkir.QueryInvalidation{{ + Query: "patients.GetPatientPage", + QueryType: "example.com/app/contracts/patients.GetPatientPage", + Event: "example.com/app/contracts/patients.PatientCreated", + EventType: "example.com/app/contracts/patients.PatientCreated", + EventCategory: "domain", + Status: gwdkir.ContractBindingBound, + OwnerKind: gwdkir.SourcePage, + OwnerID: "patients", + }, { + Query: "cards.GetPatientCard", + QueryType: "example.com/app/contracts/patients.GetPatientCard", + Event: "example.com/app/contracts/patients.PatientCreated", + EventType: "example.com/app/contracts/patients.PatientCreated", + EventCategory: "domain", + Status: gwdkir.ContractBindingBound, + OwnerKind: gwdkir.SourceComponent, + OwnerID: "PatientCard", + }}, + }} + + subscriptionAudiences := realtimeSubscriptionAudiences(options) + if got := subscriptionAudiences["example.com/app/contracts/patients.PatientNotice"]; len(got) != 1 || got[0] != "gowdk.route.0" { + t.Fatalf("subscription audiences = %#v, want route audience", subscriptionAudiences) + } + if !realtimeSubscriptionBroadcasts(options)["example.com/app/contracts/patients.PatientNotice"] { + t.Fatalf("expected mixed subscription event type to retain broadcast delivery") + } + queryAudiences := realtimeQueryAudiences(options) + if got := queryAudiences["example.com/app/contracts/patients.GetPatientPage"]; len(got) != 1 || got[0] != "gowdk.route.0" { + t.Fatalf("query audiences = %#v, want route audience", queryAudiences) + } + if !realtimeQueryBroadcasts(options)["example.com/app/contracts/patients.GetPatientCard"] { + t.Fatalf("expected component-owned query invalidation to retain broadcast delivery") + } +} + func TestGenerateRegistersSingleFlightRegionRenderers(t *testing.T) { root := t.TempDir() outputDir := filepath.Join(root, "dist") @@ -1551,11 +1613,16 @@ func TestGenerateGuardsRealtimeStreamForSubscribedPages(t *testing.T) { for _, expected := range []string{ `neturl "net/url"`, `gowdkroute "github.com/cssbruno/gowdk/runtime/route"`, + `gowdkrealtime.NewSSE(gowdkrealtime.WithSSEAudienceFromRequest(realtimeStreamAudience))`, + `var realtimeSubscriptionAudiences map[string][]string = map[string][]string{"example.com/app/contracts/patients.PatientNotice": []string{"gowdk.route.0"}}`, + `func realtimeStreamAudience(request *http.Request) []string`, `func realtimeStreamGuards(request *http.Request) []string`, - `request.URL.Query().Get("path")`, + `func realtimeStreamPath(request *http.Request) string`, `referer := request.Referer()`, `neturl.Parse(referer)`, + `return refererURL.Path`, `gowdkroute.Match("/dashboard", requestPath)`, + `return []string{"gowdk.route.0"}`, `return []string{"auth.required"}`, `if !runGuards(response, request, realtimeStreamGuards(request))`, `RegisterGuards(GOWDKGuardRegistry())`, @@ -1564,6 +1631,9 @@ func TestGenerateGuardsRealtimeStreamForSubscribedPages(t *testing.T) { t.Fatalf("expected generated guarded realtime source to contain %q:\n%s", expected, source) } } + if strings.Contains(source, `Query().Get("path")`) { + t.Fatalf("generated realtime stream must not authorize from client query path:\n%s", source) + } } func TestBoundActionFieldDecodePanicsOnUnsupportedFieldType(t *testing.T) { @@ -7150,6 +7220,10 @@ func TestGeneratedBinaryCommandSetsInvalidatedQueriesHeader(t *testing.T) { writeTestFile(t, filepath.Join(outputDir, "patients", "index.html"), "
Patients page
") program := &gwdkir.Program{ + Pages: []gwdkir.Page{ + {ID: "dashboard", Route: "/dashboard"}, + {ID: "patients", Route: "/patients"}, + }, ContractRefs: []gwdkir.ContractReference{{ Kind: gwdkir.ContractCommand, Name: "patients.CreatePatient", @@ -7477,8 +7551,13 @@ func TestGeneratedBinaryRealtimeFanoutStreamsSubscribedPresentationEvents(t *tes appDir := filepath.Join(root, "generated-app") binaryPath := filepath.Join(root, "site") writeTestFile(t, filepath.Join(outputDir, "patients", "index.html"), "
Patients page
") + writeTestFile(t, filepath.Join(outputDir, "dashboard", "index.html"), "
Dashboard page
") program := &gwdkir.Program{ + Pages: []gwdkir.Page{ + {ID: "dashboard", Route: "/dashboard"}, + {ID: "patients", Route: "/patients"}, + }, ContractRefs: []gwdkir.ContractReference{{ Kind: gwdkir.ContractCommand, Name: "patients.CreatePatient", @@ -7504,6 +7583,14 @@ func TestGeneratedBinaryRealtimeFanoutStreamsSubscribedPresentationEvents(t *tes Status: gwdkir.ContractBindingBound, OwnerKind: gwdkir.SourcePage, OwnerID: "patients", + }, { + Query: "patients.GetDashboard", + Event: "patients.OtherNotice", + EventImportPath: "gowdk-generated-app/patients", + EventType: "OtherNotice", + Status: gwdkir.ContractBindingBound, + OwnerKind: gwdkir.SourcePage, + OwnerID: "dashboard", }}, } if _, err := GenerateWithOptions(outputDir, appDir, Options{Config: csrfDisabledConfig(), IR: program}); err != nil { @@ -7576,10 +7663,11 @@ func HandleCreatePatient(ctx context.Context, command CreatePatient) (CreatePati streamCtx, cancelStream := context.WithCancel(context.Background()) defer cancelStream() - streamRequest, err := http.NewRequestWithContext(streamCtx, http.MethodGet, "http://"+addr+"/_gowdk/realtime/events", nil) + streamRequest, err := http.NewRequestWithContext(streamCtx, http.MethodGet, "http://"+addr+"/_gowdk/realtime/events?path=/dashboard", nil) if err != nil { t.Fatal(err) } + streamRequest.Header.Set("Referer", "http://"+addr+"/patients") streamResponse, err := http.DefaultClient.Do(streamRequest) if err != nil { t.Fatal(err) @@ -7641,6 +7729,19 @@ func HandleCreatePatient(ctx context.Context, command CreatePatient) (CreatePati t.Fatalf("realtime stream included unsubscribed event %q in %s", unexpected, dataLine) } } + noLeakDeadline := time.After(300 * time.Millisecond) + for { + select { + case line := <-lines: + if strings.HasPrefix(line, "data: ") && strings.Contains(line, "OtherNotice") { + t.Fatalf("route-scoped realtime stream leaked dashboard event: %s", line) + } + case err := <-readErrs: + t.Fatalf("read realtime stream after first event: %v", err) + case <-noLeakDeadline: + return + } + } } func TestGeneratedBinaryRealtimeStreamGuardDenialClosesStream(t *testing.T) { diff --git a/internal/appgen/source.go b/internal/appgen/source.go index b3f44b85..ce8c1f6b 100644 --- a/internal/appgen/source.go +++ b/internal/appgen/source.go @@ -115,7 +115,7 @@ func runtimeImportMap(options Options) map[string]string { imports["gowdkseo"] = "github.com/cssbruno/gowdk/runtime/seo" imports["gowdkseositemap"] = strings.TrimSpace(options.Sitemap.Dynamic.ImportPath) } - if generatedRealtimeStreamUsesRouteMatching(options) { + if generatedRealtimeStreamUsesRouteMatching(options) || generatedRealtimeStreamUsesAudience(options) { imports["gowdkroute"] = "github.com/cssbruno/gowdk/runtime/route" imports["neturl"] = "net/url" } diff --git a/internal/appgen/source_contracts.go b/internal/appgen/source_contracts.go index 2a1f1605..23b8ea60 100644 --- a/internal/appgen/source_contracts.go +++ b/internal/appgen/source_contracts.go @@ -554,15 +554,13 @@ func currentContractEventSinkDecl(realtime bool, queryInvalidations bool) ast.De &ast.IfStmt{ Cond: &ast.BinaryExpr{X: id("fanout"), Op: token.NEQ, Y: id("nil")}, Body: block( - define([]ast.Expr{id("fanoutSink")}, call(sel("gowdkcontracts", "PresentationFanoutCommandEventSink"), &ast.CompositeLit{ - Type: id("realtimeSubscriptionFanout"), - Elts: []ast.Expr{keyValue("inner", id("fanout"))}, - })), + define([]ast.Expr{id("scopedFanout")}, realtimeSubscriptionFanoutExpr(id("fanout"))), + define([]ast.Expr{id("fanoutSink")}, call(sel("gowdkcontracts", "PresentationFanoutCommandEventSink"), id("scopedFanout"))), &ast.IfStmt{ Cond: &ast.BinaryExpr{X: id("sink"), Op: token.NEQ, Y: id("nil")}, - Body: block(&ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, id("sink"), id("fanoutSink"), id("fanout"))}}), + Body: block(&ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, id("sink"), id("fanoutSink"), id("scopedFanout"))}}), }, - &ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, call(sel("gowdkcontracts", "InProcessCommandEventSink")), id("fanoutSink"), id("fanout"))}}, + &ast.ReturnStmt{Results: []ast.Expr{realtimeCompositeSinkExpr(queryInvalidations, call(sel("gowdkcontracts", "InProcessCommandEventSink")), id("fanoutSink"), id("scopedFanout"))}}, ), }, ) @@ -572,6 +570,13 @@ func currentContractEventSinkDecl(realtime bool, queryInvalidations bool) ast.De }, stmts) } +func realtimeSubscriptionFanoutExpr(fanout ast.Expr) ast.Expr { + return &ast.CompositeLit{ + Type: id("realtimeSubscriptionFanout"), + Elts: []ast.Expr{keyValue("inner", fanout)}, + } +} + func realtimeCompositeSinkExpr(queryInvalidations bool, base ast.Expr, fanoutSink ast.Expr, fanout ast.Expr) ast.Expr { args := []ast.Expr{base} if queryInvalidations { diff --git a/internal/appgen/source_realtime.go b/internal/appgen/source_realtime.go index c4411164..c0777347 100644 --- a/internal/appgen/source_realtime.go +++ b/internal/appgen/source_realtime.go @@ -1,6 +1,7 @@ package appgen import ( + "fmt" "go/ast" "go/token" "sort" @@ -84,23 +85,28 @@ func boundQueryInvalidations(options Options) []gwdkir.QueryInvalidation { } type realtimeStreamRoute struct { - Route string - Guards []string + Route string + Guards []string + Audience string } func generatedRealtimeStreamUsesGuards(options Options) bool { return len(realtimeStreamFallbackGuards(options)) > 0 } +func generatedRealtimeStreamUsesAudience(options Options) bool { + return len(realtimeStreamRoutes(options)) > 0 +} + func generatedRealtimeStreamUsesRouteMatching(options Options) bool { return generatedRealtimeStreamUsesGuards(options) && len(realtimeStreamRoutes(options)) > 0 } -func realtimeStreamRoutes(options Options) []realtimeStreamRoute { +func realtimeRoutesByPage(options Options) map[string]string { + routesByPage := map[string]string{} if options.IR == nil { - return nil + return routesByPage } - routesByPage := map[string]string{} for _, page := range options.IR.Pages { route := strings.TrimSpace(page.Route) if page.ID == "" || route == "" { @@ -108,6 +114,11 @@ func realtimeStreamRoutes(options Options) []realtimeStreamRoute { } routesByPage[page.ID] = route } + return routesByPage +} + +func realtimeStreamRoutes(options Options) []realtimeStreamRoute { + routesByPage := realtimeRoutesByPage(options) seen := map[string]bool{} var routes []realtimeStreamRoute for _, subscription := range boundRealtimeSubscriptions(options) { @@ -148,9 +159,126 @@ func realtimeStreamRoutes(options Options) []realtimeStreamRoute { } return strings.Join(routes[i].Guards, "\x00") < strings.Join(routes[j].Guards, "\x00") }) + labels := map[string]string{} + for index := range routes { + label := labels[routes[index].Route] + if label == "" { + label = fmt.Sprintf("gowdk.route.%d", len(labels)) + labels[routes[index].Route] = label + } + routes[index].Audience = label + } return routes } +func realtimeRouteAudienceByRoute(options Options) map[string]string { + audiences := map[string]string{} + for _, route := range realtimeStreamRoutes(options) { + if route.Route == "" || route.Audience == "" { + continue + } + audiences[route.Route] = route.Audience + } + return audiences +} + +func realtimeSubscriptionAudiences(options Options) map[string][]string { + routesByPage := realtimeRoutesByPage(options) + audiencesByRoute := realtimeRouteAudienceByRoute(options) + audiences := map[string][]string{} + for _, subscription := range boundRealtimeSubscriptions(options) { + if subscription.OwnerKind != gwdkir.SourcePage { + continue + } + eventType := realtimeEventEnvelopeType(subscription) + route := routesByPage[subscription.OwnerID] + audience := audiencesByRoute[route] + if eventType == "" || audience == "" { + continue + } + addRealtimeAudience(audiences, eventType, audience) + } + sortRealtimeAudienceMap(audiences) + return audiences +} + +func realtimeSubscriptionBroadcasts(options Options) map[string]bool { + routesByPage := realtimeRoutesByPage(options) + audiencesByRoute := realtimeRouteAudienceByRoute(options) + broadcasts := map[string]bool{} + for _, subscription := range boundRealtimeSubscriptions(options) { + eventType := realtimeEventEnvelopeType(subscription) + if eventType == "" { + continue + } + if subscription.OwnerKind != gwdkir.SourcePage { + broadcasts[eventType] = true + continue + } + route := routesByPage[subscription.OwnerID] + if audiencesByRoute[route] == "" { + broadcasts[eventType] = true + } + } + return broadcasts +} + +func realtimeQueryAudiences(options Options) map[string][]string { + routesByPage := realtimeRoutesByPage(options) + audiencesByRoute := realtimeRouteAudienceByRoute(options) + audiences := map[string][]string{} + for _, invalidation := range boundQueryInvalidations(options) { + if invalidation.OwnerKind != gwdkir.SourcePage { + continue + } + queryType := strings.TrimSpace(invalidation.QueryType) + route := routesByPage[invalidation.OwnerID] + audience := audiencesByRoute[route] + if queryType == "" || audience == "" { + continue + } + addRealtimeAudience(audiences, queryType, audience) + } + sortRealtimeAudienceMap(audiences) + return audiences +} + +func realtimeQueryBroadcasts(options Options) map[string]bool { + routesByPage := realtimeRoutesByPage(options) + audiencesByRoute := realtimeRouteAudienceByRoute(options) + broadcasts := map[string]bool{} + for _, invalidation := range boundQueryInvalidations(options) { + queryType := strings.TrimSpace(invalidation.QueryType) + if queryType == "" { + continue + } + if invalidation.OwnerKind != gwdkir.SourcePage { + broadcasts[queryType] = true + continue + } + route := routesByPage[invalidation.OwnerID] + if audiencesByRoute[route] == "" { + broadcasts[queryType] = true + } + } + return broadcasts +} + +func addRealtimeAudience(audiences map[string][]string, key string, audience string) { + for _, existing := range audiences[key] { + if existing == audience { + return + } + } + audiences[key] = append(audiences[key], audience) +} + +func sortRealtimeAudienceMap(audiences map[string][]string) { + for key := range audiences { + sort.Strings(audiences[key]) + } +} + func realtimeStreamFallbackGuards(options Options) []string { seen := map[string]bool{} var guards []string @@ -183,15 +311,28 @@ func realtimeDecls(options Options) []ast.Decl { decls := []ast.Decl{ realtimeEventsPathDecl(), realtimeFanoutMutexDecl(), - realtimeFanoutVarDecl(), + realtimeFanoutVarDecl(options), realtimeSubscriptionEventTypesDecl(options), + realtimeSubscriptionAudiencesDecl(options), + realtimeSubscriptionBroadcastsDecl(options), + realtimeQueryAudiencesDecl(options), + realtimeQueryBroadcastsDecl(options), realtimeQueryInvalidationsDecl(options), registerRealtimeFanoutDecl(), currentRealtimeFanoutDecl(), realtimeEventsHandlerDecl(options), + realtimeAudienceScopedEventsDecl(), + realtimeEventWithAudienceDecl(), + realtimeQueryInvalidationAudienceEventsDecl(), realtimeSubscriptionFanoutTypeDecl(), realtimeSubscriptionFanoutSendDecl(), } + if generatedRealtimeStreamUsesRouteMatching(options) || generatedRealtimeStreamUsesAudience(options) { + decls = append(decls, realtimeStreamPathDecl()) + } + if generatedRealtimeStreamUsesAudience(options) { + decls = append(decls, realtimeStreamAudienceDecl(options)) + } if generatedRealtimeStreamUsesGuards(options) { decls = append(decls, realtimeStreamGuardsDecl(options)) } @@ -219,11 +360,18 @@ func realtimeFanoutMutexDecl() ast.Decl { }}} } -func realtimeFanoutVarDecl() ast.Decl { +func realtimeFanoutVarDecl(options Options) ast.Decl { + newSSE := call(sel("gowdkrealtime", "NewSSE")) + if generatedRealtimeStreamUsesAudience(options) { + newSSE = call( + sel("gowdkrealtime", "NewSSE"), + call(sel("gowdkrealtime", "WithSSEAudienceFromRequest"), id("realtimeStreamAudience")), + ) + } return &ast.GenDecl{Tok: token.VAR, Specs: []ast.Spec{&ast.ValueSpec{ Names: []*ast.Ident{id("realtimeFanout")}, Type: sel("gowdkrealtime", "PresentationFanout"), - Values: []ast.Expr{call(sel("gowdkrealtime", "NewSSE"))}, + Values: []ast.Expr{newSSE}, }}} } @@ -256,6 +404,72 @@ func realtimeSubscriptionEventTypesDecl(options Options) ast.Decl { }}} } +func realtimeSubscriptionAudiencesDecl(options Options) ast.Decl { + return stringSliceMapVarDecl("realtimeSubscriptionAudiences", realtimeSubscriptionAudiences(options)) +} + +func realtimeSubscriptionBroadcastsDecl(options Options) ast.Decl { + return stringBoolMapVarDecl("realtimeSubscriptionBroadcasts", realtimeSubscriptionBroadcasts(options)) +} + +func realtimeQueryAudiencesDecl(options Options) ast.Decl { + return stringSliceMapVarDecl("realtimeQueryAudiences", realtimeQueryAudiences(options)) +} + +func realtimeQueryBroadcastsDecl(options Options) ast.Decl { + return stringBoolMapVarDecl("realtimeQueryBroadcasts", realtimeQueryBroadcasts(options)) +} + +func stringSliceMapVarDecl(name string, values map[string][]string) ast.Decl { + return &ast.GenDecl{Tok: token.VAR, Specs: []ast.Spec{&ast.ValueSpec{ + Names: []*ast.Ident{id(name)}, + Type: &ast.MapType{Key: id("string"), Value: &ast.ArrayType{Elt: id("string")}}, + Values: []ast.Expr{stringSliceMapExpr(values)}, + }}} +} + +func stringSliceMapExpr(values map[string][]string) ast.Expr { + keys := make([]string, 0, len(values)) + for key := range values { + keys = append(keys, key) + } + sort.Strings(keys) + elts := make([]ast.Expr, 0, len(keys)) + for _, key := range keys { + elts = append(elts, &ast.KeyValueExpr{Key: stringLit(key), Value: stringSliceExpr(values[key])}) + } + return &ast.CompositeLit{ + Type: &ast.MapType{Key: id("string"), Value: &ast.ArrayType{Elt: id("string")}}, + Elts: elts, + } +} + +func stringBoolMapVarDecl(name string, values map[string]bool) ast.Decl { + return &ast.GenDecl{Tok: token.VAR, Specs: []ast.Spec{&ast.ValueSpec{ + Names: []*ast.Ident{id(name)}, + Type: &ast.MapType{Key: id("string"), Value: id("bool")}, + Values: []ast.Expr{stringBoolMapExpr(values)}, + }}} +} + +func stringBoolMapExpr(values map[string]bool) ast.Expr { + keys := make([]string, 0, len(values)) + for key, value := range values { + if value { + keys = append(keys, key) + } + } + sort.Strings(keys) + elts := make([]ast.Expr, 0, len(keys)) + for _, key := range keys { + elts = append(elts, &ast.KeyValueExpr{Key: stringLit(key), Value: id("true")}) + } + return &ast.CompositeLit{ + Type: &ast.MapType{Key: id("string"), Value: id("bool")}, + Elts: elts, + } +} + func realtimeQueryInvalidationsDecl(options Options) ast.Decl { invalidations := boundQueryInvalidations(options) elts := make([]ast.Expr, 0, len(invalidations)) @@ -358,13 +572,53 @@ func realtimeEventsHandlerDecl(options Options) ast.Decl { }) } +func realtimeStreamPathDecl() ast.Decl { + stmts := []ast.Stmt{ + define([]ast.Expr{id("referer")}, call(selExpr(id("request"), "Referer"))), + &ast.IfStmt{ + Cond: &ast.BinaryExpr{X: id("referer"), Op: token.EQL, Y: stringLit("")}, + Body: block(&ast.ReturnStmt{Results: []ast.Expr{stringLit("")}}), + }, + define([]ast.Expr{id("refererURL"), id("err")}, call(sel("neturl", "Parse"), id("referer"))), + &ast.IfStmt{ + Cond: notNil("err"), + Body: block(&ast.ReturnStmt{Results: []ast.Expr{stringLit("")}}), + }, + &ast.ReturnStmt{Results: []ast.Expr{selExpr(id("refererURL"), "Path")}}, + } + return funcDecl("realtimeStreamPath", []*ast.Field{ + {Names: []*ast.Ident{id("request")}, Type: &ast.StarExpr{X: sel("http", "Request")}}, + }, []*ast.Field{{Type: id("string")}}, stmts) +} + +func realtimeStreamAudienceDecl(options Options) ast.Decl { + stmts := []ast.Stmt{ + define([]ast.Expr{id("requestPath")}, call(id("realtimeStreamPath"), id("request"))), + } + var routeStmts []ast.Stmt + for _, route := range realtimeStreamRoutes(options) { + routeStmts = append(routeStmts, &ast.IfStmt{ + Init: define([]ast.Expr{id("_"), id("ok")}, call(sel("gowdkroute", "Match"), stringLit(route.Route), id("requestPath"))), + Cond: id("ok"), + Body: block(&ast.ReturnStmt{Results: []ast.Expr{stringSliceExpr([]string{route.Audience})}}), + }) + } + stmts = append(stmts, + &ast.IfStmt{ + Cond: &ast.BinaryExpr{X: id("requestPath"), Op: token.NEQ, Y: stringLit("")}, + Body: block(routeStmts...), + }, + &ast.ReturnStmt{Results: []ast.Expr{id("nil")}}, + ) + return funcDecl("realtimeStreamAudience", []*ast.Field{ + {Names: []*ast.Ident{id("request")}, Type: &ast.StarExpr{X: sel("http", "Request")}}, + }, []*ast.Field{{Type: &ast.ArrayType{Elt: id("string")}}}, stmts) +} + func realtimeStreamGuardsDecl(options Options) ast.Decl { fallback := realtimeStreamFallbackGuards(options) stmts := []ast.Stmt{ - define([]ast.Expr{id("requestPath")}, call( - selExpr(call(selExpr(selExpr(id("request"), "URL"), "Query")), "Get"), - stringLit("path"), - )), + define([]ast.Expr{id("requestPath")}, call(id("realtimeStreamPath"), id("request"))), } var routeStmts []ast.Stmt for _, route := range realtimeStreamRoutes(options) { @@ -376,18 +630,6 @@ func realtimeStreamGuardsDecl(options Options) ast.Decl { } if len(routeStmts) > 0 { stmts = append(stmts, - &ast.IfStmt{ - Cond: &ast.BinaryExpr{X: id("requestPath"), Op: token.EQL, Y: stringLit("")}, - Body: block(&ast.IfStmt{ - Init: define([]ast.Expr{id("referer")}, call(selExpr(id("request"), "Referer"))), - Cond: &ast.BinaryExpr{X: id("referer"), Op: token.NEQ, Y: stringLit("")}, - Body: block(&ast.IfStmt{ - Init: define([]ast.Expr{id("refererURL"), id("err")}, call(sel("neturl", "Parse"), id("referer"))), - Cond: &ast.BinaryExpr{X: id("err"), Op: token.EQL, Y: id("nil")}, - Body: block(assign([]ast.Expr{id("requestPath")}, selExpr(id("refererURL"), "Path"))), - }), - }), - }, &ast.IfStmt{ Cond: &ast.BinaryExpr{X: id("requestPath"), Op: token.NEQ, Y: stringLit("")}, Body: block(routeStmts...), @@ -400,6 +642,156 @@ func realtimeStreamGuardsDecl(options Options) ast.Decl { }, []*ast.Field{{Type: &ast.ArrayType{Elt: id("string")}}}, stmts) } +func eventEnvelopeSliceType() ast.Expr { + return &ast.ArrayType{Elt: sel("gowdkcontracts", "EventEnvelope")} +} + +func eventEnvelopeSliceExpr(values ...ast.Expr) ast.Expr { + return &ast.CompositeLit{ + Type: eventEnvelopeSliceType(), + Elts: values, + } +} + +func realtimeAudienceScopedEventsDecl() ast.Decl { + return funcDecl("realtimeAudienceScopedEvents", []*ast.Field{ + {Names: []*ast.Ident{id("event")}, Type: sel("gowdkcontracts", "EventEnvelope")}, + }, []*ast.Field{{Type: eventEnvelopeSliceType()}}, []ast.Stmt{ + &ast.IfStmt{ + Cond: &ast.BinaryExpr{X: selExpr(id("event"), "Category"), Op: token.NEQ, Y: sel("gowdkcontracts", "PresentationEvent")}, + Body: block(&ast.ReturnStmt{Results: []ast.Expr{id("nil")}}), + }, + &ast.IfStmt{ + Cond: &ast.BinaryExpr{X: selExpr(id("event"), "Type"), Op: token.EQL, Y: sel("gowdkcontracts", "QueryInvalidationPresentationEventType")}, + Body: block(&ast.ReturnStmt{Results: []ast.Expr{call(id("realtimeQueryInvalidationAudienceEvents"), id("event"))}}), + }, + &ast.IfStmt{ + Cond: &ast.UnaryExpr{Op: token.NOT, X: &ast.IndexExpr{X: id("realtimeSubscriptionEventTypes"), Index: selExpr(id("event"), "Type")}}, + Body: block(&ast.ReturnStmt{Results: []ast.Expr{id("nil")}}), + }, + &ast.IfStmt{ + Cond: &ast.IndexExpr{X: id("realtimeSubscriptionBroadcasts"), Index: selExpr(id("event"), "Type")}, + Body: block(&ast.ReturnStmt{Results: []ast.Expr{eventEnvelopeSliceExpr(id("event"))}}), + }, + define([]ast.Expr{id("audiences")}, &ast.IndexExpr{X: id("realtimeSubscriptionAudiences"), Index: selExpr(id("event"), "Type")}), + &ast.IfStmt{ + Cond: &ast.BinaryExpr{X: call(id("len"), id("audiences")), Op: token.EQL, Y: intLit(0)}, + Body: block(&ast.ReturnStmt{Results: []ast.Expr{eventEnvelopeSliceExpr(id("event"))}}), + }, + define([]ast.Expr{id("scoped")}, call(id("make"), eventEnvelopeSliceType(), intLit(0), call(id("len"), id("audiences")))), + &ast.RangeStmt{ + Key: id("_"), + Value: id("audience"), + Tok: token.DEFINE, + X: id("audiences"), + Body: block(assign([]ast.Expr{id("scoped")}, call(id("append"), id("scoped"), call(id("realtimeEventWithAudience"), id("event"), id("audience"))))), + }, + &ast.ReturnStmt{Results: []ast.Expr{id("scoped")}}, + }) +} + +func realtimeEventWithAudienceDecl() ast.Decl { + return funcDecl("realtimeEventWithAudience", []*ast.Field{ + {Names: []*ast.Ident{id("event")}, Type: sel("gowdkcontracts", "EventEnvelope")}, + {Names: []*ast.Ident{id("audience")}, Type: id("string")}, + }, []*ast.Field{{Type: sel("gowdkcontracts", "EventEnvelope")}}, []ast.Stmt{ + define([]ast.Expr{id("scoped")}, id("event")), + define([]ast.Expr{id("audiences")}, &ast.CallExpr{ + Fun: id("append"), + Args: []ast.Expr{&ast.CompositeLit{Type: &ast.ArrayType{Elt: id("string")}}, selExpr(id("event"), "Audience")}, + Ellipsis: token.Pos(1), + }), + assign([]ast.Expr{selExpr(id("scoped"), "Audience")}, call(id("append"), id("audiences"), id("audience"))), + &ast.ReturnStmt{Results: []ast.Expr{id("scoped")}}, + }) +} + +func realtimeQueryInvalidationAudienceEventsDecl() ast.Decl { + return funcDecl("realtimeQueryInvalidationAudienceEvents", []*ast.Field{ + {Names: []*ast.Ident{id("event")}, Type: sel("gowdkcontracts", "EventEnvelope")}, + }, []*ast.Field{{Type: eventEnvelopeSliceType()}}, []ast.Stmt{ + define([]ast.Expr{id("notice"), id("ok")}, &ast.TypeAssertExpr{X: selExpr(id("event"), "Value"), Type: sel("gowdkcontracts", "QueryInvalidationNotice")}), + &ast.IfStmt{ + Cond: &ast.UnaryExpr{Op: token.NOT, X: id("ok")}, + Body: block(&ast.ReturnStmt{Results: []ast.Expr{eventEnvelopeSliceExpr(id("event"))}}), + }, + define([]ast.Expr{id("queriesByAudience")}, &ast.CompositeLit{Type: &ast.MapType{Key: id("string"), Value: &ast.ArrayType{Elt: id("string")}}}), + define([]ast.Expr{id("unscopedQueries")}, &ast.CompositeLit{Type: &ast.ArrayType{Elt: id("string")}}), + define([]ast.Expr{id("audiences")}, &ast.CompositeLit{Type: &ast.ArrayType{Elt: id("string")}}), + define([]ast.Expr{id("seenAudience")}, &ast.CompositeLit{Type: &ast.MapType{Key: id("string"), Value: id("bool")}}), + &ast.RangeStmt{ + Key: id("_"), + Value: id("query"), + Tok: token.DEFINE, + X: selExpr(id("notice"), "Queries"), + Body: block( + define([]ast.Expr{id("queryAudiences")}, &ast.IndexExpr{X: id("realtimeQueryAudiences"), Index: id("query")}), + &ast.IfStmt{ + Cond: &ast.BinaryExpr{ + X: &ast.IndexExpr{X: id("realtimeQueryBroadcasts"), Index: id("query")}, + Op: token.LOR, + Y: &ast.BinaryExpr{X: call(id("len"), id("queryAudiences")), Op: token.EQL, Y: intLit(0)}, + }, + Body: block( + assign([]ast.Expr{id("unscopedQueries")}, call(id("append"), id("unscopedQueries"), id("query"))), + &ast.BranchStmt{Tok: token.CONTINUE}, + ), + }, + &ast.RangeStmt{ + Key: id("_"), + Value: id("audience"), + Tok: token.DEFINE, + X: id("queryAudiences"), + Body: block( + &ast.IfStmt{ + Cond: &ast.UnaryExpr{Op: token.NOT, X: &ast.IndexExpr{X: id("seenAudience"), Index: id("audience")}}, + Body: block( + assign([]ast.Expr{&ast.IndexExpr{X: id("seenAudience"), Index: id("audience")}}, id("true")), + assign([]ast.Expr{id("audiences")}, call(id("append"), id("audiences"), id("audience"))), + ), + }, + assign([]ast.Expr{&ast.IndexExpr{X: id("queriesByAudience"), Index: id("audience")}}, call(id("append"), &ast.IndexExpr{X: id("queriesByAudience"), Index: id("audience")}, id("query"))), + ), + }, + ), + }, + &ast.IfStmt{ + Cond: &ast.BinaryExpr{X: call(id("len"), id("audiences")), Op: token.EQL, Y: intLit(0)}, + Body: block(&ast.ReturnStmt{Results: []ast.Expr{eventEnvelopeSliceExpr(id("event"))}}), + }, + define([]ast.Expr{id("scopedCapacity")}, call(id("len"), id("audiences"))), + &ast.IfStmt{ + Cond: &ast.BinaryExpr{X: call(id("len"), id("unscopedQueries")), Op: token.GTR, Y: intLit(0)}, + Body: block(assign([]ast.Expr{id("scopedCapacity")}, &ast.BinaryExpr{X: id("scopedCapacity"), Op: token.ADD, Y: intLit(1)})), + }, + define([]ast.Expr{id("scoped")}, call(id("make"), eventEnvelopeSliceType(), intLit(0), id("scopedCapacity"))), + &ast.IfStmt{ + Cond: &ast.BinaryExpr{X: call(id("len"), id("unscopedQueries")), Op: token.GTR, Y: intLit(0)}, + Body: block( + define([]ast.Expr{id("unscopedNotice")}, id("notice")), + assign([]ast.Expr{selExpr(id("unscopedNotice"), "Queries")}, id("unscopedQueries")), + define([]ast.Expr{id("unscopedEvent")}, id("event")), + assign([]ast.Expr{selExpr(id("unscopedEvent"), "Value")}, id("unscopedNotice")), + assign([]ast.Expr{id("scoped")}, call(id("append"), id("scoped"), id("unscopedEvent"))), + ), + }, + &ast.RangeStmt{ + Key: id("_"), + Value: id("audience"), + Tok: token.DEFINE, + X: id("audiences"), + Body: block( + define([]ast.Expr{id("scopedNotice")}, id("notice")), + assign([]ast.Expr{selExpr(id("scopedNotice"), "Queries")}, &ast.IndexExpr{X: id("queriesByAudience"), Index: id("audience")}), + define([]ast.Expr{id("scopedEvent")}, call(id("realtimeEventWithAudience"), id("event"), id("audience"))), + assign([]ast.Expr{selExpr(id("scopedEvent"), "Value")}, id("scopedNotice")), + assign([]ast.Expr{id("scoped")}, call(id("append"), id("scoped"), id("scopedEvent"))), + ), + }, + &ast.ReturnStmt{Results: []ast.Expr{id("scoped")}}, + }) +} + func realtimeSubscriptionFanoutTypeDecl() ast.Decl { return &ast.GenDecl{Tok: token.TYPE, Specs: []ast.Spec{&ast.TypeSpec{ Name: id("realtimeSubscriptionFanout"), @@ -434,14 +826,11 @@ func realtimeSubscriptionFanoutSendDecl() ast.Decl { Value: id("event"), Tok: token.DEFINE, X: id("events"), - Body: block(&ast.IfStmt{ - Cond: &ast.BinaryExpr{ - X: &ast.IndexExpr{X: id("realtimeSubscriptionEventTypes"), Index: selExpr(id("event"), "Type")}, - Op: token.LAND, - Y: &ast.BinaryExpr{X: selExpr(id("event"), "Category"), Op: token.EQL, Y: sel("gowdkcontracts", "PresentationEvent")}, - }, - Body: block(assign([]ast.Expr{id("filtered")}, call(id("append"), id("filtered"), id("event")))), - }), + Body: block(assign([]ast.Expr{id("filtered")}, &ast.CallExpr{ + Fun: id("append"), + Args: []ast.Expr{id("filtered"), call(id("realtimeAudienceScopedEvents"), id("event"))}, + Ellipsis: token.Pos(1), + })), }, &ast.IfStmt{ Cond: &ast.BinaryExpr{X: call(id("len"), id("filtered")), Op: token.EQL, Y: intLit(0)},