Skip to content

Commit a4127b0

Browse files
facontidavideclaude
andcommitted
refactor(object_store): consolidate ResolvedObjectEntry/ObjectBytesBox on PayloadView
ResolvedObjectEntry and ObjectBytesBox both held an inline {anchor, view} pair — the same shape as sdk::PayloadView. Collapse both onto a single PayloadView field; one named point of truth for "bytes + their lifetime anchor" across producer, store, and consumer-handle layers. Also tightens the doc comments touched in the prior commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4f6d935 commit a4127b0

7 files changed

Lines changed: 58 additions & 76 deletions

File tree

pj_base/include/pj_base/buffer_anchor.hpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,10 @@ struct PayloadView {
5151
anchor(std::move(buffer)) {}
5252
};
5353

54-
/// Convenience helper for producers whose only payload is a plain
55-
/// std::vector<uint8_t>. Wraps it in a shared_ptr (which becomes both the
56-
/// owner of the bytes and the type-erased anchor), and returns a PayloadView
57-
/// whose Span points at that vector's contents. Use this when the producer
58-
/// materializes bytes from a source that has no natural anchor (e.g. a
59-
/// C-ABI fetch returning a raw byte buffer); when an upstream allocation
60-
/// already exists (a chunk cache, an mmap region), construct the PayloadView
61-
/// directly with that anchor to avoid the helper's copy.
54+
/// Wrap a fresh vector as both anchor and Span. Use when the producer has
55+
/// no natural anchor (e.g. raw bytes from a C-ABI fetch); when an upstream
56+
/// allocation already exists, construct PayloadView directly to avoid this
57+
/// helper's copy.
6258
inline PayloadView makePayloadView(std::vector<uint8_t> bytes) {
6359
auto shared = std::make_shared<const std::vector<uint8_t>>(std::move(bytes));
6460
return PayloadView{

pj_datastore/include/pj_datastore/object_store.hpp

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,30 +40,21 @@ struct ObjectTopicDescriptor {
4040
std::string metadata_json;
4141
};
4242

43-
/// Eager payload alternative: shared ownership of an owned byte buffer.
44-
/// memoryUsage() counts the buffer's size against the retention budget.
43+
/// Eager payload: store-owned bytes, counted against the retention budget.
4544
using SharedBuffer = std::shared_ptr<const std::vector<uint8_t>>;
4645

47-
/// Lazy payload alternative: idempotent, thread-safe callable returning a
48-
/// PayloadView (Span + BufferAnchor). The anchor may be any shared_ptr-backed
49-
/// storage (chunk cache, mmap, etc.); type erasure is preserved end-to-end
50-
/// through the store. Lazy entries are not counted against the retention
51-
/// budget — bytes are owned upstream, not by the store.
46+
/// Lazy payload: idempotent, thread-safe fetcher returning bytes + anchor.
47+
/// Invoked on every read; bytes are not counted against the retention budget.
5248
using LazyCallback = std::function<sdk::PayloadView()>;
5349

5450
struct ObjectEntry {
5551
Timestamp timestamp = 0;
5652
std::variant<SharedBuffer, LazyCallback> payload;
5753
};
5854

59-
/// Result of resolving an ObjectEntry. Holds the type-erased BufferAnchor
60-
/// keeping the bytes alive and a Span over the producer-published range.
61-
/// For eager entries the anchor wraps the SharedBuffer's underlying vector;
62-
/// for lazy entries it is whatever the producer's PayloadView carries.
6355
struct ResolvedObjectEntry {
6456
Timestamp timestamp = 0;
65-
sdk::BufferAnchor anchor;
66-
Span<const uint8_t> view;
57+
sdk::PayloadView payload;
6758
};
6859

6960
struct RetentionBudget {
@@ -126,13 +117,9 @@ class ObjectStore {
126117

127118
Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);
128119

129-
// The fetch callable is invoked on every read. It returns a PayloadView
130-
// (Span + anchor). When the producer already holds the bytes in memory
131-
// behind a shared_ptr (e.g. a streaming buffer being handed off between
132-
// stores), the closure can capture that shared_ptr and return a view
133-
// backed by it — no copy. For producers that materialize bytes from disk
134-
// or other sources, the closure allocates a fresh buffer and uses it as
135-
// the anchor.
120+
// Fetcher runs on every read. Producers anchor on whatever owns the bytes
121+
// (chunk cache, mmap, fresh allocation); the store never copies — it just
122+
// retains the anchor through PayloadView.
136123
Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch);
137124

138125
// --- Read ---

pj_datastore/src/object_store.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -306,13 +306,13 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) {
306306

307307
if (const auto* owned = std::get_if<SharedBuffer>(&entry.payload)) {
308308
if (*owned) {
309-
resolved.anchor = sdk::BufferAnchor{*owned};
310-
resolved.view = Span<const uint8_t>{(*owned)->data(), (*owned)->size()};
309+
resolved.payload = sdk::PayloadView{
310+
Span<const uint8_t>{(*owned)->data(), (*owned)->size()},
311+
sdk::BufferAnchor{*owned},
312+
};
311313
}
312314
} else if (const auto* lazy = std::get_if<LazyCallback>(&entry.payload)) {
313-
auto pv = (*lazy)();
314-
resolved.anchor = std::move(pv.anchor);
315-
resolved.view = pv.bytes;
315+
resolved.payload = (*lazy)();
316316
}
317317

318318
return resolved;

pj_datastore/src/plugin_data_host.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,12 +1435,10 @@ void sourceObjectSetRetentionBudget(
14351435
// Toolbox object read host trampolines
14361436
// ---------------------------------------------------------------------------
14371437

1438-
/// Box holding the type-erased anchor that keeps ObjectStore bytes alive,
1439-
/// plus the Span over the producer-published range. One allocated per
1440-
/// successful read_latest_at; freed by release_bytes.
1438+
/// Heap holder for the PayloadView backing PJ_object_bytes_handle_t.
1439+
/// Allocated by read_latest_at; freed by release_bytes.
14411440
struct ObjectBytesBox {
1442-
sdk::BufferAnchor anchor;
1443-
Span<const uint8_t> view;
1441+
sdk::PayloadView payload;
14441442
};
14451443

14461444
PJ_object_topic_handle_t toolboxObjectLookupTopic(void* ctx, PJ_string_view_t topic_name) noexcept {
@@ -1510,12 +1508,12 @@ bool toolboxObjectReadLatestAt(
15101508
*out_handle = nullptr;
15111509
try {
15121510
auto entry = impl->store.latestAt(ObjectTopicId{topic.id}, timestamp_ns);
1513-
if (!entry.has_value() || entry->anchor == nullptr) {
1511+
if (!entry.has_value() || entry->payload.anchor == nullptr) {
15141512
impl->setError("no entry at-or-before timestamp");
15151513
propagateError(out_error, impl->last_error.c_str());
15161514
return false;
15171515
}
1518-
auto* box = new ObjectBytesBox{std::move(entry->anchor), entry->view};
1516+
auto* box = new ObjectBytesBox{std::move(entry->payload)};
15191517
*out_handle = reinterpret_cast<PJ_object_bytes_handle_t>(box);
15201518
if (out_timestamp != nullptr) {
15211519
*out_timestamp = entry->timestamp;
@@ -1544,14 +1542,14 @@ void toolboxObjectGetBytes(PJ_object_bytes_handle_t handle, const uint8_t** out_
15441542
return;
15451543
}
15461544
auto* box = reinterpret_cast<ObjectBytesBox*>(handle);
1547-
if (box->view.empty()) {
1545+
if (box->payload.bytes.empty()) {
15481546
return;
15491547
}
15501548
if (out_data != nullptr) {
1551-
*out_data = box->view.data();
1549+
*out_data = box->payload.bytes.data();
15521550
}
15531551
if (out_size != nullptr) {
1554-
*out_size = box->view.size();
1552+
*out_size = box->payload.bytes.size();
15551553
}
15561554
}
15571555

pj_datastore/tests/object_store_test.cpp

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ TEST(ObjectStoreTest, LatestAtExact) {
150150
auto r = store.latestAt(id, 200);
151151
ASSERT_TRUE(r.has_value());
152152
EXPECT_EQ(r->timestamp, 200);
153-
EXPECT_EQ(r->view[0], 0x02);
153+
EXPECT_EQ(r->payload.bytes[0], 0x02);
154154
}
155155

156156
TEST(ObjectStoreTest, LatestAtBetween) {
@@ -292,17 +292,13 @@ TEST(ObjectStoreTest, PushLazyResolves) {
292292
auto r = store.latestAt(id, 100);
293293
ASSERT_TRUE(r.has_value());
294294
EXPECT_EQ(call_count, 1);
295-
EXPECT_EQ(r->view.size(), 2u);
296-
EXPECT_EQ(r->view[0], 0xDE);
295+
EXPECT_EQ(r->payload.bytes.size(), 2u);
296+
EXPECT_EQ(r->payload.bytes[0], 0xDE);
297297
}
298298

299-
// Regression: the lazy path must preserve the BufferAnchor's concrete type.
300-
// Anchor here is a shared_ptr<TestBuffer> — NOT a shared_ptr<vector>. If
301-
// resolveEntry static_pointer_cast'd to vector (the prior implementation),
302-
// view would point at garbage and ASAN would flag it. We assert that
303-
// (a) the bytes the producer published survive resolution unchanged, and
304-
// (b) the anchor's refcount stays > 0 through the resolve (the store does
305-
// not silently swap it for a different anchor type).
299+
// Regression: anchor type-erasure must survive resolveEntry. The anchor here
300+
// is a shared_ptr<TestBuffer> (not vector); a prior static_pointer_cast to
301+
// vector would UB.
306302
TEST(ObjectStoreTest, PushLazyPreservesAnchorType) {
307303
struct TestBuffer {
308304
std::array<uint8_t, 4> bytes{0x11, 0x22, 0x33, 0x44};
@@ -322,16 +318,14 @@ TEST(ObjectStoreTest, PushLazyPreservesAnchorType) {
322318

323319
auto r = store.latestAt(id, 100);
324320
ASSERT_TRUE(r.has_value());
325-
ASSERT_EQ(r->view.size(), 4u);
326-
EXPECT_EQ(r->view[0], 0x11);
327-
EXPECT_EQ(r->view[3], 0x44);
321+
ASSERT_EQ(r->payload.bytes.size(), 4u);
322+
EXPECT_EQ(r->payload.bytes[0], 0x11);
323+
EXPECT_EQ(r->payload.bytes[3], 0x44);
328324
EXPECT_FALSE(weak_buffer.expired()); // anchor still holds the buffer alive
329325
}
330326

331-
// Regression: PayloadView::bytes is the *producer-chosen sub-range* of the
332-
// anchor's storage. resolveEntry must propagate that Span verbatim — not the
333-
// anchor's full extent. The prior implementation ignored the Span and
334-
// returned the anchor's whole vector.
327+
// Regression: the producer's Span is a sub-range of the anchor's storage.
328+
// resolveEntry must propagate it verbatim — not the anchor's full extent.
335329
TEST(ObjectStoreTest, PushLazyHonorsSpanSubview) {
336330
ObjectStore store;
337331
auto id = registerTestTopic(store);
@@ -350,10 +344,10 @@ TEST(ObjectStoreTest, PushLazyHonorsSpanSubview) {
350344

351345
auto r = store.latestAt(id, 100);
352346
ASSERT_TRUE(r.has_value());
353-
EXPECT_EQ(r->view.size(), 10u);
354-
EXPECT_EQ(r->view.data(), chunk->data() + 20);
355-
EXPECT_EQ(r->view[0], 20);
356-
EXPECT_EQ(r->view[9], 29);
347+
EXPECT_EQ(r->payload.bytes.size(), 10u);
348+
EXPECT_EQ(r->payload.bytes.data(), chunk->data() + 20);
349+
EXPECT_EQ(r->payload.bytes[0], 20);
350+
EXPECT_EQ(r->payload.bytes[9], 29);
357351
}
358352

359353
// =========================================================================
@@ -390,13 +384,13 @@ TEST(ObjectStoreTest, HandleSurvivesEviction) {
390384

391385
auto handle = store.latestAt(id, 100);
392386
ASSERT_TRUE(handle.has_value());
393-
EXPECT_EQ(handle->view[0], 0xAA);
387+
EXPECT_EQ(handle->payload.bytes[0], 0xAA);
394388

395389
store.evictBefore(id, 150);
396390
EXPECT_EQ(store.entryCount(id), 1u);
397391

398-
EXPECT_EQ(handle->view.size(), 4u);
399-
EXPECT_EQ(handle->view[0], 0xAA);
392+
EXPECT_EQ(handle->payload.bytes.size(), 4u);
393+
EXPECT_EQ(handle->payload.bytes[0], 0xAA);
400394
}
401395

402396
// =========================================================================

pj_datastore/tests/plugin_data_host_object_test.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ TEST(PluginDataHostObjectTest, PushOwnedStoresBytes) {
6464
EXPECT_EQ(f.store.entryCount(store_id), 2U);
6565
auto resolved = f.store.latestAt(store_id, 2000);
6666
ASSERT_TRUE(resolved.has_value());
67-
ASSERT_NE(resolved->anchor, nullptr);
68-
EXPECT_EQ(resolved->view.size(), payload.size());
69-
EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), payload.begin(), payload.end()));
67+
ASSERT_NE(resolved->payload.anchor, nullptr);
68+
EXPECT_EQ(resolved->payload.bytes.size(), payload.size());
69+
EXPECT_TRUE(
70+
std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), payload.begin(), payload.end()));
7071
}
7172

7273
TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) {
@@ -94,8 +95,10 @@ TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) {
9495
// Each read invokes the fetch closure.
9596
auto first = f.store.latestAt(ObjectTopicId{topic.id}, 42);
9697
ASSERT_TRUE(first.has_value());
97-
ASSERT_NE(first->anchor, nullptr);
98-
EXPECT_TRUE(std::equal(first->view.begin(), first->view.end(), shared->payload.begin(), shared->payload.end()));
98+
ASSERT_NE(first->payload.anchor, nullptr);
99+
EXPECT_TRUE(
100+
std::equal(
101+
first->payload.bytes.begin(), first->payload.bytes.end(), shared->payload.begin(), shared->payload.end()));
99102
EXPECT_GE(shared->fetch_calls.load(), 1);
100103

101104
auto second = f.store.latestAt(ObjectTopicId{topic.id}, 42);
@@ -150,7 +153,9 @@ TEST(PluginDataHostObjectTest, PushLazyDestroyCallbackRunsExactlyOnceOnEviction)
150153
// Fetch once — the callback runs but the ctx stays alive.
151154
auto resolved = f.store.latestAt(ObjectTopicId{topic.id}, 100);
152155
ASSERT_TRUE(resolved.has_value());
153-
EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), ctx->payload.begin(), ctx->payload.end()));
156+
EXPECT_TRUE(
157+
std::equal(
158+
resolved->payload.bytes.begin(), resolved->payload.bytes.end(), ctx->payload.begin(), ctx->payload.end()));
154159
EXPECT_EQ(ctx->destroy_count.load(), 0);
155160

156161
// Evict — destroy_fn runs exactly once.

pj_datastore/tests/plugin_parser_object_write_test.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,10 @@ TEST(ParserObjectWriteHostTest, ParserWritesToBothHostsFromOneParse) {
141141
// Object-store side: bytes landed.
142142
auto resolved = store.latestAt(ObjectTopicId{obj_topic.id}, 100);
143143
ASSERT_TRUE(resolved.has_value());
144-
ASSERT_NE(resolved->anchor, nullptr);
144+
ASSERT_NE(resolved->payload.anchor, nullptr);
145145
const std::vector<uint8_t> expected{0xAA, 0xBB, 0xCC};
146-
EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), expected.begin(), expected.end()));
146+
EXPECT_TRUE(
147+
std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), expected.begin(), expected.end()));
147148

148149
// (Scalar side requires flushing + a read path; Phase-3 scope is proving
149150
// both hosts were resolved and invoked. Scalar writes go into DataEngine
@@ -204,7 +205,8 @@ TEST(ParserObjectWriteHostTest, ObjectHostViewPushLazyThroughSdk) {
204205
auto resolved = store.latestAt(ObjectTopicId{topic.id}, 10);
205206
ASSERT_TRUE(resolved.has_value());
206207
const std::vector<uint8_t> expected{0xAA, 0xBB};
207-
EXPECT_TRUE(std::equal(resolved->view.begin(), resolved->view.end(), expected.begin(), expected.end()));
208+
EXPECT_TRUE(
209+
std::equal(resolved->payload.bytes.begin(), resolved->payload.bytes.end(), expected.begin(), expected.end()));
208210
EXPECT_GE(fetch_calls, 1);
209211
}
210212

0 commit comments

Comments
 (0)