diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 9fb723e812f4..172ca2b550cb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -464,13 +464,10 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness( @SuppressWarnings("methodref.receiver.bound") WorkCommitter workCommitter = StreamingEngineWorkCommitter.builder() - // Use a separate stream pool for each committer. This ensures the commit - // threads are fully isolated. .setCommitWorkStreamFactory( - () -> - WindmillStreamPool.create( - 1, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) - .getCloseableStream()) + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) .setNumCommitSenders(numCommitThreads) .setOnCommitComplete(this::onCompleteCommit)