Skip to content

Commit f127d48

Browse files
Copilotsamvaity
andcommitted
Implement inputStream.read() based SSE processing
Co-authored-by: samvaity <16845631+samvaity@users.noreply.github.com>
1 parent 62b4126 commit f127d48

1 file changed

Lines changed: 112 additions & 11 deletions

File tree

sdk/clientcore/core/src/main/java/io/clientcore/core/utils/ServerSentEventUtils.java

Lines changed: 112 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@
1111
import io.clientcore.core.implementation.utils.ServerSentEventHelper;
1212
import io.clientcore.core.models.ServerSentResult;
1313

14-
import java.io.BufferedReader;
1514
import java.io.IOException;
1615
import java.io.InputStream;
17-
import java.io.InputStreamReader;
1816
import java.nio.charset.StandardCharsets;
1917
import java.time.Duration;
2018
import java.util.ArrayList;
@@ -58,8 +56,8 @@ public static boolean isTextEventStreamContentType(String contentType) {
5856
*/
5957
public static ServerSentResult processTextEventStream(InputStream inputStream, ServerSentEventListener listener)
6058
throws IOException {
61-
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
62-
return processBuffer(reader, listener);
59+
try (InputStream is = inputStream) {
60+
return processBuffer(is, listener);
6361
}
6462
}
6563

@@ -112,18 +110,106 @@ private static boolean isEndOfBlock(StringBuilder sb) {
112110
/**
113111
* Processes the SSE buffer and dispatches the event.
114112
*
115-
* @param reader The {@link BufferedReader} to read data from.
113+
* @param inputStream The {@link InputStream} to read data from.
116114
* @param listener The {@link ServerSentEventListener} object attached to the {@link HttpRequest}.
117115
*/
118-
private static ServerSentResult processBuffer(BufferedReader reader, ServerSentEventListener listener) {
116+
private static ServerSentResult processBuffer(InputStream inputStream, ServerSentEventListener listener) {
119117
ServerSentEvent event = null;
120118
List<String> allCollectedData = new ArrayList<>(); // List to store all collected data blocks
121119

122120
try {
123121
StringBuilder collectedData = new StringBuilder();
124-
String line;
125122

126-
while ((line = reader.readLine()) != null) {
123+
// Use a byte array to accumulate UTF-8 bytes and convert to string line by line
124+
List<Byte> lineBytes = new ArrayList<>();
125+
126+
int b;
127+
while ((b = inputStream.read()) != -1) {
128+
byte currentByte = (byte) b;
129+
130+
if (currentByte == '\n') {
131+
// End of line found, convert accumulated bytes to string
132+
String line = convertBytesToString(lineBytes);
133+
collectedData.append(line).append("\n");
134+
lineBytes.clear();
135+
136+
if (isEndOfBlock(collectedData)) {
137+
String temp = collectedData.toString();
138+
allCollectedData.add(temp.trim().replace("data: ", "")); // Add the collected data block to the list
139+
event = processLines(collectedData.toString().split("\n"));
140+
141+
if (!Objects.equals(event.getEvent(), DEFAULT_EVENT) || event.getData() != null) {
142+
listener.onEvent(event);
143+
}
144+
145+
collectedData = new StringBuilder(); // clear the collected data
146+
}
147+
} else if (currentByte == '\r') {
148+
// Handle \r\n or standalone \r - peek ahead to see if \n follows
149+
int next = inputStream.read();
150+
if (next == -1) {
151+
// End of stream, treat \r as line ending
152+
String line = convertBytesToString(lineBytes);
153+
collectedData.append(line).append("\n");
154+
lineBytes.clear();
155+
156+
if (isEndOfBlock(collectedData)) {
157+
String temp = collectedData.toString();
158+
allCollectedData.add(temp.trim().replace("data: ", "")); // Add the collected data block to the list
159+
event = processLines(collectedData.toString().split("\n"));
160+
161+
if (!Objects.equals(event.getEvent(), DEFAULT_EVENT) || event.getData() != null) {
162+
listener.onEvent(event);
163+
}
164+
165+
collectedData = new StringBuilder(); // clear the collected data
166+
}
167+
break;
168+
} else if (next == '\n') {
169+
// \r\n found, end of line
170+
String line = convertBytesToString(lineBytes);
171+
collectedData.append(line).append("\n");
172+
lineBytes.clear();
173+
174+
if (isEndOfBlock(collectedData)) {
175+
String temp = collectedData.toString();
176+
allCollectedData.add(temp.trim().replace("data: ", "")); // Add the collected data block to the list
177+
event = processLines(collectedData.toString().split("\n"));
178+
179+
if (!Objects.equals(event.getEvent(), DEFAULT_EVENT) || event.getData() != null) {
180+
listener.onEvent(event);
181+
}
182+
183+
collectedData = new StringBuilder(); // clear the collected data
184+
}
185+
} else {
186+
// Standalone \r, treat as line ending and add the next byte to line bytes
187+
String line = convertBytesToString(lineBytes);
188+
collectedData.append(line).append("\n");
189+
lineBytes.clear();
190+
lineBytes.add((byte) next);
191+
192+
if (isEndOfBlock(collectedData)) {
193+
String temp = collectedData.toString();
194+
allCollectedData.add(temp.trim().replace("data: ", "")); // Add the collected data block to the list
195+
event = processLines(collectedData.toString().split("\n"));
196+
197+
if (!Objects.equals(event.getEvent(), DEFAULT_EVENT) || event.getData() != null) {
198+
listener.onEvent(event);
199+
}
200+
201+
collectedData = new StringBuilder(); // clear the collected data
202+
}
203+
}
204+
} else {
205+
// Regular byte, add to current line
206+
lineBytes.add(currentByte);
207+
}
208+
}
209+
210+
// Handle any remaining data that didn't end with a newline
211+
if (!lineBytes.isEmpty()) {
212+
String line = convertBytesToString(lineBytes);
127213
collectedData.append(line).append("\n");
128214

129215
if (isEndOfBlock(collectedData)) {
@@ -134,8 +220,6 @@ private static ServerSentResult processBuffer(BufferedReader reader, ServerSentE
134220
if (!Objects.equals(event.getEvent(), DEFAULT_EVENT) || event.getData() != null) {
135221
listener.onEvent(event);
136222
}
137-
138-
collectedData = new StringBuilder(); // clear the collected data
139223
}
140224
}
141225

@@ -145,7 +229,24 @@ private static ServerSentResult processBuffer(BufferedReader reader, ServerSentE
145229
event != null ? ServerSentEventHelper.getRetryAfter(event) : null, null);
146230
}
147231

148-
return new ServerSentResult(null, event.getId(), ServerSentEventHelper.getRetryAfter(event), allCollectedData);
232+
return new ServerSentResult(null, event != null ? event.getId() : null,
233+
event != null ? ServerSentEventHelper.getRetryAfter(event) : null, allCollectedData);
234+
}
235+
236+
/**
237+
* Converts a list of bytes to a UTF-8 string.
238+
*/
239+
private static String convertBytesToString(List<Byte> bytes) {
240+
if (bytes.isEmpty()) {
241+
return "";
242+
}
243+
244+
byte[] byteArray = new byte[bytes.size()];
245+
for (int i = 0; i < bytes.size(); i++) {
246+
byteArray[i] = bytes.get(i);
247+
}
248+
249+
return new String(byteArray, StandardCharsets.UTF_8);
149250
}
150251

151252
private static ServerSentEvent processLines(String[] lines) {

0 commit comments

Comments
 (0)