diff --git a/docs/api.rst b/docs/api.rst index 3ff2b7e75cc..32464b64d78 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -18,6 +18,14 @@ API Wrappers ^^^^^^^^^^^^^ - pyzm is a python wrapper for the ZoneMinder APIs. It supports both the legacy and new token based API, as well as ZM logs/ZM shared memory support. See `its project site `__ for more details. Documentation is `here `__. +Additional API documentation +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. toctree:: + :maxdepth: 1 + + api_monitor_websocket + API evolution ^^^^^^^^^^^^^^^ @@ -713,4 +721,3 @@ There are several details that haven't yet been documented. Till they are, here * If you still can't find an answer, post your question in the `forums `__ (not the github repo). - diff --git a/docs/api_monitor_websocket.rst b/docs/api_monitor_websocket.rst new file mode 100644 index 00000000000..26f27aabddf --- /dev/null +++ b/docs/api_monitor_websocket.rst @@ -0,0 +1,298 @@ +Monitor Websocket API +===================== + +ZoneMinder can expose live monitor data directly from ``zmc`` over a websocket +connection. + +Overview +^^^^^^^^ + +Each monitor listens on: + +:: + + MIN_WEBSOCKET_PORT + MonitorId + +For example, if ``MIN_WEBSOCKET_PORT`` is ``31000`` and the monitor id is +``5``, the websocket endpoint is: + +:: + + ws://your-server:31005/ + +This requires ``Options -> Network -> MIN_WEBSOCKET_PORT`` to be configured and +the web server or reverse proxy to allow those ports. + +.. warning:: + + The monitor websocket endpoint can expose live camera data to any client + that can reach the monitor's websocket port. Native TLS is not provided by + ``zmc`` itself, so do not expose these ports directly to untrusted + networks. Restrict access with firewall rules and/or place the endpoint + behind a reverse proxy that terminates TLS. + +Connection model +^^^^^^^^^^^^^^^^ + +The websocket server is created by ``zmc`` and runs independently per monitor. + +Clients may: + +* request one response +* subscribe to repeated updates +* unsubscribe later + +Text frames carry JSON control and metadata messages. Binary frames carry the +requested image or stream payload bytes. + +Authentication +^^^^^^^^^^^^^^ + +If ``OPT_USE_AUTH`` is disabled, websocket clients may connect without +credentials. + +If ``OPT_USE_AUTH`` is enabled, the websocket handshake is authenticated before +the connection is upgraded. The authenticated user must have live stream view +permission and monitor access for the target monitor. + +Supported authentication inputs mirror the existing ZoneMinder streaming paths: + +* ``?token=`` or ``?jwt_token=`` in the websocket URL +* ``Authorization: Bearer `` in the HTTP upgrade request +* ``?auth=&username=`` when auth-hash relay is in use +* ``?username=&password=`` when direct credentials are allowed +* ``?username=`` when ``AUTH_RELAY`` is ``none`` + +Examples: + +:: + + ws://your-server:31005/?token= + +or: + +:: + + ws://your-server:31005/?auth=&username=alice + +Commands +^^^^^^^^ + +One-shot status request: + +:: + + {"command":"status","request_id":"optional-id"} + +One-shot image request: + +:: + + {"command":"image","format":"jpeg","request_id":"optional-id"} + +Supported image formats are: + +* ``jpeg`` +* ``rgba`` +* ``yuv420p`` + +One-shot stream packet request: + +:: + + {"command":"stream","codec":"mjpeg","request_id":"optional-id"} + +Status subscription: + +:: + + {"command":"subscribe","topic":"status","interval_ms":1000} + +Event subscription: + +:: + + {"command":"subscribe","topic":"events"} + +Stream subscription: + +:: + + {"command":"subscribe","topic":"stream","codec":"mjpeg","interval_ms":1000} + +Unsubscribe: + +:: + + {"command":"unsubscribe","topic":"status"} + +or: + +:: + + {"command":"unsubscribe","topic":"events"} + +or: + +:: + + {"command":"unsubscribe","topic":"stream"} + +Status messages +^^^^^^^^^^^^^^^ + +Status replies are JSON text frames with fields such as: + +* ``monitor_id`` +* ``monitor_name`` +* ``connected`` +* ``shm_valid`` +* ``state`` / ``state_id`` +* ``capture_fps`` +* ``analysis_fps`` +* ``capture_bandwidth`` +* ``image_count`` +* ``signal`` +* ``last_event_id`` + +Event messages +^^^^^^^^^^^^^^ + +Event subscriptions receive JSON text frames with: + +* ``type = "event"`` +* ``monitor_id`` +* ``event`` +* ``message`` + +These are queue-based notifications generated from capture-side failures and +recovery transitions so the capture loop does not block on websocket clients. + +Payload metadata and binary payloads +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Every image or stream payload is sent as two websocket frames: + +1. A JSON text metadata frame +2. A binary frame containing the payload bytes + +The metadata frame includes: + +* ``type = "image"`` or ``"stream"`` +* ``request_id`` +* ``format`` +* ``content_type`` +* ``monitor_id`` +* ``width`` +* ``height`` +* ``line_size`` +* ``colours`` +* ``subpixel_order`` +* ``image_count`` +* ``sequence`` +* ``keyframe`` +* ``payload_bytes`` + +Image behavior +^^^^^^^^^^^^^^ + +Image requests use the latest decoded video frame available in the monitor +packet queue. + +``jpeg`` returns a compressed still image. + +``rgba`` returns an aligned raw RGBA buffer. Because the line size may include +padding, clients must use the reported ``line_size`` value rather than assuming +``width * 4`` bytes per row. + +``yuv420p`` returns a tightly packed planar I420 buffer. The reported +``line_size`` is the luma (Y) stride. The buffer layout is fully described by +the reported ``width``, ``height``, and ``line_size``: + +* the Y plane is ``height`` rows of ``line_size`` bytes +* the U plane follows, ``(height + 1) / 2`` rows of ``(line_size + 1) / 2`` bytes +* the V plane follows, ``(height + 1) / 2`` rows of ``(line_size + 1) / 2`` bytes + +There is no padding between planes or rows, so clients should not assume any +additional alignment. + +Stream behavior +^^^^^^^^^^^^^^^ + +Stream requests and subscriptions use explicit codec names instead of treating +encoded video packets as images. + +``mjpeg`` returns a stream of JPEG frames. For subscription mode, +``interval_ms`` controls how often the server checks for and sends a newer +frame. + +Passthrough codec streams currently support: + +* ``h264`` +* ``h265`` +* ``av1`` + +Passthrough stream payloads use the monitor packet queue and are only available +when the monitor is already producing that codec. + +One-shot passthrough stream requests return a decodable packet snapshot: + +* the payload starts at the latest available queued keyframe for that codec +* codec extradata is prepended before the keyframe packet bytes + +Passthrough subscriptions stream queued packets in order starting from the +latest available keyframe in the queue. This gives new subscribers a decodable +start point and avoids dropping interdependent packets. + +For passthrough codec subscriptions: + +* packets are pushed in queue order +* ``interval_ms`` is ignored +* ``sequence`` tracks the packet queue order +* ``keyframe`` indicates whether the payload begins a new decodable segment + +Implementation notes +^^^^^^^^^^^^^^^^^^^^ + +This transport currently uses a small in-tree websocket implementation rather +than adding a new dependency such as ``websocketpp`` to ``zmc``. + +Tradeoffs of the in-tree implementation versus a mature library such as +``websocketpp``: + +* **Smaller dependency surface.** ``zmc`` is the capture daemon and runs on a + wide range of platforms and distributions. A header-only or linked websocket + library adds packaging and build-matrix work across every supported target. +* **Direct packet queue integration.** The server thread reads frames and + encoded packets straight from the monitor packet queue without an extra + abstraction layer, which keeps the capture thread non-blocking. +* **Limited scope.** The implementation only needs RFC 6455 server framing for + one upgrade path. It does not implement permessage-deflate, extensions, + client mode, or subprotocol negotiation. A general-purpose library provides + these but they are not required here. +* **Maintenance cost.** The cost of the in-tree approach is that protocol + correctness (framing, control frames, close handshake, masking) must be + maintained and tested in this tree rather than relying on an upstream + project. The unit tests in ``tests/zm_websocket.cpp`` cover the framing and + handshake paths for this reason. + +TLS is intentionally left to the deployment boundary instead of being +implemented inside this in-tree websocket server. This mirrors how the existing +``zms`` streaming paths are deployed: TLS is terminated by a reverse proxy or +load balancer. In practice, production deployments should terminate TLS and may +additionally enforce authentication in a reverse proxy, load balancer, or +similar front-end before exposing this transport to clients. Authentication is +still enforced inside ``zmc`` (see `Authentication`_) whenever ``OPT_USE_AUTH`` +is enabled, independently of any proxy. + +Errors +^^^^^^ + +Protocol errors are returned as JSON text frames: + +:: + + {"type":"error","message":"..."} + +Unsupported image formats, unsupported stream codecs, unavailable monitor data, +or malformed commands return an error frame instead of a binary payload. diff --git a/docs/userguide/options/options_network.rst b/docs/userguide/options/options_network.rst index 6f0aaf93cfc..5809345f0c7 100644 --- a/docs/userguide/options/options_network.rst +++ b/docs/userguide/options/options_network.rst @@ -9,8 +9,10 @@ HTTP_UA - When ZoneMinder communicates with remote cameras it will identify itse HTTP_TIMEOUT - When retrieving remote images ZoneMinder will wait for this length of time before deciding that an image is not going to arrive and taking steps to retry. This timeout is in milliseconds (1000 per second) and will apply to each part of an image if it is not sent in one whole chunk. -MIN_STREAMING_PORT - ZoneMinder supports a concept called multi-port streaming. The core concept is that modern browsers like Chrome limit the number of simultaneous connections allowed from a specific domain (host name+port). In the case of Chrome this value is 6, which means you can't see more than 6 simultaneous streams from your server at one time. However, if the streams originated from different ports (or sub domains), this limitation would not apply. When you enable this option with a value (in this case, ``30000``), the streams from the monitors will originate from ``30000`` plus the monitor ID, effectively overcoming this limitation. **Note that this also needs additional setup your webserver configuration before this can start to work**. Please refer to `this article `__ on how to setup multi port streaming on Apache. +MIN_STREAMING_PORT - ZoneMinder supports a concept called multi-port streaming. The core concept is that modern browsers like Chrome limit the number of simultaneous connections allowed from a specific domain (host name+port). In the case of Chrome this value is 6, which means you can't see more than 6 simultaneous streams from your server at one time. However, if the streams originated from different ports (or sub domains), this limitation would not apply. When you enable this option with a value (in this case, ``30000``), the streams from the monitors will originate from ``30000`` plus the monitor ID, effectively overcoming this limitation. **Note that this also needs additional setup in your webserver configuration before this can start to work**. Please refer to `this article `__ on how to set up multi-port streaming on Apache. + +MIN_WEBSOCKET_PORT - ZoneMinder can also expose a per-monitor websocket transport directly from ``zmc``. This setting specifies the base port for that listener range. Each monitor websocket listens on ``MIN_WEBSOCKET_PORT + MonitorId``. Use a dedicated port range rather than ``MIN_STREAMING_PORT`` so websocket transport does not conflict with multi-port ZMS streaming or web server listeners. The websocket daemon does not terminate TLS or enforce authentication by itself, so if these ports are reachable beyond a trusted network they should be protected by firewall rules and ideally placed behind a reverse proxy that handles TLS and authentication. MIN_RTP_PORT - When ZoneMinder communicates with MPEG4 capable cameras using RTP with the unicast method it must open ports for the camera to connect back to for control and streaming purposes. This setting specifies the minimum port number that ZoneMinder will use. Ordinarily two adjacent ports are used for each camera, one for control packets and one for data packets. This port should be set to an even number, you may also need to open up a hole in your firewall to allow cameras to connect back if you wish to use unicasting. -MAX_RTP_PORT - When ZoneMinder communicates with MPEG4 capable cameras using RTP with the unicast method it must open ports for the camera to connect back to for control and streaming purposes. This setting specifies the maximum port number that ZoneMinder will use. Ordinarily two adjacent ports are used for each camera, one for control packets and one for data packets. This port should be set to an even number, you may also need to open up a hole in your firewall to allow cameras to connect back if you wish to use unicasting. You should also ensure that you have opened up at least two ports for each monitor that will be connecting to unicasting network cameras. \ No newline at end of file +MAX_RTP_PORT - When ZoneMinder communicates with MPEG4 capable cameras using RTP with the unicast method it must open ports for the camera to connect back to for control and streaming purposes. This setting specifies the maximum port number that ZoneMinder will use. Ordinarily two adjacent ports are used for each camera, one for control packets and one for data packets. This port should be set to an even number, you may also need to open up a hole in your firewall to allow cameras to connect back if you wish to use unicasting. You should also ensure that you have opened up at least two ports for each monitor that will be connecting to unicasting network cameras. diff --git a/scripts/ZoneMinder/lib/ZoneMinder/ConfigData.pm.in b/scripts/ZoneMinder/lib/ZoneMinder/ConfigData.pm.in index 492601db9d2..7ef72b13bc4 100644 --- a/scripts/ZoneMinder/lib/ZoneMinder/ConfigData.pm.in +++ b/scripts/ZoneMinder/lib/ZoneMinder/ConfigData.pm.in @@ -1046,6 +1046,22 @@ our @options = ( type => $types{integer}, category => 'network', }, + { + name => 'ZM_MIN_WEBSOCKET_PORT', + default => '', + description => 'Alternate port range to contact for monitor websocket transport.', + help => q` + This setting specifies the beginning of a dedicated websocket port + range for monitor websocket transport from zmc. Each monitor will + listen on this value plus the Monitor Id. For example, a value of + 31000 causes monitor 1 to listen on port 31001. Use a dedicated + range rather than ZM_MIN_STREAMING_PORT so websocket transport does + not conflict with multi-port ZMS streaming. If you expose these + ports outside a trusted network, terminate TLS and enforce + authentication in a reverse proxy or firewall policy.`, + type => $types{integer}, + category => 'network', + }, { name => 'ZM_MIN_RTP_PORT', default => '40200', diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f144b3f7789..45b6aa7fdfc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -79,6 +79,7 @@ set(ZM_BIN_SRC_FILES zm_uri.cpp zm_user.cpp zm_utils.cpp + zm_websocket.cpp zm_videostore.cpp zm_zone.cpp zm_storage.cpp) diff --git a/src/zm_monitor.cpp b/src/zm_monitor.cpp index 89cc72b645e..c3a6ab3cc40 100644 --- a/src/zm_monitor.cpp +++ b/src/zm_monitor.cpp @@ -34,6 +34,7 @@ #include "zm_time.h" #include "zm_uri.h" #include "zm_utils.h" +#include "zm_websocket.h" #include "zm_zone.h" #if ZM_HAS_V4L2 @@ -51,9 +52,9 @@ #include #include #include -#include -#include #include +#include +#include #include #if ZM_MEM_MAPPED @@ -64,6 +65,195 @@ #include #endif // ZM_MEM_MAPPED +namespace { + +bool encodeJpegImage(const Image &image, std::string *output) { + static thread_local std::vector buffer(ZM_MAX_IMAGE_SIZE); + size_t encoded_size = 0; + if (!image.EncodeJpeg(buffer.data(), &encoded_size)) { + return false; + } + output->assign(reinterpret_cast(buffer.data()), encoded_size); + return true; +} + +bool encodeRgbaImage(const Image &image, std::string *output, uint32_t *line_size) { + if (!output || !line_size || !image.Buffer() || !image.Width() || !image.Height()) { + return false; + } + + *line_size = FFALIGN(av_image_get_linesize(AV_PIX_FMT_RGBA, image.Width(), 0), 32); + output->assign(static_cast(*line_size) * image.Height(), '\0'); + + const uint8_t *src = image.Buffer(); + for (unsigned int y = 0; y < image.Height(); ++y) { + const uint8_t *src_row = src + (static_cast(y) * image.LineSize()); + uint8_t *dst_row = reinterpret_cast(&(*output)[static_cast(y) * (*line_size)]); + + for (unsigned int x = 0; x < image.Width(); ++x) { + uint8_t *dst_pixel = dst_row + (x * 4); + uint8_t red = 0; + uint8_t green = 0; + uint8_t blue = 0; + + switch (image.Colours()) { + case ZM_COLOUR_GRAY8: { + const uint8_t gray = src_row[x]; + red = gray; + green = gray; + blue = gray; + break; + } + case ZM_COLOUR_RGB24: { + const uint8_t *src_pixel = src_row + (x * 3); + if (image.SubpixelOrder() == ZM_SUBPIX_ORDER_BGR) { + blue = src_pixel[0]; + green = src_pixel[1]; + red = src_pixel[2]; + } else { + red = src_pixel[0]; + green = src_pixel[1]; + blue = src_pixel[2]; + } + break; + } + case ZM_COLOUR_RGB32: { + const uint8_t *src_pixel = src_row + (x * 4); + switch (image.SubpixelOrder()) { + case ZM_SUBPIX_ORDER_BGRA: + blue = src_pixel[0]; + green = src_pixel[1]; + red = src_pixel[2]; + break; + case ZM_SUBPIX_ORDER_ARGB: + red = src_pixel[1]; + green = src_pixel[2]; + blue = src_pixel[3]; + break; + case ZM_SUBPIX_ORDER_ABGR: + red = src_pixel[3]; + green = src_pixel[2]; + blue = src_pixel[1]; + break; + case ZM_SUBPIX_ORDER_RGBA: + default: + red = src_pixel[0]; + green = src_pixel[1]; + blue = src_pixel[2]; + break; + } + break; + } + default: + Error("Unsupported image colours %u for websocket rgba payload", image.Colours()); + return false; + } + + dst_pixel[0] = red; + dst_pixel[1] = green; + dst_pixel[2] = blue; + dst_pixel[3] = 0xff; + } + } + + return true; +} + +bool encodeYuv420pImage(const Image &image, std::string *output, uint32_t *line_size) { + if (!output || !line_size || !image.Buffer() || !image.Width() || !image.Height()) { + return false; + } + + const unsigned int width = image.Width(); + const unsigned int height = image.Height(); + + // ZoneMinder images are packed single-plane buffers, so reference plane 0 + // directly using the image's real (possibly padded) line size. + uint8_t *src_data[4] = {const_cast(image.Buffer()), nullptr, nullptr, nullptr}; + int src_linesize[4] = {static_cast(image.LineSize()), 0, 0, 0}; + + AVFrame *dst_frame = av_frame_alloc(); + if (!dst_frame) { + return false; + } + dst_frame->format = AV_PIX_FMT_YUV420P; + dst_frame->width = width; + dst_frame->height = height; + if (av_frame_get_buffer(dst_frame, 32) < 0) { + av_frame_free(&dst_frame); + Error("Failed to allocate yuv420p frame for websocket payload"); + return false; + } + + SwsContext *sws_ctx = sws_getContext( + width, height, image.AVPixFormat(), + width, height, AV_PIX_FMT_YUV420P, + SWS_FAST_BILINEAR, nullptr, nullptr, nullptr); + if (!sws_ctx) { + av_frame_free(&dst_frame); + Error("Failed to get swscale context for websocket yuv420p payload"); + return false; + } + + const int scaled = sws_scale(sws_ctx, src_data, src_linesize, 0, height, dst_frame->data, dst_frame->linesize); + sws_freeContext(sws_ctx); + if (scaled <= 0) { + av_frame_free(&dst_frame); + Error("swscale failed for websocket yuv420p payload"); + return false; + } + + // Copy out as a tightly packed I420 buffer (alignment 1) so the layout is + // fully described by the reported width/height: the Y plane is height rows of + // line_size bytes, followed by U then V planes of (height + 1) / 2 rows of + // (line_size + 1) / 2 bytes each. + const int packed_size = av_image_get_buffer_size(AV_PIX_FMT_YUV420P, width, height, 1); + if (packed_size <= 0) { + av_frame_free(&dst_frame); + return false; + } + output->assign(static_cast(packed_size), '\0'); + const int copied = av_image_copy_to_buffer( + reinterpret_cast(&(*output)[0]), packed_size, + dst_frame->data, dst_frame->linesize, + AV_PIX_FMT_YUV420P, width, height, 1); + av_frame_free(&dst_frame); + if (copied < 0) { + output->clear(); + return false; + } + + *line_size = width; + return true; +} + +bool requestedWebSocketVideoCodec(const std::string &codec, AVCodecID *codec_id, std::string *content_type = nullptr) { + if (codec == "h264") { + *codec_id = AV_CODEC_ID_H264; + if (content_type) { + *content_type = "video/h264"; + } + return true; + } + if (codec == "h265") { + *codec_id = AV_CODEC_ID_HEVC; + if (content_type) { + *content_type = "video/h265"; + } + return true; + } + if (codec == "av1") { + *codec_id = AV_CODEC_ID_AV1; + if (content_type) { + *content_type = "video/av1"; + } + return true; + } + return false; +} + +} // namespace + // SOLARIS - we don't have MAP_LOCKED on openSolaris/illumos #ifndef MAP_LOCKED #define MAP_LOCKED 0 @@ -327,6 +517,8 @@ Monitor::Monitor() : Janus_Manager(nullptr), Amcrest_Manager(nullptr), onvif(nullptr), + websocket_server(nullptr), + websocket_capture_bandwidth(0), red_val(0), green_val(0), blue_val(0), @@ -1231,6 +1423,7 @@ bool Monitor::connect() { last_analysis_fps_time = std::chrono::system_clock::now(); last_capture_image_count = 0; + RefreshWebSocketStatus(); Debug(3, "Success connecting"); return true; } // Monitor::connect @@ -1287,15 +1480,338 @@ bool Monitor::disconnect() { image_buffer[i] = nullptr; } + RefreshWebSocketStatus(); return true; } // end bool Monitor::disconnect() +bool Monitor::StartWebSocketServer() { + if (!config.min_websocket_port) { + return false; + } + + if (websocket_server) { + return true; + } + + const unsigned int websocket_port = zm::websocket::MonitorWebSocketPort(config.min_websocket_port, id); + if (!websocket_port) { + Warning("Unable to compute websocket port for monitor %u using base port %d", id, config.min_websocket_port); + return false; + } + + websocket_server = zm::make_unique(this); + if (!websocket_server->Start(static_cast(websocket_port))) { + websocket_server.reset(); + return false; + } + + return true; +} + +void Monitor::StopWebSocketServer() { + if (!websocket_server) { + return; + } + + websocket_server->Stop(); + websocket_server.reset(); +} + +void Monitor::RefreshWebSocketStatus() { + const bool connected = isConnected(); + const bool shm_valid = shared_data && shared_data->valid; + const double capture_fps = shared_data ? shared_data->capture_fps : 0.0; + const double analysis_fps = shared_data ? shared_data->analysis_fps : 0.0; + const uint32_t image_count = shared_data ? shared_data->image_count : 0; + const uint32_t last_frame_score = shared_data ? shared_data->last_frame_score : 0; + const uint64_t last_event_id = shared_data ? shared_data->last_event_id : 0; + const uint32_t analysing_state = shared_data ? shared_data->analysing : static_cast(ANALYSING_NONE); + const uint32_t signal_state = shared_data ? shared_data->signal : 0; + const uint32_t capture_state = shared_data ? shared_data->capturing : static_cast(capturing); + const uint32_t recording_state = shared_data ? shared_data->recording : static_cast(recording); + const uint32_t current_state = shared_data ? shared_data->state : static_cast(UNKNOWN); + const uint32_t capture_bandwidth = websocket_capture_bandwidth.load(); + const time_t heartbeat_time = shared_data ? shared_data->heartbeat_time : 0; + const time_t startup_time = shared_data ? shared_data->startup_time : 0; + + const char *state_name = "Unknown"; + if (current_state < (sizeof(State_Strings) / sizeof(State_Strings[0]))) { + state_name = State_Strings[current_state].c_str(); + } + + std::lock_guard status_lock(websocket_status_mutex); + websocket_status_json = stringtf( + "{\"type\":\"status\",\"monitor_id\":%u,\"monitor_name\":\"%s\"," + "\"connected\":%s,\"shm_valid\":%s,\"state_id\":%u,\"state\":\"%s\"," + "\"capture_fps\":%.2f,\"analysis_fps\":%.2f,\"capture_bandwidth\":%u," + "\"image_count\":%u,\"analysing\":%u,\"capturing\":%u,\"recording\":%u," + "\"signal\":%u,\"last_frame_score\":%u,\"last_event_id\":%" PRIu64 "," + "\"heartbeat_time\":%" PRIu64 ",\"startup_time\":%" PRIu64 "}", + id, + escape_json_string(name).c_str(), + connected ? "true" : "false", + shm_valid ? "true" : "false", + current_state, + state_name, + capture_fps, + analysis_fps, + capture_bandwidth, + image_count, + analysing_state, + capture_state, + recording_state, + signal_state, + last_frame_score, + last_event_id, + static_cast(heartbeat_time), + static_cast(startup_time)); +} + +std::string Monitor::GetWebSocketStatusJson() const { + std::lock_guard status_lock(websocket_status_mutex); + return websocket_status_json.empty() ? stringtf("{\"type\":\"status\",\"monitor_id\":%u}", id) : websocket_status_json; +} + +void Monitor::QueueWebSocketEvent(const std::string &event_type, const std::string &message) { + std::lock_guard message_lock(websocket_message_mutex); + if (websocket_messages.size() >= 64) { + websocket_messages.erase(websocket_messages.begin()); + } + websocket_messages.push_back(stringtf( + "{\"type\":\"event\",\"monitor_id\":%u,\"event\":\"%s\",\"message\":\"%s\"}", + id, + escape_json_string(event_type).c_str(), + escape_json_string(message).c_str())); +} + +std::vector Monitor::DrainWebSocketMessages() { + std::lock_guard message_lock(websocket_message_mutex); + std::vector drained; + drained.swap(websocket_messages); + return drained; +} + +bool Monitor::GetWebSocketImagePayload(const std::string &format, WebSocketPayload *payload) { + if (!payload) { + return false; + } + + if ((format != "jpeg") && (format != "rgba") && (format != "yuv420p")) { + return false; + } + + packetqueue_iterator *it = packetqueue.get_video_it(false); + if (!it) { + return false; + } + + packetqueue_iterator *scan_it = packetqueue.get_video_it(false); + if (!scan_it) { + packetqueue.free_it(it); + return false; + } + + bool have_image = false; + while (true) { + if (*scan_it == packetqueue.end()) { + break; + } + + ZMPacketLock packet_lock = packetqueue.get_packet_no_wait(scan_it); + if (!packet_lock.packet_) { + if (!packetqueue.increment_it(scan_it, video_stream_id)) { + break; + } + continue; + } + + if (packet_lock.packet_->packet && + packet_lock.packet_->packet->stream_index == video_stream_id && + packet_lock.packet_->image) { + *it = *scan_it; + have_image = true; + } + + if (!packetqueue.increment_it(scan_it, video_stream_id)) { + break; + } + } + + packetqueue.free_it(scan_it); + + if (!have_image) { + packetqueue.free_it(it); + return false; + } + + ZMPacketLock packet_lock = packetqueue.get_packet_no_wait(it); + packetqueue.free_it(it); + if (!packet_lock.packet_ || !packet_lock.packet_->image) { + return false; + } + + Image *image = packet_lock.packet_->image; + payload->type = "image"; + payload->format = format; + payload->width = image->Width(); + payload->height = image->Height(); + payload->image_count = shared_data ? shared_data->image_count : 0; + payload->sequence = packet_lock.packet_->queue_index; + payload->keyframe = false; + + if (format == "jpeg") { + payload->content_type = "image/jpeg"; + payload->line_size = 0; + payload->colours = image->Colours(); + payload->subpixel_order = image->SubpixelOrder(); + Image image_copy; + image_copy.Assign(*image); + return encodeJpegImage(image_copy, &payload->payload); + } + + if (format == "rgba") { + payload->content_type = "application/octet-stream"; + payload->colours = ZM_COLOUR_RGB32; + payload->subpixel_order = ZM_SUBPIX_ORDER_RGBA; + return encodeRgbaImage(*image, &payload->payload, &payload->line_size); + } + + if (format == "yuv420p") { + payload->content_type = "application/octet-stream"; + payload->colours = 0; + payload->subpixel_order = 0; + return encodeYuv420pImage(*image, &payload->payload, &payload->line_size); + } + + return false; +} + +packetqueue_iterator *Monitor::CreateWebSocketVideoIterator(const std::string &codec) { + if (!camera || !camera->getVideoStream()) { + return nullptr; + } + + AVCodecID requested_codec = AV_CODEC_ID_NONE; + if (!requestedWebSocketVideoCodec(codec, &requested_codec)) { + return nullptr; + } + + if (camera->getVideoStream()->codecpar->codec_id != requested_codec) { + return nullptr; + } + + packetqueue_iterator *it = packetqueue.get_video_it(false); + if (!it) { + return nullptr; + } + + packetqueue_iterator *scan_it = packetqueue.get_video_it(false); + if (!scan_it) { + packetqueue.free_it(it); + return nullptr; + } + + bool have_keyframe = false; + + while (true) { + if (*scan_it == packetqueue.end()) { + break; + } + + ZMPacketLock packet_lock = packetqueue.get_packet_no_wait(scan_it); + if (!packet_lock.packet_) { + if (!packetqueue.increment_it(scan_it, video_stream_id)) { + break; + } + continue; + } + + if (packet_lock.packet_->packet && + packet_lock.packet_->packet->stream_index == video_stream_id && + packet_lock.packet_->keyframe) { + *it = *scan_it; + have_keyframe = true; + } + + if (!packetqueue.increment_it(scan_it, video_stream_id)) { + break; + } + } + + packetqueue.free_it(scan_it); + + if (!have_keyframe) { + packetqueue.free_it(it); + return nullptr; + } + + return it; +} + +bool Monitor::GetNextWebSocketVideoPayload(packetqueue_iterator *it, const std::string &codec, WebSocketPayload *payload) { + if (!it || !payload || !camera || !camera->getVideoStream()) { + return false; + } + + AVCodecID requested_codec = AV_CODEC_ID_NONE; + std::string content_type; + if (!requestedWebSocketVideoCodec(codec, &requested_codec, &content_type)) { + return false; + } + + ZMPacketLock packet_lock = packetqueue.get_packet_no_wait(it); + if (!packet_lock.packet_ || !packet_lock.packet_->packet || !packet_lock.packet_->stream) { + return false; + } + + const AVCodecID codec_id = packet_lock.packet_->stream->codecpar->codec_id; + if (codec_id != requested_codec) { + packetqueue.increment_it(it, video_stream_id); + return false; + } + + payload->type = "stream"; + payload->format = codec; + payload->content_type = content_type; + payload->width = packet_lock.packet_->stream->codecpar->width ? packet_lock.packet_->stream->codecpar->width : Width(); + payload->height = packet_lock.packet_->stream->codecpar->height ? packet_lock.packet_->stream->codecpar->height : Height(); + payload->line_size = 0; + payload->colours = 0; + payload->subpixel_order = 0; + payload->image_count = shared_data ? shared_data->image_count : 0; + payload->sequence = packet_lock.packet_->queue_index; + payload->keyframe = packet_lock.packet_->keyframe; + payload->payload.clear(); + + if (packet_lock.packet_->keyframe) { + AVStream *stream = camera->getVideoStream(); + if (stream->codecpar->extradata && stream->codecpar->extradata_size > 0) { + payload->payload.append( + reinterpret_cast(stream->codecpar->extradata), + stream->codecpar->extradata_size); + } + } + + payload->payload.append( + reinterpret_cast(packet_lock.packet_->packet->data), + packet_lock.packet_->packet->size); + packetqueue.increment_it(it, video_stream_id); + return !payload->payload.empty(); +} + +void Monitor::FreeWebSocketIterator(packetqueue_iterator *it) { + if (it) { + packetqueue.free_it(it); + } +} + Monitor::~Monitor() { #if MOSQUITTOPP_FOUND if (mqtt) { mqtt->send("offline"); } #endif + StopWebSocketServer(); Close(); if (mem_ptr != nullptr) { @@ -1384,6 +1900,9 @@ int Monitor::GetImage(int32_t index, int scale) { } std::shared_ptr Monitor::getSnapshot(int index) const { + if (!shared_data || !shared_timestamps) { + return nullptr; + } if ((index < 0) || (index >= image_buffer_count)) { index = shared_data->last_write_index; } @@ -1884,9 +2403,11 @@ void Monitor::UpdateFPS() { shared_data->capture_fps = new_capture_fps; last_capture_image_count = shared_data->image_count; shared_data->analysis_fps = new_analysis_fps; + websocket_capture_bandwidth = new_capture_bandwidth; last_motion_frame_count = motion_frame_count; last_camera_bytes = new_camera_bytes; last_fps_time = now; + RefreshWebSocketStatus(); FPSeconds db_elapsed = now - last_status_time; if (db_elapsed > Seconds(10)) { @@ -3925,4 +4446,3 @@ StringVector Monitor::GroupNames() { } return groupnames; } // end Monitor::GroupNames() - diff --git a/src/zm_monitor.h b/src/zm_monitor.h index 4778d25e938..b37345aa328 100644 --- a/src/zm_monitor.h +++ b/src/zm_monitor.h @@ -46,6 +46,9 @@ class Group; class MonitorLinkExpression; +namespace zm { +class MonitorWebSocketServer; +} #define SIGNAL_CAUSE "Signal" #define MOTION_CAUSE "Motion" @@ -62,6 +65,21 @@ class Monitor : public std::enable_shared_from_this { friend class ONVIF; public: + struct WebSocketPayload { + std::string type; + std::string format; + std::string content_type; + std::string payload; + uint32_t width = 0; + uint32_t height = 0; + uint32_t line_size = 0; + uint32_t colours = 0; + uint32_t subpixel_order = 0; + uint32_t image_count = 0; + uint64_t sequence = 0; + bool keyframe = false; + }; + typedef enum { QUERY=0, CAPTURE, @@ -713,6 +731,12 @@ class Monitor : public std::enable_shared_from_this { JanusManager *Janus_Manager; AmcrestAPI *Amcrest_Manager; ONVIF *onvif; + std::unique_ptr websocket_server; + mutable std::mutex websocket_status_mutex; + std::string websocket_status_json; + std::mutex websocket_message_mutex; + std::vector websocket_messages; + std::atomic websocket_capture_bandwidth; // Used in check signal uint8_t red_val; @@ -934,6 +958,16 @@ class Monitor : public std::enable_shared_from_this { Image *GetAlarmImage(); int GetImage(int32_t index=-1, int scale=100); std::shared_ptr getSnapshot( int index=-1 ) const; + bool StartWebSocketServer(); + void StopWebSocketServer(); + void RefreshWebSocketStatus(); + std::string GetWebSocketStatusJson() const; + void QueueWebSocketEvent(const std::string &event_type, const std::string &message); + std::vector DrainWebSocketMessages(); + bool GetWebSocketImagePayload(const std::string &format, WebSocketPayload *payload); + packetqueue_iterator *CreateWebSocketVideoIterator(const std::string &codec); + bool GetNextWebSocketVideoPayload(packetqueue_iterator *it, const std::string &codec, WebSocketPayload *payload); + void FreeWebSocketIterator(packetqueue_iterator *it); SystemTimePoint GetTimestamp(int index = -1) const; void UpdateAdaptiveSkip(); useconds_t GetAnalysisRate(); diff --git a/src/zm_utils.cpp b/src/zm_utils.cpp index ea2f4dce469..436c7c41087 100644 --- a/src/zm_utils.cpp +++ b/src/zm_utils.cpp @@ -109,6 +109,120 @@ std::pair PairSplit(const std::string &str, char delim return std::make_pair(str.substr(0, pos), str.substr(pos + 1, std::string::npos)); } +bool JsonExtractQuotedField(const std::string &json, const std::string &field, std::string *value) { + std::string needle = "\"" + field + "\""; + size_t key_pos = json.find(needle); + if (key_pos == std::string::npos) { + return false; + } + + size_t colon_pos = json.find(':', key_pos + needle.size()); + if (colon_pos == std::string::npos) { + return false; + } + + size_t quote_start = json.find('"', colon_pos + 1); + if (quote_start == std::string::npos) { + return false; + } + + std::string parsed_value; + bool escaping = false; + for (size_t i = quote_start + 1; i < json.size(); ++i) { + char c = json[i]; + if (escaping) { + parsed_value.push_back(c); + escaping = false; + continue; + } + if (c == '\\') { + escaping = true; + continue; + } + if (c == '"') { + *value = parsed_value; + return true; + } + parsed_value.push_back(c); + } + + return false; +} + +bool JsonExtractIntegerField(const std::string &json, const std::string &field, int *value) { + std::string needle = "\"" + field + "\""; + size_t key_pos = json.find(needle); + if (key_pos == std::string::npos) { + return false; + } + + size_t colon_pos = json.find(':', key_pos + needle.size()); + if (colon_pos == std::string::npos) { + return false; + } + + size_t value_pos = json.find_first_of("-0123456789", colon_pos + 1); + if (value_pos == std::string::npos) { + return false; + } + + size_t end_pos = value_pos; + while (end_pos < json.size() && ((json[end_pos] >= '0' && json[end_pos] <= '9') || json[end_pos] == '-')) { + ++end_pos; + } + + try { + *value = std::stoi(json.substr(value_pos, end_pos - value_pos)); + } catch (...) { + return false; + } + + return true; +} + +bool ExtractHeaderValue(const std::string &request, const std::string &header_name, std::string *value) { + size_t line_start = 0; + while (line_start < request.size()) { + size_t line_end = request.find('\n', line_start); + if (line_end == std::string::npos) { + line_end = request.size(); + } + + std::string line = request.substr(line_start, line_end - line_start); + if (!line.empty() && line.back() == '\r') { + line.pop_back(); + } + + size_t colon_pos = line.find(':'); + if (colon_pos != std::string::npos) { + const std::string name = StringToLower(Trim(line.substr(0, colon_pos), " \t")); + if (name == header_name) { + *value = Trim(line.substr(colon_pos + 1), " \t"); + return !value->empty(); + } + } + + line_start = line_end + 1; + } + + return false; +} + +bool HeaderContainsToken(const std::string &header_value, const std::string &token) { + size_t start = 0; + while (start < header_value.size()) { + size_t end = header_value.find(',', start); + if (end == std::string::npos) { + end = header_value.size(); + } + if (StringToLower(Trim(header_value.substr(start, end - start), " \t")) == token) { + return true; + } + start = end + 1; + } + return false; +} + std::string Join(const StringVector &values, const std::string &delim) { std::stringstream ss; @@ -158,7 +272,7 @@ std::string ByteArrayToHexString(nonstd::span bytes) { return buf; } -std::string Base64Encode(const std::string &str) { +std::string Base64Encode(nonstd::span bytes) { static char base64_table[64] = {'\0'}; if (!base64_table[0]) { @@ -174,23 +288,25 @@ std::string Base64Encode(const std::string &str) { } std::string outString; - outString.reserve(2 * str.size()); + outString.reserve(2 * bytes.size()); - const char *inPtr = str.c_str(); - while (*inPtr) { - unsigned char selection = *inPtr >> 2; - unsigned char remainder = (*inPtr++ & 0x03) << 4; + for (size_t i = 0; i < bytes.size(); ) { + const unsigned char octet_a = bytes[i++]; + unsigned char selection = octet_a >> 2; + unsigned char remainder = (octet_a & 0x03) << 4; outString += base64_table[selection]; - if (*inPtr) { - selection = remainder | (*inPtr >> 4); - remainder = (*inPtr++ & 0x0f) << 2; + if (i < bytes.size()) { + const unsigned char octet_b = bytes[i++]; + selection = remainder | (octet_b >> 4); + remainder = (octet_b & 0x0f) << 2; outString += base64_table[selection]; - if (*inPtr) { - selection = remainder | (*inPtr >> 6); + if (i < bytes.size()) { + const unsigned char octet_c = bytes[i++]; + selection = remainder | (octet_c >> 6); outString += base64_table[selection]; - selection = (*inPtr++ & 0x3f); + selection = (octet_c & 0x3f); outString += base64_table[selection]; } else { outString += base64_table[remainder]; @@ -205,6 +321,12 @@ std::string Base64Encode(const std::string &str) { return outString; } +std::string Base64Encode(const std::string &str) { + return Base64Encode(nonstd::span( + reinterpret_cast(str.data()), + str.size())); +} + std::string TimevalToString(timeval tv) { tm now = {}; std::array tm_buf = {}; diff --git a/src/zm_utils.h b/src/zm_utils.h index c5fd1411863..87773b669b9 100644 --- a/src/zm_utils.h +++ b/src/zm_utils.h @@ -71,6 +71,10 @@ inline std::string StringToLower(std::string str) { StringVector Split(const std::string &str, char delim); StringVector Split(const std::string &str, const std::string &delim, size_t limit = 0); std::pair PairSplit(const std::string &str, char delim); +bool JsonExtractQuotedField(const std::string &json, const std::string &field, std::string *value); +bool JsonExtractIntegerField(const std::string &json, const std::string &field, int *value); +bool ExtractHeaderValue(const std::string &request, const std::string &header_name, std::string *value); +bool HeaderContainsToken(const std::string &header_value, const std::string &token); std::string Join(const StringVector &values, const std::string &delim = ","); @@ -83,6 +87,7 @@ std::string stringtf(const char* format, ...); std::string ByteArrayToHexString(nonstd::span bytes); +std::string Base64Encode(nonstd::span bytes); std::string Base64Encode(const std::string &str); std::string TimevalToString(timeval tv); diff --git a/src/zm_websocket.cpp b/src/zm_websocket.cpp new file mode 100644 index 00000000000..bef54a5dc15 --- /dev/null +++ b/src/zm_websocket.cpp @@ -0,0 +1,992 @@ +#include "zm_websocket.h" + +#include "zm_crypt.h" +#include "zm_monitor.h" +#include "zm_user.h" +#include "zm_utils.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +static constexpr const char *kWebSocketMagic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; +static constexpr size_t kMaxHandshakeSize = 16384; +static constexpr size_t kMaxMessageSize = 1024 * 1024; +static constexpr size_t kMaxQueuedBytesPerClient = 8 * 1024 * 1024; +static constexpr size_t kMaxClientsPerMonitor = 32; +static constexpr int kPollTimeoutMs = 100; + +bool setNonBlocking(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) { + Error("fcntl(F_GETFL) failed for websocket socket %d: %s", fd, strerror(errno)); + return false; + } + + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { + Error("fcntl(F_SETFL) failed for websocket socket %d: %s", fd, strerror(errno)); + return false; + } + + return true; +} +bool writeFully(int fd, const std::string &payload) { + size_t offset = 0; + while (offset < payload.size()) { + const ssize_t bytes_sent = ::send(fd, payload.data() + offset, payload.size() - offset, MSG_NOSIGNAL); + if (bytes_sent < 0) { + if (errno == EINTR) { + continue; + } + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + // Socket is non-blocking; wait briefly for it to drain. This path is + // only used for short error/close responses on a connection that is + // about to be closed, so a bounded wait is acceptable. + struct pollfd pfd = {fd, POLLOUT, 0}; + if (poll(&pfd, 1, 200) <= 0) { + return false; + } + continue; + } + return false; + } + if (bytes_sent == 0) { + return false; + } + offset += static_cast(bytes_sent); + } + return true; +} + +std::string statusAckJson(const std::string &topic, int interval_ms) { + return stringtf( + "{\"type\":\"ack\",\"topic\":\"%s\",\"interval_ms\":%d}", + escape_json_string(topic).c_str(), + interval_ms); +} + +std::string metadataJson(unsigned int monitor_id, const Monitor::WebSocketPayload &payload, const std::string &request_id) { + return stringtf( + "{\"type\":\"%s\",\"request_id\":\"%s\",\"format\":\"%s\",\"content_type\":\"%s\"," + "\"monitor_id\":%u,\"width\":%u,\"height\":%u,\"line_size\":%u,\"colours\":%u,\"subpixel_order\":%u," + "\"image_count\":%u,\"sequence\":%" PRIu64 ",\"keyframe\":%s,\"payload_bytes\":%zu}", + escape_json_string(payload.type).c_str(), + escape_json_string(request_id).c_str(), + escape_json_string(payload.format).c_str(), + escape_json_string(payload.content_type).c_str(), + monitor_id, + payload.width, + payload.height, + payload.line_size, + payload.colours, + payload.subpixel_order, + payload.image_count, + payload.sequence, + payload.keyframe ? "true" : "false", + payload.payload.size()); +} + +std::string errorJson(const std::string &message) { + return stringtf( + "{\"type\":\"error\",\"message\":\"%s\"}", + escape_json_string(message).c_str()); +} + +std::string httpResponse(const char *status_line, const char *body = nullptr) { + if (!body || !*body) { + return std::string(status_line) + "\r\n\r\n"; + } + + return stringtf( + "%s\r\nContent-Type: text/plain\r\nContent-Length: %zu\r\n\r\n%s", + status_line, + strlen(body), + body); +} + +bool validateStreamAccess(User *user, unsigned int monitor_id) { + if (user->getStream() < User::PERM_VIEW) { + Warning( + "Insufficient websocket privileges for user %d %s on monitor %u: stream permission required", + user->Id(), + user->getUsername(), + monitor_id); + return false; + } + + if (!user->canAccess(monitor_id)) { + Warning( + "Insufficient websocket privileges for user %d %s on monitor %u: monitor access denied", + user->Id(), + user->getUsername(), + monitor_id); + return false; + } + + return true; +} + +User *authenticateWebSocketRequest(const std::string &request, unsigned int monitor_id, int *http_status) { + *http_status = 401; + if (!config.opt_use_auth) { + return nullptr; + } + + std::string target; + if (!zm::websocket::ExtractHandshakeRequestTarget(request, &target)) { + *http_status = 400; + return nullptr; + } + + User *user = nullptr; + std::string token; + if (zm::websocket::ExtractAuthorizationBearerToken(request, &token)) { + user = zmLoadTokenUser(token, false); + } else { + const size_t query_pos = target.find('?'); + std::string query_string; + if (query_pos != std::string::npos) { + query_string = target.substr(query_pos + 1); + } + + std::istringstream request_stream(query_string); + QueryString query(request_stream); + + if (query.has("jwt_token")) { + user = zmLoadTokenUser(query.get("jwt_token")->firstValue(), false); + } else if (query.has("token")) { + user = zmLoadTokenUser(query.get("token")->firstValue(), false); + } else if (strcmp(config.auth_relay, "none") == 0) { + if (query.has("username")) { + const std::string username = query.get("username")->firstValue(); + if (checkUser(username)) { + user = zmLoadUser(username); + } + } + } else { + if (query.has("auth")) { + const std::string auth_hash = query.get("auth")->firstValue(); + const std::string username = query.has("username") ? query.get("username")->firstValue() : ""; + if (!auth_hash.empty()) { + user = zmLoadAuthUser(auth_hash, username, config.auth_hash_ips); + } + } + + if ((!user) && query.has("username") && query.has("password")) { + user = zmLoadUser(query.get("username")->firstValue(), query.get("password")->firstValue()); + } + } + } + + if (!user) { + return nullptr; + } + + if (!validateStreamAccess(user, monitor_id)) { + delete user; + *http_status = 403; + return nullptr; + } + + *http_status = 101; + return user; +} + +} // namespace + +namespace zm { +namespace websocket { + +std::string ComputeAcceptKey(const std::string &client_key) { + const std::string input = client_key + kWebSocketMagic; + const zm::crypto::SHA1::Digest digest = zm::crypto::SHA1::GetDigestOf(input); + return Base64Encode(nonstd::span(digest.data(), digest.size())); +} + +bool ExtractHandshakeKey(const std::string &request, std::string *client_key) { + std::string upgrade_value; + std::string connection_value; + std::string version_value; + if (!ExtractHeaderValue(request, "sec-websocket-key", client_key)) { + return false; + } + if (!ExtractHeaderValue(request, "upgrade", &upgrade_value) || + (StringToLower(upgrade_value) != "websocket")) { + return false; + } + if (!ExtractHeaderValue(request, "connection", &connection_value) || + !HeaderContainsToken(connection_value, "upgrade")) { + return false; + } + if (!ExtractHeaderValue(request, "sec-websocket-version", &version_value) || + (version_value != "13")) { + return false; + } + return !client_key->empty(); +} + +bool ExtractHandshakeRequestTarget(const std::string &request, std::string *target) { + const size_t line_end = request.find("\r\n"); + const std::string request_line = request.substr(0, line_end); + std::istringstream line_stream(request_line); + std::string method; + std::string version; + if (!(line_stream >> method >> *target >> version)) { + return false; + } + + return (method == "GET") && StartsWith(version, "HTTP/"); +} + +bool ExtractAuthorizationBearerToken(const std::string &request, std::string *token) { + std::string authorization; + if (!ExtractHeaderValue(request, "authorization", &authorization)) { + return false; + } + + const std::string prefix = "bearer "; + std::string lower = StringToLower(authorization); + if (!StartsWith(lower, prefix)) { + return false; + } + + *token = Trim(authorization.substr(prefix.length()), " \t"); + return !token->empty(); +} + +std::string BuildHandshakeResponse(const std::string &client_key) { + return + "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: " + ComputeAcceptKey(client_key) + "\r\n" + "\r\n"; +} + +std::string EncodeFrame(Opcode opcode, const std::string &payload, bool fin) { + std::string frame; + frame.reserve(payload.size() + 16); + frame.push_back(static_cast((fin ? 0x80 : 0x00) | static_cast(opcode))); + + const uint64_t payload_size = payload.size(); + if (payload_size < 126) { + frame.push_back(static_cast(payload_size)); + } else if (payload_size <= 0xffff) { + frame.push_back(126); + frame.push_back(static_cast((payload_size >> 8) & 0xff)); + frame.push_back(static_cast(payload_size & 0xff)); + } else { + frame.push_back(127); + for (int shift = 56; shift >= 0; shift -= 8) { + frame.push_back(static_cast((payload_size >> shift) & 0xff)); + } + } + + frame.append(payload); + return frame; +} + +DecodeResult DecodeFrame(const std::string &buffer, Frame *frame, size_t *consumed) { + if (buffer.size() < 2) { + return DecodeResult::INCOMPLETE; + } + + const uint8_t first = static_cast(buffer[0]); + const uint8_t second = static_cast(buffer[1]); + const uint8_t opcode = first & 0x0f; + const bool masked = (second & 0x80) != 0; + uint64_t payload_len = (second & 0x7f); + size_t pos = 2; + + if (payload_len == 126) { + if (buffer.size() < pos + 2) { + return DecodeResult::INCOMPLETE; + } + payload_len = (static_cast(buffer[pos]) << 8) | static_cast(buffer[pos + 1]); + pos += 2; + } else if (payload_len == 127) { + if (buffer.size() < pos + 8) { + return DecodeResult::INCOMPLETE; + } + payload_len = 0; + for (size_t i = 0; i < 8; ++i) { + payload_len = (payload_len << 8) | static_cast(buffer[pos + i]); + } + pos += 8; + } + + if (payload_len > kMaxMessageSize) { + Error("Websocket payload too large: %" PRIu64, payload_len); + return DecodeResult::ERROR; + } + + if ((opcode & 0x08) != 0) { + if ((first & 0x80) == 0) { + Warning("Rejecting fragmented websocket control frame"); + return DecodeResult::ERROR; + } + if (payload_len > 125) { + Warning("Rejecting oversized websocket control frame payload: %" PRIu64, payload_len); + return DecodeResult::ERROR; + } + } + + if (!masked) { + Warning("Rejecting unmasked websocket client frame"); + return DecodeResult::ERROR; + } + + std::array mask = {0, 0, 0, 0}; + if (buffer.size() < pos + mask.size()) { + return DecodeResult::INCOMPLETE; + } + for (size_t i = 0; i < mask.size(); ++i) { + mask[i] = static_cast(buffer[pos + i]); + } + pos += mask.size(); + + if (buffer.size() < pos + payload_len) { + return DecodeResult::INCOMPLETE; + } + + frame->fin = (first & 0x80) != 0; + frame->masked = masked; + frame->opcode = static_cast(opcode); + frame->payload.assign(buffer.data() + pos, buffer.data() + pos + payload_len); + + if (masked) { + for (size_t i = 0; i < frame->payload.size(); ++i) { + frame->payload[i] ^= mask[i % mask.size()]; + } + } + + *consumed = pos + payload_len; + return DecodeResult::OK; +} + +unsigned int MonitorWebSocketPort(int base_port, unsigned int monitor_id) { + if (base_port <= 0) { + return 0; + } + if (monitor_id > static_cast(INT_MAX - base_port)) { + return 0; + } + return static_cast(base_port) + monitor_id; +} + +} // namespace websocket + +MonitorWebSocketServer::MonitorWebSocketServer(Monitor *p_monitor) : + monitor(p_monitor), + port(0), + running(false) { +} + +MonitorWebSocketServer::~MonitorWebSocketServer() { + Stop(); +} + +bool MonitorWebSocketServer::Start(int p_port) { + if (running) { + return true; + } + + port = p_port; + if (!server.bind(port) || !server.listen() || !server.setBlocking(false)) { + Error("Unable to start websocket server for monitor %u on port %d", monitor->Id(), port); + server.close(); + return false; + } + + running = true; + server_thread = std::thread(&MonitorWebSocketServer::run, this); + Info("Started websocket server for monitor %u on port %d", monitor->Id(), port); + return true; +} + +void MonitorWebSocketServer::Stop() { + if (!running) { + return; + } + + running = false; + server.close(); + + if (server_thread.joinable()) { + server_thread.join(); + } +} + +void MonitorWebSocketServer::run() { + std::vector clients; + clients.reserve(8); + + while (running) { + std::vector pollfds; + pollfds.reserve(clients.size() + 1); + pollfds.push_back({server.getReadDesc(), POLLIN, 0}); + + for (const Client &client : clients) { + short events = POLLIN; + if (!client.send_queue.empty()) { + events |= POLLOUT; + } + pollfds.push_back({client.fd, events, 0}); + } + + int poll_result = poll(pollfds.data(), pollfds.size(), kPollTimeoutMs); + if (poll_result < 0) { + if (errno == EINTR) { + continue; + } + if (!running) { + break; + } + Error("poll() failed in websocket server for monitor %u: %s", monitor->Id(), strerror(errno)); + break; + } + + if (running && !pollfds.empty() && (pollfds[0].revents & POLLIN)) { + acceptClients(&clients); + } + + const size_t polled_client_count = pollfds.size() - 1; + for (size_t i = 0; i < polled_client_count; ++i) { + const short revents = pollfds[i + 1].revents; + if (revents & (POLLERR | POLLHUP | POLLNVAL)) { + closeClient(&clients[i]); + continue; + } + if ((revents & POLLIN) && !handleRead(&clients[i])) { + closeClient(&clients[i]); + continue; + } + if ((revents & POLLOUT) && !flushWrites(&clients[i])) { + closeClient(&clients[i]); + } + } + + const TimePoint now = std::chrono::steady_clock::now(); + broadcastStatus(&clients, now); + broadcastStreams(&clients, now); + broadcastEvents(&clients); + + for (Client &client : clients) { + if (client.fd >= 0 && !client.send_queue.empty()) { + flushWrites(&client); + } + } + + removeClosedClients(&clients); + } + + for (Client &client : clients) { + if (client.fd >= 0) { + closeClient(&client); + } + } +} + +bool MonitorWebSocketServer::acceptClients(std::vector *clients) { + while (running) { + sockaddr_storage addr = {}; + socklen_t addr_len = sizeof(addr); + const int fd = ::accept(server.getReadDesc(), reinterpret_cast(&addr), &addr_len); + if (fd < 0) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + return true; + } + Error("accept() failed in websocket server for monitor %u: %s", monitor->Id(), strerror(errno)); + return false; + } + + if (clients->size() >= kMaxClientsPerMonitor) { + Warning( + "Rejecting websocket connection for monitor %u: client limit %zu reached", + monitor->Id(), + kMaxClientsPerMonitor); + writeFully(fd, httpResponse("HTTP/1.1 503 Service Unavailable", "Too many websocket clients")); + ::close(fd); + continue; + } + + if (!setNonBlocking(fd)) { + ::close(fd); + continue; + } + + clients->emplace_back(fd); + } + + return true; +} + +bool MonitorWebSocketServer::handleRead(Client *client) { + char buffer[4096]; + while (true) { + const ssize_t bytes_read = ::recv(client->fd, buffer, sizeof(buffer), 0); + if (bytes_read == 0) { + return false; + } + if (bytes_read < 0) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + break; + } + Error("recv() failed in websocket server for monitor %u: %s", monitor->Id(), strerror(errno)); + return false; + } + + client->recv_buffer.append(buffer, bytes_read); + if (client->recv_buffer.size() > kMaxMessageSize) { + Warning("Closing websocket client for monitor %u due to oversized message", monitor->Id()); + return false; + } + } + + if (!client->handshake_complete) { + return handleHandshake(client); + } + + while (!client->recv_buffer.empty()) { + websocket::Frame frame; + size_t consumed = 0; + const websocket::DecodeResult result = websocket::DecodeFrame(client->recv_buffer, &frame, &consumed); + if (result == websocket::DecodeResult::INCOMPLETE) { + break; + } + if (result == websocket::DecodeResult::ERROR) { + return false; + } + + client->recv_buffer.erase(0, consumed); + if (!handleFrame(client, frame)) { + return false; + } + } + + return true; +} + +bool MonitorWebSocketServer::handleHandshake(Client *client) { + if (client->recv_buffer.size() > kMaxHandshakeSize) { + return false; + } + + const size_t request_end = client->recv_buffer.find("\r\n\r\n"); + if (request_end == std::string::npos) { + return true; + } + + const std::string request = client->recv_buffer.substr(0, request_end + 4); + client->recv_buffer.erase(0, request_end + 4); + + std::string client_key; + if (!websocket::ExtractHandshakeKey(request, &client_key)) { + writeFully(client->fd, httpResponse("HTTP/1.1 400 Bad Request", "Bad websocket handshake")); + return false; + } + + if (config.opt_use_auth) { + int http_status = 401; + User *user = authenticateWebSocketRequest(request, monitor->Id(), &http_status); + if (!user) { + if (http_status == 403) { + writeFully(client->fd, httpResponse("HTTP/1.1 403 Forbidden", "Forbidden")); + } else if (http_status == 400) { + writeFully(client->fd, httpResponse("HTTP/1.1 400 Bad Request", "Malformed HTTP request line")); + } else { + writeFully(client->fd, httpResponse("HTTP/1.1 401 Unauthorized", "Authentication required")); + } + return false; + } + delete user; + } + + queueRaw(client, websocket::BuildHandshakeResponse(client_key)); + client->handshake_complete = true; + client->next_status_at = std::chrono::steady_clock::now(); + client->next_stream_at = std::chrono::steady_clock::now(); + return true; +} + +bool MonitorWebSocketServer::handleFrame(Client *client, const websocket::Frame &frame) { + if (!frame.fin || (frame.opcode == websocket::Opcode::CONTINUATION)) { + Warning("Rejecting fragmented websocket frame for monitor %u", monitor->Id()); + return false; + } + + switch (frame.opcode) { + case websocket::Opcode::TEXT: { + std::string command; + std::string request_id; + JsonExtractQuotedField(frame.payload, "request_id", &request_id); + if (!JsonExtractQuotedField(frame.payload, "command", &command)) { + if (!queueFrame(client, websocket::Opcode::TEXT, errorJson("Missing command"))) { + return false; + } + return true; + } + + if (command == "status") { + if (!queueFrame(client, websocket::Opcode::TEXT, monitor->GetWebSocketStatusJson())) { + return false; + } + return true; + } + + if (command == "image") { + std::string format; + if (!JsonExtractQuotedField(frame.payload, "format", &format)) { + format = "jpeg"; + } + monitor->setLastViewed(); + Monitor::WebSocketPayload payload; + if (!monitor->GetWebSocketImagePayload(format, &payload)) { + if (!queueFrame(client, websocket::Opcode::TEXT, errorJson("Unable to fetch image payload"))) { + return false; + } + return true; + } + if (!sendImagePayload(client, payload, request_id)) { + return false; + } + return true; + } + + if (command == "stream") { + std::string codec; + if (!JsonExtractQuotedField(frame.payload, "codec", &codec)) { + codec = "mjpeg"; + } + monitor->setLastViewed(); + + Monitor::WebSocketPayload payload; + if (codec == "mjpeg") { + if (!monitor->GetWebSocketImagePayload("jpeg", &payload)) { + if (!queueFrame(client, websocket::Opcode::TEXT, errorJson("Unable to fetch mjpeg stream payload"))) { + return false; + } + return true; + } + payload.type = "stream"; + payload.format = "mjpeg"; + } else { + packetqueue_iterator *it = monitor->CreateWebSocketVideoIterator(codec); + if (!it) { + if (!queueFrame(client, websocket::Opcode::TEXT, errorJson("Unsupported stream codec or payload unavailable"))) { + return false; + } + return true; + } + const bool ok = monitor->GetNextWebSocketVideoPayload(it, codec, &payload); + monitor->FreeWebSocketIterator(it); + if (!ok) { + if (!queueFrame(client, websocket::Opcode::TEXT, errorJson("Unsupported stream codec or payload unavailable"))) { + return false; + } + return true; + } + } + + if (!sendImagePayload(client, payload, request_id)) { + return false; + } + return true; + } + + std::string topic; + if (!JsonExtractQuotedField(frame.payload, "topic", &topic)) { + if (!queueFrame(client, websocket::Opcode::TEXT, errorJson("Missing topic"))) { + return false; + } + return true; + } + + if (command == "subscribe") { + if (topic == "status") { + int interval_ms = 1000; + if (JsonExtractIntegerField(frame.payload, "interval_ms", &interval_ms)) { + interval_ms = std::max(100, std::min(interval_ms, 60000)); + client->status_interval = Milliseconds(interval_ms); + } + client->subscribe_status = true; + client->next_status_at = std::chrono::steady_clock::now(); + if (!queueFrame(client, websocket::Opcode::TEXT, statusAckJson(topic, client->status_interval.count())) || + !queueFrame(client, websocket::Opcode::TEXT, monitor->GetWebSocketStatusJson())) { + return false; + } + } else if (topic == "stream") { + int interval_ms = 1000; + if (JsonExtractIntegerField(frame.payload, "interval_ms", &interval_ms)) { + interval_ms = std::max(100, std::min(interval_ms, 60000)); + } + if (!JsonExtractQuotedField(frame.payload, "codec", &client->stream_codec)) { + client->stream_codec = "mjpeg"; + } + freeClientResources(client); + client->subscribe_stream = true; + monitor->setLastViewed(); + client->next_stream_at = std::chrono::steady_clock::now(); + if (client->stream_codec == "mjpeg") { + client->stream_interval = Milliseconds(interval_ms); + client->last_stream_sequence = 0; + if (!queueFrame(client, websocket::Opcode::TEXT, statusAckJson(topic, client->stream_interval.count()))) { + return false; + } + + Monitor::WebSocketPayload payload; + if (monitor->GetWebSocketImagePayload("jpeg", &payload)) { + payload.type = "stream"; + payload.format = "mjpeg"; + client->last_stream_sequence = payload.sequence; + if (!sendImagePayload(client, payload, request_id)) { + return false; + } + } + client->next_stream_at = std::chrono::steady_clock::now() + client->stream_interval; + } else { + client->stream_interval = Milliseconds(0); + client->stream_it = monitor->CreateWebSocketVideoIterator(client->stream_codec); + if (!client->stream_it) { + client->subscribe_stream = false; + if (!queueFrame(client, websocket::Opcode::TEXT, errorJson("Unsupported stream codec or payload unavailable"))) { + return false; + } + return true; + } + client->last_stream_sequence = 0; + if (!queueFrame(client, websocket::Opcode::TEXT, statusAckJson(topic, 0))) { + return false; + } + } + } else if (topic == "events") { + client->subscribe_events = true; + if (!queueFrame(client, websocket::Opcode::TEXT, statusAckJson(topic, 0))) { + return false; + } + } else { + if (!queueFrame(client, websocket::Opcode::TEXT, errorJson("Unsupported topic"))) { + return false; + } + } + return true; + } + + if (command == "unsubscribe") { + if (topic == "status") { + client->subscribe_status = false; + } else if (topic == "stream") { + client->subscribe_stream = false; + client->last_stream_sequence = 0; + freeClientResources(client); + } else if (topic == "events") { + client->subscribe_events = false; + } else { + if (!queueFrame(client, websocket::Opcode::TEXT, errorJson("Unsupported topic"))) { + return false; + } + return true; + } + if (!queueFrame(client, websocket::Opcode::TEXT, statusAckJson(topic, 0))) { + return false; + } + return true; + } + + if (!queueFrame(client, websocket::Opcode::TEXT, errorJson("Unsupported command"))) { + return false; + } + return true; + } + case websocket::Opcode::PING: + return queueFrame(client, websocket::Opcode::PONG, frame.payload); + case websocket::Opcode::CLOSE: + writeFully(client->fd, websocket::EncodeFrame(websocket::Opcode::CLOSE, frame.payload)); + return false; + default: + return true; + } +} + +bool MonitorWebSocketServer::flushWrites(Client *client) { + while (!client->send_queue.empty()) { + PendingBuffer &pending = client->send_queue.front(); + const ssize_t bytes_sent = + ::send(client->fd, pending.data.data() + pending.offset, pending.data.size() - pending.offset, MSG_NOSIGNAL); + if (bytes_sent < 0) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + return true; + } + Error("send() failed in websocket server for monitor %u: %s", monitor->Id(), strerror(errno)); + return false; + } + + pending.offset += bytes_sent; + if (pending.offset >= pending.data.size()) { + client->queued_bytes -= pending.data.size(); + client->send_queue.pop_front(); + } + } + + return true; +} + +void MonitorWebSocketServer::closeClient(Client *client) { + freeClientResources(client); + client->send_queue.clear(); + client->queued_bytes = 0; + if (client->fd >= 0) { + ::close(client->fd); + } + client->fd = -1; +} + +bool MonitorWebSocketServer::sendImagePayload( + Client *client, + const Monitor::WebSocketPayload &payload, + const std::string &request_id) { + return + queueFrame(client, websocket::Opcode::TEXT, metadataJson(monitor->Id(), payload, request_id)) && + queueFrame(client, websocket::Opcode::BINARY, payload.payload); +} + +void MonitorWebSocketServer::freeClientResources(Client *client) { + if (client->stream_it) { + monitor->FreeWebSocketIterator(client->stream_it); + client->stream_it = nullptr; + } +} + +bool MonitorWebSocketServer::queueRaw(Client *client, std::string payload) { + if ((client->queued_bytes + payload.size()) > kMaxQueuedBytesPerClient) { + Warning( + "Closing websocket client for monitor %u after queue exceeded %zu bytes", + monitor->Id(), + kMaxQueuedBytesPerClient); + return false; + } + client->queued_bytes += payload.size(); + client->send_queue.push_back({std::move(payload), 0}); + return true; +} + +bool MonitorWebSocketServer::queueFrame(Client *client, websocket::Opcode opcode, const std::string &payload) { + return queueRaw(client, websocket::EncodeFrame(opcode, payload)); // moved: temporary binds to by-value param +} + +void MonitorWebSocketServer::broadcastStatus(std::vector *clients, TimePoint now) { + const std::string status = monitor->GetWebSocketStatusJson(); + for (Client &client : *clients) { + if ((client.fd < 0) || !client.handshake_complete || !client.subscribe_status) { + continue; + } + if ((client.next_status_at.time_since_epoch().count() == 0) || (now >= client.next_status_at)) { + if (!queueFrame(&client, websocket::Opcode::TEXT, status)) { + closeClient(&client); + continue; + } + client.next_status_at = now + client.status_interval; + } + } +} + +void MonitorWebSocketServer::broadcastStreams(std::vector *clients, TimePoint now) { + for (Client &client : *clients) { + if ((client.fd < 0) || !client.handshake_complete || !client.subscribe_stream) { + continue; + } + monitor->setLastViewed(); + if (client.stream_codec == "mjpeg") { + if ((client.next_stream_at.time_since_epoch().count() != 0) && (now < client.next_stream_at)) { + continue; + } + + Monitor::WebSocketPayload payload; + if (monitor->GetWebSocketImagePayload("jpeg", &payload) && (payload.sequence != client.last_stream_sequence)) { + payload.type = "stream"; + payload.format = "mjpeg"; + if (!sendImagePayload(&client, payload, "")) { + closeClient(&client); + continue; + } + client.last_stream_sequence = payload.sequence; + } + client.next_stream_at = now + client.stream_interval; + continue; + } + + if (!client.stream_it) { + client.stream_it = monitor->CreateWebSocketVideoIterator(client.stream_codec); + if (!client.stream_it) { + client.subscribe_stream = false; + continue; + } + } + + int packets_sent = 0; + while (packets_sent < 64) { + Monitor::WebSocketPayload payload; + if (!monitor->GetNextWebSocketVideoPayload(client.stream_it, client.stream_codec, &payload)) { + break; + } + if (!sendImagePayload(&client, payload, "")) { + closeClient(&client); + break; + } + client.last_stream_sequence = payload.sequence; + packets_sent++; + } + } +} + +void MonitorWebSocketServer::broadcastEvents(std::vector *clients) { + bool have_subscribers = false; + for (const Client &client : *clients) { + if ((client.fd >= 0) && client.handshake_complete && client.subscribe_events) { + have_subscribers = true; + break; + } + } + if (!have_subscribers) { + return; + } + + const std::vector events = monitor->DrainWebSocketMessages(); + if (events.empty()) { + return; + } + + for (Client &client : *clients) { + if ((client.fd < 0) || !client.handshake_complete || !client.subscribe_events) { + continue; + } + for (const std::string &event : events) { + if (!queueFrame(&client, websocket::Opcode::TEXT, event)) { + closeClient(&client); + break; + } + } + } +} + +void MonitorWebSocketServer::removeClosedClients(std::vector *clients) { + clients->erase( + std::remove_if( + clients->begin(), + clients->end(), + [](const Client &client) { return client.fd < 0; }), + clients->end()); +} + +} // namespace zm diff --git a/src/zm_websocket.h b/src/zm_websocket.h new file mode 100644 index 00000000000..a23825d31e9 --- /dev/null +++ b/src/zm_websocket.h @@ -0,0 +1,111 @@ +#ifndef ZM_WEBSOCKET_H +#define ZM_WEBSOCKET_H + +#include "zm_comms.h" +#include "zm_monitor.h" +#include "zm_time.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace zm { +namespace websocket { + +enum class Opcode : uint8_t { + CONTINUATION = 0x0, + TEXT = 0x1, + BINARY = 0x2, + CLOSE = 0x8, + PING = 0x9, + PONG = 0xA +}; + +struct Frame { + bool fin = true; + bool masked = false; + Opcode opcode = Opcode::TEXT; + std::string payload; +}; + +enum class DecodeResult { + OK, + INCOMPLETE, + ERROR +}; + +std::string ComputeAcceptKey(const std::string &client_key); +bool ExtractHandshakeKey(const std::string &request, std::string *client_key); +bool ExtractHandshakeRequestTarget(const std::string &request, std::string *target); +bool ExtractAuthorizationBearerToken(const std::string &request, std::string *token); +std::string BuildHandshakeResponse(const std::string &client_key); +std::string EncodeFrame(Opcode opcode, const std::string &payload, bool fin = true); +DecodeResult DecodeFrame(const std::string &buffer, Frame *frame, size_t *consumed); +unsigned int MonitorWebSocketPort(int base_port, unsigned int monitor_id); + +} // namespace websocket + +class MonitorWebSocketServer { + public: + explicit MonitorWebSocketServer(Monitor *monitor); + ~MonitorWebSocketServer(); + + bool Start(int port); + void Stop(); + + private: + struct PendingBuffer { + std::string data; + size_t offset = 0; + }; + + struct Client { + explicit Client(int p_fd) : fd(p_fd) {} + + int fd; + bool handshake_complete = false; + bool subscribe_status = false; + bool subscribe_events = false; + bool subscribe_stream = false; + Milliseconds status_interval = Milliseconds(1000); + Milliseconds stream_interval = Milliseconds(1000); + TimePoint next_status_at = {}; + TimePoint next_stream_at = {}; + std::string stream_codec = "mjpeg"; + uint64_t last_stream_sequence = 0; + packetqueue_iterator *stream_it = nullptr; + std::string recv_buffer; + std::deque send_queue; + size_t queued_bytes = 0; + }; + + Monitor *monitor; + int port; + std::atomic running; + TcpInetServer server; + std::thread server_thread; + + void run(); + bool acceptClients(std::vector *clients); + bool handleRead(Client *client); + bool handleHandshake(Client *client); + bool handleFrame(Client *client, const websocket::Frame &frame); + bool flushWrites(Client *client); + void closeClient(Client *client); + bool sendImagePayload(Client *client, const Monitor::WebSocketPayload &payload, const std::string &request_id); + void freeClientResources(Client *client); + bool queueRaw(Client *client, std::string payload); + bool queueFrame(Client *client, websocket::Opcode opcode, const std::string &payload); + void broadcastStatus(std::vector *clients, TimePoint now); + void broadcastStreams(std::vector *clients, TimePoint now); + void broadcastEvents(std::vector *clients); + void removeClosedClients(std::vector *clients); +}; + +} // namespace zm + +#endif // ZM_WEBSOCKET_H diff --git a/src/zmc.cpp b/src/zmc.cpp index 9b778e98ba2..52180d5cca7 100644 --- a/src/zmc.cpp +++ b/src/zmc.cpp @@ -66,6 +66,7 @@ possible, this should run at more or less constant speed. #include #include #include +#include void Usage() { fprintf(stderr, "zmc -d or -r -H -P -p or -f or -m \n"); @@ -235,11 +236,13 @@ int main(int argc, char *argv[]) { int result = 0; int prime_capture_log_count = 0; + std::vector monitor_faulted(monitors.size(), false); while (!zm_terminate) { result = 0; - for (const std::shared_ptr &monitor : monitors) { + for (size_t monitor_index = 0; monitor_index < monitors.size(); ++monitor_index) { + const std::shared_ptr &monitor = monitors[monitor_index]; std::string sql = stringtf( "INSERT INTO Monitor_Status (MonitorId,Status,CaptureFPS,AnalysisFPS,CaptureBandwidth)" " VALUES (%u, 'Running',0,0,0) ON DUPLICATE KEY UPDATE Status='Running',CaptureFPS=0,AnalysisFPS=0,CaptureBandwidth=0", @@ -247,13 +250,25 @@ int main(int argc, char *argv[]) { zmDbDo(sql); monitor->LoadCamera(); + monitor->StartWebSocketServer(); + monitor->RefreshWebSocketStatus(); + bool connection_failed = false; while (!monitor->connect() and !zm_terminate) { Warning("Couldn't connect to monitor %d", monitor->Id()); + if (!connection_failed) { + monitor->QueueWebSocketEvent("connection_failed", "Unable to connect to the capture source"); + monitor->RefreshWebSocketStatus(); + connection_failed = true; + monitor_faulted[monitor_index] = true; + } monitor->SetHeartbeatTime(std::chrono::system_clock::now()); sleep(1); } if (zm_terminate) break; + if (connection_failed) { + monitor->QueueWebSocketEvent("connection_restored", "Capture source connection restored"); + } SystemTimePoint now = std::chrono::system_clock::now(); monitor->SetStartupTime(now); @@ -264,6 +279,7 @@ int main(int argc, char *argv[]) { } Seconds sleep_time = Seconds(0); + bool priming_failed = false; while ((monitor->PrimeCapture() <= 0) and !zm_terminate) { if (prime_capture_log_count % 60) { logPrintf(Logger::ERROR + monitor->Importance(), "Failed to prime capture of initial monitor"); @@ -271,6 +287,13 @@ int main(int argc, char *argv[]) { Debug(1, "Failed to prime capture of initial monitor"); } + if (!priming_failed) { + monitor->QueueWebSocketEvent("prime_capture_failed", "Unable to prime the capture source"); + monitor->RefreshWebSocketStatus(); + priming_failed = true; + monitor_faulted[monitor_index] = true; + } + prime_capture_log_count++; if (sleep_time < Seconds(ZM_WATCH_MAX_DELAY)) { sleep_time++; @@ -280,6 +303,14 @@ int main(int argc, char *argv[]) { monitor->SetHeartbeatTime(std::chrono::system_clock::now()); } if (zm_terminate) break; + if (priming_failed) { + monitor->QueueWebSocketEvent("prime_capture_restored", "Capture priming restored"); + } + if (monitor_faulted[monitor_index]) { + monitor->QueueWebSocketEvent("capture_resumed", "Capture pipeline resumed"); + monitor_faulted[monitor_index] = false; + } + monitor->RefreshWebSocketStatus(); sql = stringtf( "INSERT INTO Monitor_Status (MonitorId,Status) VALUES (%u, 'Connected') ON DUPLICATE KEY UPDATE Status='Connected'", @@ -325,6 +356,9 @@ int main(int argc, char *argv[]) { if (monitors[i]->PreCapture() < 0) { Error("Failed to pre-capture monitor %d %s (%zu/%zu)", monitors[i]->Id(), monitors[i]->Name(), i + 1, monitors.size()); + monitors[i]->QueueWebSocketEvent("capture_failed", "Pre-capture failed"); + monitors[i]->RefreshWebSocketStatus(); + monitor_faulted[i] = true; result = -1; break; } @@ -332,12 +366,18 @@ int main(int argc, char *argv[]) { if (!zm_terminate) logPrintf(Logger::ERROR + monitors[i]->Importance(), "Failed to capture image from monitor %d %s (%zu/%zu)", monitors[i]->Id(), monitors[i]->Name(), i + 1, monitors.size()); + monitors[i]->QueueWebSocketEvent("capture_failed", "Capture failed"); + monitors[i]->RefreshWebSocketStatus(); + monitor_faulted[i] = true; result = -1; break; } if (monitors[i]->PostCapture() < 0) { Error("Failed to post-capture monitor %d %s (%zu/%zu)", monitors[i]->Id(), monitors[i]->Name(), i + 1, monitors.size()); + monitors[i]->QueueWebSocketEvent("capture_failed", "Post-capture failed"); + monitors[i]->RefreshWebSocketStatus(); + monitor_faulted[i] = true; result = -1; break; } @@ -387,6 +427,11 @@ int main(int argc, char *argv[]) { } } // end while ! zm_terminate and connected + // Note: the websocket server is intentionally not stopped here. It must + // outlive a single capture/reconnect cycle so that queued capture failure + // and recovery events are delivered to subscribers and clients are not + // dropped every time capture reconnects. It is stopped once on shutdown + // after the outer loop. for (std::shared_ptr & monitor : monitors) { monitor->SetHeartbeatTime(std::chrono::system_clock::now()); monitor->Close(); @@ -407,6 +452,10 @@ int main(int argc, char *argv[]) { } // end if zm_reload } // end while ! zm_terminate outer connection loop + for (std::shared_ptr &monitor : monitors) { + monitor->StopWebSocketServer(); + } + for (std::shared_ptr &monitor : monitors) { std::string sql = stringtf( "INSERT INTO Monitor_Status (MonitorId,Status) VALUES (%u, 'NotRunning') ON DUPLICATE KEY UPDATE Status='NotRunning',CaptureFPS=0,AnalysisFPS=0,CaptureBandwidth=0", diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1f2067dd8ab..15a4b69ce87 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -20,6 +20,7 @@ set(TEST_SOURCES zm_onvif_renewal.cpp zm_poly.cpp zm_utils.cpp + zm_websocket.cpp zm_vector2.cpp zm_zone.cpp) diff --git a/tests/zm_utils.cpp b/tests/zm_utils.cpp index ffe677f73fe..b80b9a6a453 100644 --- a/tests/zm_utils.cpp +++ b/tests/zm_utils.cpp @@ -18,6 +18,8 @@ #include "zm_catch2.h" #include "zm_utils.h" + +#include #include TEST_CASE("Trim") { @@ -160,6 +162,42 @@ TEST_CASE("Base64Encode") { REQUIRE(Base64Encode("foob") == "Zm9vYg=="); REQUIRE(Base64Encode("fooba") == "Zm9vYmE="); REQUIRE(Base64Encode("foobar") == "Zm9vYmFy"); + + const std::array binary = {{0x00, 0xff, 0x10}}; + REQUIRE(Base64Encode(nonstd::span(binary.data(), binary.size())) == "AP8Q"); +} + +TEST_CASE("JsonExtractQuotedField") { + std::string value; + REQUIRE(JsonExtractQuotedField("{\"command\":\"stream\",\"codec\":\"h264\"}", "command", &value)); + REQUIRE(value == "stream"); + REQUIRE(JsonExtractQuotedField("{\"message\":\"hello \\\"zm\\\"\"}", "message", &value)); + REQUIRE(value == "hello \"zm\""); + REQUIRE_FALSE(JsonExtractQuotedField("{\"command\":123}", "command", &value)); +} + +TEST_CASE("JsonExtractIntegerField") { + int value = 0; + REQUIRE(JsonExtractIntegerField("{\"interval_ms\":1000}", "interval_ms", &value)); + REQUIRE(value == 1000); + REQUIRE(JsonExtractIntegerField("{\"interval_ms\":-25}", "interval_ms", &value)); + REQUIRE(value == -25); + REQUIRE_FALSE(JsonExtractIntegerField("{\"interval_ms\":\"fast\"}", "interval_ms", &value)); +} + +TEST_CASE("ExtractHeaderValue and HeaderContainsToken") { + const std::string request = + "GET / HTTP/1.1\r\n" + "Upgrade: websocket\r\n" + "Connection: keep-alive, Upgrade\r\n" + "Sec-WebSocket-Key: abc\r\n\r\n"; + + std::string value; + REQUIRE(ExtractHeaderValue(request, "upgrade", &value)); + REQUIRE(value == "websocket"); + REQUIRE(ExtractHeaderValue(request, "connection", &value)); + REQUIRE(HeaderContainsToken(value, "upgrade")); + REQUIRE_FALSE(HeaderContainsToken(value, "close")); } TEST_CASE("ZM::clamp") { diff --git a/tests/zm_websocket.cpp b/tests/zm_websocket.cpp new file mode 100644 index 00000000000..c412f0504cc --- /dev/null +++ b/tests/zm_websocket.cpp @@ -0,0 +1,208 @@ +/* + * This file is part of the ZoneMinder Project. See AUTHORS file for Copyright information + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see . + */ + +#include "zm_catch2.h" + +#include "zm_websocket.h" + +TEST_CASE("Websocket accept key uses RFC6455 example") { + REQUIRE( + zm::websocket::ComputeAcceptKey("dGhlIHNhbXBsZSBub25jZQ==") == + "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="); +} + +TEST_CASE("Websocket handshake extracts client key") { + const std::string request = + "GET / HTTP/1.1\r\n" + "Host: localhost:30001\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + "Sec-WebSocket-Version: 13\r\n\r\n"; + + std::string client_key; + REQUIRE(zm::websocket::ExtractHandshakeKey(request, &client_key)); + REQUIRE(client_key == "dGhlIHNhbXBsZSBub25jZQ=="); +} + +TEST_CASE("Websocket handshake extracts request target") { + const std::string request = + "GET /socket?token=abc123 HTTP/1.1\r\n" + "Host: localhost:30001\r\n\r\n"; + + std::string target; + REQUIRE(zm::websocket::ExtractHandshakeRequestTarget(request, &target)); + REQUIRE(target == "/socket?token=abc123"); +} + +TEST_CASE("Websocket handshake rejects non-GET request lines") { + std::string target; + REQUIRE_FALSE( + zm::websocket::ExtractHandshakeRequestTarget( + "POST /socket HTTP/1.1\r\nHost: localhost\r\n\r\n", + &target)); + REQUIRE_FALSE( + zm::websocket::ExtractHandshakeRequestTarget( + "GET /socket\r\nHost: localhost\r\n\r\n", + &target)); +} + +TEST_CASE("Websocket handshake accepts case-insensitive header names") { + const std::string request = + "GET / HTTP/1.1\r\n" + "Host: localhost:30001\r\n" + "upgrade: websocket\r\n" + "connection: Upgrade\r\n" + "sec-websocket-key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + "sec-websocket-version: 13\r\n\r\n"; + + std::string client_key; + REQUIRE(zm::websocket::ExtractHandshakeKey(request, &client_key)); + REQUIRE(client_key == "dGhlIHNhbXBsZSBub25jZQ=="); +} + +TEST_CASE("Websocket handshake extracts bearer token from authorization header") { + const std::string request = + "GET /socket HTTP/1.1\r\n" + "Host: localhost:30001\r\n" + "Authorization: Bearer abc.def.ghi\r\n\r\n"; + + std::string token; + REQUIRE(zm::websocket::ExtractAuthorizationBearerToken(request, &token)); + REQUIRE(token == "abc.def.ghi"); +} + +TEST_CASE("Websocket encodes server text frames") { + const std::string frame = zm::websocket::EncodeFrame(zm::websocket::Opcode::TEXT, "hello"); + + REQUIRE(frame.size() == 7); + REQUIRE(static_cast(frame[0]) == 0x81); + REQUIRE(static_cast(frame[1]) == 0x05); + REQUIRE(frame.substr(2) == "hello"); +} + +TEST_CASE("Websocket encodes extended 16-bit payload length") { + const std::string payload(126, 'a'); + const std::string frame = zm::websocket::EncodeFrame(zm::websocket::Opcode::TEXT, payload); + + REQUIRE(frame.size() == payload.size() + 4); + REQUIRE(static_cast(frame[0]) == 0x81); + REQUIRE(static_cast(frame[1]) == 126); + REQUIRE(static_cast(frame[2]) == 0x00); + REQUIRE(static_cast(frame[3]) == 126); + REQUIRE(frame.substr(4) == payload); +} + +TEST_CASE("Websocket encodes extended 64-bit payload length") { + const std::string payload(65536, 'b'); + const std::string frame = zm::websocket::EncodeFrame(zm::websocket::Opcode::TEXT, payload); + + REQUIRE(frame.size() == payload.size() + 10); + REQUIRE(static_cast(frame[0]) == 0x81); + REQUIRE(static_cast(frame[1]) == 127); + for (int i = 2; i < 7; ++i) { + REQUIRE(static_cast(frame[i]) == 0x00); + } + REQUIRE(static_cast(frame[7]) == 0x01); + REQUIRE(static_cast(frame[8]) == 0x00); + REQUIRE(static_cast(frame[9]) == 0x00); + REQUIRE(frame.substr(10) == payload); +} + +TEST_CASE("Websocket decodes masked client text frames") { + const std::string frame( + "\x81\x82\x37\xfa\x21\x3d\x7f\x93", + 8); + + zm::websocket::Frame decoded; + size_t consumed = 0; + REQUIRE(zm::websocket::DecodeFrame(frame, &decoded, &consumed) == zm::websocket::DecodeResult::OK); + REQUIRE(consumed == frame.size()); + REQUIRE(decoded.opcode == zm::websocket::Opcode::TEXT); + REQUIRE(decoded.masked == true); + REQUIRE(decoded.payload == "Hi"); +} + +TEST_CASE("Websocket decodes 16-bit masked client text frames") { + std::string frame; + const std::string payload(126, 'x'); + frame.push_back(static_cast(0x81)); + frame.push_back(static_cast(0x80 | 126)); + frame.push_back(static_cast(0x00)); + frame.push_back(static_cast(126)); + frame.append("\x01\x02\x03\x04", 4); + for (size_t i = 0; i < payload.size(); ++i) { + frame.push_back(static_cast(payload[i] ^ "\x01\x02\x03\x04"[i % 4])); + } + + zm::websocket::Frame decoded; + size_t consumed = 0; + REQUIRE(zm::websocket::DecodeFrame(frame, &decoded, &consumed) == zm::websocket::DecodeResult::OK); + REQUIRE(consumed == frame.size()); + REQUIRE(decoded.payload == payload); +} + +TEST_CASE("Websocket decode reports bytes consumed for concatenated frames") { + const std::string frame_a( + "\x81\x82\x37\xfa\x21\x3d\x7f\x93", + 8); + const std::string frame_b( + "\x81\x82\x37\xfa\x21\x3d\x7e\x92", + 8); + const std::string combined = frame_a + frame_b; + + zm::websocket::Frame decoded; + size_t consumed = 0; + REQUIRE(zm::websocket::DecodeFrame(combined, &decoded, &consumed) == zm::websocket::DecodeResult::OK); + REQUIRE(consumed == frame_a.size()); + REQUIRE(decoded.payload == "Hi"); +} + +TEST_CASE("Websocket rejects unmasked client frames") { + zm::websocket::Frame decoded; + size_t consumed = 0; + REQUIRE( + zm::websocket::DecodeFrame("\x81\x02Hi", &decoded, &consumed) == + zm::websocket::DecodeResult::ERROR); +} + +TEST_CASE("Websocket rejects oversized control frames") { + std::string frame; + frame.push_back(static_cast(0x89)); + frame.push_back(static_cast(0x80 | 126)); + frame.push_back(static_cast(0x00)); + frame.push_back(static_cast(126)); + frame.append("\x01\x02\x03\x04", 4); + frame.append(126, 'p'); + + zm::websocket::Frame decoded; + size_t consumed = 0; + REQUIRE(zm::websocket::DecodeFrame(frame, &decoded, &consumed) == zm::websocket::DecodeResult::ERROR); +} + +TEST_CASE("Websocket decoder reports incomplete frames") { + zm::websocket::Frame decoded; + size_t consumed = 0; + REQUIRE( + zm::websocket::DecodeFrame("\x81", &decoded, &consumed) == + zm::websocket::DecodeResult::INCOMPLETE); +} + +TEST_CASE("Websocket monitor streaming port uses configured base port") { + REQUIRE(zm::websocket::MonitorWebSocketPort(30000, 5) == 30005); + REQUIRE(zm::websocket::MonitorWebSocketPort(0, 5) == 0); +}