Skip to content

Commit e70237e

Browse files
authored
Support denormalized document joins for RDS source (opensearch-project#6762)
* Add join configuration model and metadata enricher for RDS source Add JoinConfig, JoinRelation configuration classes and JoinMetadataEnricher that enriches CDC events with join metadata (_table, _fields, _is_delete, _primary_key) to enable denormalization on write in the OpenSearch sink. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Wire join metadata enrichment into RDS source event pipeline Add joins config to RdsSourceConfig. Update RecordConverter.convert() to accept columnNames and call JoinMetadataEnricher when table participates in a join. Wire enricher creation in BinlogEventListener and LogicalReplicationEventProcessor. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Add rds-joins template with auto-configured denormalization script Add rds-joins-rule.yaml that matches when both source.rds and source.rds.joins are present. Add rds-joins-template.yaml that configures the OpenSearch sink with upsert action and a painless script that selectively merges/removes fields based on join metadata. Update RuleEvaluator to sort rules by specificity (most apply_when conditions first) so the more specific rds-joins rule matches before the generic rds rule. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Add 1:N join support with nested array denormalization Update JoinRelation to include child_primary_key for array element identification. Update JoinMetadataEnricher to set _is_parent, _child_table_name, _child_pk_name, _child_pk_value metadata and exclude join key columns from _fields. Update painless script in rds-joins template to handle parent flat merge, child nested array insert/update/delete, and parent delete as full document delete. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Override S3 partition key for join tables to use parent key For child tables in a join, override the S3 partition key to use the join primary key (parent key value) instead of the child table's own primary key. This ensures related parent and child events hash to the same S3 folder so they are processed together by the s3 source pipeline. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Remove external versioning from joins template Multiple tables write to the same document in join mode. Events from different tables in the same transaction can share the same timestamp, causing version conflicts with external versioning since it requires strictly greater versions. The script itself is idempotent so versioning is not needed for correctness. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Add username/password passthrough to rds-joins template Pass through username and password from customer's OpenSearch sink config to support basic auth in addition to AWS IAM auth. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Add per-row versioning, join_type, max_child_records, and monotonic versioning for RDS join Changes: - Painless script: per-table versioning for parent/1:1, per-row versioning for 1:N children - Configurable version_field (default __versions) to avoid field name collisions - join_type: one_to_one (flat merge) and one_to_many (array with max_child_records cap) - Monotonic version counter in StreamRecordConverter using AtomicLong (timestamp_millis * 1000 + sequence) for unique versions within same second - retryOnConflict(3) on scripted upsert bulk operations - Export path: wire JoinMetadataEnricher into DataFileScheduler, pass column names - Set default empty string for _child_pk_value on parent events to prevent NPE Tested: parent/child CRUD, 1:1 join, 1:N with max cap, child-before-parent, concurrent writes, rapid updates, delete+re-insert, NULL values, special chars, bulk UPDATE, REPLACE INTO, load tests (300+ events verified against MySQL). Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Add composite FK support and FK change detection for RDS join Composite FK (multi-column join key): - JoinRelation: parent_key, child_key, child_primary_key changed from String to List<String> with ACCEPT_SINGLE_VALUE_AS_ARRAY for backward compatibility - JoinMetadataEnricher: composite key values joined with | for document ID and child PK - Painless script: pkNames split by |, composite matching in removeIf and trim cleanup FK change detection (before-image): - BinlogEventListener.handleUpdateEvent: detects when child FK columns change between before-image and after-image, emits DELETE for old parent doc - Supports composite FK: checks all key columns, triggers on any column change - RecordConverter: added getJoinMetadataEnricher() getter - JoinMetadataEnricher: added getChildKeyColumns() returning List<String> Requires binlog_row_image=FULL (Aurora MySQL default) for before-image availability. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Add overlay directive for template transformer and update joins template Template transformer: - Added <<overlay path>> directive support in DynamicConfigTransformer - Processes after placeholder resolution, before model conversion - Supports [*] wildcard to apply overlay to all matching array elements - Deep merge semantics: overlay fields override target, nested objects merged - Example: <<overlay sink[*].opensearch>> merges join script into all OS sinks Joins template: - Changed from hardcoded OpenSearch sink to full customer sink passthrough - sink: <<$.<<pipeline-name>>.sink>> preserves all customer sink config (hosts, aws, index, dlq, routes, etc.) - <<overlay sink[*].opensearch>> injects action, document_id, script, scripted_upsert, retry_on_conflict into every OpenSearch sink entry - Non-OpenSearch sinks are left untouched Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Address PR review comments from @oeyh - Use Objects.equals() for FK comparison to handle NULL values (BinlogEventListener) - Create JoinType enum (ONE_TO_ONE, ONE_TO_MANY) and use in JoinRelation - Add proper ArrayList import in RuleEvaluator Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Add unit tests for join metadata enricher, version counter, rule evaluator, and overlay directive - JoinMetadataEnricherTest: parent/child enrichment, composite keys, 1:1 join type, delete events, isJoinTable, getChildKeyColumns - StreamRecordConverterTest: monotonic version counter (same millis increments, new millis resets, always > export version) - RuleEvaluatorTest: more specific rule (2 conditions) matches before generic rule (1 condition) regardless of load order - DynamicConfigTransformerTest: overlay directive merges into opensearch sinks, leaves non-opensearch sinks untouched, removes overlay key Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --------- Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
1 parent 569ae3a commit e70237e

27 files changed

Lines changed: 1176 additions & 17 deletions

File tree

data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.FileNotFoundException;
2525
import java.io.IOException;
2626
import java.io.InputStream;
27+
import java.util.ArrayList;
2728
import java.util.Collection;
2829
import java.util.List;
2930
import java.util.Map;
@@ -91,10 +92,23 @@ private RuleFileEvaluation evaluate(String pipelinesJson) {
9192
try {
9293
Collection<RuleStream> ruleStreams = transformersFactory.loadRules();
9394

94-
//walk through all rules and return first valid
95+
// Pre-parse all rules and sort by specificity (most conditions first)
96+
// so more specific rules like rds-joins match before generic rds
97+
List<ParsedRule> parsedRules = new ArrayList<>();
9598
for (RuleStream ruleStream : ruleStreams) {
9699
try {
97-
rulesModel = yamlMapper.readValue(ruleStream.getRuleStream(), RuleTransformerModel.class);
100+
RuleTransformerModel model = yamlMapper.readValue(ruleStream.getRuleStream(), RuleTransformerModel.class);
101+
parsedRules.add(new ParsedRule(model, ruleStream.getName()));
102+
} finally {
103+
ruleStream.close();
104+
}
105+
}
106+
parsedRules.sort((a, b) -> Integer.compare(
107+
b.model.getApplyWhen().size(), a.model.getApplyWhen().size()));
108+
109+
//walk through all rules and return first valid
110+
for (ParsedRule parsedRule : parsedRules) {
111+
rulesModel = parsedRule.model;
98112
List<String> rules = rulesModel.getApplyWhen();
99113
String pluginName = rulesModel.getPluginName();
100114
boolean allRulesValid = true;
@@ -107,7 +121,7 @@ private RuleFileEvaluation evaluate(String pipelinesJson) {
107121
break;
108122
}
109123
} catch (PathNotFoundException e) {
110-
LOG.debug("Json Path not found for {}", ruleStream.getName());
124+
LOG.debug("Json Path not found for {}", parsedRule.fileName);
111125
allRulesValid = false;
112126
break;
113127
}
@@ -116,13 +130,10 @@ private RuleFileEvaluation evaluate(String pipelinesJson) {
116130
if (allRulesValid) {
117131
return RuleFileEvaluation.builder()
118132
.withPluginName(pluginName)
119-
.withRuleFileName(ruleStream.getName())
133+
.withRuleFileName(parsedRule.fileName)
120134
.withResult(true)
121135
.build();
122136
}
123-
} finally {
124-
ruleStream.close();
125-
}
126137
}
127138

128139
} catch (FileNotFoundException e) {
@@ -143,4 +154,13 @@ private RuleFileEvaluation evaluate(String pipelinesJson) {
143154
.build();
144155
}
145156

157+
private static class ParsedRule {
158+
final RuleTransformerModel model;
159+
final String fileName;
160+
161+
ParsedRule(RuleTransformerModel model, String fileName) {
162+
this.model = model;
163+
this.fileName = fileName;
164+
}
165+
}
146166
}

data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme
8282

8383
private static final String S3_BUFFER_PREFIX = "/buffer";
8484

85+
/**
86+
* Pattern to match overlay directives like "<<overlay sink[*].opensearch>>"
87+
* The captured group is the target path (e.g., "sink[*].opensearch")
88+
*/
89+
private static final Pattern OVERLAY_PATTERN = Pattern.compile("^<<overlay\\s+(.+?)>>$");
90+
8591
Configuration parseConfigWithJsonNode = Configuration.builder()
8692
.jsonProvider(new JacksonJsonNodeJsonProvider())
8793
.mappingProvider(new JacksonMappingProvider())
@@ -166,6 +172,9 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT
166172
}
167173
});
168174

175+
// Process <<overlay>> directives — merge overlay fields into resolved config
176+
processOverlayDirectives(templateRootNode, pipelineJson);
177+
169178
PipelinesDataFlowModel transformedPipelinesDataFlowModel = getTransformedPipelinesDataFlowModel(pipelineNameThatNeedsTransformation,
170179
preTransformedPipelinesDataFlowModel,
171180
templateRootNode,
@@ -286,6 +295,10 @@ private void populateMapWithPlaceholderPaths(JsonNode currentNode, String curren
286295
Iterator<Map.Entry<String, JsonNode>> fields = currentNode.fields();
287296
while (fields.hasNext()) {
288297
Map.Entry<String, JsonNode> entry = fields.next();
298+
// Skip overlay directive keys — they are processed separately
299+
if (OVERLAY_PATTERN.matcher(entry.getKey()).matches()) {
300+
continue;
301+
}
289302
String path = currentPath.isEmpty() ? entry.getKey() : currentPath + "." + entry.getKey();
290303
populateMapWithPlaceholderPaths(entry.getValue(), path, placeholdersWithPaths);
291304
}
@@ -489,6 +502,156 @@ public Object invokeMethod(String methodName, Class<?> parameterType, Object arg
489502
}
490503

491504

505+
/**
506+
* Processes <<overlay path>> directives in the template.
507+
* For each overlay directive found as a key in any object node, merges the overlay
508+
* value into the target path. Supports [*] wildcard to apply to all array elements
509+
* matching a specific key.
510+
*
511+
* Example: <<overlay sink[*].opensearch>> with value {action: "upsert", script: {...}}
512+
* will merge those fields into every opensearch sink entry.
513+
*/
514+
private void processOverlayDirectives(JsonNode rootNode, String pipelineJson) {
515+
processOverlayDirectivesRecursive(rootNode, pipelineJson);
516+
}
517+
518+
private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJson) {
519+
if (node.isObject()) {
520+
ObjectNode objectNode = (ObjectNode) node;
521+
List<String> overlayKeys = new ArrayList<>();
522+
523+
// First pass: collect overlay keys
524+
Iterator<String> fieldNames = objectNode.fieldNames();
525+
while (fieldNames.hasNext()) {
526+
String fieldName = fieldNames.next();
527+
Matcher matcher = OVERLAY_PATTERN.matcher(fieldName);
528+
if (matcher.matches()) {
529+
overlayKeys.add(fieldName);
530+
}
531+
}
532+
533+
// Second pass: apply overlays
534+
for (String overlayKey : overlayKeys) {
535+
Matcher matcher = OVERLAY_PATTERN.matcher(overlayKey);
536+
if (matcher.matches()) {
537+
String targetPath = matcher.group(1);
538+
JsonNode overlayValue = objectNode.get(overlayKey);
539+
// Resolve any <<...>> placeholders inside overlay values
540+
resolveOverlayPlaceholders(overlayValue, pipelineJson);
541+
applyOverlay(objectNode, targetPath, overlayValue);
542+
objectNode.remove(overlayKey);
543+
}
544+
}
545+
546+
// Recurse into remaining children
547+
Iterator<Map.Entry<String, JsonNode>> fields = objectNode.fields();
548+
while (fields.hasNext()) {
549+
processOverlayDirectivesRecursive(fields.next().getValue(), pipelineJson);
550+
}
551+
} else if (node.isArray()) {
552+
for (JsonNode element : node) {
553+
processOverlayDirectivesRecursive(element, pipelineJson);
554+
}
555+
}
556+
}
557+
558+
/**
559+
* Resolves <<...>> placeholders in overlay value nodes using the pipeline config.
560+
*/
561+
private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson) {
562+
if (node.isObject()) {
563+
ObjectNode objectNode = (ObjectNode) node;
564+
List<Map.Entry<String, JsonNode>> entries = new ArrayList<>();
565+
objectNode.fields().forEachRemaining(entries::add);
566+
for (Map.Entry<String, JsonNode> entry : entries) {
567+
if (entry.getValue().isValueNode()) {
568+
String text = entry.getValue().asText();
569+
Matcher matcher = PLACEHOLDER_PATTERN.matcher(text);
570+
if (matcher.find()) {
571+
String placeholder = getValueFromPlaceHolder(text);
572+
String resolved = executeFunctionPlaceholder(placeholder, pipelineJson);
573+
if (isJsonPath(resolved)) {
574+
JsonNode resolvedNode = JsonPath.using(parseConfigWithJsonNode)
575+
.parse(pipelineJson).read(resolved);
576+
if (resolvedNode != null && resolvedNode.isArray() && resolvedNode.size() == 1
577+
&& resolved.contains(JSON_PATH_ARRAY_DISAMBIGUATOR_PATTERN)) {
578+
resolvedNode = resolvedNode.get(0);
579+
}
580+
objectNode.set(entry.getKey(), resolvedNode);
581+
} else if (resolved != null) {
582+
objectNode.set(entry.getKey(), objectMapper.valueToTree(resolved));
583+
}
584+
}
585+
} else {
586+
resolveOverlayPlaceholders(entry.getValue(), pipelineJson);
587+
}
588+
}
589+
} else if (node.isArray()) {
590+
for (JsonNode element : node) {
591+
resolveOverlayPlaceholders(element, pipelineJson);
592+
}
593+
}
594+
}
595+
596+
/**
597+
* Applies overlay to the target path within the parent node.
598+
* Supports paths like "sink[*].opensearch" where [*] means all array elements.
599+
*/
600+
private void applyOverlay(ObjectNode parentNode, String targetPath, JsonNode overlayValue) {
601+
// Split path into segments: e.g., "sink[*].opensearch" -> ["sink[*]", "opensearch"]
602+
String[] segments = targetPath.split("\\.");
603+
applyOverlayAtPath(parentNode, segments, 0, overlayValue);
604+
}
605+
606+
private void applyOverlayAtPath(JsonNode currentNode, String[] segments, int index, JsonNode overlayValue) {
607+
if (index >= segments.length) {
608+
// Reached the target — deep merge overlay into current node
609+
if (currentNode.isObject() && overlayValue.isObject()) {
610+
deepMerge((ObjectNode) currentNode, (ObjectNode) overlayValue);
611+
}
612+
return;
613+
}
614+
615+
String segment = segments[index];
616+
617+
if (segment.endsWith("[*]")) {
618+
// Wildcard array segment: apply to all matching elements
619+
String fieldName = segment.substring(0, segment.length() - 3);
620+
JsonNode arrayNode = currentNode.get(fieldName);
621+
if (arrayNode != null && arrayNode.isArray()) {
622+
String nextSegment = (index + 1 < segments.length) ? segments[index + 1] : null;
623+
for (JsonNode element : arrayNode) {
624+
if (nextSegment != null && element.isObject() && element.has(nextSegment)) {
625+
// Apply to the nested object (e.g., the "opensearch" object within each sink entry)
626+
applyOverlayAtPath(element.get(nextSegment), segments, index + 2, overlayValue);
627+
} else if (nextSegment == null) {
628+
applyOverlayAtPath(element, segments, index + 1, overlayValue);
629+
}
630+
}
631+
}
632+
} else {
633+
// Regular field traversal
634+
JsonNode childNode = currentNode.get(segment);
635+
if (childNode != null) {
636+
applyOverlayAtPath(childNode, segments, index + 1, overlayValue);
637+
}
638+
}
639+
}
640+
641+
/**
642+
* Deep merges source into target. Source values override target values.
643+
*/
644+
private void deepMerge(ObjectNode target, ObjectNode source) {
645+
Iterator<Map.Entry<String, JsonNode>> fields = source.fields();
646+
while (fields.hasNext()) {
647+
Map.Entry<String, JsonNode> entry = fields.next();
648+
String fieldName = entry.getKey();
649+
JsonNode sourceValue = entry.getValue();
650+
651+
target.set(fieldName, sourceValue);
652+
}
653+
}
654+
492655
/**
493656
* Replaces template node in the jsonPath with the node from
494657
* original json.

data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,47 @@ void test_isTransformationNeeded_ForOtherSource_ShouldReturn_False() throws IOEx
9494
assertFalse(result.isEvaluatedResult());
9595
}
9696

97+
@Test
98+
void test_moreSpecificRule_matchesFirst_whenMultipleRulesApply() throws IOException {
99+
// rds-joins rule has 2 conditions, rds rule has 1 condition.
100+
// Both match an RDS pipeline with joins config.
101+
// The more specific rule (rds-joins) should win.
102+
String rdsRulePath = "src/test/resources/transformation/rules/rds-rule.yaml";
103+
String rdsJoinsRulePath = "src/test/resources/transformation/rules/rds-joins-rule.yaml";
104+
String templatePath = "src/test/resources/transformation/templates/testSource/rds-joins-template.yaml";
105+
106+
String pipelineName = "test-pipeline";
107+
Map<String, Object> joinsConfig = new HashMap<>();
108+
joinsConfig.put("version_field", "__versions");
109+
Map<String, Object> sourceOptions = new HashMap<>();
110+
sourceOptions.put("joins", joinsConfig);
111+
final PluginModel source = new PluginModel("rds", sourceOptions);
112+
final List<SinkModel> sinks = Collections.singletonList(
113+
new SinkModel("testSink", Collections.emptyList(), null,
114+
Collections.emptyList(), Collections.emptyList(), null));
115+
final PipelineModel pipelineModel = new PipelineModel(
116+
source, null, Collections.emptyList(), null, sinks, 8, 50);
117+
final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(
118+
(PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel));
119+
120+
TransformersFactory transformersFactory = mock(TransformersFactory.class);
121+
122+
// Load both rules — rds rule first (less specific), rds-joins second (more specific)
123+
RuleStream rdsRule = new RuleStream("rds-rule.yaml", new FileInputStream(rdsRulePath));
124+
RuleStream rdsJoinsRule = new RuleStream("rds-joins-rule.yaml", new FileInputStream(rdsJoinsRulePath));
125+
when(transformersFactory.loadRules()).thenReturn(List.of(rdsRule, rdsJoinsRule));
126+
when(transformersFactory.getPluginTemplateFileStream("rds-joins"))
127+
.thenReturn(new FileInputStream(templatePath));
128+
129+
RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory);
130+
RuleEvaluatorResult result = ruleEvaluator.isTransformationNeeded(pipelinesDataFlowModel);
131+
132+
assertTrue(result.isEvaluatedResult());
133+
assertEquals(pipelineName, result.getPipelineName());
134+
// The rds-joins template should be selected, not the generic rds template
135+
assertTrue(result.getPipelineTemplateModel() != null);
136+
}
137+
97138
@Test
98139
void testThrowsExceptionOnFileError() {
99140
TransformersFactory transformersFactory = mock(TransformersFactory.class);

0 commit comments

Comments
 (0)