Skip to content

Commit 4d39204

Browse files
refactor(data_source_protocol)!: consolidate the push API on a single push_message slot
Rename the runtime-host vtable tail slot push_message_v2 -> push_message and remove the now-redundant eager push_raw_message slot, leaving one definitive push entry point. The C++ SDK wrapper pushMessage is unchanged, so plugin sources that go through it are unaffected; only code that wires the vtable slot by name needs the new name. Dropping a slot reduces the vtable, so this is an ABI-breaking change and the layout sentinel asserts are updated accordingly. Contents: - data_source_protocol.h: rename push_message_v2 -> push_message; drop the push_raw_message slot - sdk/data_source_host_views.hpp: align the guard/call/strings with push_message; drop the raw-message path - sdk/data_source_patterns.hpp: update the slot reference - pj_base/CMakeLists.txt + rename push_message_v2_test.cpp -> push_message_test.cpp - abi_layout_sentinels_test.cpp: update the tail-slot asserts for the reduced vtable - docs (USER_GUIDE, ARCHITECTURE, data-source-guide, message-parser-guide), mock_data_source, and the affected library/file-source tests
1 parent 12805ba commit 4d39204

13 files changed

Lines changed: 61 additions & 107 deletions

pj_base/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ if(PJ_BUILD_TESTS)
8888
tests/image_annotations_codec_test.cpp
8989
tests/image_annotations_decoder_test.cpp
9090
tests/media_metadata_test.cpp
91-
tests/push_message_v2_test.cpp
91+
tests/push_message_test.cpp
9292
tests/image_codec_test.cpp
9393
tests/depth_image_codec_test.cpp
9494
tests/point_cloud_codec_test.cpp

pj_base/include/pj_base/data_source_protocol.h

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ enum {
133133
PJ_DATA_SOURCE_CAPABILITY_HAS_DIALOG = 1ull << 5, /**< Plugin provides a configuration dialog. */
134134
};
135135

136-
/** Opaque handle returned by ensure_parser_binding, used with push_raw_message. */
136+
/** Opaque handle returned by ensure_parser_binding, used with push_message. */
137137
typedef struct {
138138
uint32_t id;
139139
} PJ_parser_binding_handle_t;
@@ -260,22 +260,6 @@ typedef struct PJ_data_source_runtime_host_vtable_t {
260260
void* ctx, const PJ_parser_binding_request_t* request, PJ_parser_binding_handle_t* out_handle,
261261
PJ_error_t* out_error) PJ_NOEXCEPT;
262262

263-
/**
264-
* [stream-thread] Push a raw message payload for host-side parsing.
265-
* @p handle must have been obtained from ensure_parser_binding.
266-
* @p host_timestamp_ns is nanoseconds since the Unix epoch
267-
* (1970-01-01T00:00:00Z). Returns false + error on failure.
268-
*
269-
* Eager-only push: the host parses immediately and the bytes are not
270-
* retained for later replay. Plugins that need lazy materialization or
271-
* ObjectIngestPolicy dispatch should use push_message_v2 instead. This
272-
* slot remains for sources that fan-out raw bytes without an associated
273-
* FetchMessageData callable (streaming or eager-only consumers).
274-
*/
275-
bool (*push_raw_message)(
276-
void* ctx, PJ_parser_binding_handle_t handle, int64_t host_timestamp_ns, PJ_bytes_view_t payload,
277-
PJ_error_t* out_error) PJ_NOEXCEPT;
278-
279263
/**
280264
* [main-thread] Display a modal message box to the user and wait for
281265
* their response. BLOCKS until the user closes the dialog. The host is
@@ -340,7 +324,7 @@ typedef struct PJ_data_source_runtime_host_vtable_t {
340324
* ObjectStore push failed, etc.). On failure the host still calls
341325
* `release` so the plugin's ctx leaks no resources.
342326
*/
343-
bool (*push_message_v2)(
327+
bool (*push_message)(
344328
void* ctx, PJ_parser_binding_handle_t handle, int64_t host_timestamp_ns,
345329
PJ_message_data_fetcher_t fetch_message_data, PJ_error_t* out_error) PJ_NOEXCEPT;
346330
} PJ_data_source_runtime_host_vtable_t;

pj_base/include/pj_base/sdk/data_source_host_views.hpp

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -209,19 +209,6 @@ class DataSourceRuntimeHostView {
209209
return handle;
210210
}
211211

212-
/// Push a raw message for host-side parsing via a previously obtained binding handle.
213-
[[nodiscard]] Status pushRawMessage(
214-
ParserBindingHandle handle, Timestamp host_timestamp_ns, Span<const uint8_t> payload) const {
215-
if (!valid() || host_.vtable->push_raw_message == nullptr) {
216-
return unexpected("runtime host is not bound");
217-
}
218-
PJ_error_t err{};
219-
if (!host_.vtable->push_raw_message(host_.ctx, handle, host_timestamp_ns, sdk::toAbiBytes(payload), &err)) {
220-
return unexpected(errorToString(err));
221-
}
222-
return okStatus();
223-
}
224-
225212
/// Push a message via a deferred FetchMessageData callable. The DataSource
226213
/// hands the host a callable that produces the payload bytes when invoked.
227214
/// The host applies the active ObjectIngestPolicy (resolved via the
@@ -245,12 +232,11 @@ class DataSourceRuntimeHostView {
245232
/// heap-relocated and used as its own anchor; bytes survive across
246233
/// the C ABI boundary at the cost of one alloc-and-move.
247234
///
248-
/// The host MUST advertise the push_message_v2 tail slot. We wrap the
235+
/// The host MUST advertise the push_message tail slot. We wrap the
249236
/// closure into a PJ_message_data_fetcher_t and hand it over verbatim; the
250237
/// host applies ObjectIngestPolicy and decides when (and whether) to
251238
/// invoke it. There is no legacy fallback: a host that doesn't expose
252-
/// the slot returns an explicit error here rather than silently
253-
/// degrading to a kEager push_raw_message.
239+
/// the slot returns an explicit error here.
254240
template <typename FetchMessageData>
255241
[[nodiscard]] Status pushMessage(
256242
ParserBindingHandle handle, Timestamp host_timestamp_ns, FetchMessageData&& fetch_message_data) const {
@@ -264,8 +250,8 @@ class DataSourceRuntimeHostView {
264250
if (!valid()) {
265251
return unexpected(std::string("runtime host is not bound"));
266252
}
267-
if (!PJ_HAS_TAIL_SLOT(PJ_data_source_runtime_host_vtable_t, host_.vtable, push_message_v2)) {
268-
return unexpected(std::string("runtime host does not expose push_message_v2"));
253+
if (!PJ_HAS_TAIL_SLOT(PJ_data_source_runtime_host_vtable_t, host_.vtable, push_message)) {
254+
return unexpected(std::string("runtime host does not expose push_message"));
269255
}
270256

271257
auto* ctx = new FetchMessageDataT(std::forward<FetchMessageData>(fetch_message_data));
@@ -308,7 +294,7 @@ class DataSourceRuntimeHostView {
308294
};
309295

310296
PJ_error_t err{};
311-
if (!host_.vtable->push_message_v2(host_.ctx, handle, host_timestamp_ns, abi_fetch_message_data, &err)) {
297+
if (!host_.vtable->push_message(host_.ctx, handle, host_timestamp_ns, abi_fetch_message_data, &err)) {
312298
return unexpected(errorToString(err));
313299
}
314300
return okStatus();

pj_base/include/pj_base/sdk/data_source_patterns.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class StreamSourceBase : public DataSourcePluginBase {
136136
/// MUST NOT BLOCK — drain buffered data and return immediately.
137137
/// Do not call recv(), read(), or any syscall that may wait.
138138
/// If your source has a receive thread, swap-drain a buffer here.
139-
/// Host methods (appendRecord, pushRawMessage) may only be called from this method.
139+
/// Host methods (appendRecord, pushMessage) may only be called from this method.
140140
virtual Status onPoll() = 0;
141141

142142
/// Called from stop(). Close connections, free resources.

pj_base/tests/abi_layout_sentinels_test.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,14 +147,11 @@ static_assert(offsetof(PJ_data_source_runtime_host_vtable_t, protocol_version) =
147147
static_assert(offsetof(PJ_data_source_runtime_host_vtable_t, struct_size) == 4, "v1 prefix pinned");
148148
static_assert(offsetof(PJ_data_source_runtime_host_vtable_t, report_message) == 8, "v1 first slot pinned");
149149
static_assert(
150-
offsetof(PJ_data_source_runtime_host_vtable_t, push_raw_message) == 72, "v1 push_raw_message slot pinned");
150+
offsetof(PJ_data_source_runtime_host_vtable_t, list_available_encodings) == 80,
151+
"list_available_encodings slot pinned");
152+
static_assert(offsetof(PJ_data_source_runtime_host_vtable_t, push_message) == 88, "push_message tail slot pinned");
151153
static_assert(
152-
offsetof(PJ_data_source_runtime_host_vtable_t, list_available_encodings) == 88,
153-
"v1 list_available_encodings slot pinned");
154-
static_assert(
155-
offsetof(PJ_data_source_runtime_host_vtable_t, push_message_v2) == 96, "v1 push_message_v2 tail slot pinned");
156-
static_assert(
157-
sizeof(PJ_data_source_runtime_host_vtable_t) == 104, "Runtime host vtable size (update deliberately on append)");
154+
sizeof(PJ_data_source_runtime_host_vtable_t) == 96, "Runtime host vtable size (update deliberately on append)");
158155

159156
// --- Write-host vtables (ABI-APPENDABLE within v4) --------------------------
160157
static_assert(offsetof(PJ_source_write_host_vtable_t, abi_version) == 0, "source write host prefix pinned");
Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
// Tests for the SDK template `DataSourceRuntimeHostView::pushMessage` and
5-
// its delegation to the C ABI slot `push_message_v2`. We exercise:
5+
// its delegation to the C ABI slot `push_message`. We exercise:
66
//
77
// 1. Vector closure → the captured FetchMessageData callable yields the
88
// same bytes when invoked from the host.
@@ -11,7 +11,7 @@
1111
// 3. Multiple invocations are idempotent (same bytes each time).
1212
// 4. The heap-held closure context is destroyed exactly once when the
1313
// host calls `release`.
14-
// 5. When the host does not expose push_message_v2 (struct_size short
14+
// 5. When the host does not expose push_message (struct_size short
1515
// or field NULL), pushMessage returns an explicit error rather than
1616
// degrading silently.
1717

@@ -28,32 +28,30 @@
2828

2929
namespace {
3030

31-
// Captured state from a push_message_v2 invocation.
31+
// Captured state from a push_message invocation.
3232
struct CapturedPush {
3333
PJ_parser_binding_handle_t handle{};
3434
int64_t timestamp_ns = 0;
3535
PJ_message_data_fetcher_t fetch_message_data{};
3636
bool received = false;
3737
};
3838

39-
// Mock runtime host — exposes a vtable that captures push_message_v2 calls
40-
// and, alternatively, push_raw_message calls (for the legacy fallback).
39+
// Mock runtime host — exposes a vtable that captures push_message calls.
4140
class MockHost {
4241
public:
4342
MockHost() {
4443
vtable_.protocol_version = PJ_DATA_SOURCE_PROTOCOL_VERSION;
4544
vtable_.struct_size = sizeof(PJ_data_source_runtime_host_vtable_t);
46-
vtable_.push_raw_message = &MockHost::pushRawMessageThunk;
47-
vtable_.push_message_v2 = &MockHost::pushMessageV2Thunk;
45+
vtable_.push_message = &MockHost::pushMessageThunk;
4846
host_.ctx = this;
4947
host_.vtable = &vtable_;
5048
}
5149

5250
// Drop the v2 slot — both clearing the field and shrinking struct_size,
5351
// matching the runtime scenario where the host predates the addition.
54-
void disablePushMessageV2() {
55-
vtable_.push_message_v2 = nullptr;
56-
vtable_.struct_size = offsetof(PJ_data_source_runtime_host_vtable_t, push_message_v2);
52+
void disablePushMessage() {
53+
vtable_.push_message = nullptr;
54+
vtable_.struct_size = offsetof(PJ_data_source_runtime_host_vtable_t, push_message);
5755
}
5856

5957
PJ::DataSourceRuntimeHostView view() const {
@@ -63,20 +61,9 @@ class MockHost {
6361
CapturedPush& captured() {
6462
return captured_;
6563
}
66-
std::vector<uint8_t>& receivedRawBytes() {
67-
return raw_bytes_;
68-
}
6964

7065
private:
71-
static bool pushRawMessageThunk(
72-
void* ctx, PJ_parser_binding_handle_t /*handle*/, int64_t /*ts*/, PJ_bytes_view_t payload,
73-
PJ_error_t* /*err*/) noexcept {
74-
auto* self = static_cast<MockHost*>(ctx);
75-
self->raw_bytes_.assign(payload.data, payload.data + payload.size);
76-
return true;
77-
}
78-
79-
static bool pushMessageV2Thunk(
66+
static bool pushMessageThunk(
8067
void* ctx, PJ_parser_binding_handle_t handle, int64_t ts, PJ_message_data_fetcher_t fetch_message_data,
8168
PJ_error_t* /*err*/) noexcept {
8269
auto* self = static_cast<MockHost*>(ctx);
@@ -90,7 +77,6 @@ class MockHost {
9077
PJ_data_source_runtime_host_vtable_t vtable_{};
9178
PJ_data_source_runtime_host_t host_{};
9279
CapturedPush captured_;
93-
std::vector<uint8_t> raw_bytes_;
9480
};
9581

9682
// Helper: invoke a captured FetchMessageData callable and assert the produced bytes match
@@ -108,9 +94,9 @@ void invokeFetchMessageDataAndExpect(
10894
}
10995
}
11096

111-
// ---------- Tests against the new push_message_v2 path ----------
97+
// ---------- Tests against the new push_message path ----------
11298

113-
TEST(PushMessageV2Test, VectorClosureFlowsThroughSlot) {
99+
TEST(PushMessageTest, VectorClosureFlowsThroughSlot) {
114100
MockHost host;
115101
std::vector<uint8_t> expected{1, 2, 3, 4, 5};
116102

@@ -124,7 +110,7 @@ TEST(PushMessageV2Test, VectorClosureFlowsThroughSlot) {
124110
host.captured().fetch_message_data.release(host.captured().fetch_message_data.ctx);
125111
}
126112

127-
TEST(PushMessageV2Test, PayloadViewClosureFlowsThroughSlot) {
113+
TEST(PushMessageTest, PayloadViewClosureFlowsThroughSlot) {
128114
MockHost host;
129115
std::vector<uint8_t> expected{10, 20, 30};
130116
auto owned = std::make_shared<std::vector<uint8_t>>(expected);
@@ -138,7 +124,7 @@ TEST(PushMessageV2Test, PayloadViewClosureFlowsThroughSlot) {
138124
host.captured().fetch_message_data.release(host.captured().fetch_message_data.ctx);
139125
}
140126

141-
TEST(PushMessageV2Test, FetchIsIdempotent) {
127+
TEST(PushMessageTest, FetchIsIdempotent) {
142128
MockHost host;
143129
std::vector<uint8_t> expected{0x42, 0x43};
144130

@@ -151,7 +137,7 @@ TEST(PushMessageV2Test, FetchIsIdempotent) {
151137
host.captured().fetch_message_data.release(host.captured().fetch_message_data.ctx);
152138
}
153139

154-
TEST(PushMessageV2Test, FetchMessageDataCtxReleasedAfterHostCalls) {
140+
TEST(PushMessageTest, FetchMessageDataCtxReleasedAfterHostCalls) {
155141
MockHost host;
156142
auto canary = std::make_shared<int>(42);
157143
std::weak_ptr<int> witness = canary;
@@ -169,7 +155,7 @@ TEST(PushMessageV2Test, FetchMessageDataCtxReleasedAfterHostCalls) {
169155
EXPECT_TRUE(witness.expired()) << "after release, the captured shared_ptr should have been the last reference";
170156
}
171157

172-
TEST(PushMessageV2Test, PayloadAnchorPropagates) {
158+
TEST(PushMessageTest, PayloadAnchorPropagates) {
173159
MockHost host;
174160
auto owned = std::make_shared<std::vector<uint8_t>>(std::vector<uint8_t>{0x99, 0x9A});
175161
std::weak_ptr<std::vector<uint8_t>> witness = owned;
@@ -204,17 +190,16 @@ TEST(PushMessageV2Test, PayloadAnchorPropagates) {
204190
EXPECT_TRUE(witness.expired());
205191
}
206192

207-
// ---------- Host without push_message_v2 returns explicit error ----------
193+
// ---------- Host without push_message returns explicit error ----------
208194

209-
TEST(PushMessageV2Test, ReturnsErrorWhenSlotMissing) {
195+
TEST(PushMessageTest, ReturnsErrorWhenSlotMissing) {
210196
MockHost host;
211-
host.disablePushMessageV2();
197+
host.disablePushMessage();
212198

213199
std::vector<uint8_t> expected{0xA, 0xB, 0xC};
214200
auto status = host.view().pushMessage(PJ::ParserBindingHandle{1}, 100, [bytes = expected]() { return bytes; });
215-
EXPECT_FALSE(status); // explicit failure — no silent fallback to push_raw_message
201+
EXPECT_FALSE(status); // explicit failure — no silent fallback
216202
EXPECT_FALSE(host.captured().received);
217-
EXPECT_TRUE(host.receivedRawBytes().empty());
218203
}
219204

220205
} // namespace

pj_datastore/docs/USER_GUIDE.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,9 @@ auto binding = runtimeHost().ensureParserBinding({
200200
.parser_config_json = config_json,
201201
});
202202

203-
// In onPoll(): push incoming messages
204-
runtimeHost().pushRawMessage(*binding, timestamp_ns, payload_span);
203+
// In onPoll(): push incoming messages (the host invokes the fetcher per policy)
204+
runtimeHost().pushMessage(
205+
*binding, timestamp_ns, [bytes = payload]() -> std::vector<uint8_t> { return bytes; });
205206
```
206207
207208
- `parser_encoding` is the wire format (e.g., `"cdr"`, `"json"`, `"protobuf"`), not the schema format
@@ -372,7 +373,8 @@ class MyStreamer : public PJ::StreamSourceBase {
372373
PJ::Status onPoll() override {
373374
// Drain buffered messages (must not block!)
374375
while (auto msg = dequeue()) {
375-
runtimeHost().pushRawMessage(binding_, msg->timestamp_ns, msg->payload);
376+
runtimeHost().pushMessage(
377+
binding_, msg->timestamp_ns, [bytes = msg->payload]() -> std::vector<uint8_t> { return bytes; });
376378
}
377379
return PJ::okStatus();
378380
}

pj_plugins/docs/ARCHITECTURE.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ out_array, err)`:
528528

529529
## Builtin-object pipeline (PR #86)
530530

531-
The v4 DataSource runtime host adds a tail slot `push_message_v2`
531+
The v4 DataSource runtime host adds a tail slot `push_message`
532532
(offset 96 in `PJ_data_source_runtime_host_vtable_t`) that takes a
533533
deferred byte-fetch callable instead of bytes:
534534

@@ -539,7 +539,7 @@ typedef struct PJ_message_data_fetcher_t {
539539
void (*release)(void* ctx);
540540
} PJ_message_data_fetcher_t;
541541

542-
bool (*push_message_v2)(
542+
bool (*push_message)(
543543
void* ctx, PJ_parser_binding_handle_t handle, int64_t timestamp_ns,
544544
PJ_message_data_fetcher_t fetch_message_data,
545545
PJ_error_t* out_error) PJ_NOEXCEPT;

pj_plugins/docs/data-source-guide.md

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ class UdpReceiver : public PJ::StreamSourceBase {
313313

314314
// Bind a parser for the configured encoding. The host resolves the
315315
// parser library, creates a parser instance, binds the write host
316-
// and schema, and returns a handle for pushRawMessage().
316+
// and schema, and returns a handle for pushMessage().
317317
binding_ = runtimeHost().ensureParserBinding({
318318
.topic_name = "udp/data",
319319
.parser_encoding = encoding_,
@@ -345,9 +345,11 @@ class UdpReceiver : public PJ::StreamSourceBase {
345345
}
346346

347347
for (const auto& msg : batch) {
348-
auto status = runtimeHost().pushRawMessage(
348+
// pushMessage takes a deferred fetcher; the host invokes it according to
349+
// the active ObjectIngestPolicy (eager for plain curves).
350+
auto status = runtimeHost().pushMessage(
349351
*binding_, msg.timestamp,
350-
PJ::Span<const uint8_t>(msg.payload.data(), msg.payload.size()));
352+
[bytes = msg.payload]() -> std::vector<uint8_t> { return bytes; });
351353
if (!status) {
352354
return PJ::unexpected(status.error());
353355
}
@@ -415,7 +417,7 @@ changes, only a different config value.
415417
callback for flushing accumulated data. The host calls it periodically from
416418
its own thread. For sources with asynchronous I/O (sockets, hardware), the
417419
plugin must manage its own receive thread and use `onPoll()` as the sync
418-
point where buffered data is handed off. Host methods (`pushRawMessage`,
420+
point where buffered data is handed off. Host methods (`pushMessage`,
419421
`appendRecord`, etc.) must only be called from `onPoll()` / the host's
420422
thread, never from the background thread.
421423

@@ -425,7 +427,7 @@ Key traits of `StreamSourceBase`:
425427
- `onStart()` opens connections and creates topics or parser bindings.
426428
- `onPoll()` flushes buffered data into the host — drain what your plugin
427429
has accumulated and return immediately. Must not block. Host methods
428-
(`pushRawMessage`, `appendRecord`, etc.) may only be called from this
430+
(`pushMessage`, `appendRecord`, etc.) may only be called from this
429431
callback.
430432
- `onStop()` tears down connections. Must be idempotent.
431433
- `stop()`, `start()`, `poll()`, and `currentState()` are managed by the
@@ -465,7 +467,7 @@ Access via `runtimeHost()`. Use this for lifecycle coordination and diagnostics.
465467
| `requestStop(terminal_state, reason)` | Ask the host to stop you (self-terminate). |
466468
| `isStopRequested()` | Check if the host wants you to stop. |
467469
| `ensureParserBinding(request)` | Bind a parser for delegated ingest (see below). |
468-
| `pushRawMessage(handle, timestamp, payload)` | Push raw bytes through a parser binding. |
470+
| `pushMessage(handle, timestamp, fetch_message_data)` | Push a message through a parser binding via a deferred fetcher callable; the host invokes it per the active ObjectIngestPolicy (eager/lazy). |
469471

470472
## Optional Features
471473

@@ -543,8 +545,9 @@ auto binding = runtimeHost().ensureParserBinding({
543545
});
544546
if (!binding) { return PJ::unexpected(binding.error()); }
545547
546-
// 2. Push raw payloads — the host parses and stores them
547-
auto status = runtimeHost().pushRawMessage(*binding, timestamp_ns, payload);
548+
// 2. Push payloads — the host invokes the fetcher and parses/stores per policy
549+
auto status = runtimeHost().pushMessage(
550+
*binding, timestamp_ns, [bytes = payload]() -> std::vector<uint8_t> { return bytes; });
548551
```
549552

550553
The host manages parser instances, caches bindings, and handles schema

0 commit comments

Comments
 (0)