From 8e648b0cacd0f841a95c2480806ef85eb8e36be3 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 16 Mar 2026 15:52:05 -0700 Subject: [PATCH] =?UTF-8?q?Revert=20"[Dataflow=20Streaming]=20Prevent=20co?= =?UTF-8?q?mmit=20threads=20from=20sharing=20commit=20strea=E2=80=A6"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 9292e1d8140a1710816b8026a08434a100049134. --- .../runners/dataflow/worker/StreamingDataflowWorker.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 9fb723e812f4..172ca2b550cb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -464,13 +464,10 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness( @SuppressWarnings("methodref.receiver.bound") WorkCommitter workCommitter = StreamingEngineWorkCommitter.builder() - // Use a separate stream pool for each committer. This ensures the commit - // threads are fully isolated. .setCommitWorkStreamFactory( - () -> - WindmillStreamPool.create( - 1, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) - .getCloseableStream()) + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) .setNumCommitSenders(numCommitThreads) .setOnCommitComplete(this::onCompleteCommit)