Skip to content

Commit a53ee81

Browse files
authored
feat(event_producer): wire up main application and infrastructure (#2130)
Connects all the adapters and logic components in the `event_producer` main entry point (`cmd/job/main.go`). This enables the worker to: 1. Subscribe to ingestion events (search refresh requests). 2. Subscribe to batch update triggers (fan-out requests). 3. Publish notifications to the downstream topic. 4. Interact with Spanner (metadata) and GCS (blobs). Infrastructure updates: - Updated `setup_pubsub.sh` to create Dead Letter Queues (DLQs) for ingestion, notification, and delivery topics, providing robust error handling. - Updated `DEVELOPMENT.md` with the correct local Pub/Sub port. - Updated `pod.yaml` with necessary environment variables for the new subscriptions.
1 parent b124730 commit a53ee81

7 files changed

Lines changed: 107 additions & 12 deletions

File tree

.dev/pubsub/run.sh

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ create_topic() {
3232
}
3333

3434
# Function to create a Subscription (Pull). gcloud does not support subscription creation in the emulator, so we use curl.
35+
# Usage: create_subscription <topic_name> <sub_name> [dlq_topic_name]
3536
create_subscription() {
3637
local topic_name=$1
3738
local sub_name=$2
39+
local dlq_topic_name=$3
3840

3941
if [[ -z "$topic_name" || -z "$sub_name" ]]; then
4042
echo "Error: Topic name and Subscription name required."
@@ -46,25 +48,60 @@ create_subscription() {
4648
# The emulator requires the full path to the topic in the JSON body
4749
local topic_path="projects/${PROJECT_ID}/topics/${topic_name}"
4850

51+
# Base JSON payload
52+
local json_payload="{\"topic\": \"$topic_path\""
53+
54+
# If a DLQ topic is provided, add the deadLetterPolicy
55+
if [[ -n "$dlq_topic_name" ]]; then
56+
local dlq_topic_path="projects/${PROJECT_ID}/topics/${dlq_topic_name}"
57+
# MaxDeliveryAttempts is set to 5 as a standard default
58+
json_payload="${json_payload}, \"deadLetterPolicy\": {\"deadLetterTopic\": \"${dlq_topic_path}\", \"maxDeliveryAttempts\": 5}"
59+
echo " -> With Dead Letter Queue: ${dlq_topic_name}"
60+
fi
61+
62+
# Close the JSON object
63+
json_payload="${json_payload}}"
64+
4965
curl -s -X PUT "http://0.0.0.0:${PORT}/v1/projects/${PROJECT_ID}/subscriptions/${sub_name}" \
5066
-H "Content-Type: application/json" \
51-
-d "{\"topic\": \"$topic_path\"}"
67+
-d "$json_payload"
5268

5369
echo -e "\nSubscription ${sub_name} for topic: ${topic_name} created."
5470
}
5571

5672
gcloud beta emulators pubsub start --project="$PROJECT_ID" --host-port="0.0.0.0:$PORT" &
57-
while ! curl -s -o /dev/null "localhost:$PORT"; do
73+
while ! curl -s -f "http://0.0.0.0:${PORT}/v1/projects/${PROJECT_ID}/topics"; do
5874
sleep 1 # Wait 1 second before checking again
5975
echo "waiting until pubsub emulator responds before finishing setup"
6076
done
6177

78+
# --- 1. Dead Letter Queues (Create these first so main subs can reference them) ---
79+
80+
# Ingestion DLQ
81+
create_topic "ingestion-jobs-dead-letter-topic-id"
82+
create_subscription "ingestion-jobs-dead-letter-topic-id" "ingestion-jobs-dead-letter-sub-id"
83+
84+
# Notification/Fan-out DLQ
85+
create_topic "notification-events-dead-letter-topic-id"
86+
create_subscription "notification-events-dead-letter-topic-id" "notification-events-dead-letter-sub-id"
87+
88+
# Delivery DLQ
89+
create_topic "delivery-dead-letter-topic-id"
90+
create_subscription "delivery-dead-letter-topic-id" "delivery-dead-letter-sub-id"
91+
92+
93+
# --- 2. Main Topics and Subscriptions ---
94+
create_topic "batch-updates-topic-id"
95+
create_subscription "batch-updates-topic-id" "batch-updates-sub-id" "ingestion-jobs-dead-letter-topic-id"
96+
6297
create_topic "ingestion-jobs-topic-id"
63-
create_subscription "ingestion-jobs-topic-id" "ingestion-jobs-sub-id"
98+
create_subscription "ingestion-jobs-topic-id" "ingestion-jobs-sub-id" "ingestion-jobs-dead-letter-topic-id"
99+
64100
create_topic "notification-events-topic-id"
65-
create_subscription "notification-events-topic-id" "notification-events-sub-id"
101+
create_subscription "notification-events-topic-id" "notification-events-sub-id" "notification-events-dead-letter-topic-id"
102+
66103
create_topic "chime-delivery-topic-id"
67-
create_subscription "chime-delivery-topic-id" "chime-delivery-sub-id"
104+
create_subscription "chime-delivery-topic-id" "chime-delivery-sub-id" "delivery-dead-letter-topic-id"
68105

69106
echo "Pub/Sub setup for webstatus.dev finished"
70107

DEVELOPMENT.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ The above skaffold command deploys multiple resources:
9393
| valkey | Valkey | N/A | valkey:6379 |
9494
| auth | Auth Emulator | http://localhost:9099<br />http://localhost:9100/auth (ui) | http://auth:9099<br />http://auth:9100/auth (ui) |
9595
| wiremock | Wiremock | http://localhost:8087 | http://api-github-mock.default.svc.cluster.local:8080 (GitHub Mock) |
96-
| pubsub | Pub/Sub Emulator | N/A | http://pubsub:8086 |
96+
| pubsub | Pub/Sub Emulator | N/A | http://pubsub:8060 |
9797
| gcs | GCS Emulator | N/A | http://gcs:4443 |
9898

9999
_In the event the servers are not responsive, make a temporary change to a file_

lib/gcppubsub/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func (c *Client) Subscribe(ctx context.Context, subID string,
7373
msg.Ack()
7474
} else if errors.Is(workErr, event.ErrTransientFailure) {
7575
// NACK: Retry later
76+
slog.WarnContext(ctx, "transient failure, will retry", "error", workErr)
7677
msg.Nack()
7778
} else {
7879
// ACK: Permanent failure or unknown error, do not retry

workers/event_producer/cmd/job/main.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@ import (
2020
"os"
2121

2222
"github.com/GoogleChrome/webstatus.dev/lib/gcpgcs"
23+
"github.com/GoogleChrome/webstatus.dev/lib/gcpgcs/gcpgcsadapters"
2324
"github.com/GoogleChrome/webstatus.dev/lib/gcppubsub"
25+
"github.com/GoogleChrome/webstatus.dev/lib/gcppubsub/gcppubsubadapters"
2426
"github.com/GoogleChrome/webstatus.dev/lib/gcpspanner"
27+
"github.com/GoogleChrome/webstatus.dev/lib/gcpspanner/spanneradapters"
28+
"github.com/GoogleChrome/webstatus.dev/workers/event_producer/pkg/producer"
2529
)
2630

2731
func main() {
@@ -56,6 +60,20 @@ func main() {
5660
os.Exit(1)
5761
}
5862

63+
// For publishing to ingestion events to fan-out
64+
ingestionTopicID := os.Getenv("INGESTION_TOPIC_ID")
65+
if ingestionTopicID == "" {
66+
slog.ErrorContext(ctx, "INGESTION_TOPIC_ID is not set. exiting...")
67+
os.Exit(1)
68+
}
69+
70+
// For subscribing to batch events
71+
batchSubID := os.Getenv("BATCH_UPDATE_SUBSCRIPTION_ID")
72+
if batchSubID == "" {
73+
slog.ErrorContext(ctx, "BATCH_UPDATE_SUBSCRIPTION_ID is not set. exiting...")
74+
os.Exit(1)
75+
}
76+
5977
// For publishing to notification events
6078
notificationTopicID := os.Getenv("NOTIFICATION_TOPIC_ID")
6179
if notificationTopicID == "" {
@@ -75,17 +93,30 @@ func main() {
7593
os.Exit(1)
7694
}
7795

78-
_, err = gcpgcs.NewClient(ctx, stateBlobBucket)
96+
blobClient, err := gcpgcs.NewClient(ctx, stateBlobBucket)
7997
if err != nil {
8098
slog.ErrorContext(ctx, "unable to create gcs client", "error", err)
8199
os.Exit(1)
82100
}
101+
blobAdapter := gcpgcsadapters.NewEventProducer(blobClient, stateBlobBucket)
83102

84-
// TODO: https://github.com/GoogleChrome/webstatus.dev/issues/1848
85-
// Nil handler for now. Will fix later
86-
err = queueClient.Subscribe(ctx, ingestionSubID, nil)
103+
p := producer.NewEventProducer(
104+
producer.NewDiffer(spanneradapters.NewEventProducerDiffer(spanneradapters.NewBackend(spannerClient))),
105+
blobAdapter, spanneradapters.NewEventProducer(spannerClient),
106+
gcppubsubadapters.NewEventProducerPublisherAdapter(queueClient, notificationTopicID))
107+
batch := producer.NewBatchUpdateHandler(spanneradapters.NewBatchEventProducer(spannerClient),
108+
gcppubsubadapters.NewBatchFanOutPublisherAdapter(queueClient, ingestionTopicID))
109+
listener := gcppubsubadapters.NewEventProducerSubscriberAdapter(
110+
p, batch, queueClient, gcppubsubadapters.SubscriberConfig{
111+
SearchSubscriptionID: ingestionSubID,
112+
BatchUpdateSubscriptionID: batchSubID,
113+
})
114+
// Start listening to ingestion events
115+
slog.InfoContext(ctx, "starting event producer subscriber for ingestion events")
116+
err = listener.Subscribe(ctx)
87117
if err != nil {
88-
slog.ErrorContext(ctx, "unable to connect to subscription", "error", err)
118+
slog.ErrorContext(ctx, "unable to start subscriber", "error", err)
89119
os.Exit(1)
90120
}
121+
91122
}

workers/event_producer/go.mod

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@ require (
1919
cloud.google.com/go v0.123.0 // indirect
2020
cloud.google.com/go/auth v0.17.0 // indirect
2121
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
22+
cloud.google.com/go/cloudtasks v1.13.7 // indirect
2223
cloud.google.com/go/compute/metadata v0.9.0 // indirect
24+
cloud.google.com/go/datastore v1.21.0 // indirect
2325
cloud.google.com/go/iam v1.5.3 // indirect
26+
cloud.google.com/go/logging v1.13.1 // indirect
27+
cloud.google.com/go/longrunning v0.7.0 // indirect
2428
cloud.google.com/go/monitoring v1.24.3 // indirect
2529
cloud.google.com/go/pubsub/v2 v2.0.0 // indirect
30+
cloud.google.com/go/secretmanager v1.16.0 // indirect
2631
cloud.google.com/go/spanner v1.86.1 // indirect
2732
cloud.google.com/go/storage v1.57.2 // indirect
2833
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.3 // indirect
@@ -33,6 +38,7 @@ require (
3338
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
3439
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3540
github.com/cncf/xds/go v0.0.0-20251110193048-8bfbf64dc13e // indirect
41+
github.com/deckarep/golang-set v1.8.0 // indirect
3642
github.com/envoyproxy/go-control-plane/envoy v1.36.0 // indirect
3743
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
3844
github.com/felixge/httpsnoop v1.0.4 // indirect
@@ -43,17 +49,25 @@ require (
4349
github.com/go-openapi/jsonpointer v0.22.3 // indirect
4450
github.com/go-openapi/swag/jsonname v0.25.3 // indirect
4551
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
52+
github.com/gomodule/redigo v1.9.3 // indirect
53+
github.com/google/go-github/v77 v77.0.0 // indirect
54+
github.com/google/go-querystring v1.1.0 // indirect
4655
github.com/google/s2a-go v0.1.9 // indirect
4756
github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect
4857
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
58+
github.com/gorilla/handlers v1.5.2 // indirect
59+
github.com/gorilla/mux v1.8.1 // indirect
60+
github.com/gorilla/securecookie v1.1.2 // indirect
4961
github.com/josharian/intern v1.0.0 // indirect
5062
github.com/mailru/easyjson v0.9.1 // indirect
5163
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
5264
github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037 // indirect
5365
github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 // indirect
5466
github.com/perimeterx/marshmallow v1.1.5 // indirect
5567
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
68+
github.com/sirupsen/logrus v1.9.3 // indirect
5669
github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect
70+
github.com/web-platform-tests/wpt.fyi v0.0.0-20251118162843-54f805c8a632 // indirect
5771
github.com/woodsbury/decimal128 v1.4.0 // indirect
5872
go.opencensus.io v0.24.0 // indirect
5973
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
@@ -79,4 +93,5 @@ require (
7993
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect
8094
google.golang.org/grpc v1.77.0 // indirect
8195
google.golang.org/protobuf v1.36.10 // indirect
96+
gopkg.in/yaml.v3 v3.0.1 // indirect
8297
)

workers/event_producer/go.sum

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,8 @@ github.com/google/go-github/v77 v77.0.0 h1:9DsKKbZqil5y/4Z9mNpZDQnpli6PJbqipSuuN
852852
github.com/google/go-github/v77 v77.0.0/go.mod h1:c8VmGXRUmaZUqbctUcGEDWYnMrtzZfJhDSylEf1wfmA=
853853
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
854854
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
855+
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
856+
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
855857
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
856858
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
857859
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@@ -986,6 +988,8 @@ github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJw
986988
github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M=
987989
github.com/perimeterx/marshmallow v1.1.5 h1:a2LALqQ1BlHM8PZblsDdidgv1mWi1DgC2UmX50IvK2s=
988990
github.com/perimeterx/marshmallow v1.1.5/go.mod h1:dsXbUu8CRzfYP5a87xpp0xq9S3u0Vchtcl8we9tYaXw=
991+
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI=
992+
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
989993
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
990994
github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
991995
github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
@@ -1098,6 +1102,8 @@ go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42s
10981102
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
10991103
go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
11001104
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
1105+
go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y=
1106+
go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU=
11011107
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
11021108
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
11031109
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@@ -1343,6 +1349,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
13431349
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
13441350
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
13451351
golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
1352+
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
13461353
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
13471354
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
13481355
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

workers/event_producer/manifests/pod.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ spec:
3333
- name: SPANNER_EMULATOR_HOST
3434
value: 'spanner:9010'
3535
- name: PUBSUB_EMULATOR_HOST
36-
value: 'http://pubsub:8086'
36+
value: 'pubsub:8060'
3737
- name: INGESTION_SUBSCRIPTION_ID
3838
value: 'ingestion-jobs-sub-id'
3939
- name: NOTIFICATION_TOPIC_ID
@@ -42,6 +42,10 @@ spec:
4242
value: 'state-bucket'
4343
- name: STORAGE_EMULATOR_HOST
4444
value: 'http://gcs:4443'
45+
- name: BATCH_UPDATE_SUBSCRIPTION_ID
46+
value: 'batch-updates-sub-id'
47+
- name: INGESTION_TOPIC_ID
48+
value: 'ingestion-jobs-topic-id'
4549
resources:
4650
limits:
4751
cpu: 250m

0 commit comments

Comments
 (0)