2525use Utopia \Validator \Nullable ;
2626use Utopia \Validator \WhiteList ;
2727use Swoole \Process ;
28- use Swoole \Http \Server ;
2928use Utopia \MockServer \Utopia \Model \Player ;
3029use Utopia \MockServer \Utopia \Validator \Player as PlayerValidator ;
30+ use Swoole \WebSocket \Frame ;
31+ use Swoole \WebSocket \Server as WebSocketServer ;
3132
3233const APP_AUTH_TYPE_SESSION = 'Session ' ;
3334const APP_AUTH_TYPE_JWT = 'JWT ' ;
3940const APP_PLATFORM_CONSOLE = 'console ' ;
4041const APP_STORAGE_CACHE = '/storage/cache ' ;
4142
42- $ http = new Server (
43+ $ http = new WebSocketServer (
4344 host: '0.0.0.0 ' ,
4445 port: App::getEnv ('PORT ' , 80 ),
4546 mode: SWOOLE_PROCESS
@@ -862,7 +863,7 @@ function ($utopia, $error, $request, $response) {
862863 ['utopia ' , 'error ' , 'request ' , 'response ' ]
863864 );
864865
865- $ http ->on (Constant::EVENT_START , function (Server $ http ) use ($ payloadSize ) {
866+ $ http ->on (Constant::EVENT_START , function (WebSocketServer $ http ) use ($ payloadSize ) {
866867 Console::success ('Server started successfully (max payload is ' . number_format ($ payloadSize ) . ' bytes) ' );
867868 Console::info ("Master pid {$ http ->master_pid }, manager pid {$ http ->manager_pid }" );
868869
@@ -885,4 +886,63 @@ function () use ($http) {
885886 $ app ->run ($ request , $ response );
886887});
887888
889+ $ http ->on ('open ' , function (WebSocketServer $ http , SwooleRequest $ request ) {
890+ $ http ->push ($ request ->fd , \json_encode ([
891+ 'type ' => 'connected ' ,
892+ 'data ' => [
893+ 'user ' => null ,
894+ 'subscriptions ' => (object ) [],
895+ ],
896+ ]));
897+ });
898+
899+ $ http ->on ('message ' , function (WebSocketServer $ http , Frame $ frame ) {
900+ $ message = \json_decode ($ frame ->data , true );
901+ if (!\is_array ($ message )) {
902+ return ;
903+ }
904+
905+ if (($ message ['type ' ] ?? '' ) === 'ping ' ) {
906+ $ http ->push ($ frame ->fd , \json_encode (['type ' => 'pong ' ]));
907+ return ;
908+ }
909+
910+ if (($ message ['type ' ] ?? '' ) !== 'subscribe ' ) {
911+ return ;
912+ }
913+
914+ foreach (($ message ['data ' ] ?? []) as $ subscription ) {
915+ $ subscriptionId = $ subscription ['subscriptionId ' ] ?? '' ;
916+ if (empty ($ subscriptionId )) {
917+ continue ;
918+ }
919+
920+ $ queries = $ subscription ['queries ' ] ?? [];
921+ $ shouldSendEvent = true ;
922+ foreach ($ queries as $ query ) {
923+ if (\is_string ($ query ) && \str_contains ($ query , '"failed" ' )) {
924+ $ shouldSendEvent = false ;
925+ break ;
926+ }
927+ }
928+
929+ if (!$ shouldSendEvent ) {
930+ continue ;
931+ }
932+
933+ $ http ->push ($ frame ->fd , \json_encode ([
934+ 'type ' => 'event ' ,
935+ 'data ' => [
936+ 'events ' => ['tests.* ' ],
937+ 'channels ' => ['tests ' ],
938+ 'subscriptions ' => [$ subscriptionId ],
939+ 'timestamp ' => \date (DATE_ATOM ),
940+ 'payload ' => [
941+ 'response ' => 'WS:/v1/realtime:passed ' ,
942+ ],
943+ ],
944+ ]));
945+ }
946+ });
947+
888948$ http ->start ();
0 commit comments