Skip to content

Commit 0d6c94a

Browse files
authored
Fix PubsubIOLT (#35372)
* Fix PubsubIOLT * Fix formatting
1 parent b74c496 commit 0d6c94a

1 file changed

Lines changed: 7 additions & 2 deletions

File tree

  • it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub

it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubsubIOLT.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
*/
7575
public class PubsubIOLT extends IOLoadTestBase {
7676

77+
private static final double TOLERANCE_FRACTION = 0.005;
7778
private static final int NUMBER_OF_BUNDLES_FOR_LOCAL = 10;
7879
private static final int NUMBER_OF_BUNDLES_FOR_MEDIUM_AND_LARGE = 20;
7980
private static final String READ_ELEMENT_METRIC_NAME = "read_count";
@@ -239,10 +240,12 @@ public void testWriteAndRead() throws IOException {
239240
readLaunchInfo.jobId(),
240241
getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME));
241242

242-
// Assert that actual data equals or greater than expected data number since there might be
243+
// Assert that actual data is within tolerance of expected data number since there might be
243244
// duplicates when testing big amount of data
244245
long expectedDataNum = configuration.numRecords - configuration.forceNumInitialBundles;
245-
assertTrue(numRecords >= expectedDataNum);
246+
long allowedTolerance = (long) (configuration.numRecords * TOLERANCE_FRACTION);
247+
double delta = Math.abs(numRecords - expectedDataNum);
248+
assertTrue(delta <= allowedTolerance);
246249

247250
// export metrics
248251
MetricsConfiguration writeMetricsConfig =
@@ -305,6 +308,7 @@ private PipelineLauncher.LaunchInfo testWrite(WriteAndReadFormat format) throws
305308
.addParameter("runner", configuration.runner)
306309
.addParameter("streaming", "true")
307310
.addParameter("numWorkers", String.valueOf(configuration.numWorkers))
311+
.addParameter("experiments", "use_runner_v2")
308312
.build();
309313

310314
return pipelineLauncher.launch(project, region, writeOptions);
@@ -339,6 +343,7 @@ private PipelineLauncher.LaunchInfo testRead(WriteAndReadFormat format) throws I
339343
.addParameter("runner", configuration.runner)
340344
.addParameter("streaming", "true")
341345
.addParameter("numWorkers", String.valueOf(configuration.numWorkers))
346+
.addParameter("experiments", "use_runner_v2")
342347
.build();
343348

344349
return pipelineLauncher.launch(project, region, readOptions);

0 commit comments

Comments
 (0)