Skip to content

Commit 1dc2bf8

Browse files
committed
GUACAMOLE-2228: Write stream blobs asynchronously.
1 parent f21ba60 commit 1dc2bf8

2 files changed

Lines changed: 322 additions & 31 deletions

File tree

guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java

Lines changed: 78 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.io.IOException;
2424
import java.io.OutputStream;
2525
import java.util.List;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.ConcurrentMap;
2628
import org.apache.guacamole.GuacamoleException;
2729
import org.apache.guacamole.net.GuacamoleTunnel;
2830
import org.apache.guacamole.protocol.GuacamoleInstruction;
@@ -37,14 +39,21 @@
3739
* sent automatically.
3840
*/
3941
public class OutputStreamInterceptingFilter
40-
extends StreamInterceptingFilter<OutputStream> {
42+
extends StreamInterceptingFilter<OutputStream>
43+
implements OutputStreamWriter.ExecutionListener {
4144

4245
/**
4346
* Logger for this class.
4447
*/
4548
private static final Logger logger =
4649
LoggerFactory.getLogger(OutputStreamInterceptingFilter.class);
4750

51+
/**
52+
* File download stream writers which will send data asynchronosly.
53+
*/
54+
private final ConcurrentMap<String, OutputStreamWriter> streamWriters =
55+
new ConcurrentHashMap<>();
56+
4857
/**
4958
* Whether this OutputStreamInterceptingFilter should respond to received
5059
* blobs with "ack" messages on behalf of the client. If false, blobs will
@@ -95,6 +104,23 @@ private void sendAck(String index, String message, GuacamoleStatus status) {
95104

96105
}
97106

107+
@Override
108+
public void onBlobWritten(String streamIndex, boolean requiresAck) {
109+
if (requiresAck) {
110+
sendAck(streamIndex, "OK", GuacamoleStatus.SUCCESS);
111+
}
112+
}
113+
114+
@Override
115+
public void onWriteFailed(String streamIndex) {
116+
sendAck(streamIndex, "FAIL", GuacamoleStatus.SERVER_ERROR);
117+
}
118+
119+
@Override
120+
public void onStreamEnd(String streamIndex) {
121+
closeInterceptedStream(streamIndex);
122+
}
123+
98124
/**
99125
* Handles a single "blob" instruction, decoding its base64 data,
100126
* sending that data to the associated OutputStream, and ultimately
@@ -117,10 +143,12 @@ private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) {
117143
if (args.size() < 2)
118144
return instruction;
119145

120-
// Pull associated stream
121-
String index = args.get(0);
122-
InterceptedStream<OutputStream> stream = getInterceptedStream(index);
123-
if (stream == null)
146+
// Get the stream index
147+
String streamIndex = args.get(0);
148+
149+
// Process the blob asynchornously if there is a worker
150+
OutputStreamWriter streamWriter = streamWriters.get(streamIndex);
151+
if (streamWriter == null)
124152
return instruction;
125153

126154
// Decode blob
@@ -134,31 +162,25 @@ private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) {
134162
return null;
135163
}
136164

137-
try {
138-
139-
// Attempt to write data to stream
140-
stream.getStream().write(blob);
141-
142-
// Force client to respond with their own "ack" if we need to
143-
// confirm that they are not falling behind with respect to the
144-
// graphical session
145-
if (!acknowledgeBlobs) {
146-
acknowledgeBlobs = true;
147-
return new GuacamoleInstruction("blob", index, "");
148-
}
149-
150-
// Otherwise, acknowledge the blob on the client's behalf
151-
sendAck(index, "OK", GuacamoleStatus.SUCCESS);
152-
153-
}
154-
catch (IOException e) {
155-
sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR);
156-
logger.debug("Write failed for intercepted stream.", e);
165+
// Force client to respond with their own "ack" to confirm they are not
166+
// falling behind with respect to the graphical session, only if
167+
// - There are no blobs in the queue currently
168+
// - Previous blob required server side acknowledgement
169+
// This may lead to more than one blob in the writer queue temporarily,
170+
// but not more than two blobs anyways.
171+
if (!acknowledgeBlobs &&
172+
streamWriter.getQueuedMessageCount() == 0 &&
173+
streamWriter.didPrevBlobRequireAck()) {
174+
streamWriter.handleBlob(blob, false);
175+
acknowledgeBlobs = true;
176+
177+
// Send an empty blob to trigger client "ack"
178+
return new GuacamoleInstruction("blob", streamIndex, "");
157179
}
158180

159-
// Instruction was handled purely internally
181+
// Put the blob to the writer queue
182+
streamWriter.handleBlob(blob, true);
160183
return null;
161-
162184
}
163185

164186
/**
@@ -176,9 +198,13 @@ private void handleEnd(GuacamoleInstruction instruction) {
176198
if (args.size() < 1)
177199
return;
178200

179-
// Terminate stream
180-
closeInterceptedStream(args.get(0));
181-
201+
OutputStreamWriter streamWriter = streamWriters.get(args.get(0));
202+
if (streamWriter == null)
203+
return;
204+
205+
// Notify the writer that the end marker has been received.
206+
// it will terminate the stream once all blobs are written.
207+
streamWriter.handleEnd();
182208
}
183209

184210
/**
@@ -224,6 +250,27 @@ protected void handleInterceptedStream(InterceptedStream<OutputStream> stream) {
224250
// Acknowledge that the stream is ready to receive data
225251
sendAck(stream.getIndex(), "OK", GuacamoleStatus.SUCCESS);
226252

227-
}
253+
// Create the stream writer
254+
OutputStreamWriter streamWriter = new OutputStreamWriter(stream, this);
255+
256+
// Put it into the container and check if there was another writer for the index
257+
OutputStreamWriter old = streamWriters.put(stream.getIndex(), streamWriter);
258+
if (old != null) {
259+
logger.debug("Found an older stream #{}; will close it", stream.getIndex());
260+
// Close the stream to be sure it does not get stuck on write
261+
closeInterceptedStream(old.getStream());
262+
// Stop it
263+
old.stop();
264+
}
228265

266+
// This will block the thread until the stream is closed by
267+
// disconnection, or the end instruction is received.
268+
streamWriter.run();
269+
270+
// Close the stream if not closed yet
271+
closeInterceptedStream(stream);
272+
273+
// Remove the stream from the container
274+
streamWriters.entrySet().removeIf(entry -> entry.getValue().equals(streamWriter));
275+
}
229276
}

0 commit comments

Comments
 (0)