Skip to content

Commit d4255c3

Browse files
committed
Refactor YAML plan and task handling, enhance foreign key processing
- Removed obsolete integration test steps from GitHub Actions workflow. - Improved logging in StepDataCoordinator for better debugging during record generation. - Updated CardinalityCountAdjustmentProcessor to ensure only foreign key target steps are modified, preventing unintended changes. - Added new YAML plan and task files for account balances and transactions, including validation of foreign key relationships. - Introduced integration tests for YAML plan execution to verify record counts and foreign key integrity.
1 parent 0329083 commit d4255c3

9 files changed

Lines changed: 437 additions & 104 deletions

File tree

.github/workflows/check.yml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,3 @@ jobs:
2323
- name: Run gradle integration tests
2424
run: |
2525
./gradlew :app:integrationTest --info
26-
- name: Run intsa-integration tests
27-
id: tests
28-
uses: data-catering/insta-integration@v4
29-
- name: Print results
30-
run: |
31-
echo "Records generated: ${{ steps.tests.outputs.num_records_generated }}"
32-
echo "Successful validations: ${{ steps.tests.outputs.num_success_validations }}"
33-
echo "Failed validations: ${{ steps.tests.outputs.num_failed_validations }}"
34-
echo "Number of validations: ${{ steps.tests.outputs.num_validations }}"
35-
echo "Validation success rate: ${{ steps.tests.outputs.validation_success_rate }}"
36-
37-
if [ "${{ steps.tests.outputs.num_failed_validations }}" -gt 0 ]; then
38-
exit 1
39-
fi

app/src/main/scala/io/github/datacatering/datacaterer/core/generator/StepDataCoordinator.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,12 +211,14 @@ class StepDataCoordinator(
211211
targetNumRecords: Long,
212212
retries: Int
213213
): (DataFrame, Long) = {
214-
LOGGER.debug(s"Record count does not reach expected num records for batch, generating more records until reached, " +
215-
s"target-num-records=$targetNumRecords, actual-num-records=$currentRecordCount, num-retries=$retries, max-retries=$maxRetries")
216214

217215
if (targetNumRecords == currentRecordCount || retries >= maxRetries) {
216+
LOGGER.debug(s"Record count reaches expected num records for batch or reached max retries, stopping generation, " +
217+
s"target-num-records=$targetNumRecords, actual-num-records=$currentRecordCount, num-retries=$retries, max-retries=$maxRetries")
218218
(currentDf, currentRecordCount)
219219
} else {
220+
LOGGER.debug(s"Record count does not reach expected num records for batch, generating more records until reached, " +
221+
s"target-num-records=$targetNumRecords, actual-num-records=$currentRecordCount, num-retries=$retries, max-retries=$maxRetries")
220222
val (newDf, newRecordCount, newBaseRecordCount) = generateAdditionalRecords(
221223
batch, step, task, dataSourceStepName, stepRecords, currentDf, currentRecordCount, currentBaseRecordCount, targetNumRecords
222224
)

app/src/main/scala/io/github/datacatering/datacaterer/core/plan/CardinalityCountAdjustmentProcessor.scala

Lines changed: 97 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,14 @@ class CardinalityCountAdjustmentProcessor(val dataCatererConfiguration: DataCate
6565

6666
dataSourceNameOpt match {
6767
case Some(dataSourceName) =>
68-
// Check if this task is a target in any FK relationship with cardinality
68+
// Get all step names in this task to check if any are FK targets
69+
val taskStepNames = task.steps.map(_.name).toSet
70+
71+
// Check if any step in this task is a target in any FK relationship with cardinality
72+
// Must match BOTH dataSource AND step name to avoid incorrect matching
6973
val targetRelationOpt = enhancedForeignKeys
7074
.flatMap(_.generate)
71-
.find(target => target.dataSource == dataSourceName && target.cardinality.isDefined)
75+
.find(target => target.dataSource == dataSourceName && target.cardinality.isDefined && taskStepNames.contains(target.step))
7276

7377
targetRelationOpt match {
7478
case Some(targetRelation) =>
@@ -85,103 +89,109 @@ class CardinalityCountAdjustmentProcessor(val dataCatererConfiguration: DataCate
8589
LOGGER.debug(s"Adjusting task count due to cardinality: data-source=$dataSourceName, " +
8690
s"task=${task.name}, original-count=$originalCount, adjusted-count=$requiredCount")
8791

88-
// Update the count for all steps in this task
89-
// Also set up perField configuration to match the cardinality grouping
92+
// Update only steps that are FK targets with cardinality config
93+
// DO NOT modify steps that are not FK targets (like the source step in the same task)
9094
val updatedSteps = task.steps.map { step =>
9195
// Get the target relation for this step from the foreign key config
9296
val targetRelationOpt = enhancedForeignKeys
9397
.flatMap(_.generate)
9498
.find(target => target.dataSource == dataSourceName && target.step == step.name)
9599

96-
val fkFieldNames = targetRelationOpt.map(_.fields).getOrElse(List()).distinct
97-
98-
// Get the cardinality configuration from the target relation
99-
val cardinalityConfigOpt = targetRelationOpt.flatMap(_.cardinality)
100-
101-
// Get the source FK for this step
102-
val fkOpt = enhancedForeignKeys
103-
.find(fk => fk.generate.exists(g => g.dataSource == dataSourceName && g.step == step.name))
104-
105-
val sourceCount = fkOpt
106-
.map { fk =>
107-
tasksByDataSource.get(fk.source.dataSource)
108-
.flatMap(_.steps.headOption)
109-
.flatMap(_.count.records)
100+
// Only process steps that are actual FK targets
101+
targetRelationOpt match {
102+
case None =>
103+
// This step is NOT a FK target - leave it unchanged
104+
LOGGER.debug(s"Step ${step.name} is not a FK target, leaving count unchanged: ${step.count.records}")
105+
step
106+
107+
case Some(targetRel) =>
108+
val fkFieldNames = targetRel.fields.distinct
109+
val cardinalityConfigOpt = targetRel.cardinality
110+
111+
// Get the source FK for this step
112+
val fkOpt = enhancedForeignKeys
113+
.find(fk => fk.generate.exists(g => g.dataSource == dataSourceName && g.step == step.name))
114+
115+
val sourceCount = fkOpt
116+
.map { fk =>
117+
tasksByDataSource.get(fk.source.dataSource)
118+
.flatMap(_.steps.find(_.name == fk.source.step))
119+
.flatMap(_.count.records)
120+
.getOrElse(1L)
121+
}
110122
.getOrElse(1L)
111-
}
112-
.getOrElse(1L)
113-
114-
// Check if step originally had perField config on FK fields (before our processing)
115-
val hadOriginalPerField = step.count.perField.exists { pfc =>
116-
fkFieldNames.exists(pfc.fieldNames.contains)
117-
}
118123

119-
// Determine if we should set perField configuration
120-
// - If step HAD perField on FK fields: DON'T set it (causes double-grouping with random values)
121-
// - If step DIDN'T have perField: SET it (enables proper grouping during generation)
122-
val updatedCount = if (fkFieldNames.nonEmpty && cardinalityConfigOpt.isDefined && !hadOriginalPerField) {
123-
val cardinalityConfig = cardinalityConfigOpt.get
124-
125-
cardinalityConfig match {
126-
case config if config.min.isDefined && config.max.isDefined =>
127-
// Bounded: set perField with min/max options
128-
LOGGER.debug(s"Setting perField config for step ${step.name}: fields=${fkFieldNames.mkString(",")}, " +
129-
s"records=$sourceCount, min=${config.min.get}, max=${config.max.get}, distribution=${config.distribution}")
130-
step.count.copy(
131-
records = Some(sourceCount), // Use source count for bounded
132-
perField = Some(io.github.datacatering.datacaterer.api.model.PerFieldCount(
133-
fieldNames = fkFieldNames,
134-
count = None,
135-
options = Map(
136-
"min" -> config.min.get,
137-
"max" -> config.max.get,
138-
"distribution" -> config.distribution
124+
// Check if step originally had perField config on FK fields (before our processing)
125+
val hadOriginalPerField = step.count.perField.exists { pfc =>
126+
fkFieldNames.exists(pfc.fieldNames.contains)
127+
}
128+
129+
// Determine if we should set perField configuration
130+
// - If step HAD perField on FK fields: DON'T set it (causes double-grouping with random values)
131+
// - If step DIDN'T have perField: SET it (enables proper grouping during generation)
132+
val updatedCount = if (fkFieldNames.nonEmpty && cardinalityConfigOpt.isDefined && !hadOriginalPerField) {
133+
val cardinalityConfig = cardinalityConfigOpt.get
134+
135+
cardinalityConfig match {
136+
case config if config.min.isDefined && config.max.isDefined =>
137+
// Bounded: set perField with min/max options
138+
LOGGER.debug(s"Setting perField config for step ${step.name}: fields=${fkFieldNames.mkString(",")}, " +
139+
s"records=$sourceCount, min=${config.min.get}, max=${config.max.get}, distribution=${config.distribution}")
140+
step.count.copy(
141+
records = Some(sourceCount), // Use source count for bounded
142+
perField = Some(io.github.datacatering.datacaterer.api.model.PerFieldCount(
143+
fieldNames = fkFieldNames,
144+
count = None,
145+
options = Map(
146+
"min" -> config.min.get,
147+
"max" -> config.max.get,
148+
"distribution" -> config.distribution
149+
)
150+
))
139151
)
140-
))
141-
)
142-
143-
case config if config.ratio.isDefined =>
144-
// Ratio: set perField with fixed count
145-
// Use requiredCount for total records, perField count for records per parent
146-
val recordsPerParent = config.ratio.get.toInt
147-
LOGGER.debug(s"Setting perField config for step ${step.name}: fields=${fkFieldNames.mkString(",")}, " +
148-
s"records=$sourceCount, count=$recordsPerParent, distribution=${config.distribution}")
149-
150-
if (config.distribution == "uniform") {
151-
step.count.copy(
152-
records = Some(sourceCount),
153-
perField = Some(io.github.datacatering.datacaterer.api.model.PerFieldCount(
154-
fieldNames = fkFieldNames,
155-
count = Some(recordsPerParent.toLong)
156-
))
157-
)
158-
} else {
159-
step.count.copy(
160-
records = Some(sourceCount),
161-
perField = Some(io.github.datacatering.datacaterer.api.model.PerFieldCount(
162-
fieldNames = fkFieldNames,
163-
count = None,
164-
options = Map(
165-
"min" -> recordsPerParent,
166-
"max" -> recordsPerParent,
167-
"distribution" -> config.distribution
152+
153+
case config if config.ratio.isDefined =>
154+
// Ratio: set perField with fixed count
155+
val recordsPerParent = config.ratio.get.toInt
156+
LOGGER.debug(s"Setting perField config for step ${step.name}: fields=${fkFieldNames.mkString(",")}, " +
157+
s"records=$sourceCount, count=$recordsPerParent, distribution=${config.distribution}")
158+
159+
if (config.distribution == "uniform") {
160+
step.count.copy(
161+
records = Some(sourceCount),
162+
perField = Some(io.github.datacatering.datacaterer.api.model.PerFieldCount(
163+
fieldNames = fkFieldNames,
164+
count = Some(recordsPerParent.toLong)
165+
))
168166
)
169-
))
170-
)
171-
}
167+
} else {
168+
step.count.copy(
169+
records = Some(sourceCount),
170+
perField = Some(io.github.datacatering.datacaterer.api.model.PerFieldCount(
171+
fieldNames = fkFieldNames,
172+
count = None,
173+
options = Map(
174+
"min" -> recordsPerParent,
175+
"max" -> recordsPerParent,
176+
"distribution" -> config.distribution
177+
)
178+
))
179+
)
180+
}
172181

173-
case _ =>
174-
step.count.copy(records = Some(sourceCount), perField = None)
175-
}
176-
} else if (hadOriginalPerField) {
177-
// Step had original perField on FK fields - remove it to avoid double-grouping
178-
LOGGER.debug(s"Removing original perField config from step ${step.name} to avoid double-grouping (FK fields: ${fkFieldNames.mkString(",")})")
179-
step.count.copy(records = Some(requiredCount), perField = None)
180-
} else {
181-
step.count.copy(records = Some(requiredCount))
182+
case _ =>
183+
step.count.copy(records = Some(sourceCount), perField = None)
184+
}
185+
} else if (hadOriginalPerField) {
186+
// Step had original perField on FK fields - remove it to avoid double-grouping
187+
LOGGER.debug(s"Removing original perField config from step ${step.name} to avoid double-grouping (FK fields: ${fkFieldNames.mkString(",")})")
188+
step.count.copy(records = Some(requiredCount), perField = None)
189+
} else {
190+
step.count.copy(records = Some(requiredCount))
191+
}
192+
193+
step.copy(count = updatedCount)
182194
}
183-
184-
step.copy(count = updatedCount)
185195
}
186196
task.copy(steps = updatedSteps)
187197
} else {
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
name: "account_balance_and_transactions_create_plan"
3+
description: "Create balances and transactions in Parquet files"
4+
tasks: []
5+
sinkOptions:
6+
foreignKeys:
7+
- source:
8+
dataSource: "parquet_ds"
9+
step: "balances"
10+
fields:
11+
- "account_number"
12+
generate:
13+
- dataSource: "parquet_ds"
14+
step: "transactions"
15+
fields:
16+
- "account_number"
17+
delete: []
18+
validations: []
19+
runId: "92f4fb44-c6cc-41db-9a42-3988c08c1254"
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
name: "parquet_balance_and_transactions_create_plan"
2+
description: "Create balances and transactions in Parquet files"
3+
tasks:
4+
- name: "parquet_balance_and_transactions"
5+
dataSourceName: "parquet"
6+
7+
sinkOptions:
8+
foreignKeys:
9+
- source:
10+
dataSource: "parquet"
11+
step: "balances"
12+
fields: [ "account_number" ]
13+
generate:
14+
- dataSource: "parquet"
15+
step: "transactions"
16+
fields: [ "account_number" ]
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
name: "test_plan_500"
3+
description: "Test with 500 balances"
4+
tasks: []
5+
sinkOptions:
6+
foreignKeys:
7+
- source:
8+
dataSource: "parquet_ds"
9+
step: "balances"
10+
fields:
11+
- "account_number"
12+
generate:
13+
- dataSource: "parquet_ds"
14+
step: "transactions"
15+
fields:
16+
- "account_number"
17+
delete: []
18+
validations: []
19+
runId: "068a8494-0dd4-4ac3-8022-c23fe04867c8"
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
name: "parquet_balance_and_transactions"
2+
steps:
3+
- name: "balances"
4+
type: "parquet"
5+
count:
6+
records: 1000
7+
options:
8+
path: "/tmp/data-caterer-parquet-fk-test/balances"
9+
fields:
10+
- name: "account_number"
11+
options:
12+
regex: "ACC1[0-9]{5,10}"
13+
isUnique: true
14+
- name: "create_time"
15+
type: "timestamp"
16+
- name: "account_status"
17+
type: "string"
18+
options:
19+
oneOf:
20+
- "open"
21+
- "closed"
22+
- "suspended"
23+
- name: "balance"
24+
type: "double"
25+
- name: "transactions"
26+
type: "parquet"
27+
count:
28+
perField:
29+
fieldNames:
30+
- "account_number"
31+
count: 5
32+
options:
33+
path: "/tmp/data-caterer-parquet-fk-test/transactions"
34+
fields:
35+
- name: "account_number"
36+
- name: "create_time"
37+
type: "timestamp"
38+
- name: "transaction_id"
39+
options:
40+
regex: "txn-[0-9]{10}"
41+
- name: "amount"
42+
type: "double"

app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ steps:
99
fields:
1010
- name: "account_number"
1111
options:
12-
regex: "ACC1[0-9]{5,10}"
12+
regex: "ACC1[0-9]{10}"
1313
- name: "create_time"
1414
type: "timestamp"
1515
- name: "account_status"

0 commit comments

Comments
 (0)