Skip to content

Commit 5022b4d

Browse files
authored
RabbitMQ connector + password change message handler (#4560)
2 parents 068f628 + 9ffc100 commit 5022b4d

21 files changed

Lines changed: 2209 additions & 1 deletion

cozy.example.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,3 +530,24 @@ contexts:
530530
- src: /logos/1_partner.png
531531
alt: Partner n°1
532532
type: secondary
533+
534+
rabbitmq:
535+
enabled: true
536+
url: amqp://guest:guest@localhost:5672/
537+
tls:
538+
# root_ca: /etc/ssl/certs/ca.pem
539+
insecure_skip_validation: false
540+
exchanges:
541+
- name: auth
542+
kind: topic
543+
durable: true
544+
declare_exchange: true
545+
dlx_name: auth.dlx
546+
queues:
547+
- name: user.password.updated
548+
declare: true
549+
prefetch: 8
550+
delivery_limit: 5
551+
bindings:
552+
- password.changed
553+

docs/rabbitmq.md

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
## RabbitMQ
2+
3+
Integration with RabbitMQ: configuration, topology, and handler semantics.
4+
5+
### Overview
6+
7+
Cozy Stack can consume messages from RabbitMQ and dispatch them to Go handlers. The consumer is managed by a background manager which:
8+
9+
- establishes and monitors the AMQP connection (with optional TLS),
10+
- declares exchanges and queues (if configured to do so),
11+
- binds queues to routing keys,
12+
- starts per-queue consumers with QoS and redelivery limits,
13+
- dispatches deliveries to queue-specific handlers.
14+
15+
### Configuration
16+
17+
RabbitMQ is configured in `cozy.yaml` under the `rabbitmq` key.
18+
19+
Key fields:
20+
21+
- `enabled`: Enable the consumer.
22+
- `url`: AMQP URL, e.g. `amqp://guest:guest@localhost:5672/`.
23+
- `tls`: Optional TLS settings (`root_ca`, `insecure_skip_validation`, `server_name`).
24+
- `exchanges[]`: List of exchanges the stack should consume from.
25+
- `name`: Exchange name.
26+
- `kind`: Exchange type (e.g. `topic`).
27+
- `durable`: Whether the exchange is durable.
28+
- `declare_exchange`: If true, the exchange is declared on startup.
29+
- `dlx_name`, `dlq_name`: Optional defaults for queues under this exchange.
30+
- `queues[]`: List of queues to consume.
31+
- `name`: Queue name.
32+
- `bindings[]`: Routing keys to bind to the exchange.
33+
- `declare`: If true, declare the queue on startup.
34+
- `prefetch`: Per-consumer QoS prefetch.
35+
- `delivery_limit`: x-delivery-limit for quorum queues.
36+
- `dlx_name`, `dlq_name`: Optional overrides per queue.
37+
38+
Example:
39+
40+
```yaml
41+
rabbitmq:
42+
enabled: true
43+
url: amqp://guest:guest@localhost:5672/
44+
tls:
45+
# root_ca: /etc/ssl/certs/ca.pem
46+
insecure_skip_validation: false
47+
# server_name: rabbit.internal
48+
exchanges:
49+
- name: auth
50+
kind: topic
51+
durable: true
52+
declare_exchange: true
53+
queues:
54+
- name: user.password.updated
55+
declare: true
56+
prefetch: 8
57+
delivery_limit: 5
58+
bindings:
59+
- password.changed
60+
- name: user.created
61+
declare: true
62+
prefetch: 8
63+
delivery_limit: 5
64+
bindings:
65+
- user.created
66+
```
67+
68+
### Handlers
69+
70+
Handlers implement a simple interface:
71+
72+
```go
73+
type Handler interface {
74+
Handle(ctx context.Context, d amqp.Delivery) error
75+
}
76+
```
77+
78+
Returning `nil` acknowledges the message. Returning a non-nil error causes the message to be requeued (subject to broker policies and delivery limits).
79+
80+
Queue names are mapped to handlers in the stack. For example:
81+
82+
- `user.password.updated` → updates an instance passphrase when a `password.changed` routing key is received.
83+
- `user.created` → validates and processes user creation events.
84+
85+
Message schemas are JSON and validated in the handler. Example payload for `user.password.updated`:
86+
87+
```json
88+
{
89+
"twakeId": "string",
90+
"iterations": 100000,
91+
"hash": "base64",
92+
"publicKey": "base64",
93+
"privateKey": "cipherString",
94+
"key": "cipherString",
95+
"timestamp": 1726040000,
96+
"domain": "example.cozy.cloud"
97+
}
98+
```
99+
100+
Example payload for `user.created`:
101+
102+
```json
103+
{
104+
"twakeId": "string",
105+
"mobile": "string",
106+
"internalEmail": "string",
107+
"iterations": 100000,
108+
"hash": "base64",
109+
"publicKey": "base64",
110+
"privateKey": "cipherString",
111+
"key": "cipherString",
112+
"timestamp": 1726040000
113+
}
114+
```
115+
116+
### Lifecycle
117+
118+
On startup, if `rabbitmq.enabled` is true:
119+
120+
1. The manager creates an AMQP connection (TLS if configured) and retries with exponential backoff.
121+
2. It declares configured exchanges and queues (if `declare_*` flags are set).
122+
3. It binds queues to their routing keys and starts consumers.
123+
4. It exposes a readiness channel internally so tests can wait until consumption is active.
124+
5. It monitors the connection and restarts consumers upon reconnection.
125+
126+
### Adding a new queue handler
127+
128+
Follow these steps to introduce a new queue and its handler.
129+
130+
1) Define the message schema and the handler
131+
132+
Create a handler type that implements the `Handle(ctx, d)` method and an accompanying message struct.
133+
134+
```go
135+
// Example message payload consumed from the queue
136+
type ExampleEvent struct {
137+
ID string `json:"id"`
138+
Action string `json:"action"`
139+
Timestamp int64 `json:"timestamp"`
140+
}
141+
142+
// ExampleHandler processes ExampleEvent messages
143+
type ExampleHandler struct{}
144+
145+
func NewExampleHandler() *ExampleHandler { return &ExampleHandler{} }
146+
147+
func (h *ExampleHandler) Handle(ctx context.Context, d amqp.Delivery) error {
148+
var msg ExampleEvent
149+
if err := json.Unmarshal(d.Body, &msg); err != nil {
150+
return fmt.Errorf("example: invalid payload: %w", err)
151+
}
152+
if msg.ID == "" {
153+
return fmt.Errorf("example: missing id")
154+
}
155+
// TODO: implement business logic
156+
return nil // ack
157+
}
158+
```
159+
160+
2) Register the handler for a queue name
161+
162+
Map the queue name to the handler so the manager knows which handler to use. This usually happens where queues are built from config.
163+
164+
```go
165+
switch configQueue.Name {
166+
case "example.queue":
167+
handler = NewExampleHandler()
168+
}
169+
```
170+
171+
3) Configure the exchange and queue in `cozy.yaml`
172+
173+
Add the queue under an exchange with the routing keys to bind.
174+
175+
```yaml
176+
rabbitmq:
177+
enabled: true
178+
url: amqp://guest:guest@localhost:5672/
179+
exchanges:
180+
- name: auth
181+
kind: topic
182+
durable: true
183+
declare_exchange: true
184+
queues:
185+
- name: example.queue
186+
declare: true
187+
prefetch: 8
188+
delivery_limit: 5
189+
bindings:
190+
- example.created
191+
- example.updated
192+
```
193+
194+
4) Publish a message (example)
195+
196+
```go
197+
ch.PublishWithContext(ctx,
198+
"auth", // exchange
199+
"example.created", // routing key
200+
false, false,
201+
amqp.Publishing{
202+
DeliveryMode: amqp.Persistent,
203+
ContentType: "application/json",
204+
Body: []byte(`{"id":"123","action":"created","timestamp":1726040000}`),
205+
},
206+
)
207+
```
208+
209+

docs/toc.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
- "/realtime - Realtime": ./realtime.md
5252
- "/remote - Proxy for remote data/API": ./remote.md
5353
- " /remote/nextcloud - NextCloud": ./nextcloud.md
54+
- "/rabbitmq - RabbitMQ integration": ./rabbitmq.md
5455
- "/settings - Settings": ./settings.md
5556
- " /settings - Terms of Services": ./user-action-required.md
5657
- "/sharings - Sharing": ./sharing.md

go.mod

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ require (
3636
github.com/oschwald/maxminddb-golang v1.13.1
3737
github.com/pquerna/otp v1.4.0
3838
github.com/prometheus/client_golang v1.18.0
39+
github.com/rabbitmq/amqp091-go v1.9.0
3940
github.com/redis/go-redis/v9 v9.4.0
4041
github.com/robfig/cron/v3 v3.0.1
4142
github.com/sideshow/apns2 v0.23.0
@@ -44,6 +45,7 @@ require (
4445
github.com/spf13/cobra v1.8.1
4546
github.com/spf13/viper v1.19.0
4647
github.com/stretchr/testify v1.9.0
48+
github.com/testcontainers/testcontainers-go v0.31.0
4749
github.com/ugorji/go/codec v1.2.12
4850
github.com/yuin/goldmark v1.7.4
4951
golang.org/x/crypto v0.31.0
@@ -65,22 +67,36 @@ require (
6567
cloud.google.com/go/iam v1.1.7 // indirect
6668
cloud.google.com/go/longrunning v0.5.6 // indirect
6769
cloud.google.com/go/storage v1.40.0 // indirect
70+
dario.cat/mergo v1.0.0 // indirect
71+
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
6872
github.com/MicahParks/keyfunc v1.9.0 // indirect
73+
github.com/Microsoft/go-winio v0.6.1 // indirect
74+
github.com/Microsoft/hcsshim v0.11.4 // indirect
6975
github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2 // indirect
7076
github.com/ajg/form v1.5.1 // indirect
7177
github.com/beorn7/perks v1.0.1 // indirect
7278
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect
79+
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
7380
github.com/cespare/xxhash/v2 v2.2.0 // indirect
81+
github.com/containerd/containerd v1.7.15 // indirect
82+
github.com/containerd/log v0.1.0 // indirect
83+
github.com/cpuguy83/dockercfg v0.3.1 // indirect
7484
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
7585
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
7686
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
87+
github.com/distribution/reference v0.5.0 // indirect
88+
github.com/docker/docker v25.0.5+incompatible // indirect
89+
github.com/docker/go-connections v0.5.0 // indirect
90+
github.com/docker/go-units v0.5.0 // indirect
7791
github.com/fatih/color v1.15.0 // indirect
7892
github.com/fatih/structs v1.1.0 // indirect
7993
github.com/felixge/httpsnoop v1.0.4 // indirect
8094
github.com/fsnotify/fsnotify v1.7.0 // indirect
8195
github.com/go-logr/logr v1.4.1 // indirect
8296
github.com/go-logr/stdr v1.2.2 // indirect
97+
github.com/go-ole/go-ole v1.2.6 // indirect
8398
github.com/gobwas/glob v0.2.3 // indirect
99+
github.com/gogo/protobuf v1.3.2 // indirect
84100
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
85101
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
86102
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
@@ -97,16 +113,26 @@ require (
97113
github.com/jonas-p/go-shp v0.1.1 // indirect
98114
github.com/klauspost/compress v1.17.2 // indirect
99115
github.com/labstack/gommon v0.4.2 // indirect
116+
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
100117
github.com/magiconair/properties v1.8.7 // indirect
101118
github.com/mattn/go-colorable v0.1.13 // indirect
102119
github.com/mattn/go-isatty v0.0.20 // indirect
103120
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
104121
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
105122
github.com/mitchellh/mapstructure v1.5.0 // indirect
123+
github.com/moby/patternmatcher v0.6.0 // indirect
124+
github.com/moby/sys/sequential v0.5.0 // indirect
125+
github.com/moby/sys/user v0.1.0 // indirect
126+
github.com/moby/term v0.5.0 // indirect
127+
github.com/morikuni/aec v1.0.0 // indirect
106128
github.com/onsi/ginkgo v1.16.5 // indirect
107129
github.com/onsi/gomega v1.18.1 // indirect
130+
github.com/opencontainers/go-digest v1.0.0 // indirect
131+
github.com/opencontainers/image-spec v1.1.0 // indirect
108132
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
133+
github.com/pkg/errors v0.9.1 // indirect
109134
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
135+
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
110136
github.com/prometheus/client_model v0.5.0 // indirect
111137
github.com/prometheus/common v0.45.0 // indirect
112138
github.com/prometheus/procfs v0.12.0 // indirect
@@ -115,11 +141,15 @@ require (
115141
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
116142
github.com/sanity-io/litter v1.5.5 // indirect
117143
github.com/sergi/go-diff v1.3.1 // indirect
144+
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
145+
github.com/shoenig/go-m1cpu v0.1.6 // indirect
118146
github.com/sourcegraph/conc v0.3.0 // indirect
119147
github.com/spf13/cast v1.6.0 // indirect
120148
github.com/spf13/pflag v1.0.5 // indirect
121149
github.com/stretchr/objx v0.5.2 // indirect
122150
github.com/subosito/gotenv v1.6.0 // indirect
151+
github.com/tklauser/go-sysconf v0.3.12 // indirect
152+
github.com/tklauser/numcpus v0.6.1 // indirect
123153
github.com/valyala/bytebufferpool v1.0.0 // indirect
124154
github.com/valyala/fasthttp v1.44.0 // indirect
125155
github.com/valyala/fasttemplate v1.2.2 // indirect
@@ -129,6 +159,7 @@ require (
129159
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 // indirect
130160
github.com/yudai/gojsondiff v1.0.0 // indirect
131161
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
162+
github.com/yusufpapurcu/wmi v1.2.3 // indirect
132163
go.opencensus.io v0.24.0 // indirect
133164
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
134165
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
@@ -138,8 +169,10 @@ require (
138169
go.uber.org/atomic v1.9.0 // indirect
139170
go.uber.org/multierr v1.9.0 // indirect
140171
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
172+
golang.org/x/mod v0.17.0 // indirect
141173
golang.org/x/sys v0.28.0 // indirect
142174
golang.org/x/time v0.5.0 // indirect
175+
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
143176
google.golang.org/appengine/v2 v2.0.2 // indirect
144177
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect
145178
google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 // indirect

0 commit comments

Comments
 (0)