Skip to content

Commit ac0d369

Browse files
authored
Merge branch 'main' into jme/LFXV2-1371
2 parents 6e9b26a + 81369d9 commit ac0d369

13 files changed

Lines changed: 452 additions & 124 deletions

File tree

README.md

Lines changed: 104 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,50 +4,13 @@ This repository contains tools and services for synchronizing data between LFX v
44

55
## Overview
66

7-
Most data entities are synced from v1 into native LFX One entities. Bidirectional sync is implemented for committees and committee members.
7+
This repository serves two distinct purposes:
88

9-
However, due to the size, complexity, and number of external interactions the LFX Meetings stack has, v1 and v2 meetings will be kept separate, though v1 meetings will be made avaliable as read-only, natively-permissioned entities within LFX One via the query service.
9+
1. **Real-time streaming replication.** PostgreSQL WAL events (via `wal-listener`) and DynamoDB Streams are replicated in real time—alongside periodic Meltano backfills—into a `v1-objects` NATS KV bucket. LFX One wrapper services subscribe to this bucket to drive indexing pipelines (OpenSearch via the indexer service) and access-control pipelines (OpenFGA via fga-sync), without needing to integrate directly with ITX eventing.
1010

11-
```mermaid
12-
flowchart TD
13-
V1[LFX v1 Meetings] --> Sync[Data Sync Process]
14-
Projects --> Sync2[Data Backfill]
15-
Committees --> Sync3[Data Backfill]
16-
Sync --> ShadowV1[**v1 Meetings**<br/>- Synced from v1<br/>- Read-only in LFX One<br/>- Separate from native v2]
17-
Sync2 --> ProjectsV2
18-
Sync3 --> CommitteesV2
19-
20-
NativeV2[**Native v2 Meetings**<br/>- Created directly in v2<br/>- Full CRUD operations]
21-
ProjectsV2[Native v2 Projects]
22-
CommitteesV2[Native v2 Committees]
23-
24-
ShadowV1 --> LFXOne[LFX One UI]
25-
NativeV2 --> LFXOne
26-
ProjectsV2 & CommitteesV2 --> LFXOne
27-
28-
LFXOne --> Search[Search & Query<br/>Services]
29-
LFXOne --> FGA[OpenFGA<br/>Access Control]
30-
LFXOne --> JoinFlow[Meeting Join Flow]
31-
32-
subgraph "LFX One Platform"
33-
Search
34-
FGA
35-
JoinFlow
36-
end
37-
38-
subgraph "v1 Data"
39-
V1
40-
Projects
41-
Committees
42-
end
11+
2. **Bidirectional sync for "core" resources.** Projects and committees are fully synced in both directions between LFX v1 and LFX One. This gives LFX One a self-contained stack for these entity types, which simplifies developer environment stand-up by removing the dependency on the highly-interconnected LFX/Salesforce/ITX stack.
4312

44-
subgraph "v2 Data"
45-
ShadowV1
46-
NativeV2
47-
ProjectsV2
48-
CommitteesV2
49-
end
50-
```
13+
ITX-hosted resources such as Meetings are handled by v2 "wrapper" services that sit in front of the ITX APIs and rely on the NATS KV replication above for eventing; they do **not** get their own native v2 entity storage. See the [ITX wrappers component diagram](#itx-wrappers-component-diagram) in the Architecture Diagrams section for how this fits together.
5114

5215
## Prerequisites
5316

@@ -63,14 +26,23 @@ Please see each component for further setup instructions.
6326
This repository contains three main components:
6427

6528
### [Meltano](./meltano/README.md)
29+
6630
Data extraction and loading pipeline that extracts data from LFX v1 sources (DynamoDB for meetings, PostgreSQL for projects/committees) and loads it into NATS KV stores for processing by the v2 platform.
6731

6832
### [v1-sync-helper](./cmd/lfx-v1-sync-helper/README.md)
33+
6934
Go service that monitors NATS KV stores for replicated v1 data and synchronizes it with the LFX v2 platform APIs, handling data transformation and conflict resolution.
7035

7136
### [Helm charts](./charts/lfx-v1-sync-helper/README.md)
37+
7238
Kubernetes deployment manifests for the custom app service and WAL listener component, providing scalable deployment options for production environments.
7339

40+
## Research & guides
41+
42+
- [Adding a new DynamoDB table](./research/adding-dynamodb-table.md) — step-by-step checklist for onboarding a new DynamoDB table into the Meltano pipeline and stream consumer, with a worked example.
43+
- [Updating the Meltano catalog ConfigMap](./research/updating-meltano-catalog.md) — how to regenerate and apply the schema cache when tables or columns change.
44+
- [Meetings v1 vs v2](./research/meetings-v1-vs-v2.md) — comparison of the v1 and v2 meetings data models.
45+
7446
## NATS API
7547

7648
The v1-sync-helper service provides a NATS request/reply function for querying v1-v2 ID mappings.
@@ -86,12 +58,14 @@ The v1-sync-helper service provides a NATS request/reply function for querying v
8658
Send a NATS request to `lfx.lookup_v1_mapping` with the mapping key as the payload. The service will respond with the corresponding mapping value or an error.
8759

8860
**Request Format:**
61+
8962
```
9063
Subject: lfx.lookup_v1_mapping
9164
Payload: <mapping_key>
9265
```
9366

9467
**Response Format:**
68+
9569
- **Success**: The mapped value as a string
9670
- **Not Found**: Empty string (`""`)
9771
- **Error**: String prefixed with `"error: "` (e.g., `"error: connection timeout"`)
@@ -116,11 +90,78 @@ The following table shows the supported mapping key patterns and their expected
11690

11791
## Architecture Diagrams
11892

119-
Regarding the following diagrams:
93+
Regarding the following sequence diagrams:
94+
95+
- "Projects API" is representative of the core resources that have bidirectional sync (projects, committees). ITX-hosted resources such as Meetings are handled by wrapper services that subscribe to the NATS KV bucket instead—see the component diagram below.
12096

121-
- The DynamoDB source (incremental or realtime) is not currently included in the diagrams.
122-
- The planned bidirectional sync (LFX One changes back to v1) is included in the diagrams.
123-
- "Projects API" is representative of most data entities. However, v1 Meetings push straight to OpenSearch and OpenFGA (via platform services)—this is not shown.
97+
### ITX wrappers component diagram
98+
99+
This diagram shows how the LFX One platform, the v1-sync-helper replication pipeline, and ITX-hosted services fit together at the component level.
100+
101+
```mermaid
102+
flowchart TD
103+
%%{init: {'flowchart': {'defaultRenderer': 'elk' }}}%%
104+
105+
user[User]
106+
107+
subgraph lfxv2["LFX Platform (k8s)"]
108+
traefik[Traefik]
109+
heimdall[Heimdall]
110+
subgraph fga-sync
111+
fga-sync-update-access[update-access]
112+
fga-sync-access-check[access-check]
113+
end
114+
indexer
115+
query-svc[Query Service]
116+
opensearch[OpenSearch]
117+
openfga[OpenFGA]
118+
119+
xyz-wrapper@{ shape: processes, label: "Entity services (wrappers)" }
120+
121+
traefik -.->|calls authz middleware| heimdall
122+
traefik --->|"proxies all list (search) requests to"| query-svc
123+
heimdall -.->|checks relations via| openfga
124+
query-svc -->|queries from| opensearch
125+
query-svc -.->|checks access via NATS| fga-sync-access-check
126+
indexer -.->|stores to| opensearch
127+
fga-sync-update-access -.->|syncs relations to| openfga
128+
fga-sync-access-check -.->|checks access via| openfga
129+
130+
traefik -->|proxies authorized resource create/get/put requests to| xyz-wrapper
131+
132+
xyz-wrapper -.->|upsert via NATS| indexer
133+
xyz-wrapper -.->|push relations via NATS| fga-sync-update-access
134+
135+
%%wal-listener
136+
v1-sync-helper
137+
v1-objects[(v1 replica<br />KV bucket)]
138+
%%wal-listener -.->|NATS stream| v1-sync-helper
139+
v1-sync-helper -.->|NATS KV operations| v1-objects
140+
141+
v1-objects -.->|subscribes to bucket events via NATS| xyz-wrapper
142+
end
143+
144+
subgraph itx-aws[ITX AWS]
145+
itx-api-gw[API Gateway]
146+
itx-svc-authz[Authorizer Lambda]
147+
itx-service-xyz@{ shape: processes, label: "ITX services (Lambdas)"}
148+
dynamodb[(DynamoDB)]
149+
150+
itx-api-gw -.-> itx-svc-authz
151+
itx-api-gw --> itx-service-xyz
152+
itx-service-xyz --> dynamodb
153+
end
154+
155+
third-party-svcs@{ shape: processes, label: "Third-party services (Zoom, etc)"}
156+
itx-service-xyz --> third-party-svcs
157+
158+
xyz-wrapper -->|authorized<br />create/get/put| itx-api-gw
159+
160+
dynamodb -.->|consumed by streams| v1-sync-helper
161+
162+
user -->|old| PIS[PIS or User Service] -->|authorized create/get/put/list| itx-api-gw
163+
user -->|new| traefik
164+
```
124165

125166
### Data extraction/replication sequence diagram
126167

@@ -129,6 +170,8 @@ sequenceDiagram
129170
participant lfx_v1 as LFX v1 API
130171
participant postgres as Platform Database<br/>(PostgreSQL)
131172
participant wal-listener
173+
participant dynamodb as DynamoDB
174+
participant dynamo-stream as dynamodb-stream-consumer
132175
participant meltano as Meltano<br/>(custom NATS<br/>exporter)
133176
participant v1_kv as "v1" NATS KV bucket
134177
participant v1-sync-helper
@@ -140,13 +183,20 @@ sequenceDiagram
140183
wal-listener-)+v1-sync-helper: notification on "wal-listener" subject
141184
deactivate wal-listener
142185
v1-sync-helper-)-v1_kv: store record (or soft-deletion) by v1 ID
186+
lfx_v1 ->> dynamodb: create/update/delete (via ITX API)
187+
dynamodb-)+dynamo-stream: DynamoDB Streams event
188+
dynamo-stream-)+v1-sync-helper: notification on "dynamodb_streams" subject
189+
deactivate dynamo-stream
190+
v1-sync-helper-)-v1_kv: store record (or soft-deletion) by v1 ID
143191
144192
Note over lfx_v1,v1_kv: Data backfill (full sync & incremental gap-fill)
145193
meltano->>meltano: scheduled task invoke (weekly/monthly)
146194
activate meltano
147195
meltano->>meltano: load state from S3<br/>(incremental state bookmark)
148196
meltano->>+postgres: query records >= LAST_SYNC<br/>(full re-sync also supported)
149197
postgres--)-meltano: results
198+
meltano->>+dynamodb: Scan tables >= LAST_MONTH<br/>(full re-scan also supported)
199+
dynamodb--)-meltano: results
150200
loop for each record
151201
meltano->>+v1_kv: fetch KV item by v1 ID
152202
v1_kv--)-meltano: KV item, soft-deletion, or empty
@@ -274,6 +324,8 @@ sequenceDiagram
274324
participant lfx_v1 as LFX v1 API
275325
participant postgres as Platform Database<br/>(PostgreSQL)
276326
participant wal-listener
327+
participant dynamodb as DynamoDB
328+
participant dynamo-stream as dynamodb-stream-consumer
277329
participant meltano as Meltano<br/>(custom NATS<br/>exporter)
278330
participant v1_kv as "v1" NATS KV bucket
279331
participant v1-sync-helper
@@ -290,13 +342,20 @@ sequenceDiagram
290342
wal-listener-)+v1-sync-helper: notification on "wal-listener" subject
291343
deactivate wal-listener
292344
v1-sync-helper-)-v1_kv: store record (or soft-deletion) by v1 ID
345+
lfx_v1 ->> dynamodb: create/update/delete (via ITX API)
346+
dynamodb-)+dynamo-stream: DynamoDB Streams event
347+
dynamo-stream-)+v1-sync-helper: notification on "dynamodb_streams" subject
348+
deactivate dynamo-stream
349+
v1-sync-helper-)-v1_kv: store record (or soft-deletion) by v1 ID
293350
294351
Note over lfx_v1,v1_kv: Data backfill (full sync & incremental gap-fill)
295352
meltano->>meltano: scheduled task invoke (weekly/monthly)
296353
activate meltano
297354
meltano->>meltano: load state from S3<br/>(incremental state bookmark)
298355
meltano->>+postgres: query records >= LAST_SYNC<br/>(full re-sync also supported)
299356
postgres--)-meltano: results
357+
meltano->>+dynamodb: Scan tables >= LAST_MONTH<br/>(full re-scan also supported)
358+
dynamodb--)-meltano: results
300359
loop for each record
301360
meltano->>+v1_kv: fetch KV item by v1 ID
302361
v1_kv--)-meltano: KV item, soft-deletion, or empty

charts/lfx-v1-sync-helper/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ dynamodbStreamConsumer:
252252
# DYNAMODB_TABLES is a comma-separated list of DynamoDB table names to consume.
253253
# Defaults to the full set of tables used by the tap-dynamodb Meltano extractor.
254254
DYNAMODB_TABLES:
255-
value: "itx-poll,itx-poll-vote,itx-surveys,itx-survey-responses,itx-zoom-meetings-mappings-v2,itx-zoom-meetings-v2,itx-zoom-past-meetings-mappings,itx-zoom-past-meetings,itx-zoom-past-meetings-attendees,itx-zoom-past-meetings-invitees,itx-zoom-past-meetings-recordings,itx-zoom-past-meetings-summaries,itx-zoom-meetings-registrants-v2,itx-zoom-meetings-invite-responses-v2,itx-zoom-meetings-attachments-v2,itx-zoom-past-meetings-attachments,itx-groupsio-v2-service,itx-groupsio-v2-subgroup,itx-groupsio-v2-member"
255+
value: "itx-poll,itx-poll-vote,itx-surveys,itx-survey-responses,surveymonkey-surveys,itx-zoom-meetings-mappings-v2,itx-zoom-meetings-v2,itx-zoom-past-meetings-mappings,itx-zoom-past-meetings,itx-zoom-past-meetings-attendees,itx-zoom-past-meetings-invitees,itx-zoom-past-meetings-recordings,itx-zoom-past-meetings-summaries,itx-zoom-meetings-registrants-v2,itx-zoom-meetings-invite-responses-v2,itx-zoom-meetings-attachments-v2,itx-zoom-past-meetings-attachments,itx-groupsio-v2-service,itx-groupsio-v2-subgroup,itx-groupsio-v2-member"
256256
# START_FROM_LATEST controls the iterator start position for new shards with no checkpoint.
257257
# Set to "true" to only receive new records; "false" (default) replays all available records.
258258
START_FROM_LATEST:

cmd/lfx-v1-sync-helper/handlers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func handleKVPut(ctx context.Context, entry jetstream.KeyValueEntry) bool {
107107
// Voting records are handled by lfx-v2-voting-service.
108108
logger.With("key", key).DebugContext(ctx, "voting record, handled by lfx-v2-voting-service")
109109
return false
110-
case "itx-surveys", "itx-survey-responses":
110+
case "itx-surveys", "itx-survey-responses", "surveymonkey-surveys":
111111
// Survey records are handled by lfx-v2-survey-service.
112112
logger.With("key", key).DebugContext(ctx, "survey record, handled by lfx-v2-survey-service")
113113
return false
@@ -254,7 +254,7 @@ func handleResourceDelete(ctx context.Context, key string, v1Principal string, v
254254
// Voting records are handled by lfx-v2-voting-service.
255255
logger.With("key", key).DebugContext(ctx, "voting record deleted, handled by lfx-v2-voting-service")
256256
return false
257-
case "itx-surveys", "itx-survey-responses":
257+
case "itx-surveys", "itx-survey-responses", "surveymonkey-surveys":
258258
// Survey records are handled by lfx-v2-survey-service.
259259
logger.With("key", key).DebugContext(ctx, "survey record deleted, handled by lfx-v2-survey-service")
260260
return false

cmd/lfx-v1-sync-helper/handlers_meetings.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,12 +1285,6 @@ func handleZoomPastMeetingUpdate(ctx context.Context, key string, v1Data map[str
12851285
indexerAction = MessageActionUpdated
12861286
}
12871287

1288-
tags := getPastMeetingTags(pastMeeting)
1289-
if err := sendIndexerMessage(ctx, IndexV1PastMeetingSubject, indexerAction, pastMeeting, tags); err != nil {
1290-
funcLogger.With(errKey, err).ErrorContext(ctx, "failed to send past meeting indexer message")
1291-
return
1292-
}
1293-
12941288
// Try to get committee mappings from the index first
12951289
var committees []string
12961290
pastMeeting.Committees = []Committee{}
@@ -1339,6 +1333,12 @@ func handleZoomPastMeetingUpdate(ctx context.Context, key string, v1Data map[str
13391333
}
13401334
}
13411335

1336+
tags := getPastMeetingTags(pastMeeting)
1337+
if err := sendIndexerMessage(ctx, IndexV1PastMeetingSubject, indexerAction, pastMeeting, tags); err != nil {
1338+
funcLogger.With(errKey, err).ErrorContext(ctx, "failed to send past meeting indexer message")
1339+
return
1340+
}
1341+
13421342
accessMsg := PastMeetingAccessMessage{
13431343
UID: uid,
13441344
MeetingUID: pastMeeting.MeetingID,

cmd/lfx-v1-sync-helper/ingest_dynamodb.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -265,20 +265,17 @@ func dynamodbKVKey(tableName string, keys map[string]interface{}) string {
265265
return tableName + "." + strings.Join(parts, "#")
266266
}
267267

268-
// shouldDynamoDBUpdate returns true when the incoming new image should overwrite
269-
// the existing KV entry. It compares modified_at timestamps when both are present,
270-
// falling back to last_modified_at for tables that use that field instead (e.g.
271-
// Groups.io tables). If either timestamp is missing or unparseable the write
272-
// proceeds (stream events are treated as authoritative).
268+
// knownTimestampFields is the ordered list of timestamp field names checked when
269+
// comparing records. Add new field names here when onboarding tables that use a
270+
// different timestamp field.
271+
var knownTimestampFields = []string{"modified_at", "last_modified_at", "date_modified"}
272+
273+
// shouldDynamoDBUpdate returns true when the new image should overwrite the
274+
// existing KV entry. If either record has no recognisable timestamp the write
275+
// proceeds; stream events are treated as authoritative.
273276
func shouldDynamoDBUpdate(ctx context.Context, newData, existingData map[string]interface{}, key string) bool {
274-
newModifiedAt := getTimestampString(newData, "modified_at")
275-
if newModifiedAt == "" {
276-
newModifiedAt = getTimestampString(newData, "last_modified_at")
277-
}
278-
existingModifiedAt := getTimestampString(existingData, "modified_at")
279-
if existingModifiedAt == "" {
280-
existingModifiedAt = getTimestampString(existingData, "last_modified_at")
281-
}
277+
newModifiedAt := firstTimestamp(newData, knownTimestampFields...)
278+
existingModifiedAt := firstTimestamp(existingData, knownTimestampFields...)
282279

283280
if newModifiedAt == "" || existingModifiedAt == "" {
284281
return true
@@ -295,3 +292,14 @@ func shouldDynamoDBUpdate(ctx context.Context, newData, existingData map[string]
295292

296293
return newTime.After(existingTime)
297294
}
295+
296+
// firstTimestamp returns the first non-empty timestamp string found in data
297+
// for the given field names, or "" if none are present.
298+
func firstTimestamp(data map[string]interface{}, fields ...string) string {
299+
for _, f := range fields {
300+
if v := getTimestampString(data, f); v != "" {
301+
return v
302+
}
303+
}
304+
return ""
305+
}

0 commit comments

Comments
 (0)