@@ -20,12 +20,12 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
2020 // ========================================================================================
2121
2222 test(" E2E: Ratio-based cardinality with uniform distribution - full flow" ) {
23- // Step 1: Define plan with cardinality
23+ // Step 1: Define plan with cardinality at target level
2424 val foreignKeys = List (ForeignKey (
2525 ForeignKeyRelation (" accounts" , " accounts_table" , List (" account_id" )),
26- List (ForeignKeyRelation (" transactions" , " transactions_table" , List (" account_id" ))) ,
27- List ( ),
28- cardinality = Some ( CardinalityConfig (ratio = Some ( 5.0 ), distribution = " uniform " ) )
26+ List (ForeignKeyRelation (" transactions" , " transactions_table" , List (" account_id" ),
27+ cardinality = Some ( CardinalityConfig (ratio = Some ( 5.0 ), distribution = " uniform " ))) ),
28+ List ( )
2929 ))
3030
3131 val sinkOptions = SinkOptions (Some (" 12345" ), None , foreignKeys)
@@ -58,13 +58,15 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
5858 val (adjustedPlan, adjustedTasks, _) = processor.apply(plan, tasks, validations)
5959
6060 // Verify count was adjusted: 3 accounts * 5 ratio = 15 transactions
61+ // With perField, records is set to SOURCE count (3), and perField count is set to ratio (5)
62+ // This generates 3 * 5 = 15 total records
6163 val adjustedTransactionStep = adjustedTasks
6264 .find(_.name == " transaction_task" )
6365 .flatMap(_.steps.headOption)
6466 .get
6567
66- assert(adjustedTransactionStep.count.records.contains(15 ),
67- s " Transaction count should be adjusted to 15 , got ${adjustedTransactionStep.count.records}" )
68+ assert(adjustedTransactionStep.count.records.contains(3 ),
69+ s " Transaction count should be set to source count (3) , got ${adjustedTransactionStep.count.records}" )
6870
6971 // Verify perField was set
7072 assert(adjustedTransactionStep.count.perField.isDefined,
@@ -115,7 +117,6 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
115117 val result = ForeignKeyUtil .getDataFramesWithForeignKeys(
116118 adjustedPlan,
117119 dfMap,
118- useV2 = true ,
119120 executableTasks = Some (executableTasks)
120121 )
121122
@@ -143,12 +144,12 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
143144 }
144145
145146 test(" E2E: Bounded cardinality (min/max) - full flow" ) {
146- // Step 1: Define plan with bounded cardinality
147+ // Step 1: Define plan with bounded cardinality at target level
147148 val foreignKeys = List (ForeignKey (
148149 ForeignKeyRelation (" authors" , " authors_table" , List (" author_id" )),
149- List (ForeignKeyRelation (" articles" , " articles_table" , List (" author_id" ))) ,
150- List ( ),
151- cardinality = Some ( CardinalityConfig (min = Some ( 2 ), max = Some ( 4 ), distribution = " uniform " ) )
150+ List (ForeignKeyRelation (" articles" , " articles_table" , List (" author_id" ),
151+ cardinality = Some ( CardinalityConfig (min = Some ( 2 ), max = Some ( 4 ), distribution = " uniform " ))) ),
152+ List ( )
152153 ))
153154
154155 val sinkOptions = SinkOptions (Some (" 12346" ), None , foreignKeys)
@@ -190,8 +191,8 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
190191 assert(adjustedArticleStep.count.perField.isDefined, " PerField should be set" )
191192 val perField = adjustedArticleStep.count.perField.get
192193 assert(perField.fieldNames.contains(" author_id" ), " PerField should include author_id" )
193- assert(perField.options.get(" min" ) == Some (2 ), " PerField min should be 2" )
194- assert(perField.options.get(" max" ) == Some (4 ), " PerField max should be 4" )
194+ assert(perField.options.get(" min" ).contains (2 ), " PerField min should be 2" )
195+ assert(perField.options.get(" max" ).contains (4 ), " PerField max should be 4" )
195196
196197 // Step 3: Simulate data generation with perField grouping (varying counts 2-4)
197198 val authorsDf = sparkSession.createDataFrame(Seq (
@@ -227,7 +228,6 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
227228 val result = ForeignKeyUtil .getDataFramesWithForeignKeys(
228229 adjustedPlan,
229230 dfMap,
230- useV2 = true ,
231231 executableTasks = Some (executableTasks)
232232 )
233233
@@ -257,10 +257,10 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
257257 test(" E2E: Cardinality with all-exist mode - all FKs valid, cardinality preserved" ) {
258258 val foreignKeys = List (ForeignKey (
259259 ForeignKeyRelation (" customers" , " customers_table" , List (" customer_id" )),
260- List (ForeignKeyRelation (" orders" , " orders_table" , List (" customer_id" ))) ,
261- List ( ),
262- cardinality = Some ( CardinalityConfig (ratio = Some (2.0 ), distribution = " uniform " )),
263- generationMode = Some ( " all-exist " )
260+ List (ForeignKeyRelation (" orders" , " orders_table" , List (" customer_id" ),
261+ cardinality = Some ( CardinalityConfig (ratio = Some ( 2.0 ), distribution = " uniform " ) ),
262+ generationMode = Some (" all-exist " ) )),
263+ List ( )
264264 ))
265265
266266 val sinkOptions = SinkOptions (Some (" 12347" ), None , foreignKeys)
@@ -313,7 +313,6 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
313313 val result = ForeignKeyUtil .getDataFramesWithForeignKeys(
314314 adjustedPlan,
315315 dfMap,
316- useV2 = true ,
317316 executableTasks = Some (executableTasks)
318317 )
319318
@@ -339,11 +338,11 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
339338 test(" E2E: Cardinality with partial mode - introduces violations while preserving cardinality" ) {
340339 val foreignKeys = List (ForeignKey (
341340 ForeignKeyRelation (" products" , " products_table" , List (" product_id" )),
342- List (ForeignKeyRelation (" reviews" , " reviews_table" , List (" product_id" ))) ,
343- List ( ),
344- cardinality = Some ( CardinalityConfig (ratio = Some (3.0 ), distribution = " uniform " )),
345- nullability = Some (NullabilityConfig ( 0.25 , " random " )),
346- generationMode = Some ( " partial " )
341+ List (ForeignKeyRelation (" reviews" , " reviews_table" , List (" product_id" ),
342+ cardinality = Some ( CardinalityConfig (ratio = Some ( 3.0 ), distribution = " uniform " ) ),
343+ nullability = Some (NullabilityConfig ( 0.25 , " random " )),
344+ generationMode = Some (" partial " ) )),
345+ List ( )
347346 ))
348347
349348 val sinkOptions = SinkOptions (Some (" 1" ), None , foreignKeys)
@@ -403,7 +402,6 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
403402 val result = ForeignKeyUtil .getDataFramesWithForeignKeys(
404403 adjustedPlan,
405404 dfMap,
406- useV2 = true ,
407405 executableTasks = Some (executableTasks)
408406 )
409407
@@ -441,9 +439,9 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
441439 test(" E2E: Composite key cardinality - full flow" ) {
442440 val foreignKeys = List (ForeignKey (
443441 ForeignKeyRelation (" locations" , " locations_table" , List (" country" , " state" )),
444- List (ForeignKeyRelation (" stores" , " stores_table" , List (" country" , " state" ))) ,
445- List ( ),
446- cardinality = Some ( CardinalityConfig (ratio = Some ( 3.0 ), distribution = " uniform " ) )
442+ List (ForeignKeyRelation (" stores" , " stores_table" , List (" country" , " state" ),
443+ cardinality = Some ( CardinalityConfig (ratio = Some ( 3.0 ), distribution = " uniform " ))) ),
444+ List ( )
447445 ))
448446
449447 val sinkOptions = SinkOptions (Some (" 12348" ), None , foreignKeys)
@@ -496,7 +494,6 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
496494 val result = ForeignKeyUtil .getDataFramesWithForeignKeys(
497495 adjustedPlan,
498496 dfMap,
499- useV2 = true ,
500497 executableTasks = Some (executableTasks)
501498 )
502499
@@ -525,9 +522,9 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
525522 test(" E2E: FK with nullability (no cardinality) - standard processing" ) {
526523 val foreignKeys = List (ForeignKey (
527524 ForeignKeyRelation (" stores" , " stores_table" , List (" store_id" )),
528- List (ForeignKeyRelation (" sales" , " sales_table" , List (" store_id" ))) ,
529- List ( ),
530- nullability = Some ( NullabilityConfig ( 0.2 , " random " ) )
525+ List (ForeignKeyRelation (" sales" , " sales_table" , List (" store_id" ),
526+ nullability = Some ( NullabilityConfig ( 0.2 , " random " ))) ),
527+ List ( )
531528 ))
532529
533530 val sinkOptions = SinkOptions (Some (" 12349" ), None , foreignKeys)
@@ -585,7 +582,6 @@ class ForeignKeyEndToEndIntegrationTest extends SparkSuite {
585582 val result = ForeignKeyUtil .getDataFramesWithForeignKeys(
586583 adjustedPlan,
587584 dfMap,
588- useV2 = true ,
589585 executableTasks = None // No perField
590586 )
591587
0 commit comments