Skip to content

Commit fafa624

Browse files
pabloinigoblascofacontidavide
authored andcommitted
refactor(object_store): PayloadView end-to-end; cross-store flushTo; atomic target swap
A consolidated refactor of the ObjectStore / DataEngine surface that makes PayloadView the universal vocabulary for ownership of bytes across the SDK, adds zero-copy cross-instance transfer of entries between stores, and exposes an atomic target-swap primitive on the plugin data host used to redirect writes between two stores at runtime. ObjectStore + ObjectEntry - ObjectEntry::payload is std::any holding either std::shared_ptr<const std::vector<uint8_t>> (eager owned bytes, counted against the retention budget) or std::function<sdk::PayloadView()> (lazy resolver returning Span + BufferAnchor; not counted, bytes owned upstream). resolveEntry dispatches via std::any_cast; each branch handles its own concrete type. No static_pointer_cast on the lazy anchor — its type erasure (BufferAnchor = shared_ptr<const void>) is preserved end-to-end so producers can anchor on any shared_ptr<T>. - ResolvedObjectEntry consolidates on PayloadView (Span + anchor). Removes the previous shared_ptr<const vector<uint8_t>> field, which locked consumers to a concrete anchor type and required a hidden static_pointer_cast in resolveEntry that would fail silently for non-vector anchors. - ObjectStore::flushTo(ObjectStore& dst): two-phase, atomic, zero- copy bulk transfer of entries between two instances. Topics matched by descriptor; monotonicity enforced strictly per series; failure leaves both sides untouched. Each ObjectEntry is moved by value via std::move; the std::any inside transfers its buffer intact. - DataEngine::flushTo(DataEngine& dst): symmetric primitive for the columnar scalar store. Each TopicStorage's sealed-chunk deque is moved through to the destination; no chunk constructor invoked. - sdk::makePayloadView(std::vector<uint8_t>) replaces sdk::makeOwnedPayloadView. Wraps a vector into a shared_ptr that serves as both the bytes backing and the BufferAnchor. - ObjectBytesBox removed from plugin_data_host. The C-ABI toolbox handle becomes a heap-allocated PayloadView directly. plugin_data_host atomic target swap - setTarget on engine and parser write hosts atomically swap which engine/store receives subsequent writes. Safe under concurrent ingest: in-flight writes complete on the previous target. Version bump - 0.4.0 -> 0.5.0. Source-incompatible: ResolvedObjectEntry::data renamed to payload (PayloadView instead of shared_ptr<vector>). Consumers migrate from entry->data->{data,size,empty}() to entry->payload.bytes.{data,size,empty}(); from entry->data to entry->payload.anchor for ownership retention.
1 parent 2768118 commit fafa624

11 files changed

Lines changed: 799 additions & 52 deletions

pj_datastore/include/pj_datastore/engine.hpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,26 @@ class DataEngine {
7979
/// derived.onSourceCommitted(engine.commitChunks(writer.flushAll()));
8080
std::vector<PJ::TopicId> commitChunks(std::vector<std::pair<PJ::TopicId, TopicChunk>> chunks);
8181

82-
/// Evict old chunks outside retention window.
82+
/// Evict old chunks outside the retention window.
8383
void enforceRetention(PJ::Timestamp retention_window_ns);
8484

85+
/// Move every committed chunk from this engine into `dst`, leaving this
86+
/// engine's topic storages empty (datasets, topics, schemas, and time
87+
/// domains remain registered). Topics are matched by descriptor
88+
/// (`dataset_id` + `name`); both engines must have the matching topics
89+
/// registered or the call fails without partial mutation. Monotonicity is
90+
/// enforced strictly per topic: the source's earliest chunk timestamp must
91+
/// be greater than or equal to the destination's current `time_max()`.
92+
///
93+
/// Zero-copy on the chunk data: the destination's `std::deque<TopicChunk>`
94+
/// receives the source's chunks via `std::move`, never via copy. The chunk
95+
/// internals (column buffers, value arrays) are pointer/buffer moves.
96+
///
97+
/// Schema compatibility is the caller's responsibility — typically the
98+
/// destination is kept in lockstep with the source through parallel
99+
/// `createDataset` / `createTopic` registration at startup.
100+
PJ::Expected<void, std::string> flushTo(DataEngine& dst);
101+
85102
// Writer/Reader factories
86103
/// Create a writer bound to this engine.
87104
[[nodiscard]] DataWriter createWriter();

pj_datastore/include/pj_datastore/object_store.hpp

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,24 @@ using LazyCallback = std::function<sdk::PayloadView()>;
4949

5050
struct ObjectEntry {
5151
Timestamp timestamp = 0;
52+
// Holds either a SharedBuffer (eager owned payload, counted against the
53+
// retention budget) or a LazyCallback (lazy resolver). resolveEntry
54+
// discriminates via std::get_if; the variant is exhaustive over the two
55+
// (and only two) payload kinds.
5256
std::variant<SharedBuffer, LazyCallback> payload;
5357
};
5458

5559
struct ResolvedObjectEntry {
5660
Timestamp timestamp = 0;
61+
// PayloadView lets the entry hold an opaque anchor (any shared_ptr<T>) plus
62+
// a non-owning Span over the bytes. Consumers read `payload.bytes` for the
63+
// data; they retain `payload.anchor` if they need the bytes to outlive the
64+
// resolve call. Both `pushOwned` and `pushLazy` paths land here:
65+
// - owned: payload.bytes spans the shared_ptr<vector<uint8_t>>; anchor IS that shared_ptr.
66+
// - lazy: payload is whatever the closure returns; anchor can be any shared_ptr<T>
67+
// (e.g. a C-ABI payload anchor wrapped as shared_ptr<void>).
68+
// resolveEntry never casts the anchor to a concrete type; the type erasure
69+
// stays opaque all the way to the consumer.
5770
sdk::PayloadView payload;
5871
};
5972

@@ -119,7 +132,9 @@ class ObjectStore {
119132

120133
// Fetcher runs on every read. Producers anchor on whatever owns the bytes
121134
// (chunk cache, mmap, fresh allocation); the store never copies — it just
122-
// retains the anchor through PayloadView.
135+
// retains the anchor through PayloadView. When the producer already holds
136+
// the bytes behind a shared_ptr (e.g. a streaming buffer handed off between
137+
// stores), the closure captures it and returns a view backed by it.
123138
Status pushLazy(ObjectTopicId id, Timestamp timestamp, LazyCallback fetch);
124139

125140
// --- Read ---
@@ -147,6 +162,29 @@ class ObjectStore {
147162
void evictBefore(ObjectTopicId id, Timestamp threshold);
148163
void evictAllBefore(Timestamp threshold);
149164

165+
// --- Cross-store flush ---
166+
167+
// Move every entry from this store into `dst`, leaving this store empty
168+
// (topic registrations are preserved). Topics are matched by descriptor
169+
// (dataset_id + topic_name); both stores must have registered the same
170+
// descriptors or the call fails without partial mutation. For each series,
171+
// monotonicity is enforced strictly: the earliest timestamp being moved
172+
// must be greater than or equal to the destination's current last
173+
// timestamp. On any validation failure the call returns an error and
174+
// neither store is mutated.
175+
//
176+
// Zero-copy on the payload bytes. Each ObjectEntry is moved into the
177+
// destination's series by value; the std::variant inside holds either a
178+
// shared_ptr or a std::function, and moving it is a pointer/buffer move —
179+
// bytes captured by the closure or owned by the shared_ptr are never
180+
// copied or materialized during the flush. Lazy
181+
// entries preserve their semantics in the destination: their closure is
182+
// re-invoked only when the destination is read.
183+
//
184+
// After the move, the destination's retention budget is applied to each
185+
// touched series in normal order.
186+
Expected<void, std::string> flushTo(ObjectStore& dst);
187+
150188
// --- Lifecycle ---
151189

152190
void removeTopic(ObjectTopicId id);

pj_datastore/include/pj_datastore/plugin_data_host.hpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ class DatastoreSourceWriteHost {
3131
[[nodiscard]] PJ_source_write_host_t raw() noexcept;
3232
void flushPending();
3333

34+
// Atomically swap the destination DataEngine. Mirrors the parser write
35+
// host's setTarget: the streaming two-engine flow routes source-level
36+
// scalar pushes to a secondary DataEngine during pause and back to the
37+
// primary on resume. Pending rows are flushed to the current engine
38+
// before the switch. Does not take ownership.
39+
void setTarget(DataEngine* target);
40+
3441
private:
3542
std::unique_ptr<DatastoreSourceWriteHostState> state_;
3643
};
@@ -51,6 +58,13 @@ class DatastoreSourceObjectWriteHost {
5158

5259
[[nodiscard]] PJ_object_write_host_t raw() noexcept;
5360

61+
// Atomically swap the destination store. Used by the streaming two-store
62+
// flow to route pushes to a secondary ObjectStore during pause and back to
63+
// the primary on resume. Must point to a live store; the host does not
64+
// take ownership and the caller must keep the target alive for as long as
65+
// the host can receive pushes against it.
66+
void setTarget(ObjectStore* target) noexcept;
67+
5468
private:
5569
std::unique_ptr<DatastoreSourceObjectWriteHostState> state_;
5670
};
@@ -68,6 +82,14 @@ class DatastoreParserWriteHost {
6882
[[nodiscard]] PJ_parser_write_host_t raw() noexcept;
6983
void flushPending();
7084

85+
// Atomically swap the destination DataEngine. Mirrors the object write
86+
// host's setTarget: the streaming two-store flow routes scalar pushes to a
87+
// secondary DataEngine during pause and back to the primary on resume.
88+
// Pending rows are flushed to the current engine before the switch; the
89+
// bound topic must exist in `target` with the same TopicId (the streaming
90+
// manager registers it lockstep on both engines). Does not take ownership.
91+
void setTarget(DataEngine* target);
92+
7193
private:
7294
std::unique_ptr<DatastoreParserWriteHostState> state_;
7395
};
@@ -111,6 +133,13 @@ class DatastoreParserObjectWriteHost {
111133

112134
[[nodiscard]] PJ_parser_object_write_host_t raw() noexcept;
113135

136+
// Atomically swap the destination store. Used by the streaming two-store
137+
// flow to route pushes to a secondary ObjectStore during pause and back to
138+
// the primary on resume. The bound topic id must exist in `target` (the
139+
// streaming manager ensures this via lockstep registerTopic on both
140+
// stores). The host does not take ownership of the target.
141+
void setTarget(ObjectStore* target) noexcept;
142+
114143
private:
115144
std::unique_ptr<DatastoreParserObjectWriteHostState> state_;
116145
};

pj_datastore/include/pj_datastore/topic_storage.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ struct TopicMetadata {
5757
uint32_t truncated_sample_count = 0;
5858
};
5959

60+
class DataEngine;
61+
6062
/// Storage container for committed chunks of one topic.
6163
class TopicStorage {
6264
public:
@@ -123,6 +125,12 @@ class TopicStorage {
123125
void setArrayExpansionCount(const std::string& field_path, uint32_t count);
124126

125127
private:
128+
// DataEngine::flushTo needs to move sealed_chunks_ between TopicStorage
129+
// instances of different engines without copying. Friending it lets the
130+
// transfer happen entirely inside DataEngine without exposing the move
131+
// primitive on the public TopicStorage API.
132+
friend class DataEngine;
133+
126134
TopicId topic_id_;
127135
TopicDescriptor descriptor_;
128136
std::deque<TopicChunk> sealed_chunks_;

pj_datastore/src/engine.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,63 @@ void DataEngine::enforceRetention(Timestamp retention_window_ns) {
182182
}
183183
}
184184

185+
Expected<void, std::string> DataEngine::flushTo(DataEngine& dst) {
186+
if (&dst == this) {
187+
return PJ::unexpected("flushTo: source and destination are the same engine");
188+
}
189+
190+
// Phase 1: validate. Walk every src topic with sealed chunks and look up
191+
// the matching dst topic by descriptor (dataset_id + name). Verify
192+
// monotonicity against dst's current time_max. No mutation yet.
193+
struct Step {
194+
TopicStorage* src;
195+
TopicStorage* dst;
196+
};
197+
std::vector<Step> plan;
198+
plan.reserve(impl_->topics.size());
199+
200+
for (auto it = impl_->topics.begin(); it != impl_->topics.end(); ++it) {
201+
auto& src_storage = it.value();
202+
if (src_storage.empty()) {
203+
continue;
204+
}
205+
TopicStorage* dst_storage = nullptr;
206+
for (auto dst_it = dst.impl_->topics.begin(); dst_it != dst.impl_->topics.end(); ++dst_it) {
207+
auto& candidate = dst_it.value();
208+
if (candidate.descriptor().dataset_id == src_storage.descriptor().dataset_id &&
209+
candidate.descriptor().name == src_storage.descriptor().name) {
210+
dst_storage = &candidate;
211+
break;
212+
}
213+
}
214+
if (dst_storage == nullptr) {
215+
return PJ::unexpected(
216+
"flushTo: destination has no topic '" + src_storage.descriptor().name + "' for dataset " +
217+
std::to_string(src_storage.descriptor().dataset_id));
218+
}
219+
if (!dst_storage->empty() && src_storage.time_min() < dst_storage->time_max()) {
220+
return PJ::unexpected("flushTo: monotonicity violation for topic '" + src_storage.descriptor().name + "'");
221+
}
222+
plan.push_back({&src_storage, dst_storage});
223+
}
224+
225+
// Phase 2: execute. friend access lets us move sealed_chunks_ directly
226+
// between TopicStorage instances of different engines — the deque move
227+
// transfers chunk ownership without copying any column data or value
228+
// buffers. Each chunk's TopicChunkStats (t_min/t_max/row_count) rides
229+
// along inside the chunk by value, so dst's time_min/time_max queries
230+
// reflect the new state immediately after the move.
231+
for (auto& step : plan) {
232+
auto drained = std::move(step.src->sealed_chunks_);
233+
step.src->sealed_chunks_.clear(); // post-move state: deque is valid but empty.
234+
for (auto& chunk : drained) {
235+
step.dst->sealed_chunks_.push_back(std::move(chunk));
236+
}
237+
}
238+
239+
return {};
240+
}
241+
185242
// ---------------------------------------------------------------------------
186243
// Listing helpers
187244
// ---------------------------------------------------------------------------

pj_datastore/src/object_store.cpp

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ Status ObjectStore::pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector
7979
return unexpected("timestamp not monotonically non-decreasing");
8080
}
8181

82-
size_t payload_size = payload.size();
82+
const size_t payload_size = payload.size();
83+
8384
auto shared_data = std::make_shared<const std::vector<uint8_t>>(std::move(payload));
8485

8586
ObjectEntry entry;
@@ -264,6 +265,74 @@ void ObjectStore::evictAllBefore(Timestamp threshold) {
264265
}
265266
}
266267

268+
// --- Cross-store flush ---
269+
270+
Expected<void, std::string> ObjectStore::flushTo(ObjectStore& dst) {
271+
if (&dst == this) {
272+
return unexpected("flushTo: source and destination are the same store");
273+
}
274+
275+
// Deterministic lock order by address to avoid deadlock with concurrent flushTo calls.
276+
ObjectStore* first = this < &dst ? this : &dst;
277+
ObjectStore* second = first == this ? &dst : this;
278+
std::unique_lock first_lock(first->store_mutex_);
279+
std::unique_lock second_lock(second->store_mutex_);
280+
281+
// Phase 1: validate every source series can be matched to a destination
282+
// topic by descriptor and that the move respects monotonicity. No mutation.
283+
struct Step {
284+
ObjectSeries* src;
285+
ObjectSeries* dst;
286+
};
287+
std::vector<Step> plan;
288+
plan.reserve(topics_.size());
289+
290+
for (auto& [src_id, src_series] : topics_) {
291+
if (src_series->entry_timestamps.empty()) {
292+
continue;
293+
}
294+
ObjectSeries* dst_series = nullptr;
295+
for (auto& [dst_id, dst_series_ptr] : dst.topics_) {
296+
if (dst_series_ptr->descriptor.dataset_id == src_series->descriptor.dataset_id &&
297+
dst_series_ptr->descriptor.topic_name == src_series->descriptor.topic_name) {
298+
dst_series = dst_series_ptr.get();
299+
break;
300+
}
301+
}
302+
if (dst_series == nullptr) {
303+
return unexpected(
304+
"flushTo: destination has no topic '" + src_series->descriptor.topic_name + "' for dataset " +
305+
std::to_string(src_series->descriptor.dataset_id));
306+
}
307+
if (!dst_series->entry_timestamps.empty() &&
308+
src_series->entry_timestamps.front() < dst_series->entry_timestamps.back()) {
309+
return unexpected("flushTo: monotonicity violation for topic '" + src_series->descriptor.topic_name + "'");
310+
}
311+
plan.push_back({src_series.get(), dst_series});
312+
}
313+
314+
// Phase 2: execute the moves. Holding both store_mutex_ unique means no
315+
// other reader or writer can observe an intermediate state; per-series
316+
// mutexes are not needed because no concurrent access can occur.
317+
for (auto& step : plan) {
318+
for (auto& entry : step.src->entries) {
319+
step.dst->entries.push_back(std::move(entry));
320+
}
321+
step.dst->entry_timestamps.insert(
322+
step.dst->entry_timestamps.end(), step.src->entry_timestamps.begin(), step.src->entry_timestamps.end());
323+
step.dst->memory_bytes += step.src->memory_bytes;
324+
325+
step.src->entries.clear();
326+
step.src->entry_timestamps.clear();
327+
step.src->memory_bytes = 0;
328+
329+
const Timestamp newest = step.dst->entry_timestamps.empty() ? 0 : step.dst->entry_timestamps.back();
330+
applyRetention(*step.dst, newest);
331+
}
332+
333+
return {};
334+
}
335+
267336
// --- Lifecycle ---
268337

269338
void ObjectStore::removeTopic(ObjectTopicId id) {
@@ -305,13 +374,22 @@ ResolvedObjectEntry ObjectStore::resolveEntry(const ObjectEntry& entry) {
305374
resolved.timestamp = entry.timestamp;
306375

307376
if (const auto* owned = std::get_if<SharedBuffer>(&entry.payload)) {
377+
// Owned branch: build a PayloadView whose Span covers the vector and
378+
// whose anchor IS the same shared_ptr — refcount bump, no copy. A
379+
// default-constructed entry holds a null SharedBuffer, so guard it.
308380
if (*owned) {
309381
resolved.payload = sdk::PayloadView{
310382
Span<const uint8_t>{(*owned)->data(), (*owned)->size()},
311383
sdk::BufferAnchor{*owned},
312384
};
313385
}
314386
} else if (const auto* lazy = std::get_if<LazyCallback>(&entry.payload)) {
387+
// Lazy branch: forward whatever the closure returns. The anchor stays
388+
// opaque (shared_ptr<const void>); consumers retain it without needing
389+
// to know the concrete type. No static_pointer_cast here — that was the
390+
// hidden contract that locked the slot to shared_ptr<vector<uint8_t>>;
391+
// dropping it lets producers anchor on arrow::Buffer, mmap pools, or
392+
// C-ABI payload anchors equally.
315393
resolved.payload = (*lazy)();
316394
}
317395

0 commit comments

Comments
 (0)