File tree Expand file tree Collapse file tree
beam/core/src/main/java/cz/o2/proxima/beam/io Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -177,12 +177,12 @@ public void processElement(@Element StreamElement element) {
177177 boolean isTransactional =
178178 element .getAttributeDescriptor ().getTransactionMode () != TransactionMode .NONE ;
179179 int weight = isTransactional ? transactionalWriteWeight : 1 ;
180- synchronized ( pendingWrites ) {
181- while ( inflightWriteWeights . get () >= maxPendingWrites ) {
180+ while ( inflightWriteWeights . get () + weight >= maxPendingWrites ) {
181+ synchronized ( pendingWrites ) {
182182 ExceptionUtils .unchecked (() -> pendingWrites .wait (100 ));
183183 }
184- inflightWriteWeights .addAndGet (weight );
185184 }
185+ inflightWriteWeights .addAndGet (weight );
186186 writeRunnableRef .set (
187187 () -> {
188188 CompletableFuture <Pair <Boolean , Throwable >> writeResult = new CompletableFuture <>();
You can’t perform that action at this time.
0 commit comments