-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathJetty12WebSocketHandler.java
More file actions
143 lines (126 loc) · 4.56 KB
/
Jetty12WebSocketHandler.java
File metadata and controls
143 lines (126 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.socket;
import io.socket.engineio.server.EngineIoServer;
import io.socket.engineio.server.EngineIoWebSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
/**
* Jetty 12 compatible WebSocket handler for Engine.IO. This replaces the JettyWebSocketHandler from
* engine.io-server-jetty which is not compatible with Jetty 12's new WebSocket API.
*/
@Slf4j
@WebSocket(autoDemand = true)
public class Jetty12WebSocketHandler extends EngineIoWebSocket {
private final EngineIoServer server;
private Session session;
private Map<String, String> query;
private Map<String, List<String>> headers;
public Jetty12WebSocketHandler(EngineIoServer server) {
this.server = server;
}
@Override
public Map<String, String> getQuery() {
return query;
}
@Override
public Map<String, List<String>> getConnectionHeaders() {
return headers;
}
@Override
public void write(String message) {
if (session != null && session.isOpen()) {
session.sendText(message, Callback.NOOP);
}
}
@Override
public void write(byte[] message) {
if (session != null && session.isOpen()) {
session.sendBinary(ByteBuffer.wrap(message), Callback.NOOP);
}
}
@Override
public void close() {
if (session != null && session.isOpen()) {
session.close();
}
}
@OnWebSocketOpen
public void onOpen(Session session) {
this.session = session;
LOG.info("WebSocket connection opened");
// Parse query parameters from upgrade request
this.query = new HashMap<>();
var upgradeRequest = session.getUpgradeRequest();
if (upgradeRequest != null) {
LOG.debug("UpgradeRequest URI: {}", upgradeRequest.getRequestURI());
var parameterMap = upgradeRequest.getParameterMap();
if (parameterMap != null) {
LOG.debug("Query parameters: {}", parameterMap);
for (Map.Entry<String, List<String>> entry : parameterMap.entrySet()) {
if (entry.getValue() != null && !entry.getValue().isEmpty()) {
query.put(entry.getKey(), entry.getValue().get(0));
}
}
}
this.headers = upgradeRequest.getHeaders();
} else {
LOG.warn("UpgradeRequest is null - query parameters will be empty");
this.headers = new HashMap<>();
}
LOG.info("Parsed query params for Engine.IO: {}", query);
server.handleWebSocket(this);
}
@OnWebSocketMessage
public void onTextMessage(Session session, String message) {
emit("message", message);
}
@OnWebSocketMessage
public void onBinaryMessage(Session session, ByteBuffer buffer, Callback callback) {
byte[] message = new byte[buffer.remaining()];
buffer.get(message);
emit("message", message);
callback.succeed();
}
@OnWebSocketClose
public void onClose(int statusCode, String reason) {
LOG.info("WebSocket closed: statusCode={}, reason={}", statusCode, reason);
emit("close");
this.session = null;
}
@OnWebSocketError
public void onError(Throwable error) {
if (error instanceof ClosedChannelException) {
LOG.debug("WebSocket channel closed by peer (likely abnormal disconnect)");
return;
}
try {
LOG.error(
"WebSocket error: {} - {}", error.getClass().getSimpleName(), error.getMessage(), error);
emit("error", "websocket error", error.getMessage());
} catch (Exception e) {
LOG.error("Failed to handle WebSocket error gracefully: {}", e.getMessage(), e);
}
}
}