Skip to content

Commit 67760dc

Browse files
refactor(object_store): widen ResolvedObjectEntry::data to PayloadView; drop ObjectBytesBox
Builds on top of !184 to remove the last hidden contract: resolveEntry's lazy branch used static_pointer_cast<const vector>(pv.anchor), which forced every lazy producer to anchor on shared_ptr<vector<uint8_t>> and defeated zero-copy for any producer that already had a different concrete anchor (C-ABI ctx, arrow::Buffer, mmap pool, etc.). ResolvedObjectEntry::data (shared_ptr<const vector<uint8_t>>) is now ResolvedObjectEntry::payload (sdk::PayloadView). resolveEntry returns PayloadView verbatim in both branches — no concrete-type assumption. ObjectBytesBox in plugin_data_host.cpp is removed: with PayloadView on the resolved entry it was a no-value wrapper; toolboxObjectReadLatestAt allocates a PayloadView directly and the C-ABI handle is its pointer. All consumers migrated to entry->payload.bytes / entry->payload.anchor. This is the §14.5 widening from the streaming-manager mejoras report — precondition for end-to-end zero-copy ingest from the streaming runtime capturing PJ_payload_t.anchor as shared_ptr<void>.
1 parent c7636be commit 67760dc

6 files changed

Lines changed: 65 additions & 52 deletions

File tree

pj_datastore/include/pj_datastore/object_store.hpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,16 @@ struct ObjectEntry {
4949

5050
struct ResolvedObjectEntry {
5151
Timestamp timestamp = 0;
52-
std::shared_ptr<const std::vector<uint8_t>> data;
52+
// PayloadView lets the entry hold an opaque anchor (any shared_ptr<T>) plus
53+
// a non-owning Span over the bytes. Consumers read `payload.bytes` for the
54+
// data; they retain `payload.anchor` if they need the bytes to outlive the
55+
// resolve call. Both `pushOwned` and `pushLazy` paths land here:
56+
// - owned: payload.bytes spans the shared_ptr<vector<uint8_t>>; anchor IS that shared_ptr.
57+
// - lazy: payload is whatever the closure returns; anchor can be any shared_ptr<T>
58+
// (e.g. a C-ABI payload anchor wrapped as shared_ptr<void>).
59+
// resolveEntry never casts the anchor to a concrete type; the type erasure
60+
// stays opaque all the way to the consumer.
61+
sdk::PayloadView payload;
5362
};
5463

5564
struct RetentionBudget {

pj_datastore/src/object_store.cpp

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -376,18 +376,20 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) {
376376
resolved.timestamp = entry.timestamp;
377377

378378
if (const auto* owned = std::any_cast<std::shared_ptr<const std::vector<uint8_t>>>(&entry.payload)) {
379-
resolved.data = *owned;
379+
// Owned branch: build a PayloadView whose Span covers the vector and
380+
// whose anchor IS the same shared_ptr — refcount bump, no copy.
381+
resolved.payload = sdk::PayloadView{
382+
Span<const uint8_t>{(*owned)->data(), (*owned)->size()},
383+
sdk::BufferAnchor{*owned},
384+
};
380385
} else if (const auto* lazy = std::any_cast<std::function<sdk::PayloadView()>>(&entry.payload)) {
381-
// Recover the shared_ptr<const vector<uint8_t>> from the PayloadView's
382-
// type-erased anchor. Contract: callers of pushLazy must construct the
383-
// PayloadView with an anchor that is exactly a
384-
// shared_ptr<const std::vector<uint8_t>> — typically the same buffer the
385-
// closure's Span points at. The static_pointer_cast reinterprets the
386-
// shared_ptr<const void> back to that concrete type, sharing ownership
387-
// without copying bytes. Producers that hold their bytes in any other
388-
// container should use pushOwned instead.
389-
auto pv = (*lazy)();
390-
resolved.data = std::static_pointer_cast<const std::vector<uint8_t>>(pv.anchor);
386+
// Lazy branch: forward whatever the closure returns. The anchor stays
387+
// opaque (shared_ptr<const void>); consumers retain it without needing
388+
// to know the concrete type. No static_pointer_cast here — that was the
389+
// hidden contract that locked the slot to shared_ptr<vector<uint8_t>>;
390+
// dropping it lets producers anchor on arrow::Buffer, mmap pools, or
391+
// C-ABI payload anchors equally.
392+
resolved.payload = (*lazy)();
391393
}
392394

393395
return resolved;

pj_datastore/src/plugin_data_host.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,12 +1456,12 @@ void sourceObjectSetRetentionBudget(
14561456
// Toolbox object read host trampolines
14571457
// ---------------------------------------------------------------------------
14581458

1459-
/// Box holding the shared_ptr that keeps ObjectStore bytes alive. One
1460-
/// allocated per successful read_latest_at; freed by release_bytes.
1461-
struct ObjectBytesBox {
1462-
std::shared_ptr<const std::vector<uint8_t>> bytes;
1463-
};
1464-
1459+
// The C-ABI exposes the bytes via a void-handle (PJ_object_bytes_handle_t)
1460+
// that the plugin must later release. We allocate a sdk::PayloadView on the
1461+
// heap and reinterpret_cast its pointer to the handle: the PayloadView's
1462+
// anchor is what keeps the underlying buffer alive until the plugin calls
1463+
// release_bytes. No wrapper struct needed — PayloadView already carries
1464+
// bytes (Span) + anchor (BufferAnchor) in one value.
14651465
PJ_object_topic_handle_t toolboxObjectLookupTopic(void* ctx, PJ_string_view_t topic_name) noexcept {
14661466
auto* impl = static_cast<DatastoreToolboxObjectReadHostState*>(ctx);
14671467
try {
@@ -1529,13 +1529,13 @@ bool toolboxObjectReadLatestAt(
15291529
*out_handle = nullptr;
15301530
try {
15311531
auto entry = impl->store.latestAt(ObjectTopicId{topic.id}, timestamp_ns);
1532-
if (!entry.has_value() || entry->data == nullptr) {
1532+
if (!entry.has_value() || entry->payload.anchor == nullptr) {
15331533
impl->setError("no entry at-or-before timestamp");
15341534
propagateError(out_error, impl->last_error.c_str());
15351535
return false;
15361536
}
1537-
auto* box = new ObjectBytesBox{std::move(entry->data)};
1538-
*out_handle = reinterpret_cast<PJ_object_bytes_handle_t>(box);
1537+
auto* payload_handle = new sdk::PayloadView(std::move(entry->payload));
1538+
*out_handle = reinterpret_cast<PJ_object_bytes_handle_t>(payload_handle);
15391539
if (out_timestamp != nullptr) {
15401540
*out_timestamp = entry->timestamp;
15411541
}
@@ -1562,23 +1562,23 @@ void toolboxObjectGetBytes(PJ_object_bytes_handle_t handle, const uint8_t** out_
15621562
if (handle == nullptr) {
15631563
return;
15641564
}
1565-
auto* box = reinterpret_cast<ObjectBytesBox*>(handle);
1566-
if (!box->bytes) {
1565+
const auto* payload = reinterpret_cast<const sdk::PayloadView*>(handle);
1566+
if (payload->anchor == nullptr) {
15671567
return;
15681568
}
15691569
if (out_data != nullptr) {
1570-
*out_data = box->bytes->data();
1570+
*out_data = payload->bytes.data();
15711571
}
15721572
if (out_size != nullptr) {
1573-
*out_size = box->bytes->size();
1573+
*out_size = payload->bytes.size();
15741574
}
15751575
}
15761576

15771577
void toolboxObjectReleaseBytes(PJ_object_bytes_handle_t handle) noexcept {
15781578
if (handle == nullptr) {
15791579
return;
15801580
}
1581-
delete reinterpret_cast<ObjectBytesBox*>(handle);
1581+
delete reinterpret_cast<sdk::PayloadView*>(handle);
15821582
}
15831583

15841584
std::size_t toolboxObjectEntryCount(void* ctx, PJ_object_topic_handle_t topic) noexcept {

pj_datastore/tests/object_store_test.cpp

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ TEST(ObjectStoreTest, LatestAtExact) {
148148
auto r = store.latestAt(id, 200);
149149
ASSERT_TRUE(r.has_value());
150150
EXPECT_EQ(r->timestamp, 200);
151-
EXPECT_EQ((*r->data)[0], 0x02);
151+
EXPECT_EQ(r->payload.bytes[0], 0x02);
152152
}
153153

154154
TEST(ObjectStoreTest, LatestAtBetween) {
@@ -290,8 +290,8 @@ TEST(ObjectStoreTest, PushLazyResolves) {
290290
auto r = store.latestAt(id, 100);
291291
ASSERT_TRUE(r.has_value());
292292
EXPECT_EQ(call_count, 1);
293-
EXPECT_EQ(r->data->size(), 2u);
294-
EXPECT_EQ((*r->data)[0], 0xDE);
293+
EXPECT_EQ(r->payload.bytes.size(), 2u);
294+
EXPECT_EQ(r->payload.bytes[0], 0xDE);
295295
}
296296

297297
// =========================================================================
@@ -328,13 +328,13 @@ TEST(ObjectStoreTest, HandleSurvivesEviction) {
328328

329329
auto handle = store.latestAt(id, 100);
330330
ASSERT_TRUE(handle.has_value());
331-
EXPECT_EQ((*handle->data)[0], 0xAA);
331+
EXPECT_EQ(handle->payload.bytes[0], 0xAA);
332332

333333
store.evictBefore(id, 150);
334334
EXPECT_EQ(store.entryCount(id), 1u);
335335

336-
EXPECT_EQ(handle->data->size(), 4u);
337-
EXPECT_EQ((*handle->data)[0], 0xAA);
336+
EXPECT_EQ(handle->payload.bytes.size(), 4u);
337+
EXPECT_EQ(handle->payload.bytes[0], 0xAA);
338338
}
339339

340340
// =========================================================================
@@ -490,13 +490,13 @@ TEST(ObjectStoreFlushTest, BasicMoveOwnedEntries) {
490490
auto e0 = dst.at(dst_id, 0);
491491
ASSERT_TRUE(e0.has_value());
492492
EXPECT_EQ(e0->timestamp, 100);
493-
EXPECT_EQ(e0->data->size(), 8u);
494-
EXPECT_EQ((*e0->data)[0], 0xAA);
493+
EXPECT_EQ(e0->payload.bytes.size(), 8u);
494+
EXPECT_EQ(e0->payload.bytes[0], 0xAA);
495495

496496
auto e2 = dst.at(dst_id, 2);
497497
ASSERT_TRUE(e2.has_value());
498498
EXPECT_EQ(e2->timestamp, 300);
499-
EXPECT_EQ((*e2->data)[0], 0xCC);
499+
EXPECT_EQ(e2->payload.bytes[0], 0xCC);
500500
}
501501

502502
TEST(ObjectStoreFlushTest, PreservesTopicRegistrationOnSrc) {
@@ -605,8 +605,8 @@ TEST(ObjectStoreFlushTest, MultipleTopicsFlushed) {
605605

606606
EXPECT_EQ(dst.entryCount(dst_a), 2u);
607607
EXPECT_EQ(dst.entryCount(dst_b), 2u);
608-
EXPECT_EQ((*dst.at(dst_a, 0)->data)[0], 0xAA);
609-
EXPECT_EQ((*dst.at(dst_b, 1)->data)[0], 0xDD);
608+
EXPECT_EQ(dst.at(dst_a, 0)->payload.bytes[0], 0xAA);
609+
EXPECT_EQ(dst.at(dst_b, 1)->payload.bytes[0], 0xDD);
610610
}
611611

612612
TEST(ObjectStoreFlushTest, LazyEntriesPreserveSemantics) {
@@ -615,9 +615,9 @@ TEST(ObjectStoreFlushTest, LazyEntriesPreserveSemantics) {
615615
auto dst_id = registerSameDescriptor(dst);
616616

617617
int src_invocations = 0;
618-
src.pushLazy(src_id, 100, [&src_invocations]() -> std::vector<uint8_t> {
618+
src.pushLazy(src_id, 100, [&src_invocations]() -> sdk::PayloadView {
619619
++src_invocations;
620-
return {0xDE, 0xAD, 0xBE, 0xEF};
620+
return sdk::makePayloadView({0xDE, 0xAD, 0xBE, 0xEF});
621621
});
622622

623623
// Flush itself must NOT invoke the closure — it just moves the std::function.
@@ -632,8 +632,8 @@ TEST(ObjectStoreFlushTest, LazyEntriesPreserveSemantics) {
632632
auto resolved = dst.latestAt(dst_id, 100);
633633
ASSERT_TRUE(resolved.has_value());
634634
EXPECT_EQ(src_invocations, 1);
635-
ASSERT_EQ(resolved->data->size(), 4u);
636-
EXPECT_EQ((*resolved->data)[0], 0xDE);
635+
ASSERT_EQ(resolved->payload.bytes.size(), 4u);
636+
EXPECT_EQ(resolved->payload.bytes[0], 0xDE);
637637
}
638638

639639
TEST(ObjectStoreFlushTest, RetentionBudgetAppliedAfterFlush) {
@@ -668,13 +668,13 @@ TEST(ObjectStoreFlushTest, ZeroCopyOwnershipChainSurvives) {
668668
src.pushOwned(src_id, 100, makePayload(16, 0x77));
669669
auto pre_handle = src.at(src_id, 0);
670670
ASSERT_TRUE(pre_handle.has_value());
671-
const auto* pre_ptr = pre_handle->data.get();
671+
const auto* pre_ptr = pre_handle->payload.anchor.get();
672672

673673
ASSERT_TRUE(src.flushTo(dst).has_value());
674674

675675
auto post_handle = dst.at(dst_id, 0);
676676
ASSERT_TRUE(post_handle.has_value());
677-
EXPECT_EQ(post_handle->data.get(), pre_ptr) << "shared_ptr identity must survive the flush";
677+
EXPECT_EQ(post_handle->payload.anchor.get(), pre_ptr) << "shared_ptr identity must survive the flush";
678678
}
679679

680680
} // namespace

pj_datastore/tests/plugin_data_host_object_test.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ TEST(PluginDataHostObjectTest, PushOwnedStoresBytes) {
6363
EXPECT_EQ(f.store.entryCount(store_id), 2U);
6464
auto resolved = f.store.latestAt(store_id, 2000);
6565
ASSERT_TRUE(resolved.has_value());
66-
ASSERT_NE(resolved->data, nullptr);
67-
EXPECT_EQ(resolved->data->size(), payload.size());
68-
EXPECT_EQ(*resolved->data, payload);
66+
ASSERT_NE(resolved->payload.anchor, nullptr);
67+
EXPECT_EQ(resolved->payload.bytes.size(), payload.size());
68+
EXPECT_EQ(std::vector<uint8_t>(resolved->payload.bytes.begin(), resolved->payload.bytes.end()), payload);
6969
}
7070

7171
TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) {
@@ -93,8 +93,8 @@ TEST(PluginDataHostObjectTest, PushLazyRetainsClosureUntilEviction) {
9393
// Each read invokes the fetch closure.
9494
auto first = f.store.latestAt(ObjectTopicId{topic.id}, 42);
9595
ASSERT_TRUE(first.has_value());
96-
ASSERT_NE(first->data, nullptr);
97-
EXPECT_EQ(*first->data, shared->payload);
96+
ASSERT_NE(first->payload.anchor, nullptr);
97+
EXPECT_EQ(std::vector<uint8_t>(first->payload.bytes.begin(), first->payload.bytes.end()), shared->payload);
9898
EXPECT_GE(shared->fetch_calls.load(), 1);
9999

100100
auto second = f.store.latestAt(ObjectTopicId{topic.id}, 42);
@@ -149,7 +149,7 @@ TEST(PluginDataHostObjectTest, PushLazyDestroyCallbackRunsExactlyOnceOnEviction)
149149
// Fetch once — the callback runs but the ctx stays alive.
150150
auto resolved = f.store.latestAt(ObjectTopicId{topic.id}, 100);
151151
ASSERT_TRUE(resolved.has_value());
152-
EXPECT_EQ(*resolved->data, ctx->payload);
152+
EXPECT_EQ(std::vector<uint8_t>(resolved->payload.bytes.begin(), resolved->payload.bytes.end()), ctx->payload);
153153
EXPECT_EQ(ctx->destroy_count.load(), 0);
154154

155155
// Evict — destroy_fn runs exactly once.

pj_datastore/tests/plugin_parser_object_write_test.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,9 @@ TEST(ParserObjectWriteHostTest, ParserWritesToBothHostsFromOneParse) {
140140
// Object-store side: bytes landed.
141141
auto resolved = store.latestAt(ObjectTopicId{obj_topic.id}, 100);
142142
ASSERT_TRUE(resolved.has_value());
143-
ASSERT_NE(resolved->data, nullptr);
143+
ASSERT_NE(resolved->payload.anchor, nullptr);
144144
const std::vector<uint8_t> expected{0xAA, 0xBB, 0xCC};
145-
EXPECT_EQ(*resolved->data, expected);
145+
EXPECT_EQ(std::vector<uint8_t>(resolved->payload.bytes.begin(), resolved->payload.bytes.end()), expected);
146146

147147
// (Scalar side requires flushing + a read path; Phase-3 scope is proving
148148
// both hosts were resolved and invoked. Scalar writes go into DataEngine
@@ -202,7 +202,9 @@ TEST(ParserObjectWriteHostTest, ObjectHostViewPushLazyThroughSdk) {
202202

203203
auto resolved = store.latestAt(ObjectTopicId{topic.id}, 10);
204204
ASSERT_TRUE(resolved.has_value());
205-
EXPECT_EQ(*resolved->data, (std::vector<uint8_t>{0xAA, 0xBB}));
205+
EXPECT_EQ(
206+
std::vector<uint8_t>(resolved->payload.bytes.begin(), resolved->payload.bytes.end()),
207+
(std::vector<uint8_t>{0xAA, 0xBB}));
206208
EXPECT_GE(fetch_calls, 1);
207209
}
208210

0 commit comments

Comments
 (0)