Skip to content

Commit 4f6d935

Browse files
facontidavideclaude
andcommitted
refactor(object_store): preserve BufferAnchor end-to-end; restore typed variant
Builds on the prior commit's PayloadView-returning pushLazy signature, but fixes two anchor-discarding bugs and restores a typed payload variant. Bugs in the prior implementation: 1. resolveEntry did static_pointer_cast<const vector<uint8_t>>(pv.anchor), forcing every producer's anchor to be exactly a shared_ptr<vector>. Any other anchor type (chunk cache, mmap, parquet column slice) was UB. This negated the entire point of BufferAnchor = shared_ptr<const void>. 2. resolveEntry discarded PayloadView::bytes (the Span) and returned the whole anchor's vector. Producers can no longer publish a sub-range of a larger backing buffer — which is the canonical zero-copy use case (one decompressed MCAP chunk anchored once, many message Spans into it). Both bugs traced to a single root cause: ResolvedObjectEntry::data was still shared_ptr<const vector<uint8_t>>, so resolution had to materialize a vector somehow. Fixed by making ResolvedObjectEntry carry {BufferAnchor anchor, Span<const uint8_t> view} — type-erased anchor + producer-published span. No static_pointer_cast anywhere. Variant + named aliases: ObjectEntry::payload returns to std::variant; the prior std::any traded compile-time exhaustiveness for nothing (still two alternatives, still typed access via the cast). Two named aliases capture the two payload shapes at the type level: using SharedBuffer = std::shared_ptr<const std::vector<uint8_t>>; using LazyCallback = std::function<sdk::PayloadView()>; Trampolines (plugin_data_host.cpp): ObjectBytesBox now holds {BufferAnchor, Span}; toolboxObjectGetBytes returns view.data()/view.size() — no shared_ptr<vector> in the read path. Misc consistency: - pushOwned/pushLazy now return Status (alias for Expected<void, string>); the one other Expected<void, string> use at service_registry_builder.hpp also switched. - PayloadView(shared_ptr<vector>) constructor takes shared_ptr<const vector> to match SharedBuffer and makePayloadView. Tests: - All resolved->data accessors migrated to resolved->view / resolved->anchor across object_store_test, plugin_data_host_object_test, plugin_parser_object_write_test. - Two regression tests added: PushLazyPreservesAnchorType — anchor is shared_ptr<TestBuffer> (not vector). Bug-1 regression: ASAN would catch the prior static cast. PushLazyHonorsSpanSubview — anchor is a 100-byte vector with Span [20,30). Bug-2 regression: verifies view.data()==chunk+20. ./build.sh --debug && ./test.sh → 62/62 passing under ASAN. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3ad2968 commit 4f6d935

8 files changed

Lines changed: 130 additions & 53 deletions

File tree

pj_base/include/pj_base/buffer_anchor.hpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,19 @@ struct PayloadView {
4646

4747
PayloadView(Span<const uint8_t> bytes_, BufferAnchor anchor_) : bytes(bytes_), anchor(std::move(anchor_)) {}
4848

49-
PayloadView(std::shared_ptr<std::vector<uint8_t>> buffer)
49+
PayloadView(std::shared_ptr<const std::vector<uint8_t>> buffer)
5050
: bytes(buffer ? Span<const uint8_t>(buffer->data(), buffer->size()) : Span<const uint8_t>()),
5151
anchor(std::move(buffer)) {}
5252
};
5353

5454
/// Convenience helper for producers whose only payload is a plain
5555
/// std::vector<uint8_t>. Wraps it in a shared_ptr (which becomes both the
5656
/// owner of the bytes and the type-erased anchor), and returns a PayloadView
57-
/// whose Span points at that vector's contents. Satisfies the contract
58-
/// expected by ObjectStore::resolveEntry on the lazy branch: the anchor is a
59-
/// shared_ptr<const std::vector<uint8_t>> recoverable via static_pointer_cast.
57+
/// whose Span points at that vector's contents. Use this when the producer
58+
/// materializes bytes from a source that has no natural anchor (e.g. a
59+
/// C-ABI fetch returning a raw byte buffer); when an upstream allocation
60+
/// already exists (a chunk cache, an mmap region), construct the PayloadView
61+
/// directly with that anchor to avoid the helper's copy.
6062
inline PayloadView makePayloadView(std::vector<uint8_t> bytes) {
6163
auto shared = std::make_shared<const std::vector<uint8_t>>(std::move(bytes));
6264
return PayloadView{

pj_datastore/include/pj_datastore/object_store.hpp

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Copyright 2026 Davide Faconti
33
// SPDX-License-Identifier: MPL-2.0
44

5-
#include <any>
65
#include <cstddef>
76
#include <cstdint>
87
#include <deque>
@@ -14,10 +13,12 @@
1413
#include <string>
1514
#include <string_view>
1615
#include <utility>
16+
#include <variant>
1717
#include <vector>
1818

1919
#include "pj_base/buffer_anchor.hpp"
2020
#include "pj_base/expected.hpp"
21+
#include "pj_base/span.hpp"
2122
#include "pj_base/types.hpp"
2223

2324
namespace PJ {
@@ -39,17 +40,30 @@ struct ObjectTopicDescriptor {
3940
std::string metadata_json;
4041
};
4142

43+
/// Eager payload alternative: shared ownership of an owned byte buffer.
44+
/// memoryUsage() counts the buffer's size against the retention budget.
45+
using SharedBuffer = std::shared_ptr<const std::vector<uint8_t>>;
46+
47+
/// Lazy payload alternative: idempotent, thread-safe callable returning a
48+
/// PayloadView (Span + BufferAnchor). The anchor may be any shared_ptr-backed
49+
/// storage (chunk cache, mmap, etc.); type erasure is preserved end-to-end
50+
/// through the store. Lazy entries are not counted against the retention
51+
/// budget — bytes are owned upstream, not by the store.
52+
using LazyCallback = std::function<sdk::PayloadView()>;
53+
4254
struct ObjectEntry {
4355
Timestamp timestamp = 0;
44-
// Holds either a shared_ptr<const std::vector<uint8_t>> (eager owned payload)
45-
// or a std::function<sdk::PayloadView()> (lazy resolver). resolveEntry
46-
// discriminates via std::any_cast.
47-
std::any payload;
56+
std::variant<SharedBuffer, LazyCallback> payload;
4857
};
4958

59+
/// Result of resolving an ObjectEntry. Holds the type-erased BufferAnchor
60+
/// keeping the bytes alive and a Span over the producer-published range.
61+
/// For eager entries the anchor wraps the SharedBuffer's underlying vector;
62+
/// for lazy entries it is whatever the producer's PayloadView carries.
5063
struct ResolvedObjectEntry {
5164
Timestamp timestamp = 0;
52-
std::shared_ptr<const std::vector<uint8_t>> data;
65+
sdk::BufferAnchor anchor;
66+
Span<const uint8_t> view;
5367
};
5468

5569
struct RetentionBudget {
@@ -110,7 +124,7 @@ class ObjectStore {
110124

111125
// --- Write ---
112126

113-
Expected<void, std::string> pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);
127+
Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);
114128

115129
// The fetch callable is invoked on every read. It returns a PayloadView
116130
// (Span + anchor). When the producer already holds the bytes in memory
@@ -119,7 +133,7 @@ class ObjectStore {
119133
// backed by it — no copy. For producers that materialize bytes from disk
120134
// or other sources, the closure allocates a fresh buffer and uses it as
121135
// the anchor.
122-
Expected<void, std::string> pushLazy(ObjectTopicId id, Timestamp timestamp, std::function<sdk::PayloadView()> fetch);
136+
Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch);
123137

124138
// --- Read ---
125139

pj_datastore/src/object_store.cpp

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ std::vector<ObjectTopicId> ObjectStore::listTopics(DatasetId dataset_id) const {
6767

6868
// --- Write ---
6969

70-
Expected<void, std::string> ObjectStore::pushOwned(
71-
ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload) {
70+
Status ObjectStore::pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload) {
7271
std::shared_lock store_lock(store_mutex_);
7372
auto* series = findSeries(id);
7473
if (series == nullptr) {
@@ -94,8 +93,7 @@ Expected<void, std::string> ObjectStore::pushOwned(
9493
return {};
9594
}
9695

97-
Expected<void, std::string> ObjectStore::pushLazy(
98-
ObjectTopicId id, Timestamp timestamp, std::function<sdk::PayloadView()> fetch) {
96+
Status ObjectStore::pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch) {
9997
std::shared_lock store_lock(store_mutex_);
10098
auto* series = findSeries(id);
10199
if (series == nullptr) {
@@ -306,19 +304,15 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) {
306304
ResolvedObjectEntry resolved;
307305
resolved.timestamp = entry.timestamp;
308306

309-
if (const auto* owned = std::any_cast<std::shared_ptr<const std::vector<uint8_t>>>(&entry.payload)) {
310-
resolved.data = *owned;
311-
} else if (const auto* lazy = std::any_cast<std::function<sdk::PayloadView()>>(&entry.payload)) {
312-
// Recover the shared_ptr<const vector<uint8_t>> from the PayloadView's
313-
// type-erased anchor. Contract: callers of pushLazy must construct the
314-
// PayloadView with an anchor that is exactly a
315-
// shared_ptr<const std::vector<uint8_t>> — typically the same buffer the
316-
// closure's Span points at. The static_pointer_cast reinterprets the
317-
// shared_ptr<const void> back to that concrete type, sharing ownership
318-
// without copying bytes. Producers that hold their bytes in any other
319-
// container should use pushOwned instead.
307+
if (const auto* owned = std::get_if<SharedBuffer>(&entry.payload)) {
308+
if (*owned) {
309+
resolved.anchor = sdk::BufferAnchor{*owned};
310+
resolved.view = Span<const uint8_t>{(*owned)->data(), (*owned)->size()};
311+
}
312+
} else if (const auto* lazy = std::get_if<LazyCallback>(&entry.payload)) {
320313
auto pv = (*lazy)();
321-
resolved.data = std::static_pointer_cast<const std::vector<uint8_t>>(pv.anchor);
314+
resolved.anchor = std::move(pv.anchor);
315+
resolved.view = pv.bytes;
322316
}
323317

324318
return resolved;
@@ -330,7 +324,7 @@ void ObjectStore::evictFront(ObjectSeries& series) {
330324
}
331325

332326
const auto& front = series.entries.front();
333-
if (const auto* owned = std::any_cast<std::shared_ptr<const std::vector<uint8_t>>>(&front.payload)) {
327+
if (const auto* owned = std::get_if<SharedBuffer>(&front.payload); owned != nullptr && *owned) {
334328
series.memory_bytes -= (*owned)->size();
335329
}
336330

pj_datastore/src/plugin_data_host.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,10 +1435,12 @@ void sourceObjectSetRetentionBudget(
14351435
// Toolbox object read host trampolines
14361436
// ---------------------------------------------------------------------------
14371437

1438-
/// Box holding the shared_ptr that keeps ObjectStore bytes alive. One
1439-
/// allocated per successful read_latest_at; freed by release_bytes.
1438+
/// Box holding the type-erased anchor that keeps ObjectStore bytes alive,
1439+
/// plus the Span over the producer-published range. One allocated per
1440+
/// successful read_latest_at; freed by release_bytes.
14401441
struct ObjectBytesBox {
1441-
std::shared_ptr<const std::vector<uint8_t>> bytes;
1442+
sdk::BufferAnchor anchor;
1443+
Span<const uint8_t> view;
14421444
};
14431445

14441446
PJ_object_topic_handle_t toolboxObjectLookupTopic(void* ctx, PJ_string_view_t topic_name) noexcept {
@@ -1508,12 +1510,12 @@ bool toolboxObjectReadLatestAt(
15081510
*out_handle = nullptr;
15091511
try {
15101512
auto entry = impl->store.latestAt(ObjectTopicId{topic.id}, timestamp_ns);
1511-
if (!entry.has_value() || entry->data == nullptr) {
1513+
if (!entry.has_value() || entry->anchor == nullptr) {
15121514
impl->setError("no entry at-or-before timestamp");
15131515
propagateError(out_error, impl->last_error.c_str());
15141516
return false;
15151517
}
1516-
auto* box = new ObjectBytesBox{std::move(entry->data)};
1518+
auto* box = new ObjectBytesBox{std::move(entry->anchor), entry->view};
15171519
*out_handle = reinterpret_cast<PJ_object_bytes_handle_t>(box);
15181520
if (out_timestamp != nullptr) {
15191521
*out_timestamp = entry->timestamp;
@@ -1542,14 +1544,14 @@ void toolboxObjectGetBytes(PJ_object_bytes_handle_t handle, const uint8_t** out_
15421544
return;
15431545
}
15441546
auto* box = reinterpret_cast<ObjectBytesBox*>(handle);
1545-
if (!box->bytes) {
1547+
if (box->view.empty()) {
15461548
return;
15471549
}
15481550
if (out_data != nullptr) {
1549-
*out_data = box->bytes->data();
1551+
*out_data = box->view.data();
15501552
}
15511553
if (out_size != nullptr) {
1552-
*out_size = box->bytes->size();
1554+
*out_size = box->view.size();
15531555
}
15541556
}
15551557

pj_datastore/tests/object_store_test.cpp

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66
#include <gtest/gtest.h>
77

8+
#include <array>
89
#include <cstdint>
910
#include <functional>
11+
#include <memory>
1012
#include <string>
1113
#include <thread>
1214
#include <vector>
@@ -148,7 +150,7 @@ TEST(ObjectStoreTest, LatestAtExact) {
148150
auto r = store.latestAt(id, 200);
149151
ASSERT_TRUE(r.has_value());
150152
EXPECT_EQ(r->timestamp, 200);
151-
EXPECT_EQ((*r->data)[0], 0x02);
153+
EXPECT_EQ(r->view[0], 0x02);
152154
}
153155

154156
TEST(ObjectStoreTest, LatestAtBetween) {
@@ -290,8 +292,68 @@ TEST(ObjectStoreTest, PushLazyResolves) {
290292
auto r = store.latestAt(id, 100);
291293
ASSERT_TRUE(r.has_value());
292294
EXPECT_EQ(call_count, 1);
293-
EXPECT_EQ(r->data->size(), 2u);
294-
EXPECT_EQ((*r->data)[0], 0xDE);
295+
EXPECT_EQ(r->view.size(), 2u);
296+
EXPECT_EQ(r->view[0], 0xDE);
297+
}
298+
299+
// Regression: the lazy path must preserve the BufferAnchor's concrete type.
300+
// Anchor here is a shared_ptr<TestBuffer> — NOT a shared_ptr<vector>. If
301+
// resolveEntry static_pointer_cast'd to vector (the prior implementation),
302+
// view would point at garbage and ASAN would flag it. We assert that
303+
// (a) the bytes the producer published survive resolution unchanged, and
304+
// (b) the anchor's refcount stays > 0 through the resolve (the store does
305+
// not silently swap it for a different anchor type).
306+
TEST(ObjectStoreTest, PushLazyPreservesAnchorType) {
307+
struct TestBuffer {
308+
std::array<uint8_t, 4> bytes{0x11, 0x22, 0x33, 0x44};
309+
};
310+
ObjectStore store;
311+
auto id = registerTestTopic(store);
312+
313+
auto buffer = std::make_shared<TestBuffer>();
314+
std::weak_ptr<TestBuffer> weak_buffer = buffer;
315+
316+
store.pushLazy(id, 100, [buffer]() -> sdk::PayloadView {
317+
return sdk::PayloadView{
318+
Span<const uint8_t>{buffer->bytes.data(), buffer->bytes.size()},
319+
sdk::BufferAnchor{buffer},
320+
};
321+
});
322+
323+
auto r = store.latestAt(id, 100);
324+
ASSERT_TRUE(r.has_value());
325+
ASSERT_EQ(r->view.size(), 4u);
326+
EXPECT_EQ(r->view[0], 0x11);
327+
EXPECT_EQ(r->view[3], 0x44);
328+
EXPECT_FALSE(weak_buffer.expired()); // anchor still holds the buffer alive
329+
}
330+
331+
// Regression: PayloadView::bytes is the *producer-chosen sub-range* of the
332+
// anchor's storage. resolveEntry must propagate that Span verbatim — not the
333+
// anchor's full extent. The prior implementation ignored the Span and
334+
// returned the anchor's whole vector.
335+
TEST(ObjectStoreTest, PushLazyHonorsSpanSubview) {
336+
ObjectStore store;
337+
auto id = registerTestTopic(store);
338+
339+
auto chunk = std::make_shared<std::vector<uint8_t>>(100);
340+
for (size_t i = 0; i < chunk->size(); ++i) {
341+
(*chunk)[i] = static_cast<uint8_t>(i);
342+
}
343+
344+
store.pushLazy(id, 100, [chunk]() -> sdk::PayloadView {
345+
return sdk::PayloadView{
346+
Span<const uint8_t>{chunk->data() + 20, 10}, // bytes [20, 30)
347+
sdk::BufferAnchor{chunk},
348+
};
349+
});
350+
351+
auto r = store.latestAt(id, 100);
352+
ASSERT_TRUE(r.has_value());
353+
EXPECT_EQ(r->view.size(), 10u);
354+
EXPECT_EQ(r->view.data(), chunk->data() + 20);
355+
EXPECT_EQ(r->view[0], 20);
356+
EXPECT_EQ(r->view[9], 29);
295357
}
296358

297359
// =========================================================================
@@ -328,13 +390,13 @@ TEST(ObjectStoreTest, HandleSurvivesEviction) {
328390

329391
auto handle = store.latestAt(id, 100);
330392
ASSERT_TRUE(handle.has_value());
331-
EXPECT_EQ((*handle->data)[0], 0xAA);
393+
EXPECT_EQ(handle->view[0], 0xAA);
332394

333395
store.evictBefore(id, 150);
334396
EXPECT_EQ(store.entryCount(id), 1u);
335397

336-
EXPECT_EQ(handle->data->size(), 4u);
337-
EXPECT_EQ((*handle->data)[0], 0xAA);
398+
EXPECT_EQ(handle->view.size(), 4u);
399+
EXPECT_EQ(handle->view[0], 0xAA);
338400
}
339401

340402
// =========================================================================

pj_datastore/tests/plugin_data_host_object_test.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <gtest/gtest.h>
55

6+
#include <algorithm>
67
#include <atomic>
78
#include <cstdint>
89
#include <memory>
@@ -63,9 +64,9 @@ TEST(PluginDataHostObjectTest, PushOwnedStoresBytes) {
6364
EXPECT_EQ(f.store.entryCount(store_id), 2U);
6465
auto resolved = f.store.latestAt(store_id, 2000);
6566
ASSERT_TRUE(resolved.has_value());
66-
ASSERT_NE(resolved->data, nullptr);
67-
EXPECT_EQ(resolved->data->size(), payload.size());
68-
EXPECT_EQ(*resolved->data, payload);
67+
ASSERT_NE(resolved->anchor, nullptr);
68+
EXPECT_EQ(resolved->view.size(), payload.size());
69+
EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), payload.begin(), payload.end()));
6970
}
7071

7172
TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) {
@@ -93,8 +94,8 @@ TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) {
9394
// Each read invokes the fetch closure.
9495
auto first = f.store.latestAt(ObjectTopicId{topic.id}, 42);
9596
ASSERT_TRUE(first.has_value());
96-
ASSERT_NE(first->data, nullptr);
97-
EXPECT_EQ(*first->data, shared->payload);
97+
ASSERT_NE(first->anchor, nullptr);
98+
EXPECT_TRUE(std::equal(first->view.begin(), first->view.end(), shared->payload.begin(), shared->payload.end()));
9899
EXPECT_GE(shared->fetch_calls.load(), 1);
99100

100101
auto second = f.store.latestAt(ObjectTopicId{topic.id}, 42);
@@ -149,7 +150,7 @@ TEST(PluginDataHostObjectTest, PushLazyDestroyCallbackRunsExactlyOnceOnEviction)
149150
// Fetch once — the callback runs but the ctx stays alive.
150151
auto resolved = f.store.latestAt(ObjectTopicId{topic.id}, 100);
151152
ASSERT_TRUE(resolved.has_value());
152-
EXPECT_EQ(*resolved->data, ctx->payload);
153+
EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), ctx->payload.begin(), ctx->payload.end()));
153154
EXPECT_EQ(ctx->destroy_count.load(), 0);
154155

155156
// Evict — destroy_fn runs exactly once.

pj_datastore/tests/plugin_parser_object_write_test.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <gtest/gtest.h>
1111

12+
#include <algorithm>
1213
#include <cstdint>
1314
#include <memory>
1415
#include <string>
@@ -140,9 +141,9 @@ TEST(ParserObjectWriteHostTest, ParserWritesToBothHostsFromOneParse) {
140141
// Object-store side: bytes landed.
141142
auto resolved = store.latestAt(ObjectTopicId{obj_topic.id}, 100);
142143
ASSERT_TRUE(resolved.has_value());
143-
ASSERT_NE(resolved->data, nullptr);
144+
ASSERT_NE(resolved->anchor, nullptr);
144145
const std::vector<uint8_t> expected{0xAA, 0xBB, 0xCC};
145-
EXPECT_EQ(*resolved->data, expected);
146+
EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), expected.begin(), expected.end()));
146147

147148
// (Scalar side requires flushing + a read path; Phase-3 scope is proving
148149
// both hosts were resolved and invoked. Scalar writes go into DataEngine
@@ -202,7 +203,8 @@ TEST(ParserObjectWriteHostTest, ObjectHostViewPushLazyThroughSdk) {
202203

203204
auto resolved = store.latestAt(ObjectTopicId{topic.id}, 10);
204205
ASSERT_TRUE(resolved.has_value());
205-
EXPECT_EQ(*resolved->data, (std::vector<uint8_t>{0xAA, 0xBB}));
206+
const std::vector<uint8_t> expected{0xAA, 0xBB};
207+
EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), expected.begin(), expected.end()));
206208
EXPECT_GE(fetch_calls, 1);
207209
}
208210

pj_plugins/include/pj_plugins/host/service_registry_builder.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class ServiceRegistryBuilder {
4343
/// not take ownership of ctx/vtable.
4444
/// @return `Expected<void>`: ok on success, error string on duplicate or
4545
/// null fat-pointer field.
46-
[[nodiscard]] ::PJ::Expected<void, std::string> tryRegisterService(
46+
[[nodiscard]] ::PJ::Status tryRegisterService(
4747
std::string_view name, uint32_t protocol_version, PJ_service_t service) {
4848
const std::string service_name(name);
4949
if (service.ctx == nullptr || service.vtable == nullptr) {

0 commit comments

Comments
 (0)