Skip to content

Commit e0c3cad

Browse files
Merge branch 'revert-b77d16aa' into 'internal_main'
Revert "Merge branch 'refactor/push-message-slot-rename' into 'internal_main'" See merge request client-projets/p.2026-plotjuggler/plotjuggler_core!193
2 parents b77d16a + 2b64709 commit e0c3cad

13 files changed

Lines changed: 107 additions & 62 deletions

pj_base/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ if(PJ_BUILD_TESTS)
8888
tests/image_annotations_codec_test.cpp
8989
tests/image_annotations_decoder_test.cpp
9090
tests/media_metadata_test.cpp
91-
tests/push_message_test.cpp
91+
tests/push_message_v2_test.cpp
9292
tests/image_codec_test.cpp
9393
tests/depth_image_codec_test.cpp
9494
tests/point_cloud_codec_test.cpp

pj_base/include/pj_base/data_source_protocol.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ enum {
133133
PJ_DATA_SOURCE_CAPABILITY_HAS_DIALOG = 1ull << 5, /**< Plugin provides a configuration dialog. */
134134
};
135135

136-
/** Opaque handle returned by ensure_parser_binding, used with push_message. */
136+
/** Opaque handle returned by ensure_parser_binding, used with push_raw_message. */
137137
typedef struct {
138138
uint32_t id;
139139
} PJ_parser_binding_handle_t;
@@ -260,6 +260,22 @@ typedef struct PJ_data_source_runtime_host_vtable_t {
260260
void* ctx, const PJ_parser_binding_request_t* request, PJ_parser_binding_handle_t* out_handle,
261261
PJ_error_t* out_error) PJ_NOEXCEPT;
262262

263+
/**
264+
* [stream-thread] Push a raw message payload for host-side parsing.
265+
* @p handle must have been obtained from ensure_parser_binding.
266+
* @p host_timestamp_ns is nanoseconds since the Unix epoch
267+
* (1970-01-01T00:00:00Z). Returns false + error on failure.
268+
*
269+
* Eager-only push: the host parses immediately and the bytes are not
270+
* retained for later replay. Plugins that need lazy materialization or
271+
* ObjectIngestPolicy dispatch should use push_message_v2 instead. This
272+
* slot remains for sources that fan-out raw bytes without an associated
273+
* FetchMessageData callable (streaming or eager-only consumers).
274+
*/
275+
bool (*push_raw_message)(
276+
void* ctx, PJ_parser_binding_handle_t handle, int64_t host_timestamp_ns, PJ_bytes_view_t payload,
277+
PJ_error_t* out_error) PJ_NOEXCEPT;
278+
263279
/**
264280
* [main-thread] Display a modal message box to the user and wait for
265281
* their response. BLOCKS until the user closes the dialog. The host is
@@ -324,7 +340,7 @@ typedef struct PJ_data_source_runtime_host_vtable_t {
324340
* ObjectStore push failed, etc.). On failure the host still calls
325341
* `release` so the plugin's ctx leaks no resources.
326342
*/
327-
bool (*push_message)(
343+
bool (*push_message_v2)(
328344
void* ctx, PJ_parser_binding_handle_t handle, int64_t host_timestamp_ns,
329345
PJ_message_data_fetcher_t fetch_message_data, PJ_error_t* out_error) PJ_NOEXCEPT;
330346
} PJ_data_source_runtime_host_vtable_t;

pj_base/include/pj_base/sdk/data_source_host_views.hpp

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,19 @@ class DataSourceRuntimeHostView {
209209
return handle;
210210
}
211211

212+
/// Push a raw message for host-side parsing via a previously obtained binding handle.
213+
[[nodiscard]] Status pushRawMessage(
214+
ParserBindingHandle handle, Timestamp host_timestamp_ns, Span<const uint8_t> payload) const {
215+
if (!valid() || host_.vtable->push_raw_message == nullptr) {
216+
return unexpected("runtime host is not bound");
217+
}
218+
PJ_error_t err{};
219+
if (!host_.vtable->push_raw_message(host_.ctx, handle, host_timestamp_ns, sdk::toAbiBytes(payload), &err)) {
220+
return unexpected(errorToString(err));
221+
}
222+
return okStatus();
223+
}
224+
212225
/// Push a message via a deferred FetchMessageData callable. The DataSource
213226
/// hands the host a callable that produces the payload bytes when invoked.
214227
/// The host applies the active ObjectIngestPolicy (resolved via the
@@ -232,11 +245,12 @@ class DataSourceRuntimeHostView {
232245
/// heap-relocated and used as its own anchor; bytes survive across
233246
/// the C ABI boundary at the cost of one alloc-and-move.
234247
///
235-
/// The host MUST advertise the push_message tail slot. We wrap the
248+
/// The host MUST advertise the push_message_v2 tail slot. We wrap the
236249
/// closure into a PJ_message_data_fetcher_t and hand it over verbatim; the
237250
/// host applies ObjectIngestPolicy and decides when (and whether) to
238251
/// invoke it. There is no legacy fallback: a host that doesn't expose
239-
/// the slot returns an explicit error here.
252+
/// the slot returns an explicit error here rather than silently
253+
/// degrading to a kEager push_raw_message.
240254
template <typename FetchMessageData>
241255
[[nodiscard]] Status pushMessage(
242256
ParserBindingHandle handle, Timestamp host_timestamp_ns, FetchMessageData&& fetch_message_data) const {
@@ -250,8 +264,8 @@ class DataSourceRuntimeHostView {
250264
if (!valid()) {
251265
return unexpected(std::string("runtime host is not bound"));
252266
}
253-
if (!PJ_HAS_TAIL_SLOT(PJ_data_source_runtime_host_vtable_t, host_.vtable, push_message)) {
254-
return unexpected(std::string("runtime host does not expose push_message"));
267+
if (!PJ_HAS_TAIL_SLOT(PJ_data_source_runtime_host_vtable_t, host_.vtable, push_message_v2)) {
268+
return unexpected(std::string("runtime host does not expose push_message_v2"));
255269
}
256270

257271
auto* ctx = new FetchMessageDataT(std::forward<FetchMessageData>(fetch_message_data));
@@ -294,7 +308,7 @@ class DataSourceRuntimeHostView {
294308
};
295309

296310
PJ_error_t err{};
297-
if (!host_.vtable->push_message(host_.ctx, handle, host_timestamp_ns, abi_fetch_message_data, &err)) {
311+
if (!host_.vtable->push_message_v2(host_.ctx, handle, host_timestamp_ns, abi_fetch_message_data, &err)) {
298312
return unexpected(errorToString(err));
299313
}
300314
return okStatus();

pj_base/include/pj_base/sdk/data_source_patterns.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class StreamSourceBase : public DataSourcePluginBase {
136136
/// MUST NOT BLOCK — drain buffered data and return immediately.
137137
/// Do not call recv(), read(), or any syscall that may wait.
138138
/// If your source has a receive thread, swap-drain a buffer here.
139-
/// Host methods (appendRecord, pushMessage) may only be called from this method.
139+
/// Host methods (appendRecord, pushRawMessage) may only be called from this method.
140140
virtual Status onPoll() = 0;
141141

142142
/// Called from stop(). Close connections, free resources.

pj_base/tests/abi_layout_sentinels_test.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,14 @@ static_assert(offsetof(PJ_data_source_runtime_host_vtable_t, protocol_version) =
147147
static_assert(offsetof(PJ_data_source_runtime_host_vtable_t, struct_size) == 4, "v1 prefix pinned");
148148
static_assert(offsetof(PJ_data_source_runtime_host_vtable_t, report_message) == 8, "v1 first slot pinned");
149149
static_assert(
150-
offsetof(PJ_data_source_runtime_host_vtable_t, list_available_encodings) == 80,
151-
"list_available_encodings slot pinned");
150+
offsetof(PJ_data_source_runtime_host_vtable_t, push_raw_message) == 72, "v1 push_raw_message slot pinned");
152151
static_assert(
153-
offsetof(PJ_data_source_runtime_host_vtable_t, push_message) == 88, "push_message tail slot pinned");
152+
offsetof(PJ_data_source_runtime_host_vtable_t, list_available_encodings) == 88,
153+
"v1 list_available_encodings slot pinned");
154154
static_assert(
155-
sizeof(PJ_data_source_runtime_host_vtable_t) == 96, "Runtime host vtable size (update deliberately on append)");
155+
offsetof(PJ_data_source_runtime_host_vtable_t, push_message_v2) == 96, "v1 push_message_v2 tail slot pinned");
156+
static_assert(
157+
sizeof(PJ_data_source_runtime_host_vtable_t) == 104, "Runtime host vtable size (update deliberately on append)");
156158

157159
// --- Write-host vtables (ABI-APPENDABLE within v4) --------------------------
158160
static_assert(offsetof(PJ_source_write_host_vtable_t, abi_version) == 0, "source write host prefix pinned");
Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
// Tests for the SDK template `DataSourceRuntimeHostView::pushMessage` and
5-
// its delegation to the C ABI slot `push_message`. We exercise:
5+
// its delegation to the C ABI slot `push_message_v2`. We exercise:
66
//
77
// 1. Vector closure → the captured FetchMessageData callable yields the
88
// same bytes when invoked from the host.
@@ -11,7 +11,7 @@
1111
// 3. Multiple invocations are idempotent (same bytes each time).
1212
// 4. The heap-held closure context is destroyed exactly once when the
1313
// host calls `release`.
14-
// 5. When the host does not expose push_message (struct_size short
14+
// 5. When the host does not expose push_message_v2 (struct_size short
1515
// or field NULL), pushMessage returns an explicit error rather than
1616
// degrading silently.
1717

@@ -28,30 +28,32 @@
2828

2929
namespace {
3030

31-
// Captured state from a push_message invocation.
31+
// Captured state from a push_message_v2 invocation.
3232
struct CapturedPush {
3333
PJ_parser_binding_handle_t handle{};
3434
int64_t timestamp_ns = 0;
3535
PJ_message_data_fetcher_t fetch_message_data{};
3636
bool received = false;
3737
};
3838

39-
// Mock runtime host — exposes a vtable that captures push_message calls.
39+
// Mock runtime host — exposes a vtable that captures push_message_v2 calls
40+
// and, alternatively, push_raw_message calls (for the legacy fallback).
4041
class MockHost {
4142
public:
4243
MockHost() {
4344
vtable_.protocol_version = PJ_DATA_SOURCE_PROTOCOL_VERSION;
4445
vtable_.struct_size = sizeof(PJ_data_source_runtime_host_vtable_t);
45-
vtable_.push_message = &MockHost::pushMessageThunk;
46+
vtable_.push_raw_message = &MockHost::pushRawMessageThunk;
47+
vtable_.push_message_v2 = &MockHost::pushMessageV2Thunk;
4648
host_.ctx = this;
4749
host_.vtable = &vtable_;
4850
}
4951

5052
// Drop the v2 slot — both clearing the field and shrinking struct_size,
5153
// matching the runtime scenario where the host predates the addition.
52-
void disablePushMessage() {
53-
vtable_.push_message = nullptr;
54-
vtable_.struct_size = offsetof(PJ_data_source_runtime_host_vtable_t, push_message);
54+
void disablePushMessageV2() {
55+
vtable_.push_message_v2 = nullptr;
56+
vtable_.struct_size = offsetof(PJ_data_source_runtime_host_vtable_t, push_message_v2);
5557
}
5658

5759
PJ::DataSourceRuntimeHostView view() const {
@@ -61,9 +63,20 @@ class MockHost {
6163
CapturedPush& captured() {
6264
return captured_;
6365
}
66+
std::vector<uint8_t>& receivedRawBytes() {
67+
return raw_bytes_;
68+
}
6469

6570
private:
66-
static bool pushMessageThunk(
71+
static bool pushRawMessageThunk(
72+
void* ctx, PJ_parser_binding_handle_t /*handle*/, int64_t /*ts*/, PJ_bytes_view_t payload,
73+
PJ_error_t* /*err*/) noexcept {
74+
auto* self = static_cast<MockHost*>(ctx);
75+
self->raw_bytes_.assign(payload.data, payload.data + payload.size);
76+
return true;
77+
}
78+
79+
static bool pushMessageV2Thunk(
6780
void* ctx, PJ_parser_binding_handle_t handle, int64_t ts, PJ_message_data_fetcher_t fetch_message_data,
6881
PJ_error_t* /*err*/) noexcept {
6982
auto* self = static_cast<MockHost*>(ctx);
@@ -77,6 +90,7 @@ class MockHost {
7790
PJ_data_source_runtime_host_vtable_t vtable_{};
7891
PJ_data_source_runtime_host_t host_{};
7992
CapturedPush captured_;
93+
std::vector<uint8_t> raw_bytes_;
8094
};
8195

8296
// Helper: invoke a captured FetchMessageData callable and assert the produced bytes match
@@ -94,9 +108,9 @@ void invokeFetchMessageDataAndExpect(
94108
}
95109
}
96110

97-
// ---------- Tests against the new push_message path ----------
111+
// ---------- Tests against the new push_message_v2 path ----------
98112

99-
TEST(PushMessageTest, VectorClosureFlowsThroughSlot) {
113+
TEST(PushMessageV2Test, VectorClosureFlowsThroughSlot) {
100114
MockHost host;
101115
std::vector<uint8_t> expected{1, 2, 3, 4, 5};
102116

@@ -110,7 +124,7 @@ TEST(PushMessageTest, VectorClosureFlowsThroughSlot) {
110124
host.captured().fetch_message_data.release(host.captured().fetch_message_data.ctx);
111125
}
112126

113-
TEST(PushMessageTest, PayloadViewClosureFlowsThroughSlot) {
127+
TEST(PushMessageV2Test, PayloadViewClosureFlowsThroughSlot) {
114128
MockHost host;
115129
std::vector<uint8_t> expected{10, 20, 30};
116130
auto owned = std::make_shared<std::vector<uint8_t>>(expected);
@@ -124,7 +138,7 @@ TEST(PushMessageTest, PayloadViewClosureFlowsThroughSlot) {
124138
host.captured().fetch_message_data.release(host.captured().fetch_message_data.ctx);
125139
}
126140

127-
TEST(PushMessageTest, FetchIsIdempotent) {
141+
TEST(PushMessageV2Test, FetchIsIdempotent) {
128142
MockHost host;
129143
std::vector<uint8_t> expected{0x42, 0x43};
130144

@@ -137,7 +151,7 @@ TEST(PushMessageTest, FetchIsIdempotent) {
137151
host.captured().fetch_message_data.release(host.captured().fetch_message_data.ctx);
138152
}
139153

140-
TEST(PushMessageTest, FetchMessageDataCtxReleasedAfterHostCalls) {
154+
TEST(PushMessageV2Test, FetchMessageDataCtxReleasedAfterHostCalls) {
141155
MockHost host;
142156
auto canary = std::make_shared<int>(42);
143157
std::weak_ptr<int> witness = canary;
@@ -155,7 +169,7 @@ TEST(PushMessageTest, FetchMessageDataCtxReleasedAfterHostCalls) {
155169
EXPECT_TRUE(witness.expired()) << "after release, the captured shared_ptr should have been the last reference";
156170
}
157171

158-
TEST(PushMessageTest, PayloadAnchorPropagates) {
172+
TEST(PushMessageV2Test, PayloadAnchorPropagates) {
159173
MockHost host;
160174
auto owned = std::make_shared<std::vector<uint8_t>>(std::vector<uint8_t>{0x99, 0x9A});
161175
std::weak_ptr<std::vector<uint8_t>> witness = owned;
@@ -190,16 +204,17 @@ TEST(PushMessageTest, PayloadAnchorPropagates) {
190204
EXPECT_TRUE(witness.expired());
191205
}
192206

193-
// ---------- Host without push_message returns explicit error ----------
207+
// ---------- Host without push_message_v2 returns explicit error ----------
194208

195-
TEST(PushMessageTest, ReturnsErrorWhenSlotMissing) {
209+
TEST(PushMessageV2Test, ReturnsErrorWhenSlotMissing) {
196210
MockHost host;
197-
host.disablePushMessage();
211+
host.disablePushMessageV2();
198212

199213
std::vector<uint8_t> expected{0xA, 0xB, 0xC};
200214
auto status = host.view().pushMessage(PJ::ParserBindingHandle{1}, 100, [bytes = expected]() { return bytes; });
201-
EXPECT_FALSE(status); // explicit failure — no silent fallback
215+
EXPECT_FALSE(status); // explicit failure — no silent fallback to push_raw_message
202216
EXPECT_FALSE(host.captured().received);
217+
EXPECT_TRUE(host.receivedRawBytes().empty());
203218
}
204219

205220
} // namespace

pj_datastore/docs/USER_GUIDE.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,8 @@ auto binding = runtimeHost().ensureParserBinding({
200200
.parser_config_json = config_json,
201201
});
202202

203-
// In onPoll(): push incoming messages (the host invokes the fetcher per policy)
204-
runtimeHost().pushMessage(
205-
*binding, timestamp_ns, [bytes = payload]() -> std::vector<uint8_t> { return bytes; });
203+
// In onPoll(): push incoming messages
204+
runtimeHost().pushRawMessage(*binding, timestamp_ns, payload_span);
206205
```
207206
208207
- `parser_encoding` is the wire format (e.g., `"cdr"`, `"json"`, `"protobuf"`), not the schema format
@@ -373,8 +372,7 @@ class MyStreamer : public PJ::StreamSourceBase {
373372
PJ::Status onPoll() override {
374373
// Drain buffered messages (must not block!)
375374
while (auto msg = dequeue()) {
376-
runtimeHost().pushMessage(
377-
binding_, msg->timestamp_ns, [bytes = msg->payload]() -> std::vector<uint8_t> { return bytes; });
375+
runtimeHost().pushRawMessage(binding_, msg->timestamp_ns, msg->payload);
378376
}
379377
return PJ::okStatus();
380378
}

pj_plugins/docs/ARCHITECTURE.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ out_array, err)`:
519519

520520
## Builtin-object pipeline (PR #86)
521521

522-
The v4 DataSource runtime host adds a tail slot `push_message`
522+
The v4 DataSource runtime host adds a tail slot `push_message_v2`
523523
(offset 96 in `PJ_data_source_runtime_host_vtable_t`) that takes a
524524
deferred byte-fetch callable instead of bytes:
525525

@@ -530,7 +530,7 @@ typedef struct PJ_message_data_fetcher_t {
530530
void (*release)(void* ctx);
531531
} PJ_message_data_fetcher_t;
532532

533-
bool (*push_message)(
533+
bool (*push_message_v2)(
534534
void* ctx, PJ_parser_binding_handle_t handle, int64_t timestamp_ns,
535535
PJ_message_data_fetcher_t fetch_message_data,
536536
PJ_error_t* out_error) PJ_NOEXCEPT;

pj_plugins/docs/data-source-guide.md

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ class UdpReceiver : public PJ::StreamSourceBase {
313313

314314
// Bind a parser for the configured encoding. The host resolves the
315315
// parser library, creates a parser instance, binds the write host
316-
// and schema, and returns a handle for pushMessage().
316+
// and schema, and returns a handle for pushRawMessage().
317317
binding_ = runtimeHost().ensureParserBinding({
318318
.topic_name = "udp/data",
319319
.parser_encoding = encoding_,
@@ -345,11 +345,9 @@ class UdpReceiver : public PJ::StreamSourceBase {
345345
}
346346

347347
for (const auto& msg : batch) {
348-
// pushMessage takes a deferred fetcher; the host invokes it according to
349-
// the active ObjectIngestPolicy (eager for plain curves).
350-
auto status = runtimeHost().pushMessage(
348+
auto status = runtimeHost().pushRawMessage(
351349
*binding_, msg.timestamp,
352-
[bytes = msg.payload]() -> std::vector<uint8_t> { return bytes; });
350+
PJ::Span<const uint8_t>(msg.payload.data(), msg.payload.size()));
353351
if (!status) {
354352
return PJ::unexpected(status.error());
355353
}
@@ -417,7 +415,7 @@ changes, only a different config value.
417415
callback for flushing accumulated data. The host calls it periodically from
418416
its own thread. For sources with asynchronous I/O (sockets, hardware), the
419417
plugin must manage its own receive thread and use `onPoll()` as the sync
420-
point where buffered data is handed off. Host methods (`pushMessage`,
418+
point where buffered data is handed off. Host methods (`pushRawMessage`,
421419
`appendRecord`, etc.) must only be called from `onPoll()` / the host's
422420
thread, never from the background thread.
423421

@@ -427,7 +425,7 @@ Key traits of `StreamSourceBase`:
427425
- `onStart()` opens connections and creates topics or parser bindings.
428426
- `onPoll()` flushes buffered data into the host — drain what your plugin
429427
has accumulated and return immediately. Must not block. Host methods
430-
(`pushMessage`, `appendRecord`, etc.) may only be called from this
428+
(`pushRawMessage`, `appendRecord`, etc.) may only be called from this
431429
callback.
432430
- `onStop()` tears down connections. Must be idempotent.
433431
- `stop()`, `start()`, `poll()`, and `currentState()` are managed by the
@@ -467,7 +465,7 @@ Access via `runtimeHost()`. Use this for lifecycle coordination and diagnostics.
467465
| `requestStop(terminal_state, reason)` | Ask the host to stop you (self-terminate). |
468466
| `isStopRequested()` | Check if the host wants you to stop. |
469467
| `ensureParserBinding(request)` | Bind a parser for delegated ingest (see below). |
470-
| `pushMessage(handle, timestamp, fetch_message_data)` | Push a message through a parser binding via a deferred fetcher callable; the host invokes it per the active ObjectIngestPolicy (eager/lazy). |
468+
| `pushRawMessage(handle, timestamp, payload)` | Push raw bytes through a parser binding. |
471469

472470
## Optional Features
473471

@@ -545,9 +543,8 @@ auto binding = runtimeHost().ensureParserBinding({
545543
});
546544
if (!binding) { return PJ::unexpected(binding.error()); }
547545
548-
// 2. Push payloads — the host invokes the fetcher and parses/stores per policy
549-
auto status = runtimeHost().pushMessage(
550-
*binding, timestamp_ns, [bytes = payload]() -> std::vector<uint8_t> { return bytes; });
546+
// 2. Push raw payloads — the host parses and stores them
547+
auto status = runtimeHost().pushRawMessage(*binding, timestamp_ns, payload);
551548
```
552549

553550
The host manages parser instances, caches bindings, and handles schema

0 commit comments

Comments
 (0)