Skip to content

Commit 2479422

Browse files
Added logic for dag creation and weighted table selection (#3643)
* Added logic for dag creation and weighted table selection * minor fixes * addressing comments
1 parent be7ffcb commit 2479422

8 files changed

Lines changed: 853 additions & 0 deletions

File tree

v2/cdc-data-generator/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838
<artifactId>spanner-common</artifactId>
3939
<version>${project.version}</version>
4040
</dependency>
41+
<dependency>
42+
<groupId>org.apache.commons</groupId>
43+
<artifactId>commons-math3</artifactId>
44+
<version>3.6.1</version>
45+
</dependency>
4146
<dependency>
4247
<groupId>org.apache.beam</groupId>
4348
<artifactId>beam-sdks-java-core</artifactId>
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.dofn;
17+
18+
import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema;
19+
import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable;
20+
import com.google.common.annotations.VisibleForTesting;
21+
import java.util.List;
22+
import java.util.stream.Collectors;
23+
import org.apache.beam.sdk.transforms.DoFn;
24+
import org.apache.beam.sdk.values.PCollectionView;
25+
import org.apache.commons.math3.distribution.EnumeratedDistribution;
26+
import org.apache.commons.math3.util.Pair;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public class SelectTableFn extends DoFn<Long, DataGeneratorTable> {
31+
private static final Logger LOG = LoggerFactory.getLogger(SelectTableFn.class);
32+
private final PCollectionView<DataGeneratorSchema> schemaView;
33+
34+
// Cache for table distribution
35+
private transient EnumeratedDistribution<DataGeneratorTable> tableDistribution;
36+
37+
public SelectTableFn(PCollectionView<DataGeneratorSchema> schemaView) {
38+
this.schemaView = schemaView;
39+
}
40+
41+
@ProcessElement
42+
public void processElement(ProcessContext c) {
43+
DataGeneratorSchema schema = c.sideInput(schemaView);
44+
45+
// Lazily initialize distribution. Schema is assumed to never change.
46+
if (tableDistribution == null) {
47+
tableDistribution = buildDistribution(schema);
48+
}
49+
50+
if (tableDistribution == null) {
51+
LOG.warn("No tables available for selection (total weight may be 0).");
52+
return;
53+
}
54+
55+
c.output(tableDistribution.sample());
56+
}
57+
58+
@VisibleForTesting
59+
public static EnumeratedDistribution<DataGeneratorTable> buildDistribution(
60+
DataGeneratorSchema schema) {
61+
List<Pair<DataGeneratorTable, Double>> pmf =
62+
schema.tables().values().stream()
63+
.filter(DataGeneratorTable::isRoot)
64+
.map(table -> new Pair<>(table, calculateTotalQps(table, schema)))
65+
.collect(Collectors.toList());
66+
67+
if (pmf.isEmpty()) {
68+
return null;
69+
}
70+
71+
double totalWeight = pmf.stream().mapToDouble(Pair::getValue).sum();
72+
if (totalWeight <= 0) {
73+
return null;
74+
}
75+
76+
LOG.info(
77+
"Initialized Table Distribution with {} entries. Total Weight (Root + Children QPS): {}",
78+
pmf.size(),
79+
totalWeight);
80+
81+
return new EnumeratedDistribution<>(pmf);
82+
}
83+
84+
private static double calculateTotalQps(DataGeneratorTable table, DataGeneratorSchema schema) {
85+
double totalQps = table.insertQps();
86+
if (table.childTables() != null) {
87+
for (String childName : table.childTables()) {
88+
DataGeneratorTable childTable = schema.tables().get(childName);
89+
if (childTable != null) {
90+
totalQps += calculateTotalQps(childTable, schema);
91+
}
92+
}
93+
}
94+
return totalQps;
95+
}
96+
}

v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/model/DataGeneratorTable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public abstract class DataGeneratorTable implements Serializable {
6666
@Nullable
6767
public abstract ImmutableList<String> childTables();
6868

69+
public abstract Builder toBuilder();
70+
6971
public static Builder builder() {
7072
return new AutoValue_DataGeneratorTable.Builder();
7173
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.transforms;
17+
18+
import com.google.cloud.teleport.v2.templates.dofn.SelectTableFn;
19+
import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema;
20+
import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable;
21+
import org.apache.beam.sdk.transforms.PTransform;
22+
import org.apache.beam.sdk.transforms.ParDo;
23+
import org.apache.beam.sdk.values.PCollection;
24+
import org.apache.beam.sdk.values.PCollectionView;
25+
26+
/**
27+
* A {@link PTransform} that selects a table for each tick based on weighted probability. The weight
28+
* for a table is (table insertQPS + insertQPS of its descendants). The denominator is the total
29+
* insertQPS across all tables.
30+
*/
31+
public class SelectTable extends PTransform<PCollection<Long>, PCollection<DataGeneratorTable>> {
32+
33+
private final PCollectionView<DataGeneratorSchema> schemaView;
34+
35+
public SelectTable(PCollectionView<DataGeneratorSchema> schemaView) {
36+
this.schemaView = schemaView;
37+
}
38+
39+
@Override
40+
public PCollection<DataGeneratorTable> expand(PCollection<Long> input) {
41+
return input.apply(
42+
"SelectTableFn", ParDo.of(new SelectTableFn(schemaView)).withSideInputs(schemaView));
43+
}
44+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.utils;
17+
18+
import com.google.cloud.teleport.v2.templates.model.DataGeneratorForeignKey;
19+
import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema;
20+
import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable;
21+
import com.google.common.collect.ImmutableList;
22+
import com.google.common.collect.ImmutableMap;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
30+
/** Utilities for manipulating {@link DataGeneratorSchema}. */
31+
public class SchemaUtils {
32+
33+
/**
34+
* Constructs a Directed Acyclic Graph (DAG) of tables in the schema. Identifies parent-child
35+
* relationships based on Foreign Keys and Interleaving. Handles multiple parents by selecting the
36+
* one with the *least* QPS. Populates the `children` list for each table and sets `isRoot`
37+
* accordingly.
38+
*
39+
* <p>Note: Circular dependencies are not supported right now.
40+
*
41+
* @param schema The input schema.
42+
* @return A new schema with DAG information populated.
43+
*/
44+
public static DataGeneratorSchema setSchemaDAG(DataGeneratorSchema schema) {
45+
Map<String, DataGeneratorTable> tableMap = schema.tables();
46+
Map<String, List<String>> parentToSequenceChild = new HashMap<>();
47+
Set<String> hasSequenceParent = new HashSet<>();
48+
49+
// 1. Build Dependency Chains for Each Table
50+
for (DataGeneratorTable childTable : tableMap.values()) {
51+
String childName = childTable.name();
52+
53+
// Collect Parents (Interleaved and FK)
54+
List<DataGeneratorTable> parents = new ArrayList<>();
55+
56+
if (childTable.interleavedInTable() != null) {
57+
String parentName = childTable.interleavedInTable();
58+
DataGeneratorTable parentTable = tableMap.get(parentName);
59+
if (parentTable != null) {
60+
parents.add(parentTable);
61+
}
62+
}
63+
64+
for (DataGeneratorForeignKey fk : childTable.foreignKeys()) {
65+
DataGeneratorTable parentTable = tableMap.get(fk.referencedTable());
66+
if (parentTable != null && !parents.contains(parentTable)) {
67+
parents.add(parentTable);
68+
}
69+
}
70+
71+
if (parents.isEmpty()) {
72+
continue; // No parents for this table
73+
}
74+
75+
// Sort Parents by QPS
76+
parents.sort(java.util.Comparator.comparingInt(DataGeneratorTable::insertQps));
77+
78+
// Chain the Parents: P1 -> P2 -> ... -> Pn -> Child
79+
for (int i = 0; i < parents.size() - 1; i++) {
80+
String currentParentName = parents.get(i).name();
81+
String nextParentName = parents.get(i + 1).name();
82+
// Avoid adding duplicate dependencies if a table is part of multiple chains
83+
List<String> currentChildren =
84+
parentToSequenceChild.computeIfAbsent(currentParentName, k -> new ArrayList<>());
85+
if (!currentChildren.contains(nextParentName)) {
86+
currentChildren.add(nextParentName);
87+
}
88+
hasSequenceParent.add(nextParentName);
89+
}
90+
91+
// Link the last parent in the chain to the child table
92+
String lastParentName = parents.get(parents.size() - 1).name();
93+
List<String> lastParentChildren =
94+
parentToSequenceChild.computeIfAbsent(lastParentName, k -> new ArrayList<>());
95+
if (!lastParentChildren.contains(childName)) {
96+
lastParentChildren.add(childName);
97+
}
98+
hasSequenceParent.add(childName);
99+
}
100+
101+
// 2. Update Tables with Sequence Children and isRoot
102+
ImmutableMap.Builder<String, DataGeneratorTable> newTablesBuilder = ImmutableMap.builder();
103+
for (DataGeneratorTable table : tableMap.values()) {
104+
String tableName = table.name();
105+
List<String> sequenceChildren =
106+
parentToSequenceChild.getOrDefault(tableName, ImmutableList.of());
107+
boolean isRoot = !hasSequenceParent.contains(tableName);
108+
109+
newTablesBuilder.put(
110+
tableName,
111+
table.toBuilder()
112+
.childTables(
113+
ImmutableList.copyOf(
114+
sequenceChildren)) // These are tables to generate AFTER this one
115+
.isRoot(isRoot)
116+
.build());
117+
}
118+
119+
return DataGeneratorSchema.builder().tables(newTablesBuilder.buildOrThrow()).build();
120+
}
121+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (C) 2026 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
/** Utility classes for Data Generator template. */
18+
package com.google.cloud.teleport.v2.templates.utils;

0 commit comments

Comments
 (0)