Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion pj_base/include/pj_base/builtin/video_frame_codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <string_view>
#include <vector>

#include "pj_base/buffer_anchor.hpp"
#include "pj_base/builtin/video_frame.hpp"
#include "pj_base/expected.hpp"

Expand All @@ -22,7 +23,17 @@ inline constexpr std::string_view kSchemaVideoFrame = "PJ.VideoFrame";
[[nodiscard]] std::vector<uint8_t> serializeVideoFrame(const sdk::VideoFrame& frame);

/// Decodes canonical PJ.VideoFrame wire bytes into sdk::VideoFrame. The
/// returned frame owns its bytes via `anchor`.
/// returned frame owns its bytes via `anchor` (a fresh copy of the `data`
/// field). Use this when the wire buffer does not outlive the call.
[[nodiscard]] Expected<sdk::VideoFrame> deserializeVideoFrame(const uint8_t* data, size_t size);

/// Decodes canonical PJ.VideoFrame / foxglove.CompressedVideo wire bytes into
/// sdk::VideoFrame without copying the compressed bitstream. The returned
/// frame's `data` ALIASES the input buffer and its `anchor` is set to the
/// supplied `anchor`, which the caller must keep alive for as long as the frame
/// (and its `data` span) is used. The two schemas are wire-identical, so this
/// one decoder serves both.
[[nodiscard]] Expected<sdk::VideoFrame> deserializeVideoFrameView(
const uint8_t* data, size_t size, sdk::BufferAnchor anchor);

} // namespace PJ
2 changes: 1 addition & 1 deletion pj_base/proto/pj/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ rationale.
- `OccupancyGridUpdate`
- **`Log.proto`** — a single textual log message (severity level + text + originating name) for a log/console panel; mirrors the core of Foxglove's `Log` (file/line omitted).
- `Log`
- **`VideoFrame.proto`** — one frame of an inter-frame-coded video stream (`h264`, `h265`, `vp9`, `av1`) when per-frame `Image` messages would be wasteful.
- **`VideoFrame.proto`** — one frame of an inter-frame-coded video stream (`h264`, `h265`, `vp9`, `av1`) when per-frame `Image` messages would be wasteful. Field layout is wire-identical to `foxglove.CompressedVideo` (timestamp=1, frame_id=2, data=3, format=4), so one decoder parses both.
- `VideoFrame`
- **`AssetVideo.proto`** — reference to a file-backed video plus typed playback metadata (path, MIME type, dimensions, frame rate) so consumers can size playback windows without opening the file.
- `AssetVideo`
Expand Down
11 changes: 7 additions & 4 deletions pj_base/proto/pj/VideoFrame.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ package PJ;
// On the SDK side, `data` is exposed as `Span<const uint8_t>` plus a `BufferAnchor` that keeps the underlying
// allocation alive (same byte-backed view pattern as Image, DepthImage, and PointCloud). The anchor is a C++ lifetime
// concept with no wire-format equivalent.
//
// The field layout is wire-identical to `foxglove.CompressedVideo` (timestamp=1, frame_id=2, data=3, format=4), so a
// single decoder parses both this canonical schema and the Foxglove one.
message VideoFrame {
// Timestamp of the frame
google.protobuf.Timestamp timestamp = 1;
Expand All @@ -38,9 +41,9 @@ message VideoFrame {
// the camera (into the scene).
string frame_id = 2;

// Codec identifier, lowercase. Recognized values: "h264", "h265", "vp9", "av1".
string format = 3;

// Compressed bitstream containing exactly one frame given prior stream state.
bytes data = 4;
bytes data = 3;

// Codec identifier, lowercase. Recognized values: "h264", "h265", "vp9", "av1".
string format = 4;
}
91 changes: 67 additions & 24 deletions pj_base/src/builtin/video_frame_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ using builtin_wire::Reader;
using builtin_wire::Tag;
using builtin_wire::WireType;
using builtin_wire::Writer;
using sdk::BufferAnchor;
using sdk::VideoFrame;

bool readBytesIntoFrame(Reader& reader, VideoFrame& out) {
// Reads the length-delimited `data` field (field 3) into an owning copy. The
// returned frame's `anchor` owns a fresh vector, so `data` stays valid past the
// lifetime of the wire buffer.
bool readBytesOwning(Reader& reader, VideoFrame& out) {
const uint8_t* data = nullptr;
size_t size = 0;
if (!reader.readBytes(data, size)) {
Expand All @@ -34,29 +38,26 @@ bool readBytesIntoFrame(Reader& reader, VideoFrame& out) {
return true;
}

} // namespace

std::vector<uint8_t> serializeVideoFrame(const VideoFrame& frame) {
std::vector<uint8_t> out;
Writer writer(out);

writer.message(1, [&](Writer& nested) { builtin_wire::writeTimestamp(nested, frame.timestamp_ns); });
writer.string(2, frame.frame_id);
writer.string(3, frame.format);
writer.bytes(4, frame.data.data(), frame.data.size());

return out;
}

Expected<sdk::VideoFrame> deserializeVideoFrame(const uint8_t* data, size_t size) {
if (data == nullptr || size == 0) {
return unexpected(std::string("VideoFrame wire: empty buffer"));
// Reads the length-delimited `data` field (field 3) as a non-owning view that
// ALIASES the wire buffer. The caller-supplied `anchor` keeps that buffer alive;
// no copy of the bitstream is made.
bool readBytesView(Reader& reader, const BufferAnchor& anchor, VideoFrame& out) {
const uint8_t* data = nullptr;
size_t size = 0;
if (!reader.readBytes(data, size)) {
return false;
}
out.data = Span<const uint8_t>(data, size);
out.anchor = anchor;
return true;
}

Reader reader(data, size);
sdk::VideoFrame frame;

const bool ok = parseFields(reader, [&](Tag tag, Reader& r) {
// Drives the shared field dispatch. `read_data` consumes the `data` field
// (field 3); the two deserialize entry points differ only in whether that
// callback copies or aliases the wire bytes. All other fields are identical.
template <typename ReadData>
bool parseVideoFrame(Reader& reader, VideoFrame& frame, ReadData&& read_data) {
return parseFields(reader, [&](Tag tag, Reader& r) {
switch (tag.field) {
case 1:
if (tag.type != WireType::kLengthDelimited) {
Expand All @@ -72,16 +73,58 @@ Expected<sdk::VideoFrame> deserializeVideoFrame(const uint8_t* data, size_t size
if (tag.type != WireType::kLengthDelimited) {
return false;
}
return r.readString(frame.format);
return read_data(r, frame);
case 4:
if (tag.type != WireType::kLengthDelimited) {
return false;
}
return readBytesIntoFrame(r, frame);
return r.readString(frame.format);
default:
return false;
}
});
}

} // namespace

std::vector<uint8_t> serializeVideoFrame(const VideoFrame& frame) {
std::vector<uint8_t> out;
Writer writer(out);

writer.message(1, [&](Writer& nested) { builtin_wire::writeTimestamp(nested, frame.timestamp_ns); });
writer.string(2, frame.frame_id);
writer.bytes(3, frame.data.data(), frame.data.size());
writer.string(4, frame.format);

return out;
}

Expected<sdk::VideoFrame> deserializeVideoFrame(const uint8_t* data, size_t size) {
if (data == nullptr || size == 0) {
return unexpected(std::string("VideoFrame wire: empty buffer"));
}

Reader reader(data, size);
sdk::VideoFrame frame;

const bool ok = parseVideoFrame(reader, frame, [](Reader& r, VideoFrame& f) { return readBytesOwning(r, f); });

if (!ok) {
return unexpected(std::string("VideoFrame wire: decode failed"));
}

return frame;
}

Expected<sdk::VideoFrame> deserializeVideoFrameView(const uint8_t* data, size_t size, sdk::BufferAnchor anchor) {
if (data == nullptr || size == 0) {
return unexpected(std::string("VideoFrame wire: empty buffer"));
}

Reader reader(data, size);
sdk::VideoFrame frame;

const bool ok = parseVideoFrame(reader, frame, [&](Reader& r, VideoFrame& f) { return readBytesView(r, anchor, f); });

if (!ok) {
return unexpected(std::string("VideoFrame wire: decode failed"));
Expand Down
69 changes: 69 additions & 0 deletions pj_base/tests/video_frame_codec_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <cstdint>
#include <cstring>
#include <memory>
#include <vector>

#include "protobuf_wire_test_helpers.hpp"
Expand Down Expand Up @@ -43,5 +44,73 @@ TEST(VideoFrameCodecTest, RoundTripRealisticPayload) {
EXPECT_EQ(std::memcmp(out->data.data(), payload.data(), payload.size()), 0);
}

// Locks the on-wire field layout to match foxglove.CompressedVideo:
// timestamp=1, frame_id=2, data=3 (bytes), format=4 (string). The golden bytes
// are built independently of the codec so a future field-number regression is
// caught here.
TEST(VideoFrameCodecTest, WireLayoutMatchesFoxglove) {
VideoFrame in;
in.timestamp_ns = 1'700'000'000'500'000'000LL;
in.frame_id = "cam0";
in.format = "h265";
const std::vector<uint8_t> payload = {0xDE, 0xAD, 0xBE, 0xEF};
in.data = Span<const uint8_t>(payload.data(), payload.size());

std::vector<uint8_t> expected;
pb::appendTag(expected, 1, 2); // timestamp (message)
pb::appendLenDelim(expected, pb::encodeTimestamp(in.timestamp_ns));
pb::appendTag(expected, 2, 2); // frame_id (string)
pb::appendString(expected, in.frame_id);
pb::appendTag(expected, 3, 2); // data (bytes)
pb::appendBytes(expected, payload.data(), payload.size());
pb::appendTag(expected, 4, 2); // format (string)
pb::appendString(expected, in.format);

const auto bytes = serializeVideoFrame(in);
EXPECT_EQ(bytes, expected);
}

// deserializeVideoFrameView must NOT copy the compressed bitstream: the
// returned data span has to point straight into the wire buffer, and the frame
// must keep the supplied anchor alive.
TEST(VideoFrameCodecTest, ViewAliasesInputBuffer) {
VideoFrame in;
in.timestamp_ns = 42;
in.frame_id = "cam";
in.format = "av1";
const std::vector<uint8_t> payload = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06};
in.data = Span<const uint8_t>(payload.data(), payload.size());

// Own the wire bytes through a shared_ptr so it can double as the anchor.
auto wire = std::make_shared<std::vector<uint8_t>>(serializeVideoFrame(in));
sdk::BufferAnchor anchor = wire;

auto out = deserializeVideoFrameView(wire->data(), wire->size(), anchor);
ASSERT_TRUE(out.has_value());
EXPECT_EQ(out->timestamp_ns, in.timestamp_ns);
EXPECT_EQ(out->frame_id, in.frame_id);
EXPECT_EQ(out->format, in.format);

// Round-trips the payload contents...
ASSERT_EQ(out->data.size(), payload.size());
EXPECT_EQ(std::memcmp(out->data.data(), payload.data(), payload.size()), 0);

// ...and aliases the input buffer: the span points inside `wire`, not at a
// fresh copy.
const uint8_t* wire_begin = wire->data();
const uint8_t* wire_end = wire->data() + wire->size();
EXPECT_GE(out->data.data(), wire_begin);
EXPECT_LE(out->data.data() + out->data.size(), wire_end);

// The frame's anchor must reference the same allocation we handed in, keeping
// the aliased bytes alive.
EXPECT_EQ(out->anchor, anchor);
}

TEST(VideoFrameCodecTest, ViewEmptyBufferProducesError) {
sdk::BufferAnchor anchor;
EXPECT_FALSE(deserializeVideoFrameView(nullptr, 0, anchor).has_value());
}

} // namespace
} // namespace PJ
Loading