Skip to content

Commit ccb246c

Browse files
committed
Avoid invoking writeFunction under lock in ChannelSendOperator
Signed-off-by: l2yuPa <jeungwon28@gmail.com>
1 parent 188cb9b commit ccb246c

2 files changed

Lines changed: 72 additions & 58 deletions

File tree

spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -176,31 +176,34 @@ public final void onNext(T item) {
176176
requiredWriteSubscriber().onNext(item);
177177
return;
178178
}
179-
// FIXME revisit in case of reentrant sync deadlock
179+
180180
synchronized (this) {
181181
if (this.state == State.READY_TO_WRITE) {
182182
requiredWriteSubscriber().onNext(item);
183+
return;
183184
}
184-
else if (this.state == State.NEW) {
185-
this.item = item;
186-
this.state = State.FIRST_SIGNAL_RECEIVED;
187-
Publisher<Void> result;
188-
try {
189-
result = writeFunction.apply(this);
190-
}
191-
catch (Throwable ex) {
192-
this.writeCompletionBarrier.onError(ex);
193-
return;
194-
}
195-
result.subscribe(this.writeCompletionBarrier);
196-
}
197-
else {
185+
186+
if (this.state != State.NEW) {
198187
if (this.subscription != null) {
199188
this.subscription.cancel();
200189
}
201190
this.writeCompletionBarrier.onError(new IllegalStateException("Unexpected item."));
191+
return;
202192
}
193+
194+
this.item = item;
195+
this.state = State.FIRST_SIGNAL_RECEIVED;
203196
}
197+
198+
Publisher<Void> result;
199+
try {
200+
result = writeFunction.apply(this);
201+
}
202+
catch (Throwable ex) {
203+
this.writeCompletionBarrier.onError(ex);
204+
return;
205+
}
206+
result.subscribe(this.writeCompletionBarrier);
204207
}
205208

206209
private Subscriber<? super T> requiredWriteSubscriber() {
@@ -234,27 +237,31 @@ public final void onComplete() {
234237
requiredWriteSubscriber().onComplete();
235238
return;
236239
}
240+
237241
synchronized (this) {
238242
if (this.state == State.READY_TO_WRITE) {
239243
requiredWriteSubscriber().onComplete();
244+
return;
240245
}
241-
else if (this.state == State.NEW) {
242-
this.completed = true;
243-
this.state = State.FIRST_SIGNAL_RECEIVED;
244-
Publisher<Void> result;
245-
try {
246-
result = writeFunction.apply(this);
247-
}
248-
catch (Throwable ex) {
249-
this.writeCompletionBarrier.onError(ex);
250-
return;
251-
}
252-
result.subscribe(this.writeCompletionBarrier);
253-
}
254-
else {
246+
247+
if (this.state != State.NEW) {
255248
this.completed = true;
249+
return;
256250
}
251+
252+
this.completed = true;
253+
this.state = State.FIRST_SIGNAL_RECEIVED;
254+
}
255+
256+
Publisher<Void> result;
257+
try {
258+
result = writeFunction.apply(this);
259+
}
260+
catch (Throwable ex) {
261+
this.writeCompletionBarrier.onError(ex);
262+
return;
257263
}
264+
result.subscribe(this.writeCompletionBarrier);
258265
}
259266

260267
@Override

spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -168,31 +168,34 @@ public final void onNext(T item) {
168168
requiredWriteSubscriber().onNext(item);
169169
return;
170170
}
171-
// FIXME revisit in case of reentrant sync deadlock
171+
172172
synchronized (this) {
173173
if (this.state == State.READY_TO_WRITE) {
174174
requiredWriteSubscriber().onNext(item);
175+
return;
175176
}
176-
else if (this.state == State.NEW) {
177-
this.item = item;
178-
this.state = State.FIRST_SIGNAL_RECEIVED;
179-
Publisher<Void> result;
180-
try {
181-
result = writeFunction.apply(this);
182-
}
183-
catch (Throwable ex) {
184-
this.writeCompletionBarrier.onError(ex);
185-
return;
186-
}
187-
result.subscribe(this.writeCompletionBarrier);
188-
}
189-
else {
177+
178+
if (this.state != State.NEW) {
190179
if (this.subscription != null) {
191180
this.subscription.cancel();
192181
}
193182
this.writeCompletionBarrier.onError(new IllegalStateException("Unexpected item."));
183+
return;
194184
}
185+
186+
this.item = item;
187+
this.state = State.FIRST_SIGNAL_RECEIVED;
195188
}
189+
190+
Publisher<Void> result;
191+
try {
192+
result = writeFunction.apply(this);
193+
}
194+
catch (Throwable ex) {
195+
this.writeCompletionBarrier.onError(ex);
196+
return;
197+
}
198+
result.subscribe(this.writeCompletionBarrier);
196199
}
197200

198201
private Subscriber<? super T> requiredWriteSubscriber() {
@@ -226,27 +229,31 @@ public final void onComplete() {
226229
requiredWriteSubscriber().onComplete();
227230
return;
228231
}
232+
229233
synchronized (this) {
230234
if (this.state == State.READY_TO_WRITE) {
231235
requiredWriteSubscriber().onComplete();
236+
return;
232237
}
233-
else if (this.state == State.NEW) {
234-
this.completed = true;
235-
this.state = State.FIRST_SIGNAL_RECEIVED;
236-
Publisher<Void> result;
237-
try {
238-
result = writeFunction.apply(this);
239-
}
240-
catch (Throwable ex) {
241-
this.writeCompletionBarrier.onError(ex);
242-
return;
243-
}
244-
result.subscribe(this.writeCompletionBarrier);
245-
}
246-
else {
238+
239+
if (this.state != State.NEW) {
247240
this.completed = true;
241+
return;
248242
}
243+
244+
this.completed = true;
245+
this.state = State.FIRST_SIGNAL_RECEIVED;
246+
}
247+
248+
Publisher<Void> result;
249+
try {
250+
result = writeFunction.apply(this);
251+
}
252+
catch (Throwable ex) {
253+
this.writeCompletionBarrier.onError(ex);
254+
return;
249255
}
256+
result.subscribe(this.writeCompletionBarrier);
250257
}
251258

252259
@Override

0 commit comments

Comments
 (0)