Skip to content

Commit 0329083

Browse files
committed
Enhance foreign key strategies with deterministic behavior using hash-based approaches
- Updated ForeignKeyEndToEndIntegrationTest to assert expected null rows for foreign keys based on deterministic hash values. - Modified CardinalityStrategy, DistributedSamplingStrategy, GenerationModeStrategy, and NullabilityStrategy to utilize hash-based methods for consistent results across different Spark environments. - Introduced SimplePercentileCalculator for efficient percentile calculations in performance metrics, replacing the deprecated T-Digest. - Added tests for new deterministic behaviors in foreign key strategies and updated existing tests for consistency. - Improved DataGenerator to support deterministic SQL generation with seed-based hash functions.
1 parent a6f8e4c commit 0329083

29 files changed

Lines changed: 1324 additions & 391 deletions

app/src/integrationTest/scala/io/github/datacatering/datacaterer/core/foreignkey/ForeignKeyEndToEndIntegrationTest.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -393,8 +393,14 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
393393

394394
// Count null FKs (violations)
395395
val nullCount = updatedReviewsDf.filter(updatedReviewsDf("product_id").isNull).count()
396-
397-
// With seed=1 and 25% ratio, expect exactly 4 nulls out of 12 records
396+
val nullRowIds = updatedReviewsDf.filter(updatedReviewsDf("product_id").isNull)
397+
.select("review_id").collect().map(_.getString(0)).sorted.toList
398+
399+
// With seed=1 and 25% nullability ratio on 12 records, we get exactly these null rows
400+
// This verifies the hash-based approach is deterministic across environments
401+
val expectedNullRows = List("REV004", "REV007", "REV008", "REV011")
402+
assert(nullRowIds == expectedNullRows,
403+
s"Expected exactly $expectedNullRows to be null with seed=1, but got $nullRowIds")
398404
assert(nullCount == 4, s"Expected exactly 4 nulls with seed=1, got $nullCount")
399405

400406
// Verify non-null FKs are valid
@@ -565,11 +571,15 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
565571

566572
// Count nulls
567573
val nullCount = updatedSalesDf.filter(updatedSalesDf("store_id").isNull).count()
568-
569-
// With seed=12349 and 20% ratio, we expect around 2 nulls out of 10 records
570-
// Exact count depends on seed randomness
571-
assert(nullCount >= 0 && nullCount <= 4,
572-
s"Expected 0-4 nulls with 20% ratio (seed variance), got $nullCount")
574+
val nullRowIds = updatedSalesDf.filter(updatedSalesDf("store_id").isNull)
575+
.select("sale_id").collect().map(_.getString(0)).sorted.toList
576+
577+
// With seed=12349 and 20% nullability ratio on 10 records, we get exactly these null rows
578+
// This verifies the hash-based approach is deterministic across environments
579+
val expectedNullRows = List("SALE001", "SALE004")
580+
assert(nullRowIds == expectedNullRows,
581+
s"Expected exactly $expectedNullRows to be null with seed=12349, but got $nullRowIds")
582+
assert(nullCount == 2, s"Expected exactly 2 nulls with seed=12349, got $nullCount")
573583

574584
// Non-null values should be valid store IDs
575585
val validStoreIds = storesDf.select("store_id").collect().map(_.getString(0)).toSet

app/src/main/scala/io/github/datacatering/datacaterer/core/foreignkey/strategy/CardinalityStrategy.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ class CardinalityStrategy extends ForeignKeyStrategy {
2424

2525
private val LOGGER = Logger.getLogger(getClass.getName)
2626

27+
/** Minimum source count required for modulo operations to avoid division by zero */
28+
private val MIN_SOURCE_COUNT_FOR_MODULO = 1L
29+
2730
override def name: String = "CardinalityStrategy"
2831

2932
/**
@@ -102,6 +105,12 @@ class CardinalityStrategy extends ForeignKeyStrategy {
102105

103106
LOGGER.info(s"Source has $sourceCount distinct parent records")
104107

108+
// Guard against empty source DataFrame to prevent division by zero in modulo operations
109+
if (sourceCount == 0) {
110+
LOGGER.warn("Source DataFrame has no records - cannot apply cardinality. Returning target DataFrame unchanged.")
111+
return targetDf
112+
}
113+
105114
// Check if target has perField config that creates grouping structure
106115
// If so, use group-based approach which preserves the generated groups
107116
val hasMatchingPerFieldConfig = targetPerFieldCount.exists { pfc =>
@@ -127,8 +136,10 @@ class CardinalityStrategy extends ForeignKeyStrategy {
127136
1.0
128137
}
129138

130-
LOGGER.info(s"Using INDEX-BASED approach: assigning FKs by row position (${recordsPerParent} records per parent)")
131-
applyCardinalityWithIndex(sourceDf, targetDf, sourceFields, targetFields, sourceCount, recordsPerParent.toLong)
139+
// Use ceil to match calculateRequiredCount behavior and avoid generating fewer records than expected
140+
val recordsPerParentCeiled = math.ceil(recordsPerParent).toLong
141+
LOGGER.info(s"Using INDEX-BASED approach: assigning FKs by row position ($recordsPerParentCeiled records per parent)")
142+
applyCardinalityWithIndex(sourceDf, targetDf, sourceFields, targetFields, sourceCount, recordsPerParentCeiled)
132143
}
133144
}
134145

app/src/main/scala/io/github/datacatering/datacaterer/core/foreignkey/strategy/DistributedSamplingStrategy.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,17 @@ class DistributedSamplingStrategy extends ForeignKeyStrategy {
6868
.withColumn("_fk_idx", row_number().over(windowSpec) - 1)
6969

7070
// Assign random index to each target row (0 to sourceCount-1)
71-
val randExpr = config.seed.map(s => rand(s)).getOrElse(rand())
72-
val targetWithIndex = targetDf
73-
.withColumn("_fk_idx", floor(randExpr * sourceCount).cast(LongType))
71+
// Use hash-based approach when seed is provided for deterministic behavior across environments
72+
// (Spark's rand(seed) is partition-dependent and not truly deterministic)
73+
val targetWithIndex = config.seed match {
74+
case Some(s) =>
75+
val allCols = targetDf.columns.map(col)
76+
val hashExpr = xxhash64(allCols :+ lit(s): _*)
77+
// Use absolute hash value modulo sourceCount for uniform distribution
78+
targetDf.withColumn("_fk_idx", abs(hashExpr) % sourceCount)
79+
case None =>
80+
targetDf.withColumn("_fk_idx", floor(rand() * sourceCount).cast(LongType))
81+
}
7482

7583
// Rename source fields to avoid ambiguity
7684
val renamedSource = sourceFields.foldLeft(sourceWithIndex) { case (df, field) =>

app/src/main/scala/io/github/datacatering/datacaterer/core/foreignkey/strategy/GenerationModeStrategy.scala

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,34 @@ class GenerationModeStrategy(generationMode: String = "all-exist") extends Forei
125125
val shouldInvalidate = (col("_combination_id") % totalCombinations).bitwiseAND(1 << fieldIdx) === 0
126126

127127
// Generate random invalid values for this field
128+
// Use hash-based approach when seed is provided for deterministic behavior across environments
129+
// (Spark's rand(seed) is partition-dependent and not truly deterministic)
128130
val dataType = result.schema(targetField).dataType
129-
val randExpr = relation.config.seed.map(s => rand(s)).getOrElse(rand())
130131
val invalidValue = dataType match {
131132
case StringType =>
132133
// Use deterministic hash-based approach when seed is available
133134
relation.config.seed match {
134-
case Some(s) => concat(lit("INVALID_"), expr(s"MD5(CONCAT('$s', CAST(monotonically_increasing_id() AS STRING)))"))
135+
case Some(s) =>
136+
val allCols = result.columns.map(col)
137+
concat(lit("INVALID_"), substring(md5(concat(allCols :+ lit(s): _*)), 1, 8))
135138
case None => concat(lit("INVALID_"), expr("uuid()"))
136139
}
137-
case IntegerType => (randExpr * 999999999).cast(IntegerType)
138-
case LongType => (randExpr * 999999999999L).cast(LongType)
140+
case IntegerType =>
141+
relation.config.seed match {
142+
case Some(s) =>
143+
val allCols = result.columns.map(col)
144+
val hashExpr = xxhash64(allCols :+ lit(s) :+ lit(fieldIdx): _*)
145+
(abs(hashExpr) % 999999999).cast(IntegerType)
146+
case None => (rand() * 999999999).cast(IntegerType)
147+
}
148+
case LongType =>
149+
relation.config.seed match {
150+
case Some(s) =>
151+
val allCols = result.columns.map(col)
152+
val hashExpr = xxhash64(allCols :+ lit(s) :+ lit(fieldIdx): _*)
153+
abs(hashExpr) % 999999999999L
154+
case None => (rand() * 999999999999L).cast(LongType)
155+
}
139156
case _ => lit(null).cast(dataType)
140157
}
141158

app/src/main/scala/io/github/datacatering/datacaterer/core/foreignkey/strategy/NullabilityStrategy.scala

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,24 @@ class NullabilityStrategy extends PostProcessingStrategy {
9292
}
9393

9494
// Add a column to determine which records get null FKs
95+
// For deterministic behavior with seed, we use a hash-based approach instead of rand()
96+
// because Spark's rand(seed) is partition-dependent and not truly deterministic across environments
9597
val withNullFlag = strategy match {
9698
case "random" =>
97-
val randExpr = uniqueSeed.map(s => rand(s)).getOrElse(rand())
98-
targetDf.withColumn("_should_null_fk", randExpr < percentage)
99+
uniqueSeed match {
100+
case Some(s) =>
101+
// Use hash-based deterministic selection: hash all columns + seed, then check if < percentage
102+
// This ensures the same rows are selected regardless of partitioning
103+
val allCols = targetDf.columns.map(col)
104+
// Use xxhash64 for better distribution (returns Long), then normalize to [0, 1)
105+
val hashExpr = xxhash64(allCols :+ lit(s): _*)
106+
// Convert to unsigned by bitwise AND with max long, then normalize
107+
val normalizedHash = (hashExpr.bitwiseAND(lit(Long.MaxValue))).cast("double") / lit(Long.MaxValue.toDouble)
108+
targetDf.withColumn("_should_null_fk", normalizedHash < percentage)
109+
case None =>
110+
// No seed provided - use non-deterministic rand()
111+
targetDf.withColumn("_should_null_fk", rand() < percentage)
112+
}
99113

100114
case "head" =>
101115
// First N% of records get null FKs
@@ -117,8 +131,15 @@ class NullabilityStrategy extends PostProcessingStrategy {
117131

118132
case _ =>
119133
LOGGER.warn(s"Unknown nullability strategy: $strategy, using random")
120-
val randExpr = uniqueSeed.map(s => rand(s)).getOrElse(rand())
121-
targetDf.withColumn("_should_null_fk", randExpr < percentage)
134+
uniqueSeed match {
135+
case Some(s) =>
136+
val allCols = targetDf.columns.map(col)
137+
val hashExpr = xxhash64(allCols :+ lit(s): _*)
138+
val normalizedHash = (hashExpr.bitwiseAND(lit(Long.MaxValue))).cast("double") / lit(Long.MaxValue.toDouble)
139+
targetDf.withColumn("_should_null_fk", normalizedHash < percentage)
140+
case None =>
141+
targetDf.withColumn("_should_null_fk", rand() < percentage)
142+
}
122143
}
123144

124145
// Apply nulls to target fields

app/src/main/scala/io/github/datacatering/datacaterer/core/generator/execution/PatternBasedExecutionStrategy.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ class PatternBasedExecutionStrategy(
1717
) extends ExecutionStrategy {
1818

1919
private val LOGGER = Logger.getLogger(getClass.getName)
20+
21+
/**
22+
* Threshold for rate change detection. Only update the rate limiter when the rate
23+
* changes by more than this fraction (10%) to avoid excessive rate limiter recreation.
24+
*/
25+
private val RATE_CHANGE_THRESHOLD = 0.1
2026
private val metricsCollector = new PerformanceMetricsCollector()
2127

2228
// Extract pattern configuration from first step with pattern configured
@@ -85,9 +91,9 @@ class PatternBasedExecutionStrategy(
8591
val elapsedSeconds = durationTracker.getElapsedTimeMs / 1000.0
8692
val targetRate = loadPattern.getRateAt(elapsedSeconds, totalDurationSeconds)
8793

88-
// Only create a new rate limiter if the rate has changed significantly (>10% change or first time)
94+
// Only create a new rate limiter if the rate has changed significantly or this is the first time
8995
val shouldUpdate = currentRateLimiter.isEmpty ||
90-
math.abs(targetRate - currentRate).toDouble / currentRate > 0.1
96+
math.abs(targetRate - currentRate).toDouble / currentRate > RATE_CHANGE_THRESHOLD
9197

9298
if (shouldUpdate) {
9399
currentRate = targetRate

app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metrics/PerformanceMetrics.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ case class PerformanceMetrics(
7676

7777
/**
7878
* Calculate percentile using exact or approximate method based on dataset size.
79-
* For large datasets (>100k samples), uses T-Digest for memory efficiency.
79+
* For large datasets (>100k samples), uses SimplePercentileCalculator for memory efficiency.
8080
* For smaller datasets, uses exact sorting.
8181
* Phase 3 optimization.
8282
*/
@@ -85,10 +85,10 @@ case class PerformanceMetrics(
8585

8686
val latencies = batchMetrics.map(_.batchDurationMs.toDouble)
8787

88-
// Use T-Digest for large datasets (Phase 3 optimization)
89-
if (latencies.size > TDigest.LARGE_DATASET_THRESHOLD) {
90-
val digest = TDigest.fromValues(latencies)
91-
digest.quantile(percentile)
88+
// Use SimplePercentileCalculator for large datasets (Phase 3 optimization)
89+
if (latencies.size > SimplePercentileCalculator.LARGE_DATASET_THRESHOLD) {
90+
val calculator = SimplePercentileCalculator.fromValues(latencies)
91+
calculator.quantile(percentile)
9292
} else {
9393
// Exact calculation for smaller datasets
9494
val sorted = latencies.sorted

0 commit comments

Comments
 (0)