Skip to content

Commit 1447d4f

Browse files
refactor(object_store): PayloadView end-to-end; cross-store flushTo; atomic target swap (#105)
* refactor(object_store): PayloadView end-to-end; cross-store flushTo; atomic target swap A consolidated refactor of the ObjectStore / DataEngine surface that makes PayloadView the universal vocabulary for ownership of bytes across the SDK, adds zero-copy cross-instance transfer of entries between stores, and exposes an atomic target-swap primitive on the plugin data host used to redirect writes between two stores at runtime. ObjectStore + ObjectEntry - ObjectEntry::payload is std::any holding either std::shared_ptr<const std::vector<uint8_t>> (eager owned bytes, counted against the retention budget) or std::function<sdk::PayloadView()> (lazy resolver returning Span + BufferAnchor; not counted, bytes owned upstream). resolveEntry dispatches via std::any_cast; each branch handles its own concrete type. No static_pointer_cast on the lazy anchor — its type erasure (BufferAnchor = shared_ptr<const void>) is preserved end-to-end so producers can anchor on any shared_ptr<T>. - ResolvedObjectEntry consolidates on PayloadView (Span + anchor). Removes the previous shared_ptr<const vector<uint8_t>> field, which locked consumers to a concrete anchor type and required a hidden static_pointer_cast in resolveEntry that would fail silently for non-vector anchors. - ObjectStore::flushTo(ObjectStore& dst): two-phase, atomic, zero- copy bulk transfer of entries between two instances. Topics matched by descriptor; monotonicity enforced strictly per series; failure leaves both sides untouched. Each ObjectEntry is moved by value via std::move; the std::any inside transfers its buffer intact. - DataEngine::flushTo(DataEngine& dst): symmetric primitive for the columnar scalar store. Each TopicStorage's sealed-chunk deque is moved through to the destination; no chunk constructor invoked. - sdk::makePayloadView(std::vector<uint8_t>) replaces sdk::makeOwnedPayloadView. Wraps a vector into a shared_ptr that serves as both the bytes backing and the BufferAnchor. - ObjectBytesBox removed from plugin_data_host. The C-ABI toolbox handle becomes a heap-allocated PayloadView directly. plugin_data_host atomic target swap - setTarget on engine and parser write hosts atomically swap which engine/store receives subsequent writes. Safe under concurrent ingest: in-flight writes complete on the previous target. Version bump - 0.4.0 -> 0.5.0. Source-incompatible: ResolvedObjectEntry::data renamed to payload (PayloadView instead of shared_ptr<vector>). Consumers migrate from entry->data->{data,size,empty}() to entry->payload.bytes.{data,size,empty}(); from entry->data to entry->payload.anchor for ownership retention. * minor changes --------- Co-authored-by: Davide Faconti <davide.faconti@gmail.com>
1 parent 2768118 commit 1447d4f

11 files changed

Lines changed: 769 additions & 54 deletions

pj_datastore/include/pj_datastore/engine.hpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,22 @@ class DataEngine {
7979
/// derived.onSourceCommitted(engine.commitChunks(writer.flushAll()));
8080
std::vector<PJ::TopicId> commitChunks(std::vector<std::pair<PJ::TopicId, TopicChunk>> chunks);
8181

82-
/// Evict old chunks outside retention window.
82+
/// Evict old chunks outside the retention window.
8383
void enforceRetention(PJ::Timestamp retention_window_ns);
8484

85+
/// Move every committed chunk into `dst`, leaving this engine's storages
86+
/// empty (datasets, topics, schemas, time domains stay registered). Topics
87+
/// are matched by descriptor (`dataset_id` + `name`); both engines must have
88+
/// them registered. Monotonicity is enforced per topic: the source's
89+
/// earliest chunk timestamp must be >= the destination's `time_max()`. Any
90+
/// failure mutates neither engine.
91+
///
92+
/// Zero-copy: dst's `std::deque<TopicChunk>` receives the chunks via
93+
/// `std::move` (column buffers/value arrays are pointer moves). Schema
94+
/// compatibility is the caller's responsibility — typically dst is kept in
95+
/// lockstep with the source via parallel registration at startup.
96+
PJ::Status flushTo(DataEngine& dst);
97+
8598
// Writer/Reader factories
8699
/// Create a writer bound to this engine.
87100
[[nodiscard]] DataWriter createWriter();

pj_datastore/include/pj_datastore/object_store.hpp

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,15 @@ using LazyCallback = std::function<sdk::PayloadView()>;
4949

5050
struct ObjectEntry {
5151
Timestamp timestamp = 0;
52+
// Eager owned bytes or a lazy resolver; resolveEntry discriminates via std::get_if.
5253
std::variant<SharedBuffer, LazyCallback> payload;
5354
};
5455

5556
struct ResolvedObjectEntry {
5657
Timestamp timestamp = 0;
58+
// Non-owning Span over the bytes plus an opaque anchor (any shared_ptr<T>).
59+
// Consumers read `payload.bytes`; retain `payload.anchor` to keep the bytes
60+
// alive past the resolve call. resolveEntry never casts the anchor.
5761
sdk::PayloadView payload;
5862
};
5963

@@ -117,9 +121,9 @@ class ObjectStore {
117121

118122
Status pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);
119123

120-
// Fetcher runs on every read. Producers anchor on whatever owns the bytes
121-
// (chunk cache, mmap, fresh allocation); the store never copies — it just
122-
// retains the anchor through PayloadView.
124+
// Fetcher runs on every read; the store retains the anchor via PayloadView
125+
// and never copies. The closure can return a view over bytes the producer
126+
// already owns (chunk cache, mmap, hand-off between stores).
123127
Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch);
124128

125129
// --- Read ---
@@ -147,6 +151,20 @@ class ObjectStore {
147151
void evictBefore(ObjectTopicId id, Timestamp threshold);
148152
void evictAllBefore(Timestamp threshold);
149153

154+
// --- Cross-store flush ---
155+
156+
// Move every entry into `dst`, leaving this store empty (registrations kept).
157+
// Topics are matched by descriptor (dataset_id + topic_name); both stores
158+
// must share descriptors. Monotonicity is enforced per series: the earliest
159+
// moved timestamp must be >= the destination's last. Any validation failure
160+
// returns an error and mutates neither store.
161+
//
162+
// Zero-copy: each ObjectEntry is moved by value, so the variant's shared_ptr
163+
// or closure transfers as a pointer move — bytes are never copied. Lazy
164+
// entries keep their semantics; their closure re-runs only on a dst read.
165+
// Afterward, dst's retention budget is applied to each touched series.
166+
Status flushTo(ObjectStore& dst);
167+
150168
// --- Lifecycle ---
151169

152170
void removeTopic(ObjectTopicId id);

pj_datastore/include/pj_datastore/plugin_data_host.hpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ class DatastoreSourceWriteHost {
3131
[[nodiscard]] PJ_source_write_host_t raw() noexcept;
3232
void flushPending();
3333

34+
// Atomically swap the destination DataEngine. Mirrors the parser write
35+
// host's setTarget: the streaming two-engine flow routes source-level
36+
// scalar pushes to a secondary DataEngine during pause and back to the
37+
// primary on resume. Pending rows are flushed to the current engine
38+
// before the switch. Does not take ownership.
39+
void setTarget(DataEngine* target);
40+
3441
private:
3542
std::unique_ptr<DatastoreSourceWriteHostState> state_;
3643
};
@@ -51,6 +58,13 @@ class DatastoreSourceObjectWriteHost {
5158

5259
[[nodiscard]] PJ_object_write_host_t raw() noexcept;
5360

61+
// Atomically swap the destination store. Used by the streaming two-store
62+
// flow to route pushes to a secondary ObjectStore during pause and back to
63+
// the primary on resume. Must point to a live store; the host does not
64+
// take ownership and the caller must keep the target alive for as long as
65+
// the host can receive pushes against it.
66+
void setTarget(ObjectStore* target) noexcept;
67+
5468
private:
5569
std::unique_ptr<DatastoreSourceObjectWriteHostState> state_;
5670
};
@@ -68,6 +82,14 @@ class DatastoreParserWriteHost {
6882
[[nodiscard]] PJ_parser_write_host_t raw() noexcept;
6983
void flushPending();
7084

85+
// Atomically swap the destination DataEngine. Mirrors the object write
86+
// host's setTarget: the streaming two-store flow routes scalar pushes to a
87+
// secondary DataEngine during pause and back to the primary on resume.
88+
// Pending rows are flushed to the current engine before the switch; the
89+
// bound topic must exist in `target` with the same TopicId (the streaming
90+
// manager registers it lockstep on both engines). Does not take ownership.
91+
void setTarget(DataEngine* target);
92+
7193
private:
7294
std::unique_ptr<DatastoreParserWriteHostState> state_;
7395
};
@@ -111,6 +133,13 @@ class DatastoreParserObjectWriteHost {
111133

112134
[[nodiscard]] PJ_parser_object_write_host_t raw() noexcept;
113135

136+
// Atomically swap the destination store. Used by the streaming two-store
137+
// flow to route pushes to a secondary ObjectStore during pause and back to
138+
// the primary on resume. The bound topic id must exist in `target` (the
139+
// streaming manager ensures this via lockstep registerTopic on both
140+
// stores). The host does not take ownership of the target.
141+
void setTarget(ObjectStore* target) noexcept;
142+
114143
private:
115144
std::unique_ptr<DatastoreParserObjectWriteHostState> state_;
116145
};

pj_datastore/include/pj_datastore/topic_storage.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ struct TopicMetadata {
5757
uint32_t truncated_sample_count = 0;
5858
};
5959

60+
class DataEngine;
61+
6062
/// Storage container for committed chunks of one topic.
6163
class TopicStorage {
6264
public:
@@ -123,6 +125,12 @@ class TopicStorage {
123125
void setArrayExpansionCount(const std::string& field_path, uint32_t count);
124126

125127
private:
128+
// DataEngine::flushTo needs to move sealed_chunks_ between TopicStorage
129+
// instances of different engines without copying. Friending it lets the
130+
// transfer happen entirely inside DataEngine without exposing the move
131+
// primitive on the public TopicStorage API.
132+
friend class DataEngine;
133+
126134
TopicId topic_id_;
127135
TopicDescriptor descriptor_;
128136
std::deque<TopicChunk> sealed_chunks_;

pj_datastore/src/engine.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,63 @@ void DataEngine::enforceRetention(Timestamp retention_window_ns) {
182182
}
183183
}
184184

185+
Status DataEngine::flushTo(DataEngine& dst) {
186+
if (&dst == this) {
187+
return PJ::unexpected("flushTo: source and destination are the same engine");
188+
}
189+
190+
// Phase 1: validate. Walk every src topic with sealed chunks and look up
191+
// the matching dst topic by descriptor (dataset_id + name). Verify
192+
// monotonicity against dst's current time_max. No mutation yet.
193+
struct Step {
194+
TopicStorage* src;
195+
TopicStorage* dst;
196+
};
197+
std::vector<Step> plan;
198+
plan.reserve(impl_->topics.size());
199+
200+
for (auto it = impl_->topics.begin(); it != impl_->topics.end(); ++it) {
201+
auto& src_storage = it.value();
202+
if (src_storage.empty()) {
203+
continue;
204+
}
205+
TopicStorage* dst_storage = nullptr;
206+
for (auto dst_it = dst.impl_->topics.begin(); dst_it != dst.impl_->topics.end(); ++dst_it) {
207+
auto& candidate = dst_it.value();
208+
if (candidate.descriptor().dataset_id == src_storage.descriptor().dataset_id &&
209+
candidate.descriptor().name == src_storage.descriptor().name) {
210+
dst_storage = &candidate;
211+
break;
212+
}
213+
}
214+
if (dst_storage == nullptr) {
215+
return PJ::unexpected(
216+
"flushTo: destination has no topic '" + src_storage.descriptor().name + "' for dataset " +
217+
std::to_string(src_storage.descriptor().dataset_id));
218+
}
219+
if (!dst_storage->empty() && src_storage.time_min() < dst_storage->time_max()) {
220+
return PJ::unexpected("flushTo: monotonicity violation for topic '" + src_storage.descriptor().name + "'");
221+
}
222+
plan.push_back({&src_storage, dst_storage});
223+
}
224+
225+
// Phase 2: execute. friend access lets us move sealed_chunks_ directly
226+
// between TopicStorage instances of different engines — the deque move
227+
// transfers chunk ownership without copying any column data or value
228+
// buffers. Each chunk's TopicChunkStats (t_min/t_max/row_count) rides
229+
// along inside the chunk by value, so dst's time_min/time_max queries
230+
// reflect the new state immediately after the move.
231+
for (auto& step : plan) {
232+
auto drained = std::move(step.src->sealed_chunks_);
233+
step.src->sealed_chunks_.clear(); // post-move state: deque is valid but empty.
234+
for (auto& chunk : drained) {
235+
step.dst->sealed_chunks_.push_back(std::move(chunk));
236+
}
237+
}
238+
239+
return {};
240+
}
241+
185242
// ---------------------------------------------------------------------------
186243
// Listing helpers
187244
// ---------------------------------------------------------------------------

pj_datastore/src/object_store.cpp

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ Status ObjectStore::pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector
7979
return unexpected("timestamp not monotonically non-decreasing");
8080
}
8181

82-
size_t payload_size = payload.size();
82+
const size_t payload_size = payload.size();
83+
8384
auto shared_data = std::make_shared<const std::vector<uint8_t>>(std::move(payload));
8485

8586
ObjectEntry entry;
@@ -264,6 +265,74 @@ void ObjectStore::evictAllBefore(Timestamp threshold) {
264265
}
265266
}
266267

268+
// --- Cross-store flush ---
269+
270+
Status ObjectStore::flushTo(ObjectStore& dst) {
271+
if (&dst == this) {
272+
return unexpected("flushTo: source and destination are the same store");
273+
}
274+
275+
// Deterministic lock order by address to avoid deadlock with concurrent flushTo calls.
276+
ObjectStore* first = this < &dst ? this : &dst;
277+
ObjectStore* second = first == this ? &dst : this;
278+
std::unique_lock first_lock(first->store_mutex_);
279+
std::unique_lock second_lock(second->store_mutex_);
280+
281+
// Phase 1: validate every source series can be matched to a destination
282+
// topic by descriptor and that the move respects monotonicity. No mutation.
283+
struct Step {
284+
ObjectSeries* src;
285+
ObjectSeries* dst;
286+
};
287+
std::vector<Step> plan;
288+
plan.reserve(topics_.size());
289+
290+
for (auto& [src_id, src_series] : topics_) {
291+
if (src_series->entry_timestamps.empty()) {
292+
continue;
293+
}
294+
ObjectSeries* dst_series = nullptr;
295+
for (auto& [dst_id, dst_series_ptr] : dst.topics_) {
296+
if (dst_series_ptr->descriptor.dataset_id == src_series->descriptor.dataset_id &&
297+
dst_series_ptr->descriptor.topic_name == src_series->descriptor.topic_name) {
298+
dst_series = dst_series_ptr.get();
299+
break;
300+
}
301+
}
302+
if (dst_series == nullptr) {
303+
return unexpected(
304+
"flushTo: destination has no topic '" + src_series->descriptor.topic_name + "' for dataset " +
305+
std::to_string(src_series->descriptor.dataset_id));
306+
}
307+
if (!dst_series->entry_timestamps.empty() &&
308+
src_series->entry_timestamps.front() < dst_series->entry_timestamps.back()) {
309+
return unexpected("flushTo: monotonicity violation for topic '" + src_series->descriptor.topic_name + "'");
310+
}
311+
plan.push_back({src_series.get(), dst_series});
312+
}
313+
314+
// Phase 2: execute the moves. Holding both store_mutex_ unique means no
315+
// other reader or writer can observe an intermediate state; per-series
316+
// mutexes are not needed because no concurrent access can occur.
317+
for (auto& step : plan) {
318+
for (auto& entry : step.src->entries) {
319+
step.dst->entries.push_back(std::move(entry));
320+
}
321+
step.dst->entry_timestamps.insert(
322+
step.dst->entry_timestamps.end(), step.src->entry_timestamps.begin(), step.src->entry_timestamps.end());
323+
step.dst->memory_bytes += step.src->memory_bytes;
324+
325+
step.src->entries.clear();
326+
step.src->entry_timestamps.clear();
327+
step.src->memory_bytes = 0;
328+
329+
const Timestamp newest = step.dst->entry_timestamps.empty() ? 0 : step.dst->entry_timestamps.back();
330+
applyRetention(*step.dst, newest);
331+
}
332+
333+
return {};
334+
}
335+
267336
// --- Lifecycle ---
268337

269338
void ObjectStore::removeTopic(ObjectTopicId id) {
@@ -305,13 +374,17 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) {
305374
resolved.timestamp = entry.timestamp;
306375

307376
if (const auto* owned = std::get_if<SharedBuffer>(&entry.payload)) {
377+
// Span the vector, anchor on the same shared_ptr — refcount bump, no copy.
378+
// A default-constructed entry holds a null SharedBuffer, so guard it.
308379
if (*owned) {
309380
resolved.payload = sdk::PayloadView{
310381
Span<const uint8_t>{(*owned)->data(), (*owned)->size()},
311382
sdk::BufferAnchor{*owned},
312383
};
313384
}
314385
} else if (const auto* lazy = std::get_if<LazyCallback>(&entry.payload)) {
386+
// Forward the closure's PayloadView verbatim. The anchor stays opaque (no
387+
// cast), so producers can back it with arrow::Buffer, mmap, or a C-ABI anchor.
315388
resolved.payload = (*lazy)();
316389
}
317390

0 commit comments

Comments
 (0)