@@ -458,34 +458,50 @@ task validatesRunner {
458458 ))
459459}
460460
461+ def validatesRunnerStreamingConfig = [
462+ pipelineOptions : legacyPipelineOptions + [' --streaming' ],
463+ excludedCategories : [
464+ ' org.apache.beam.sdk.testing.UsesCommittedMetrics' ,
465+ ' org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput' ,
466+ ],
467+ excludedTests : [
468+ // TODO(https://github.com/apache/beam/issues/21472)
469+ ' org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState' ,
470+ // GroupIntoBatches.withShardedKey not supported on streaming runner v1
471+ // https://github.com/apache/beam/issues/22592
472+ ' org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow' ,
473+
474+ // These tests use static state and don't work with remote execution.
475+ ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle' ,
476+ ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful' ,
477+ ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement' ,
478+ ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful' ,
479+ ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup' ,
480+ ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful' ,
481+ ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle' ,
482+ ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful' ,
483+ ]
484+ ]
485+
461486task validatesRunnerStreaming {
462487 group = " Verification"
463488 description " Validates Dataflow runner forcing streaming mode"
464- dependsOn(createLegacyWorkerValidatesRunnerTest(
489+ dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [
465490 name : ' validatesRunnerLegacyWorkerTestStreaming' ,
466- pipelineOptions : legacyPipelineOptions + [' --streaming' ],
467- excludedCategories : [
468- ' org.apache.beam.sdk.testing.UsesCommittedMetrics' ,
469- ' org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput' ,
470- ],
471- excludedTests : [
472- // TODO(https://github.com/apache/beam/issues/21472)
473- ' org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState' ,
474- // GroupIntoBatches.withShardedKey not supported on streaming runner v1
475- // https://github.com/apache/beam/issues/22592
476- ' org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow' ,
491+ ]))
492+ }
477493
478- // These tests use static state and don't work with remote execution.
479- ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle ' ,
480- ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful ' ,
481- ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement ' ,
482- ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful ' ,
483- ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup ' ,
484- ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful ' ,
485- ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle ' ,
486- ' org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful ' ,
487- ]
488- ))
494+ task validatesRunnerStreamingTagEncodingV2 {
495+ group = " Verification "
496+ description " Validates Dataflow runner (legacy) with Tag Encoding V2 experiments "
497+ dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [
498+ name : ' validatesRunnerLegacyWorkerTestStreamingTagEncodingV2 ' ,
499+ pipelineOptions : validatesRunnerStreamingConfig . pipelineOptions + [
500+ ' --experiments=enable_streaming_engine ' ,
501+ ' --experiments=enable_streaming_engine_state_tag_encoding_v2 ' ,
502+ ' --experiments=streaming_engine_state_tag_encoding_v2_supported '
503+ ],
504+ ] ))
489505}
490506
491507def setupXVR = tasks. register(" setupXVR" ) {
0 commit comments