Skip to content

Commit 752a019

Browse files
committed
Avoid invoking writeFunction under lock in ChannelSendOperator
Signed-off-by: l2yuPa <jeungwon28@gmail.com>
1 parent 188cb9b commit 752a019

File tree

2 files changed

+72
-38
lines changed

2 files changed

+72
-38
lines changed

spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -176,23 +176,17 @@ public final void onNext(T item) {
176176
requiredWriteSubscriber().onNext(item);
177177
return;
178178
}
179-
// FIXME revisit in case of reentrant sync deadlock
179+
180+
boolean invokeWriteFunction = false;
181+
180182
synchronized (this) {
181183
if (this.state == State.READY_TO_WRITE) {
182184
requiredWriteSubscriber().onNext(item);
183185
}
184186
else if (this.state == State.NEW) {
185187
this.item = item;
186188
this.state = State.FIRST_SIGNAL_RECEIVED;
187-
Publisher<Void> result;
188-
try {
189-
result = writeFunction.apply(this);
190-
}
191-
catch (Throwable ex) {
192-
this.writeCompletionBarrier.onError(ex);
193-
return;
194-
}
195-
result.subscribe(this.writeCompletionBarrier);
189+
invokeWriteFunction = true;
196190
}
197191
else {
198192
if (this.subscription != null) {
@@ -201,6 +195,20 @@ else if (this.state == State.NEW) {
201195
this.writeCompletionBarrier.onError(new IllegalStateException("Unexpected item."));
202196
}
203197
}
198+
199+
if (!invokeWriteFunction) {
200+
return;
201+
}
202+
203+
Publisher<Void> result;
204+
try {
205+
result = writeFunction.apply(this);
206+
}
207+
catch (Throwable ex) {
208+
this.writeCompletionBarrier.onError(ex);
209+
return;
210+
}
211+
result.subscribe(this.writeCompletionBarrier);
204212
}
205213

206214
private Subscriber<? super T> requiredWriteSubscriber() {
@@ -234,27 +242,36 @@ public final void onComplete() {
234242
requiredWriteSubscriber().onComplete();
235243
return;
236244
}
245+
246+
boolean invokeWriteFunction = false;
247+
237248
synchronized (this) {
238249
if (this.state == State.READY_TO_WRITE) {
239250
requiredWriteSubscriber().onComplete();
240251
}
241252
else if (this.state == State.NEW) {
242253
this.completed = true;
243254
this.state = State.FIRST_SIGNAL_RECEIVED;
244-
Publisher<Void> result;
245-
try {
246-
result = writeFunction.apply(this);
247-
}
248-
catch (Throwable ex) {
249-
this.writeCompletionBarrier.onError(ex);
250-
return;
251-
}
252-
result.subscribe(this.writeCompletionBarrier);
255+
invokeWriteFunction = true;
253256
}
254257
else {
255258
this.completed = true;
256259
}
257260
}
261+
262+
if (!invokeWriteFunction) {
263+
return;
264+
}
265+
266+
Publisher<Void> result;
267+
try {
268+
result = writeFunction.apply(this);
269+
}
270+
catch (Throwable ex) {
271+
this.writeCompletionBarrier.onError(ex);
272+
return;
273+
}
274+
result.subscribe(this.writeCompletionBarrier);
258275
}
259276

260277
@Override

spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -168,23 +168,17 @@ public final void onNext(T item) {
168168
requiredWriteSubscriber().onNext(item);
169169
return;
170170
}
171-
// FIXME revisit in case of reentrant sync deadlock
171+
172+
boolean invokeWriteFunction = false;
173+
172174
synchronized (this) {
173175
if (this.state == State.READY_TO_WRITE) {
174176
requiredWriteSubscriber().onNext(item);
175177
}
176178
else if (this.state == State.NEW) {
177179
this.item = item;
178180
this.state = State.FIRST_SIGNAL_RECEIVED;
179-
Publisher<Void> result;
180-
try {
181-
result = writeFunction.apply(this);
182-
}
183-
catch (Throwable ex) {
184-
this.writeCompletionBarrier.onError(ex);
185-
return;
186-
}
187-
result.subscribe(this.writeCompletionBarrier);
181+
invokeWriteFunction = true;
188182
}
189183
else {
190184
if (this.subscription != null) {
@@ -193,6 +187,20 @@ else if (this.state == State.NEW) {
193187
this.writeCompletionBarrier.onError(new IllegalStateException("Unexpected item."));
194188
}
195189
}
190+
191+
if (!invokeWriteFunction) {
192+
return;
193+
}
194+
195+
Publisher<Void> result;
196+
try {
197+
result = writeFunction.apply(this);
198+
}
199+
catch (Throwable ex) {
200+
this.writeCompletionBarrier.onError(ex);
201+
return;
202+
}
203+
result.subscribe(this.writeCompletionBarrier);
196204
}
197205

198206
private Subscriber<? super T> requiredWriteSubscriber() {
@@ -226,27 +234,36 @@ public final void onComplete() {
226234
requiredWriteSubscriber().onComplete();
227235
return;
228236
}
237+
238+
boolean invokeWriteFunction = false;
239+
229240
synchronized (this) {
230241
if (this.state == State.READY_TO_WRITE) {
231242
requiredWriteSubscriber().onComplete();
232243
}
233244
else if (this.state == State.NEW) {
234245
this.completed = true;
235246
this.state = State.FIRST_SIGNAL_RECEIVED;
236-
Publisher<Void> result;
237-
try {
238-
result = writeFunction.apply(this);
239-
}
240-
catch (Throwable ex) {
241-
this.writeCompletionBarrier.onError(ex);
242-
return;
243-
}
244-
result.subscribe(this.writeCompletionBarrier);
247+
invokeWriteFunction = true;
245248
}
246249
else {
247250
this.completed = true;
248251
}
249252
}
253+
254+
if (!invokeWriteFunction) {
255+
return;
256+
}
257+
258+
Publisher<Void> result;
259+
try {
260+
result = writeFunction.apply(this);
261+
}
262+
catch (Throwable ex) {
263+
this.writeCompletionBarrier.onError(ex);
264+
return;
265+
}
266+
result.subscribe(this.writeCompletionBarrier);
250267
}
251268

252269
@Override

0 commit comments

Comments
 (0)