@@ -24,36 +24,49 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
2424 metadataConfig : MetadataConfig , flagsConfig : FlagsConfig , generationConfig : GenerationConfig , streamingConfig : StreamingConfig = StreamingConfig ())(implicit sparkSession : SparkSession ) {
2525
2626 private val LOGGER = Logger .getLogger(getClass.getName)
27- private lazy val sinkFactory = new SinkFactory (flagsConfig, metadataConfig, foldersConfig, streamingConfig)
27+ private var sinkFactoryOpt : Option [SinkFactory ] = None
28+ private def sinkFactory : SinkFactory = {
29+ sinkFactoryOpt match {
30+ case Some (existing) => existing
31+ case None =>
32+ val created = new SinkFactory (flagsConfig, metadataConfig, foldersConfig, streamingConfig)
33+ sinkFactoryOpt = Some (created)
34+ created
35+ }
36+ }
2837 private lazy val sinkRouter = new SinkRouter ()
2938 private lazy val recordTrackingProcessor = new RecordTrackingProcessor (foldersConfig.recordTrackingFolderPath)
3039 private lazy val validationRecordTrackingProcessor = new RecordTrackingProcessor (foldersConfig.recordTrackingForValidationFolderPath)
3140
3241 def splitAndProcess (plan : Plan , executableTasks : List [(TaskSummary , Task )], optValidations : Option [List [ValidationConfiguration ]])
3342 (implicit sparkSession : SparkSession ): (List [DataSourceResult ], Option [PerformanceMetrics ]) = {
34- val faker = getDataFaker(plan)
35- val dataGeneratorFactory = new DataGeneratorFactory (faker, flagsConfig.enableFastGeneration)
36- val uniqueFieldUtil = new UniqueFieldsUtil (plan, executableTasks, flagsConfig.enableUniqueCheckOnlyInBatch, generationConfig)
37-
38- // Create StepDataCoordinator for data generation
39- val stepDataCoordinator = new StepDataCoordinator (dataGeneratorFactory, uniqueFieldUtil, connectionConfigsByName, flagsConfig)
40-
41- // Create execution strategy
42- val executionStrategy = ExecutionStrategyFactory .create(plan, executableTasks, generationConfig)
43-
44- // Route to appropriate execution mode based on strategy
45- executionStrategy.getGenerationMode match {
46- case GenerationMode .Batched =>
47- LOGGER .info(" Using batched generation mode" )
48- executeBatchedGeneration(plan, executableTasks, optValidations, stepDataCoordinator, executionStrategy)
49-
50- case GenerationMode .AllUpfront =>
51- LOGGER .info(" Using all-upfront generation mode for streaming execution" )
52- executeAllUpfrontGeneration(plan, executableTasks, optValidations, stepDataCoordinator, executionStrategy)
53-
54- case GenerationMode .Progressive =>
55- LOGGER .warn(" Progressive generation mode not yet implemented, falling back to batched mode" )
56- executeBatchedGeneration(plan, executableTasks, optValidations, stepDataCoordinator, executionStrategy)
43+ try {
44+ val faker = getDataFaker(plan)
45+ val dataGeneratorFactory = new DataGeneratorFactory (faker, flagsConfig.enableFastGeneration)
46+ val uniqueFieldUtil = new UniqueFieldsUtil (plan, executableTasks, flagsConfig.enableUniqueCheckOnlyInBatch, generationConfig)
47+
48+ // Create StepDataCoordinator for data generation
49+ val stepDataCoordinator = new StepDataCoordinator (dataGeneratorFactory, uniqueFieldUtil, connectionConfigsByName, flagsConfig)
50+
51+ // Create execution strategy
52+ val executionStrategy = ExecutionStrategyFactory .create(plan, executableTasks, generationConfig)
53+
54+ // Route to appropriate execution mode based on strategy
55+ executionStrategy.getGenerationMode match {
56+ case GenerationMode .Batched =>
57+ LOGGER .info(" Using batched generation mode" )
58+ executeBatchedGeneration(plan, executableTasks, optValidations, stepDataCoordinator, executionStrategy)
59+
60+ case GenerationMode .AllUpfront =>
61+ LOGGER .info(" Using all-upfront generation mode for streaming execution" )
62+ executeAllUpfrontGeneration(plan, executableTasks, optValidations, stepDataCoordinator, executionStrategy)
63+
64+ case GenerationMode .Progressive =>
65+ LOGGER .warn(" Progressive generation mode not yet implemented, falling back to batched mode" )
66+ executeBatchedGeneration(plan, executableTasks, optValidations, stepDataCoordinator, executionStrategy)
67+ }
68+ } finally {
69+ shutdownSinkFactory()
5770 }
5871 }
5972
@@ -274,73 +287,77 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
274287 val dataSourcesUsedInValidation = getDataSourcesUsedInValidation(optValidations)
275288 val pekkoStreamingWriter = new PekkoStreamingSinkWriter (foldersConfig, streamingConfig)
276289
277- generatedData.flatMap { case (dataSourceStepName, df) =>
278- val dataSourceName = dataSourceStepName.split(" \\ ." ).head
279- val (step, task) = stepAndTaskByDataSourceName(dataSourceStepName)
280- val dataSourceConfig = connectionConfigsByName.getOrElse(dataSourceName, Map ())
281-
282- // Skip reference mode steps - they should not be saved to sinks
283- val isReferenceMode = step.options.get(ENABLE_REFERENCE_MODE ).map(_.toBoolean).getOrElse(DEFAULT_ENABLE_REFERENCE_MODE )
284- if (isReferenceMode) {
285- LOGGER .debug(s " Skipping save for reference data source, data-source= $dataSourceName, step-name= ${step.name}" )
286- None
287- } else {
288- val stepWithConfig = step.copy(options = dataSourceConfig ++ step.options)
289- val format = stepWithConfig.options.getOrElse(" format" , throw new IllegalArgumentException (s " No format specified for $dataSourceName" ))
290-
291- // Add rate information to options for router decision
292- val optionsWithRate = executionStrategy match {
293- case dbs : DurationBasedExecutionStrategy if dbs.getTargetRate.isDefined =>
294- stepWithConfig.options + (" hasRateControl" -> " true" )
295- case _ : PatternBasedExecutionStrategy =>
296- stepWithConfig.options + (" hasRateControl" -> " true" )
297- case _ => stepWithConfig.options
298- }
299-
300- // Determine sink strategy using router
301- val sinkStrategy = sinkRouter.determineSinkStrategy(format, executionStrategy.getGenerationMode, optionsWithRate)
302-
303- LOGGER .info(s " Routing to sink, data-source= $dataSourceName, format= $format, strategy= $sinkStrategy" )
304-
305- // Apply record tracking if enabled
306- if (flagsConfig.enableRecordTracking) {
307- recordTrackingProcessor.trackRecords(df, dataSourceName, plan.name, stepWithConfig)
308- }
309- if (dataSourcesUsedInValidation.contains(dataSourceName)) {
310- validationRecordTrackingProcessor.trackRecords(df, dataSourceName, plan.name, stepWithConfig)
311- }
312-
313- sinkStrategy match {
314- case SinkStrategy .BatchSink =>
315- // Use standard batch writer
316- val sinkResult = sinkFactory.pushToSink(df, dataSourceName, stepWithConfig, startTime, isMultiBatch = false , isLastBatch = true )
317- Some (DataSourceResult (dataSourceName, task, stepWithConfig, sinkResult, 1 ))
318-
319- case SinkStrategy .StreamingSink =>
320- // Use Pekko streaming writer with rate control
321- val (rate, rateFunction, totalDuration) = executionStrategy match {
322- case dbs : DurationBasedExecutionStrategy =>
323- (dbs.getTargetRate.getOrElse(1 ), None , None )
324- case pbs : PatternBasedExecutionStrategy =>
325- // For pattern-based, pass rate function for dynamic control
326- val avgRate = pbs.getAverageRate
327- val rateFn = pbs.getRateFunction
328- val duration = pbs.getDurationSeconds
329- LOGGER .info(s " Using pattern-based streaming with dynamic rate control, data-source= $dataSourceName, average-rate= $avgRate/sec, duration= ${duration}s " )
330- (avgRate, Some (rateFn), Some (duration))
331- case _ => (1 , None , None )
332- }
333- if (rateFunction.isEmpty) {
334- LOGGER .info(s " Using streaming sink with rate control, data-source= $dataSourceName, rate= $rate/sec " )
335- }
336- val (sinkResult, metrics) = pekkoStreamingWriter.saveWithRateControl(
337- dataSourceName, df, format, dataSourceConfig, stepWithConfig, rate, startTime, rateFunction, totalDuration
338- )
339-
340- // Create result with performance metrics
341- Some (DataSourceResult (dataSourceName, task, stepWithConfig, sinkResult, 1 , metrics))
290+ try {
291+ generatedData.flatMap { case (dataSourceStepName, df) =>
292+ val dataSourceName = dataSourceStepName.split(" \\ ." ).head
293+ val (step, task) = stepAndTaskByDataSourceName(dataSourceStepName)
294+ val dataSourceConfig = connectionConfigsByName.getOrElse(dataSourceName, Map ())
295+
296+ // Skip reference mode steps - they should not be saved to sinks
297+ val isReferenceMode = step.options.get(ENABLE_REFERENCE_MODE ).map(_.toBoolean).getOrElse(DEFAULT_ENABLE_REFERENCE_MODE )
298+ if (isReferenceMode) {
299+ LOGGER .debug(s " Skipping save for reference data source, data-source= $dataSourceName, step-name= ${step.name}" )
300+ None
301+ } else {
302+ val stepWithConfig = step.copy(options = dataSourceConfig ++ step.options)
303+ val format = stepWithConfig.options.getOrElse(" format" , throw new IllegalArgumentException (s " No format specified for $dataSourceName" ))
304+
305+ // Add rate information to options for router decision
306+ val optionsWithRate = executionStrategy match {
307+ case dbs : DurationBasedExecutionStrategy if dbs.getTargetRate.isDefined =>
308+ stepWithConfig.options + (" hasRateControl" -> " true" )
309+ case _ : PatternBasedExecutionStrategy =>
310+ stepWithConfig.options + (" hasRateControl" -> " true" )
311+ case _ => stepWithConfig.options
312+ }
313+
314+ // Determine sink strategy using router
315+ val sinkStrategy = sinkRouter.determineSinkStrategy(format, executionStrategy.getGenerationMode, optionsWithRate)
316+
317+ LOGGER .info(s " Routing to sink, data-source= $dataSourceName, format= $format, strategy= $sinkStrategy" )
318+
319+ // Apply record tracking if enabled
320+ if (flagsConfig.enableRecordTracking) {
321+ recordTrackingProcessor.trackRecords(df, dataSourceName, plan.name, stepWithConfig)
322+ }
323+ if (dataSourcesUsedInValidation.contains(dataSourceName)) {
324+ validationRecordTrackingProcessor.trackRecords(df, dataSourceName, plan.name, stepWithConfig)
325+ }
326+
327+ sinkStrategy match {
328+ case SinkStrategy .BatchSink =>
329+ // Use standard batch writer
330+ val sinkResult = sinkFactory.pushToSink(df, dataSourceName, stepWithConfig, startTime, isMultiBatch = false , isLastBatch = true )
331+ Some (DataSourceResult (dataSourceName, task, stepWithConfig, sinkResult, 1 ))
332+
333+ case SinkStrategy .StreamingSink =>
334+ // Use Pekko streaming writer with rate control
335+ val (rate, rateFunction, totalDuration) = executionStrategy match {
336+ case dbs : DurationBasedExecutionStrategy =>
337+ (dbs.getTargetRate.getOrElse(1 ), None , None )
338+ case pbs : PatternBasedExecutionStrategy =>
339+ // For pattern-based, pass rate function for dynamic control
340+ val avgRate = pbs.getAverageRate
341+ val rateFn = pbs.getRateFunction
342+ val duration = pbs.getDurationSeconds
343+ LOGGER .info(s " Using pattern-based streaming with dynamic rate control, data-source= $dataSourceName, average-rate= $avgRate/sec, duration= ${duration}s " )
344+ (avgRate, Some (rateFn), Some (duration))
345+ case _ => (1 , None , None )
346+ }
347+ if (rateFunction.isEmpty) {
348+ LOGGER .info(s " Using streaming sink with rate control, data-source= $dataSourceName, rate= $rate/sec " )
349+ }
350+ val (sinkResult, metrics) = pekkoStreamingWriter.saveWithRateControl(
351+ dataSourceName, df, format, dataSourceConfig, stepWithConfig, rate, startTime, rateFunction, totalDuration
352+ )
353+
354+ // Create result with performance metrics
355+ Some (DataSourceResult (dataSourceName, task, stepWithConfig, sinkResult, 1 , metrics))
356+ }
342357 }
343358 }
359+ } finally {
360+ pekkoStreamingWriter.shutdown()
344361 }
345362 }
346363
@@ -446,5 +463,10 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
446463 case _ => new Faker () with Serializable
447464 }
448465 }
466+
467+ private def shutdownSinkFactory (): Unit = {
468+ sinkFactoryOpt.foreach(_.shutdown())
469+ sinkFactoryOpt = None
470+ }
449471}
450472
0 commit comments