33import com .fasterxml .jackson .annotation .JsonProperty ;
44import com .fasterxml .jackson .core .JsonProcessingException ;
55import com .fasterxml .jackson .databind .ObjectMapper ;
6+ import io .github .blackspherefollower .buttplug4j .client .ButtplugClient ;
7+ import io .github .blackspherefollower .buttplug4j .client .IConnectedEvent ;
8+ import io .github .blackspherefollower .buttplug4j .protocol .ButtplugConsts ;
9+ import io .github .blackspherefollower .buttplug4j .protocol .ButtplugMessage ;
10+ import io .github .blackspherefollower .buttplug4j .protocol .ButtplugProtocolException ;
11+ import io .github .blackspherefollower .buttplug4j .protocol .messages .Error ;
12+ import org .eclipse .jetty .util .component .LifeCycle ;
13+ import org .eclipse .jetty .websocket .api .Session ;
14+ import org .eclipse .jetty .websocket .api .annotations .*;
15+ import org .eclipse .jetty .websocket .api .extensions .Frame ;
16+ import org .eclipse .jetty .websocket .client .ClientUpgradeRequest ;
17+ import org .eclipse .jetty .websocket .client .WebSocketClient ;
618
719import java .net .URI ;
8- import java .net .http .HttpClient ;
9- import java .net .http .WebSocket ;
1020import java .nio .ByteBuffer ;
11- import java .nio .CharBuffer ;
1221import java .nio .charset .StandardCharsets ;
1322import java .util .concurrent .*;
1423
24+ @ WebSocket (maxTextMessageSize = 64 * 1024 )
1525public class WSDMClient {
1626
17- private WebSocket client ;
18- private final WSDHeader header ;
27+ private WebSocketClient client ;
28+ private Session session ;
29+ private WSDHeader header ;
1930 public ConcurrentLinkedQueue <String > messages = new ConcurrentLinkedQueue <>();
20- private final CompletableFuture <Boolean > connected = new CompletableFuture <>();
31+ private CompletableFuture <Boolean > connected = new CompletableFuture <>();
2132
2233 public int battery = 100 ;
2334
24- static class WSDHeader {
35+ class WSDHeader {
2536 @ JsonProperty ("identifier" )
2637 public String identifier ;
2738 @ JsonProperty ("address" )
@@ -34,82 +45,64 @@ public WSDMClient(final URI url, final String identifier, final String address)
3445 header = new WSDHeader ();
3546 header .identifier = identifier ;
3647 header .address = address ;
48+ client = new WebSocketClient ();
3749
38- HttpClient
39- .newHttpClient ()
40- .newWebSocketBuilder ()
41- .buildAsync (url , new WebSocketClient (this ))
42- .join ();
43- connected .get (10 , TimeUnit .SECONDS );
44-
50+ client .start ();
51+ client .connect (this , url , new ClientUpgradeRequest ()).get (2 , TimeUnit .SECONDS );
52+ connected .get (5 , TimeUnit .SECONDS );
4553 }
4654
47- private static class WebSocketClient implements WebSocket .Listener {
48- WSDMClient wsdmclient ;
49- public WebSocketClient (WSDMClient wsdmclient ) {this .wsdmclient = wsdmclient ;}
50-
51- @ Override
52- public void onOpen (WebSocket webSocket ) {
53- System .out .println ("onOpen using subprotocol " + webSocket .getSubprotocol ());
54- wsdmclient .onConnect (webSocket );
55- WebSocket .Listener .super .onOpen (webSocket );
56- }
57-
58- @ Override
59- public CompletionStage <?> onText (WebSocket webSocket , CharSequence data , boolean last ) {
60- System .out .println ("onText received " + data );
61- wsdmclient .onMessage (data .toString ());
62- return WebSocket .Listener .super .onText (webSocket , data , last );
63- }
55+ protected void cleanup () {
6456
65- @ Override
66- public void onError (WebSocket webSocket , Throwable error ) {
67- System .out .println ("Bad day! " + webSocket .toString ());
68- error .printStackTrace ();
69- WebSocket .Listener .super .onError (webSocket , error );
70- }
57+ if (session != null ) {
58+ session .close ();
59+ }
7160
72- @ Override
73- public CompletionStage <?> onClose ( WebSocket webSocket , int statusCode , String reason ) {
74- System . out . println ( "onClose received " + statusCode + " " + reason );
75- wsdmclient . onClose ( statusCode , reason );
76- return WebSocket . Listener . super . onClose ( webSocket , statusCode , reason ) ;
61+ try {
62+ LifeCycle . stop ( client );
63+ } catch ( RuntimeException ignored ) {
64+ }
65+ client = null ;
7766 }
7867
79- @ Override
80- public CompletionStage <?> onBinary (WebSocket webSocket , ByteBuffer message , boolean last ) {
81- System .out .println ("onBinary received " + message );
82- wsdmclient .onMessage (StandardCharsets .UTF_8 .decode (message ).toString ());
83- return WebSocket .Listener .super .onBinary (webSocket , message , last );
84- }
85- }
86-
87-
68+ @ OnWebSocketClose
8869 public void onClose (int statusCode , String reason ) {
89- this .client = null ;
70+ this .session = null ;
71+ cleanup ();
9072 }
9173
92- public void onConnect (WebSocket client ) {
93- this .client = client ;
74+ @ OnWebSocketConnect
75+ public void onConnect (Session session ) {
76+ this .session = session ;
77+
9478 // Don't block the WS thread
9579 new Thread (() -> {
9680 try {
97- client . sendText ( new ObjectMapper ().writeValueAsString (header ), true ).get (1 , TimeUnit .SECONDS );
81+ session . getRemote (). sendStringByFuture ( new ObjectMapper ().writeValueAsString (header )).get (1 , TimeUnit .SECONDS );
9882 } catch (JsonProcessingException | ExecutionException | InterruptedException |TimeoutException e ) {
9983 System .out .println ("Failed to send header: " + e .getMessage ());
10084 }
10185 }).start ();
10286 }
10387
104- public void onMessage (final String message ) {
88+ @ OnWebSocketFrame
89+ public void onFrame (final Frame frame ) {
90+ if ( frame .getType ().isData ()) {
91+ System .out .println ("Got frame: " + frame );
92+ byte [] data = new byte [frame .getPayloadLength ()];
93+ frame .getPayload ().get (data );
94+ onMessage (null , new String (data , StandardCharsets .UTF_8 ));
95+ }
96+ }
97+
98+ @ OnWebSocketMessage
99+ public void onMessage (final Session sess , final String message ) {
105100 System .out .println ("Got message: " + message );
106101 if (message .startsWith ("DeviceType;" )) {
107102 new Thread (() -> {
108103 try {
109- sendMessage ("Z:10:" + header .address + ";" );
110- if (!connected .isDone ()) {
111- connected .complete (true );
112- }
104+ session .getRemote ().sendBytesByFuture (ByteBuffer .wrap (("Z:10:" + header .address + ";" ).getBytes (StandardCharsets .UTF_8 ))).get (1 , TimeUnit .SECONDS );
105+ connected .complete (true );
113106 } catch (InterruptedException | ExecutionException | TimeoutException e ) {
114107 throw new RuntimeException (e );
115108 }
@@ -120,7 +113,7 @@ public void onMessage(final String message) {
120113 if (message .startsWith ("Battery;" )) {
121114 new Thread (() -> {
122115 try {
123- sendMessage ( battery + ";" );
116+ session . getRemote (). sendBytesByFuture ( ByteBuffer . wrap (( battery + ";" ). getBytes ( StandardCharsets . UTF_8 ))). get ( 1 , TimeUnit . SECONDS );
124117 connected .complete (true );
125118 } catch (InterruptedException | ExecutionException | TimeoutException e ) {
126119 throw new RuntimeException (e );
@@ -132,7 +125,12 @@ public void onMessage(final String message) {
132125 messages .add (message );
133126 }
134127
128+ @ OnWebSocketError
129+ public void onWebSocketError (final Throwable cause ) {
130+ System .out .println ("Got error: " + cause .getMessage ());
131+ }
132+
135133 protected void sendMessage (final String msg ) throws ExecutionException , InterruptedException , TimeoutException {
136- client . sendBinary ( ByteBuffer . wrap (msg . getBytes ( StandardCharsets . UTF_8 )), true ).get (1 , TimeUnit .SECONDS );
134+ session . getRemote (). sendStringByFuture (msg ).get (1 , TimeUnit .SECONDS );
137135 }
138136 }
0 commit comments