Skip to content

Commit fc16f80

Browse files
committed
chore: minor changes
Adopt the follow-up tweaks from upstream PR #105: small reshuffles in engine.hpp / object_store.hpp / object_store.cpp / engine.cpp / plugin_data_host.cpp (net -32 lines across the five files). No semantic or API impact; tightens declarations and trims a few unused includes.
1 parent 353bd4e commit fc16f80

5 files changed

Lines changed: 37 additions & 69 deletions

File tree

pj_datastore/include/pj_datastore/engine.hpp

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,22 +82,18 @@ class DataEngine {
8282
/// Evict old chunks outside the retention window.
8383
void enforceRetention(PJ::Timestamp retention_window_ns);
8484

85-
/// Move every committed chunk from this engine into `dst`, leaving this
86-
/// engine's topic storages empty (datasets, topics, schemas, and time
87-
/// domains remain registered). Topics are matched by descriptor
88-
/// (`dataset_id` + `name`); both engines must have the matching topics
89-
/// registered or the call fails without partial mutation. Monotonicity is
90-
/// enforced strictly per topic: the source's earliest chunk timestamp must
91-
/// be greater than or equal to the destination's current `time_max()`.
85+
/// Move every committed chunk into `dst`, leaving this engine's storages
86+
/// empty (datasets, topics, schemas, time domains stay registered). Topics
87+
/// are matched by descriptor (`dataset_id` + `name`); both engines must have
88+
/// them registered. Monotonicity is enforced per topic: the source's
89+
/// earliest chunk timestamp must be >= the destination's `time_max()`. Any
90+
/// failure mutates neither engine.
9291
///
93-
/// Zero-copy on the chunk data: the destination's `std::deque<TopicChunk>`
94-
/// receives the source's chunks via `std::move`, never via copy. The chunk
95-
/// internals (column buffers, value arrays) are pointer/buffer moves.
96-
///
97-
/// Schema compatibility is the caller's responsibility — typically the
98-
/// destination is kept in lockstep with the source through parallel
99-
/// `createDataset` / `createTopic` registration at startup.
100-
PJ::Expected<void, std::string> flushTo(DataEngine& dst);
92+
/// Zero-copy: dst's `std::deque<TopicChunk>` receives the chunks via
93+
/// `std::move` (column buffers/value arrays are pointer moves). Schema
94+
/// compatibility is the caller's responsibility — typically dst is kept in
95+
/// lockstep with the source via parallel registration at startup.
96+
PJ::Status flushTo(DataEngine& dst);
10197

10298
// Writer/Reader factories
10399
/// Create a writer bound to this engine.

pj_datastore/include/pj_datastore/object_store.hpp

Lines changed: 17 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -49,24 +49,15 @@ using LazyCallback = std::function<sdk::PayloadView()>;
4949

5050
struct ObjectEntry {
5151
Timestamp timestamp = 0;
52-
// Holds either a SharedBuffer (eager owned payload, counted against the
53-
// retention budget) or a LazyCallback (lazy resolver). resolveEntry
54-
// discriminates via std::get_if; the variant is exhaustive over the two
55-
// (and only two) payload kinds.
52+
// Eager owned bytes or a lazy resolver; resolveEntry discriminates via std::get_if.
5653
std::variant<SharedBuffer, LazyCallback> payload;
5754
};
5855

5956
struct ResolvedObjectEntry {
6057
Timestamp timestamp = 0;
61-
// PayloadView lets the entry hold an opaque anchor (any shared_ptr<T>) plus
62-
// a non-owning Span over the bytes. Consumers read `payload.bytes` for the
63-
// data; they retain `payload.anchor` if they need the bytes to outlive the
64-
// resolve call. Both `pushOwned` and `pushLazy` paths land here:
65-
// - owned: payload.bytes spans the shared_ptr<vector<uint8_t>>; anchor IS that shared_ptr.
66-
// - lazy: payload is whatever the closure returns; anchor can be any shared_ptr<T>
67-
// (e.g. a C-ABI payload anchor wrapped as shared_ptr<void>).
68-
// resolveEntry never casts the anchor to a concrete type; the type erasure
69-
// stays opaque all the way to the consumer.
58+
// Non-owning Span over the bytes plus an opaque anchor (any shared_ptr<T>).
59+
// Consumers read `payload.bytes`; retain `payload.anchor` to keep the bytes
60+
// alive past the resolve call. resolveEntry never casts the anchor.
7061
sdk::PayloadView payload;
7162
};
7263

@@ -130,11 +121,9 @@ class ObjectStore {
130121

131122
Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);
132123

133-
// Fetcher runs on every read. Producers anchor on whatever owns the bytes
134-
// (chunk cache, mmap, fresh allocation); the store never copies — it just
135-
// retains the anchor through PayloadView. When the producer already holds
136-
// the bytes behind a shared_ptr (e.g. a streaming buffer handed off between
137-
// stores), the closure captures it and returns a view backed by it.
124+
// Fetcher runs on every read; the store retains the anchor via PayloadView
125+
// and never copies. The closure can return a view over bytes the producer
126+
// already owns (chunk cache, mmap, hand-off between stores).
138127
Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch);
139128

140129
// --- Read ---
@@ -164,26 +153,17 @@ class ObjectStore {
164153

165154
// --- Cross-store flush ---
166155

167-
// Move every entry from this store into `dst`, leaving this store empty
168-
// (topic registrations are preserved). Topics are matched by descriptor
169-
// (dataset_id + topic_name); both stores must have registered the same
170-
// descriptors or the call fails without partial mutation. For each series,
171-
// monotonicity is enforced strictly: the earliest timestamp being moved
172-
// must be greater than or equal to the destination's current last
173-
// timestamp. On any validation failure the call returns an error and
174-
// neither store is mutated.
156+
// Move every entry into `dst`, leaving this store empty (registrations kept).
157+
// Topics are matched by descriptor (dataset_id + topic_name); both stores
158+
// must share descriptors. Monotonicity is enforced per series: the earliest
159+
// moved timestamp must be >= the destination's last. Any validation failure
160+
// returns an error and mutates neither store.
175161
//
176-
// Zero-copy on the payload bytes. Each ObjectEntry is moved into the
177-
// destination's series by value; the std::variant inside holds either a
178-
// shared_ptr or a std::function, and moving it is a pointer/buffer move —
179-
// bytes captured by the closure or owned by the shared_ptr are never
180-
// copied or materialized during the flush. Lazy
181-
// entries preserve their semantics in the destination: their closure is
182-
// re-invoked only when the destination is read.
183-
//
184-
// After the move, the destination's retention budget is applied to each
185-
// touched series in normal order.
186-
Expected<void, std::string> flushTo(ObjectStore& dst);
162+
// Zero-copy: each ObjectEntry is moved by value, so the variant's shared_ptr
163+
// or closure transfers as a pointer move — bytes are never copied. Lazy
164+
// entries keep their semantics; their closure re-runs only on a dst read.
165+
// Afterward, dst's retention budget is applied to each touched series.
166+
Status flushTo(ObjectStore& dst);
187167

188168
// --- Lifecycle ---
189169

pj_datastore/src/engine.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ void DataEngine::enforceRetention(Timestamp retention_window_ns) {
182182
}
183183
}
184184

185-
Expected<void, std::string> DataEngine::flushTo(DataEngine& dst) {
185+
Status DataEngine::flushTo(DataEngine& dst) {
186186
if (&dst == this) {
187187
return PJ::unexpected("flushTo: source and destination are the same engine");
188188
}

pj_datastore/src/object_store.cpp

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ void ObjectStore::evictAllBefore(Timestamp threshold) {
267267

268268
// --- Cross-store flush ---
269269

270-
Expected<void, std::string> ObjectStore::flushTo(ObjectStore& dst) {
270+
Status ObjectStore::flushTo(ObjectStore& dst) {
271271
if (&dst == this) {
272272
return unexpected("flushTo: source and destination are the same store");
273273
}
@@ -374,22 +374,17 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) {
374374
resolved.timestamp = entry.timestamp;
375375

376376
if (const auto* owned = std::get_if<SharedBuffer>(&entry.payload)) {
377-
// Owned branch: build a PayloadView whose Span covers the vector and
378-
// whose anchor IS the same shared_ptr — refcount bump, no copy. A
379-
// default-constructed entry holds a null SharedBuffer, so guard it.
377+
// Span the vector, anchor on the same shared_ptr — refcount bump, no copy.
378+
// A default-constructed entry holds a null SharedBuffer, so guard it.
380379
if (*owned) {
381380
resolved.payload = sdk::PayloadView{
382381
Span<const uint8_t>{(*owned)->data(), (*owned)->size()},
383382
sdk::BufferAnchor{*owned},
384383
};
385384
}
386385
} else if (const auto* lazy = std::get_if<LazyCallback>(&entry.payload)) {
387-
// Lazy branch: forward whatever the closure returns. The anchor stays
388-
// opaque (shared_ptr<const void>); consumers retain it without needing
389-
// to know the concrete type. No static_pointer_cast here — that was the
390-
// hidden contract that locked the slot to shared_ptr<vector<uint8_t>>;
391-
// dropping it lets producers anchor on arrow::Buffer, mmap pools, or
392-
// C-ABI payload anchors equally.
386+
// Forward the closure's PayloadView verbatim. The anchor stays opaque (no
387+
// cast), so producers can back it with arrow::Buffer, mmap, or a C-ABI anchor.
393388
resolved.payload = (*lazy)();
394389
}
395390

pj_datastore/src/plugin_data_host.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,12 +1456,9 @@ void sourceObjectSetRetentionBudget(
14561456
// Toolbox object read host trampolines
14571457
// ---------------------------------------------------------------------------
14581458

1459-
// The C-ABI exposes the bytes via a void-handle (PJ_object_bytes_handle_t)
1460-
// that the plugin must later release. We allocate a sdk::PayloadView on the
1461-
// heap and reinterpret_cast its pointer to the handle: the PayloadView's
1462-
// anchor is what keeps the underlying buffer alive until the plugin calls
1463-
// release_bytes. No wrapper struct needed — PayloadView already carries
1464-
// bytes (Span) + anchor (BufferAnchor) in one value.
1459+
// PJ_object_bytes_handle_t is a heap-allocated sdk::PayloadView: its anchor
1460+
// keeps the buffer alive until the plugin calls release_bytes. No wrapper
1461+
// struct needed — PayloadView already carries the Span + anchor.
14651462
PJ_object_topic_handle_t toolboxObjectLookupTopic(void* ctx, PJ_string_view_t topic_name) noexcept {
14661463
auto* impl = static_cast<DatastoreToolboxObjectReadHostState*>(ctx);
14671464
try {

0 commit comments

Comments
 (0)