33import com .fasterxml .jackson .annotation .JsonProperty ;
44import com .fasterxml .jackson .core .JsonProcessingException ;
55import com .fasterxml .jackson .databind .ObjectMapper ;
6- import io .github .blackspherefollower .buttplug4j .client .ButtplugClient ;
7- import io .github .blackspherefollower .buttplug4j .client .IConnectedEvent ;
8- import io .github .blackspherefollower .buttplug4j .protocol .ButtplugConsts ;
9- import io .github .blackspherefollower .buttplug4j .protocol .ButtplugMessage ;
10- import io .github .blackspherefollower .buttplug4j .protocol .ButtplugProtocolException ;
11- import io .github .blackspherefollower .buttplug4j .protocol .messages .Error ;
12- import org .eclipse .jetty .util .component .LifeCycle ;
13- import org .eclipse .jetty .websocket .api .Session ;
14- import org .eclipse .jetty .websocket .api .annotations .*;
15- import org .eclipse .jetty .websocket .api .extensions .Frame ;
16- import org .eclipse .jetty .websocket .client .ClientUpgradeRequest ;
17- import org .eclipse .jetty .websocket .client .WebSocketClient ;
186
197import java .net .URI ;
8+ import java .net .http .HttpClient ;
9+ import java .net .http .WebSocket ;
2010import java .nio .ByteBuffer ;
11+ import java .nio .CharBuffer ;
2112import java .nio .charset .StandardCharsets ;
2213import java .util .concurrent .*;
2314
24- @ WebSocket (maxTextMessageSize = 64 * 1024 )
2515public class WSDMClient {
2616
27- private WebSocketClient client ;
28- private Session session ;
29- private WSDHeader header ;
17+ private WebSocket client ;
18+ private final WSDHeader header ;
3019 public ConcurrentLinkedQueue <String > messages = new ConcurrentLinkedQueue <>();
31- private CompletableFuture <Boolean > connected = new CompletableFuture <>();
20+ private final CompletableFuture <Boolean > connected = new CompletableFuture <>();
3221
3322 public int battery = 100 ;
3423
35- class WSDHeader {
24+ static class WSDHeader {
3625 @ JsonProperty ("identifier" )
3726 public String identifier ;
3827 @ JsonProperty ("address" )
@@ -45,64 +34,82 @@ public WSDMClient(final URI url, final String identifier, final String address)
4534 header = new WSDHeader ();
4635 header .identifier = identifier ;
4736 header .address = address ;
48- client = new WebSocketClient ();
4937
50- client .start ();
51- client .connect (this , url , new ClientUpgradeRequest ()).get (2 , TimeUnit .SECONDS );
52- connected .get (5 , TimeUnit .SECONDS );
38+ HttpClient
39+ .newHttpClient ()
40+ .newWebSocketBuilder ()
41+ .buildAsync (url , new WebSocketClient (this ))
42+ .join ();
43+ connected .get (10 , TimeUnit .SECONDS );
44+
5345 }
5446
55- protected void cleanup () {
47+ private static class WebSocketClient implements WebSocket .Listener {
48+ WSDMClient wsdmclient ;
49+ public WebSocketClient (WSDMClient wsdmclient ) {this .wsdmclient = wsdmclient ;}
5650
57- if (session != null ) {
58- session .close ();
59- }
51+ @ Override
52+ public void onOpen (WebSocket webSocket ) {
53+ System .out .println ("onOpen using subprotocol " + webSocket .getSubprotocol ());
54+ wsdmclient .onConnect (webSocket );
55+ WebSocket .Listener .super .onOpen (webSocket );
56+ }
6057
61- try {
62- LifeCycle . stop ( client );
63- } catch ( RuntimeException ignored ) {
64- }
65- client = null ;
58+ @ Override
59+ public CompletionStage <?> onText ( WebSocket webSocket , CharSequence data , boolean last ) {
60+ System . out . println ( "onText received " + data );
61+ wsdmclient . onMessage ( data . toString ());
62+ return WebSocket . Listener . super . onText ( webSocket , data , last ) ;
6663 }
6764
68- @ OnWebSocketClose
69- public void onClose (int statusCode , String reason ) {
70- this .session = null ;
71- cleanup ();
65+ @ Override
66+ public void onError (WebSocket webSocket , Throwable error ) {
67+ System .out .println ("Bad day! " + webSocket .toString ());
68+ error .printStackTrace ();
69+ WebSocket .Listener .super .onError (webSocket , error );
7270 }
7371
74- @ OnWebSocketConnect
75- public void onConnect (Session session ) {
76- this .session = session ;
72+ @ Override
73+ public CompletionStage <?> onClose (WebSocket webSocket , int statusCode , String reason ) {
74+ System .out .println ("onClose received " + statusCode + " " + reason );
75+ wsdmclient .onClose (statusCode , reason );
76+ return WebSocket .Listener .super .onClose (webSocket , statusCode , reason );
77+ }
78+
79+ @ Override
80+ public CompletionStage <?> onBinary (WebSocket webSocket , ByteBuffer message , boolean last ) {
81+ System .out .println ("onBinary received " + message );
82+ wsdmclient .onMessage (StandardCharsets .UTF_8 .decode (message ).toString ());
83+ return WebSocket .Listener .super .onBinary (webSocket , message , last );
84+ }
85+ }
86+
7787
88+ public void onClose (int statusCode , String reason ) {
89+ this .client = null ;
90+ }
91+
92+ public void onConnect (WebSocket client ) {
93+ this .client = client ;
7894 // Don't block the WS thread
7995 new Thread (() -> {
8096 try {
81- session . getRemote (). sendStringByFuture ( new ObjectMapper ().writeValueAsString (header )).get (1 , TimeUnit .SECONDS );
97+ client . sendText ( new ObjectMapper ().writeValueAsString (header ), true ).get (1 , TimeUnit .SECONDS );
8298 } catch (JsonProcessingException | ExecutionException | InterruptedException |TimeoutException e ) {
8399 System .out .println ("Failed to send header: " + e .getMessage ());
84100 }
85101 }).start ();
86102 }
87103
88- @ OnWebSocketFrame
89- public void onFrame (final Frame frame ) {
90- if ( frame .getType ().isData ()) {
91- System .out .println ("Got frame: " + frame );
92- byte [] data = new byte [frame .getPayloadLength ()];
93- frame .getPayload ().get (data );
94- onMessage (null , new String (data , StandardCharsets .UTF_8 ));
95- }
96- }
97-
98- @ OnWebSocketMessage
99- public void onMessage (final Session sess , final String message ) {
104+ public void onMessage (final String message ) {
100105 System .out .println ("Got message: " + message );
101106 if (message .startsWith ("DeviceType;" )) {
102107 new Thread (() -> {
103108 try {
104- session .getRemote ().sendBytesByFuture (ByteBuffer .wrap (("Z:10:" + header .address + ";" ).getBytes (StandardCharsets .UTF_8 ))).get (1 , TimeUnit .SECONDS );
105- connected .complete (true );
109+ sendMessage ("Z:10:" + header .address + ";" );
110+ if (!connected .isDone ()) {
111+ connected .complete (true );
112+ }
106113 } catch (InterruptedException | ExecutionException | TimeoutException e ) {
107114 throw new RuntimeException (e );
108115 }
@@ -113,7 +120,7 @@ public void onMessage(final Session sess, final String message) {
113120 if (message .startsWith ("Battery;" )) {
114121 new Thread (() -> {
115122 try {
116- session . getRemote (). sendBytesByFuture ( ByteBuffer . wrap (( battery + ";" ). getBytes ( StandardCharsets . UTF_8 ))). get ( 1 , TimeUnit . SECONDS );
123+ sendMessage ( battery + ";" );
117124 connected .complete (true );
118125 } catch (InterruptedException | ExecutionException | TimeoutException e ) {
119126 throw new RuntimeException (e );
@@ -125,12 +132,7 @@ public void onMessage(final Session sess, final String message) {
125132 messages .add (message );
126133 }
127134
128- @ OnWebSocketError
129- public void onWebSocketError (final Throwable cause ) {
130- System .out .println ("Got error: " + cause .getMessage ());
131- }
132-
133135 protected void sendMessage (final String msg ) throws ExecutionException , InterruptedException , TimeoutException {
134- session . getRemote (). sendStringByFuture (msg ).get (1 , TimeUnit .SECONDS );
136+ client . sendBinary ( ByteBuffer . wrap (msg . getBytes ( StandardCharsets . UTF_8 )), true ).get (1 , TimeUnit .SECONDS );
135137 }
136138 }
0 commit comments