Skip to content

Commit 353bd4e

Browse files
pabloinigoblascofacontidavide
authored andcommitted
chore: PayloadView constructors + docs + extra tests + bump 0.5.0
Adopt the additions Davide made on top of the consolidated refactor upstream (PR #105): - PayloadView gains explicit default + Span/anchor + shared_ptr<vector> constructors in buffer_anchor.hpp. - New section in OBJECT_STORE_DESIGN.md covering the PayloadView contract and the eager/lazy dispatch in resolveEntry. - Additional ObjectStore tests (+56 lines) covering edge cases not in the original PR. - service_registry_builder.hpp tweak. - conanfile.py + CMakeLists.txt bumped to 0.5.0 (matches the release tag cut upstream).
1 parent 67760dc commit 353bd4e

10 files changed

Lines changed: 143 additions & 56 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ endif()
196196
if(PJ_INSTALL_SDK)
197197
include(CMakePackageConfigHelpers)
198198

199-
set(PJ_PACKAGE_VERSION "0.4.0")
199+
set(PJ_PACKAGE_VERSION "0.5.0")
200200
set(PJ_PACKAGE_CMAKE_DIR ${CMAKE_INSTALL_LIBDIR}/cmake/plotjuggler_core)
201201

202202
install(EXPORT plotjuggler_coreTargets

conanfile.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
plugin_sdk — umbrella for plugin authors (base + dialog SDK + parser SDK)
88
plugin_host — umbrella for host loaders (data_source/parser/toolbox/dialog)
99
10-
A consuming Conan recipe declares e.g. `plotjuggler_core/0.4.0` and then:
10+
A consuming Conan recipe declares e.g. `plotjuggler_core/0.5.0` and then:
1111
1212
find_package(plotjuggler_core REQUIRED COMPONENTS plugin_sdk)
1313
target_link_libraries(my_plugin PRIVATE plotjuggler_core::plugin_sdk)
@@ -27,7 +27,7 @@
2727

2828
class PlotjugglerCoreConan(ConanFile):
2929
name = "plotjuggler_core"
30-
version = "0.4.0"
30+
version = "0.5.0"
3131
# Apache-2.0 covers pj_base + pj_plugins (the plugin-facing SDK);
3232
# MPL-2.0 covers pj_datastore (the storage engine). See LICENSE.
3333
license = "Apache-2.0 AND MPL-2.0"

pj_base/include/pj_base/buffer_anchor.hpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,20 @@ using BufferAnchor = std::shared_ptr<const void>;
4141
struct PayloadView {
4242
Span<const uint8_t> bytes;
4343
BufferAnchor anchor;
44+
45+
PayloadView() = default;
46+
47+
PayloadView(Span<const uint8_t> bytes_, BufferAnchor anchor_) : bytes(bytes_), anchor(std::move(anchor_)) {}
48+
49+
PayloadView(std::shared_ptr<const std::vector<uint8_t>> buffer)
50+
: bytes(buffer ? Span<const uint8_t>(buffer->data(), buffer->size()) : Span<const uint8_t>()),
51+
anchor(std::move(buffer)) {}
4452
};
4553

46-
/// Convenience helper for producers whose only payload is a plain
47-
/// std::vector<uint8_t>. Wraps it in a shared_ptr (which becomes both the
48-
/// owner of the bytes and the type-erased anchor), and returns a PayloadView
49-
/// whose Span points at that vector's contents. Satisfies the contract
50-
/// expected by ObjectStore::resolveEntry on the lazy branch: the anchor is a
51-
/// shared_ptr<const std::vector<uint8_t>> recoverable via static_pointer_cast.
54+
/// Wrap a fresh vector as both anchor and Span. Use when the producer has
55+
/// no natural anchor (e.g. raw bytes from a C-ABI fetch); when an upstream
56+
/// allocation already exists, construct PayloadView directly to avoid this
57+
/// helper's copy.
5258
inline PayloadView makePayloadView(std::vector<uint8_t> bytes) {
5359
auto shared = std::make_shared<const std::vector<uint8_t>>(std::move(bytes));
5460
return PayloadView{

pj_datastore/docs/OBJECT_STORE_DESIGN.md

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@ struct ObjectTopicDescriptor {
3030
std::string metadata_json;
3131
};
3232

33+
// Eager payload: store-owned bytes, counted against the retention budget.
34+
using SharedBuffer = std::shared_ptr<const std::vector<uint8_t>>;
35+
// Lazy payload: idempotent fetcher returning a view + ownership anchor.
36+
using LazyCallback = std::function<sdk::PayloadView()>;
37+
3338
struct ObjectEntry {
3439
Timestamp timestamp;
35-
std::variant<
36-
std::shared_ptr<const std::vector<uint8_t>>,
37-
std::function<std::vector<uint8_t>()>> payload;
40+
std::variant<SharedBuffer, LazyCallback> payload;
3841
};
3942

4043
struct RetentionBudget {
@@ -55,9 +58,13 @@ order. Equal timestamps are allowed. Out-of-order writes fail.
5558
shared buffer owned by the store. Owned entries contribute to `memoryUsage()`.
5659
5760
`pushLazy(id, timestamp, fetch)` stores a callable instead of bytes. The callable
58-
is invoked on each read and returns a fresh byte vector. Lazy entries do not
59-
contribute to `memoryUsage()` because the store does not retain the bytes between
60-
reads.
61+
is invoked on each read and returns a `sdk::PayloadView` — a `Span<const uint8_t>`
62+
paired with a type-erased `BufferAnchor` that keeps those bytes alive for as long
63+
as the resolved view is held. The producer anchors on whatever already owns the
64+
bytes (a decompressed chunk, an mmap, or a fresh allocation via
65+
`sdk::makePayloadView`), so the store never copies on resolve. Lazy entries do not
66+
contribute to `memoryUsage()` because the store retains the callable, not the
67+
fetched bytes.
6168
6269
Both write paths apply the topic retention budget after the new entry is
6370
inserted.
@@ -80,12 +87,14 @@ Resolved entries contain:
8087
```cpp
8188
struct ResolvedObjectEntry {
8289
Timestamp timestamp;
83-
std::shared_ptr<const std::vector<uint8_t>> data;
90+
sdk::PayloadView payload; // { Span<const uint8_t> bytes; BufferAnchor anchor; }
8491
};
8592
```
8693

87-
The shared pointer keeps resolved bytes alive independently of later store
88-
mutation.
94+
`payload.bytes` is the resolved view; `payload.anchor` keeps those bytes alive
95+
independently of later store mutation (eviction, removal, or `clear()`). For an
96+
owned entry the anchor is the store's `SharedBuffer`; for a lazy entry it is
97+
whatever the fetcher anchored on. An empty `anchor` means "no bytes".
8998

9099
## Retention
91100

pj_datastore/include/pj_datastore/object_store.hpp

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Copyright 2026 Davide Faconti
33
// SPDX-License-Identifier: MPL-2.0
44

5-
#include <any>
65
#include <cstddef>
76
#include <cstdint>
87
#include <deque>
@@ -14,10 +13,12 @@
1413
#include <string>
1514
#include <string_view>
1615
#include <utility>
16+
#include <variant>
1717
#include <vector>
1818

1919
#include "pj_base/buffer_anchor.hpp"
2020
#include "pj_base/expected.hpp"
21+
#include "pj_base/span.hpp"
2122
#include "pj_base/types.hpp"
2223

2324
namespace PJ {
@@ -39,12 +40,20 @@ struct ObjectTopicDescriptor {
3940
std::string metadata_json;
4041
};
4142

43+
/// Eager payload: store-owned bytes, counted against the retention budget.
44+
using SharedBuffer = std::shared_ptr<const std::vector<uint8_t>>;
45+
46+
/// Lazy payload: idempotent, thread-safe fetcher returning bytes + anchor.
47+
/// Invoked on every read; bytes are not counted against the retention budget.
48+
using LazyCallback = std::function<sdk::PayloadView()>;
49+
4250
struct ObjectEntry {
4351
Timestamp timestamp = 0;
44-
// Holds either a shared_ptr<const std::vector<uint8_t>> (eager owned payload)
45-
// or a std::function<sdk::PayloadView()> (lazy resolver). resolveEntry
46-
// discriminates via std::any_cast.
47-
std::any payload;
52+
// Holds either a SharedBuffer (eager owned payload, counted against the
53+
// retention budget) or a LazyCallback (lazy resolver). resolveEntry
54+
// discriminates via std::get_if; the variant is exhaustive over the two
55+
// (and only two) payload kinds.
56+
std::variant<SharedBuffer, LazyCallback> payload;
4857
};
4958

5059
struct ResolvedObjectEntry {
@@ -119,16 +128,14 @@ class ObjectStore {
119128

120129
// --- Write ---
121130

122-
Expected<void, std::string> pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);
131+
Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);
123132

124-
// The fetch callable is invoked on every read. It returns a PayloadView
125-
// (Span + anchor). When the producer already holds the bytes in memory
126-
// behind a shared_ptr (e.g. a streaming buffer being handed off between
127-
// stores), the closure can capture that shared_ptr and return a view
128-
// backed by it — no copy. For producers that materialize bytes from disk
129-
// or other sources, the closure allocates a fresh buffer and uses it as
130-
// the anchor.
131-
Expected<void, std::string> pushLazy(ObjectTopicId id, Timestamp timestamp, std::function<sdk::PayloadView()> fetch);
133+
// Fetcher runs on every read. Producers anchor on whatever owns the bytes
134+
// (chunk cache, mmap, fresh allocation); the store never copies — it just
135+
// retains the anchor through PayloadView. When the producer already holds
136+
// the bytes behind a shared_ptr (e.g. a streaming buffer handed off between
137+
// stores), the closure captures it and returns a view backed by it.
138+
Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch);
132139

133140
// --- Read ---
134141

@@ -167,10 +174,10 @@ class ObjectStore {
167174
// neither store is mutated.
168175
//
169176
// Zero-copy on the payload bytes. Each ObjectEntry is moved into the
170-
// destination's series by value; the std::variant inside (or, post-#184,
171-
// the std::any) holds either a shared_ptr or a std::function, and moving
172-
// it is a pointer/buffer move — bytes captured by the closure or owned by
173-
// the shared_ptr are never copied or materialized during the flush. Lazy
177+
// destination's series by value; the std::variant inside holds either a
178+
// shared_ptr or a std::function, and moving it is a pointer/buffer move —
179+
// bytes captured by the closure or owned by the shared_ptr are never
180+
// copied or materialized during the flush. Lazy
174181
// entries preserve their semantics in the destination: their closure is
175182
// re-invoked only when the destination is read.
176183
//

pj_datastore/src/object_store.cpp

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ std::vector<ObjectTopicId> ObjectStore::listTopics(DatasetId dataset_id) const {
6767

6868
// --- Write ---
6969

70-
Expected<void, std::string> ObjectStore::pushOwned(
71-
ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload) {
70+
Status ObjectStore::pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload) {
7271
std::shared_lock store_lock(store_mutex_);
7372
auto* series = findSeries(id);
7473
if (series == nullptr) {
@@ -95,8 +94,7 @@ Expected<void, std::string> ObjectStore::pushOwned(
9594
return {};
9695
}
9796

98-
Expected<void, std::string> ObjectStore::pushLazy(
99-
ObjectTopicId id, Timestamp timestamp, std::function<sdk::PayloadView()> fetch) {
97+
Status ObjectStore::pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch) {
10098
std::shared_lock store_lock(store_mutex_);
10199
auto* series = findSeries(id);
102100
if (series == nullptr) {
@@ -375,14 +373,17 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) {
375373
ResolvedObjectEntry resolved;
376374
resolved.timestamp = entry.timestamp;
377375

378-
if (const auto* owned = std::any_cast<std::shared_ptr<const std::vector<uint8_t>>>(&entry.payload)) {
376+
if (const auto* owned = std::get_if<SharedBuffer>(&entry.payload)) {
379377
// Owned branch: build a PayloadView whose Span covers the vector and
380-
// whose anchor IS the same shared_ptr — refcount bump, no copy.
381-
resolved.payload = sdk::PayloadView{
382-
Span<const uint8_t>{(*owned)->data(), (*owned)->size()},
383-
sdk::BufferAnchor{*owned},
384-
};
385-
} else if (const auto* lazy = std::any_cast<std::function<sdk::PayloadView()>>(&entry.payload)) {
378+
// whose anchor IS the same shared_ptr — refcount bump, no copy. A
379+
// default-constructed entry holds a null SharedBuffer, so guard it.
380+
if (*owned) {
381+
resolved.payload = sdk::PayloadView{
382+
Span<const uint8_t>{(*owned)->data(), (*owned)->size()},
383+
sdk::BufferAnchor{*owned},
384+
};
385+
}
386+
} else if (const auto* lazy = std::get_if<LazyCallback>(&entry.payload)) {
386387
// Lazy branch: forward whatever the closure returns. The anchor stays
387388
// opaque (shared_ptr<const void>); consumers retain it without needing
388389
// to know the concrete type. No static_pointer_cast here — that was the
@@ -401,7 +402,7 @@ void ObjectStore::evictFront(ObjectSeries& series) {
401402
}
402403

403404
const auto& front = series.entries.front();
404-
if (const auto* owned = std::any_cast<std::shared_ptr<const std::vector<uint8_t>>>(&front.payload)) {
405+
if (const auto* owned = std::get_if<SharedBuffer>(&front.payload); owned != nullptr && *owned) {
405406
series.memory_bytes -= (*owned)->size();
406407
}
407408

pj_datastore/tests/object_store_test.cpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66
#include <gtest/gtest.h>
77

8+
#include <array>
89
#include <cstdint>
910
#include <functional>
11+
#include <memory>
1012
#include <string>
1113
#include <thread>
1214
#include <vector>
@@ -294,6 +296,60 @@ TEST(ObjectStoreTest, PushLazyResolves) {
294296
EXPECT_EQ(r->payload.bytes[0], 0xDE);
295297
}
296298

299+
// Regression: anchor type-erasure must survive resolveEntry. The anchor here
300+
// is a shared_ptr<TestBuffer> (not vector); a prior static_pointer_cast to
301+
// vector would UB.
302+
TEST(ObjectStoreTest, PushLazyPreservesAnchorType) {
303+
struct TestBuffer {
304+
std::array<uint8_t, 4> bytes{0x11, 0x22, 0x33, 0x44};
305+
};
306+
ObjectStore store;
307+
auto id = registerTestTopic(store);
308+
309+
auto buffer = std::make_shared<TestBuffer>();
310+
std::weak_ptr<TestBuffer> weak_buffer = buffer;
311+
312+
store.pushLazy(id, 100, [buffer]() -> sdk::PayloadView {
313+
return sdk::PayloadView{
314+
Span<const uint8_t>{buffer->bytes.data(), buffer->bytes.size()},
315+
sdk::BufferAnchor{buffer},
316+
};
317+
});
318+
319+
auto r = store.latestAt(id, 100);
320+
ASSERT_TRUE(r.has_value());
321+
ASSERT_EQ(r->payload.bytes.size(), 4u);
322+
EXPECT_EQ(r->payload.bytes[0], 0x11);
323+
EXPECT_EQ(r->payload.bytes[3], 0x44);
324+
EXPECT_FALSE(weak_buffer.expired()); // anchor still holds the buffer alive
325+
}
326+
327+
// Regression: the producer's Span is a sub-range of the anchor's storage.
328+
// resolveEntry must propagate it verbatim — not the anchor's full extent.
329+
TEST(ObjectStoreTest, PushLazyHonorsSpanSubview) {
330+
ObjectStore store;
331+
auto id = registerTestTopic(store);
332+
333+
auto chunk = std::make_shared<std::vector<uint8_t>>(100);
334+
for (size_t i = 0; i < chunk->size(); ++i) {
335+
(*chunk)[i] = static_cast<uint8_t>(i);
336+
}
337+
338+
store.pushLazy(id, 100, [chunk]() -> sdk::PayloadView {
339+
return sdk::PayloadView{
340+
Span<const uint8_t>{chunk->data() + 20, 10}, // bytes [20, 30)
341+
sdk::BufferAnchor{chunk},
342+
};
343+
});
344+
345+
auto r = store.latestAt(id, 100);
346+
ASSERT_TRUE(r.has_value());
347+
EXPECT_EQ(r->payload.bytes.size(), 10u);
348+
EXPECT_EQ(r->payload.bytes.data(), chunk->data() + 20);
349+
EXPECT_EQ(r->payload.bytes[0], 20);
350+
EXPECT_EQ(r->payload.bytes[9], 29);
351+
}
352+
297353
// =========================================================================
298354
// Timestamp monotonicity
299355
// =========================================================================

pj_datastore/tests/plugin_data_host_object_test.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <gtest/gtest.h>
55

6+
#include <algorithm>
67
#include <atomic>
78
#include <cstdint>
89
#include <memory>
@@ -65,7 +66,8 @@ TEST(PluginDataHostObjectTest, PushOwnedStoresBytes) {
6566
ASSERT_TRUE(resolved.has_value());
6667
ASSERT_NE(resolved->payload.anchor, nullptr);
6768
EXPECT_EQ(resolved->payload.bytes.size(), payload.size());
68-
EXPECT_EQ(std::vector<uint8_t>(resolved->payload.bytes.begin(), resolved->payload.bytes.end()), payload);
69+
EXPECT_TRUE(
70+
std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), payload.begin(), payload.end()));
6971
}
7072

7173
TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) {
@@ -94,7 +96,9 @@ TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) {
9496
auto first = f.store.latestAt(ObjectTopicId{topic.id}, 42);
9597
ASSERT_TRUE(first.has_value());
9698
ASSERT_NE(first->payload.anchor, nullptr);
97-
EXPECT_EQ(std::vector<uint8_t>(first->payload.bytes.begin(), first->payload.bytes.end()), shared->payload);
99+
EXPECT_TRUE(
100+
std::equal(
101+
first->payload.bytes.begin(), first->payload.bytes.end(), shared->payload.begin(), shared->payload.end()));
98102
EXPECT_GE(shared->fetch_calls.load(), 1);
99103

100104
auto second = f.store.latestAt(ObjectTopicId{topic.id}, 42);
@@ -149,7 +153,9 @@ TEST(PluginDataHostObjectTest, PushLazyDestroyCallbackRunsExactlyOnceOnEviction)
149153
// Fetch once — the callback runs but the ctx stays alive.
150154
auto resolved = f.store.latestAt(ObjectTopicId{topic.id}, 100);
151155
ASSERT_TRUE(resolved.has_value());
152-
EXPECT_EQ(std::vector<uint8_t>(resolved->payload.bytes.begin(), resolved->payload.bytes.end()), ctx->payload);
156+
EXPECT_TRUE(
157+
std::equal(
158+
resolved->payload.bytes.begin(), resolved->payload.bytes.end(), ctx->payload.begin(), ctx->payload.end()));
153159
EXPECT_EQ(ctx->destroy_count.load(), 0);
154160

155161
// Evict — destroy_fn runs exactly once.

pj_datastore/tests/plugin_parser_object_write_test.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <gtest/gtest.h>
1111

12+
#include <algorithm>
1213
#include <cstdint>
1314
#include <memory>
1415
#include <string>
@@ -142,7 +143,8 @@ TEST(ParserObjectWriteHostTest, ParserWritesToBothHostsFromOneParse) {
142143
ASSERT_TRUE(resolved.has_value());
143144
ASSERT_NE(resolved->payload.anchor, nullptr);
144145
const std::vector<uint8_t> expected{0xAA, 0xBB, 0xCC};
145-
EXPECT_EQ(std::vector<uint8_t>(resolved->payload.bytes.begin(), resolved->payload.bytes.end()), expected);
146+
EXPECT_TRUE(
147+
std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), expected.begin(), expected.end()));
146148

147149
// (Scalar side requires flushing + a read path; Phase-3 scope is proving
148150
// both hosts were resolved and invoked. Scalar writes go into DataEngine
@@ -202,9 +204,9 @@ TEST(ParserObjectWriteHostTest, ObjectHostViewPushLazyThroughSdk) {
202204

203205
auto resolved = store.latestAt(ObjectTopicId{topic.id}, 10);
204206
ASSERT_TRUE(resolved.has_value());
205-
EXPECT_EQ(
206-
std::vector<uint8_t>(resolved->payload.bytes.begin(), resolved->payload.bytes.end()),
207-
(std::vector<uint8_t>{0xAA, 0xBB}));
207+
const std::vector<uint8_t> expected{0xAA, 0xBB};
208+
EXPECT_TRUE(
209+
std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), expected.begin(), expected.end()));
208210
EXPECT_GE(fetch_calls, 1);
209211
}
210212

pj_plugins/include/pj_plugins/host/service_registry_builder.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class ServiceRegistryBuilder {
4343
/// not take ownership of ctx/vtable.
4444
/// @return `Expected<void>`: ok on success, error string on duplicate or
4545
/// null fat-pointer field.
46-
[[nodiscard]] ::PJ::Expected<void, std::string> tryRegisterService(
46+
[[nodiscard]] ::PJ::Status tryRegisterService(
4747
std::string_view name, uint32_t protocol_version, PJ_service_t service) {
4848
const std::string service_name(name);
4949
if (service.ctx == nullptr || service.vtable == nullptr) {

0 commit comments

Comments
 (0)