Skip to content

Commit df5fbf0

Browse files
committed
Keep one flush method in FrameHandler
1 parent d07986d commit df5fbf0

6 files changed

Lines changed: 13 additions & 27 deletions

File tree

src/main/java/com/rabbitmq/client/impl/DefaultHeartbeatSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void run() {
137137

138138
if (now > (lastActivityTime + this.heartbeatNanos)) {
139139
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
140-
frameHandler.flush();
140+
frameHandler.flush(null);
141141
}
142142
} catch (IOException e) {
143143
// ignore

src/main/java/com/rabbitmq/client/impl/FrameHandler.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,24 +74,13 @@ default void finishConnectionNegotiation() {
7474
*/
7575
void writeFrame(Frame frame) throws IOException;
7676

77-
/**
78-
* Flush the underlying data connection.
79-
* @throws IOException if there is a problem accessing the connection
80-
*/
81-
void flush() throws IOException;
82-
8377
/**
8478
* Flush the underlying data connection and notify the listener when the write completes.
8579
* The default implementation flushes synchronously and calls the listener immediately.
8680
* @param listener called when the flush completes, may be {@code null}
8781
* @throws IOException if there is a problem accessing the connection
8882
*/
89-
default void flush(WriteListener listener) throws IOException {
90-
flush();
91-
if (listener != null) {
92-
listener.done(true, null);
93-
}
94-
}
83+
void flush(WriteListener listener) throws IOException;
9584

9685
/** Close the underlying data connection (complaint not permitted). */
9786
void close();

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -403,11 +403,6 @@ private void doWriteFrame(Frame frame) throws IOException {
403403
}
404404
}
405405

406-
@Override
407-
public void flush() {
408-
this.channel.flush();
409-
}
410-
411406
@Override
412407
public void flush(WriteListener listener) {
413408
if (listener == null) {

src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.rabbitmq.client.impl;
1717

1818
import com.rabbitmq.client.AMQP;
19+
import com.rabbitmq.client.WriteListener;
1920
import org.slf4j.Logger;
2021
import org.slf4j.LoggerFactory;
2122

@@ -213,21 +214,21 @@ public void writeFrame(Frame frame) throws IOException {
213214
}
214215

215216
@Override
216-
public void flush() throws IOException {
217+
public void flush(WriteListener listener) throws IOException {
217218
_outputStream.flush();
219+
if (listener != null) {
220+
listener.done(true, null);
221+
}
218222
}
219223

220224
@Override
221225
public void close() {
222226
try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _e) {}
223227
// async flush if possible
224228
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
225-
Callable<Void> flushCallable = new Callable<Void>() {
226-
@Override
227-
public Void call() throws Exception {
228-
flush();
229-
return null;
230-
}
229+
Callable<Void> flushCallable = () -> {
230+
flush(null);
231+
return null;
231232
};
232233
Future<Void> flushTask = null;
233234
try {

src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public int getPort() {
264264
return -1;
265265
}
266266

267-
public void flush() throws IOException {
267+
public void flush(WriteListener listener) throws IOException {
268268
// no need to implement this: don't bother writing the frame
269269
}
270270

src/test/java/com/rabbitmq/client/test/BrokenFramesTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.AMQP;
2020
import com.rabbitmq.client.ConnectionFactory;
2121
import com.rabbitmq.client.UnexpectedFrameError;
22+
import com.rabbitmq.client.WriteListener;
2223
import com.rabbitmq.client.impl.AMQConnection;
2324
import com.rabbitmq.client.impl.AMQImpl.Basic.Publish;
2425
import com.rabbitmq.client.impl.Frame;
@@ -152,7 +153,7 @@ public int getPort() {
152153
return -1;
153154
}
154155

155-
public void flush() throws IOException {
156+
public void flush(WriteListener listener) throws IOException {
156157
// no need to implement this: don't bother writing the frame
157158
}
158159

0 commit comments

Comments
 (0)