File tree Expand file tree Collapse file tree 1 file changed +11
-13
lines changed
rsocket-core/src/main/java/io/rsocket/core Expand file tree Collapse file tree 1 file changed +11
-13
lines changed Original file line number Diff line number Diff line change @@ -128,18 +128,7 @@ class RSocketResponder implements RSocket {
128128 }
129129
130130 private void handleSendProcessorError (Throwable t ) {
131- sendingSubscriptions
132- .values ()
133- .forEach (
134- subscription -> {
135- try {
136- subscription .cancel ();
137- } catch (Throwable e ) {
138- if (LOGGER .isDebugEnabled ()) {
139- LOGGER .debug ("Dropped exception" , t );
140- }
141- }
142- });
131+ cleanUpSendingSubscriptions ();
143132
144133 channelProcessors
145134 .values ()
@@ -275,7 +264,16 @@ private void cleanup(Throwable e) {
275264 }
276265
277266 private synchronized void cleanUpSendingSubscriptions () {
278- sendingSubscriptions .values ().forEach (Subscription ::cancel );
267+ // Iterate explicitly to handle collisions with concurrent removals
268+ for (IntObjectMap .PrimitiveEntry <Subscription > entry : sendingSubscriptions .entries ()) {
269+ try {
270+ entry .value ().cancel ();
271+ } catch (Throwable ex ) {
272+ if (LOGGER .isDebugEnabled ()) {
273+ LOGGER .debug ("Dropped exception" , ex );
274+ }
275+ }
276+ }
279277 sendingSubscriptions .clear ();
280278 }
281279
You can’t perform that action at this time.
0 commit comments