Skip to content

Commit 0d0a763

Browse files
refactor(object_store): PayloadView end-to-end; cross-store flushTo; atomic target swap
A consolidated refactor of the ObjectStore / DataEngine surface that makes PayloadView the universal vocabulary for ownership of bytes across the SDK, adds zero-copy cross-instance transfer of entries between stores, and exposes an atomic target-swap primitive on the plugin data host used to redirect writes between two stores at runtime. ObjectStore + ObjectEntry - ObjectEntry::payload is std::any holding either std::shared_ptr<const std::vector<uint8_t>> (eager owned bytes, counted against the retention budget) or std::function<sdk::PayloadView()> (lazy resolver returning Span + BufferAnchor; not counted, bytes owned upstream). resolveEntry dispatches via std::any_cast; each branch handles its own concrete type. No static_pointer_cast on the lazy anchor — its type erasure (BufferAnchor = shared_ptr<const void>) is preserved end-to-end so producers can anchor on any shared_ptr<T>. - ResolvedObjectEntry consolidates on PayloadView (Span + anchor). Removes the previous shared_ptr<const vector<uint8_t>> field, which locked consumers to a concrete anchor type and required a hidden static_pointer_cast in resolveEntry that would fail silently for non-vector anchors. - ObjectStore::flushTo(ObjectStore& dst): two-phase, atomic, zero- copy bulk transfer of entries between two instances. Topics matched by descriptor; monotonicity enforced strictly per series; failure leaves both sides untouched. Each ObjectEntry is moved by value via std::move; the std::any inside transfers its buffer intact. - DataEngine::flushTo(DataEngine& dst): symmetric primitive for the columnar scalar store. Each TopicStorage's sealed-chunk deque is moved through to the destination; no chunk constructor invoked. - sdk::makePayloadView(std::vector<uint8_t>) replaces sdk::makeOwnedPayloadView. Wraps a vector into a shared_ptr that serves as both the bytes backing and the BufferAnchor. - ObjectBytesBox removed from plugin_data_host. The C-ABI toolbox handle becomes a heap-allocated PayloadView directly. plugin_data_host atomic target swap - setTarget on engine and parser write hosts atomically swap which engine/store receives subsequent writes. Safe under concurrent ingest: in-flight writes complete on the previous target. Version bump - 0.4.0 -> 0.5.0. Source-incompatible: ResolvedObjectEntry::data renamed to payload (PayloadView instead of shared_ptr<vector>). Consumers migrate from entry->data->{data,size,empty}() to entry->payload.bytes.{data,size,empty}(); from entry->data to entry->payload.anchor for ownership retention.
1 parent a8d117d commit 0d0a763

13 files changed

Lines changed: 858 additions & 84 deletions

pj_base/include/pj_base/buffer_anchor.hpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
#include <cstdint>
1717
#include <memory>
18+
#include <utility>
19+
#include <vector>
1820

1921
#include "pj_base/span.hpp"
2022

@@ -49,5 +51,19 @@ struct PayloadView {
4951
anchor(std::move(buffer)) {}
5052
};
5153

54+
/// Convenience helper for producers whose only payload is a plain
55+
/// std::vector<uint8_t>. Wraps it in a shared_ptr (which becomes both the
56+
/// owner of the bytes and the type-erased anchor), and returns a PayloadView
57+
/// whose Span points at that vector's contents. Satisfies the contract
58+
/// expected by ObjectStore::resolveEntry on the lazy branch: the anchor is a
59+
/// shared_ptr<const std::vector<uint8_t>> recoverable via static_pointer_cast.
60+
inline PayloadView makePayloadView(std::vector<uint8_t> bytes) {
61+
auto shared = std::make_shared<const std::vector<uint8_t>>(std::move(bytes));
62+
return PayloadView{
63+
Span<const uint8_t>{shared->data(), shared->size()},
64+
BufferAnchor{shared},
65+
};
66+
}
67+
5268
} // namespace sdk
5369
} // namespace PJ

pj_datastore/include/pj_datastore/engine.hpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,26 @@ class DataEngine {
7979
/// derived.onSourceCommitted(engine.commitChunks(writer.flushAll()));
8080
std::vector<PJ::TopicId> commitChunks(std::vector<std::pair<PJ::TopicId, TopicChunk>> chunks);
8181

82-
/// Evict old chunks outside retention window.
82+
/// Evict old chunks outside the retention window.
8383
void enforceRetention(PJ::Timestamp retention_window_ns);
8484

85+
/// Move every committed chunk from this engine into `dst`, leaving this
86+
/// engine's topic storages empty (datasets, topics, schemas, and time
87+
/// domains remain registered). Topics are matched by descriptor
88+
/// (`dataset_id` + `name`); both engines must have the matching topics
89+
/// registered or the call fails without partial mutation. Monotonicity is
90+
/// enforced strictly per topic: the source's earliest chunk timestamp must
91+
/// be greater than or equal to the destination's current `time_max()`.
92+
///
93+
/// Zero-copy on the chunk data: the destination's `std::deque<TopicChunk>`
94+
/// receives the source's chunks via `std::move`, never via copy. The chunk
95+
/// internals (column buffers, value arrays) are pointer/buffer moves.
96+
///
97+
/// Schema compatibility is the caller's responsibility — typically the
98+
/// destination is kept in lockstep with the source through parallel
99+
/// `createDataset` / `createTopic` registration at startup.
100+
PJ::Expected<void, std::string> flushTo(DataEngine& dst);
101+
85102
// Writer/Reader factories
86103
/// Create a writer bound to this engine.
87104
[[nodiscard]] DataWriter createWriter();

pj_datastore/include/pj_datastore/object_store.hpp

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Copyright 2026 Davide Faconti
33
// SPDX-License-Identifier: MPL-2.0
44

5+
#include <any>
56
#include <cstddef>
67
#include <cstdint>
78
#include <deque>
@@ -13,9 +14,9 @@
1314
#include <string>
1415
#include <string_view>
1516
#include <utility>
16-
#include <variant>
1717
#include <vector>
1818

19+
#include "pj_base/buffer_anchor.hpp"
1920
#include "pj_base/expected.hpp"
2021
#include "pj_base/types.hpp"
2122

@@ -40,12 +41,24 @@ struct ObjectTopicDescriptor {
4041

4142
struct ObjectEntry {
4243
Timestamp timestamp = 0;
43-
std::variant<std::shared_ptr<const std::vector<uint8_t>>, std::function<std::vector<uint8_t>()>> payload;
44+
// Holds either a shared_ptr<const std::vector<uint8_t>> (eager owned payload)
45+
// or a std::function<sdk::PayloadView()> (lazy resolver). resolveEntry
46+
// discriminates via std::any_cast.
47+
std::any payload;
4448
};
4549

4650
struct ResolvedObjectEntry {
4751
Timestamp timestamp = 0;
48-
std::shared_ptr<const std::vector<uint8_t>> data;
52+
// PayloadView lets the entry hold an opaque anchor (any shared_ptr<T>) plus
53+
// a non-owning Span over the bytes. Consumers read `payload.bytes` for the
54+
// data; they retain `payload.anchor` if they need the bytes to outlive the
55+
// resolve call. Both `pushOwned` and `pushLazy` paths land here:
56+
// - owned: payload.bytes spans the shared_ptr<vector<uint8_t>>; anchor IS that shared_ptr.
57+
// - lazy: payload is whatever the closure returns; anchor can be any shared_ptr<T>
58+
// (e.g. a C-ABI payload anchor wrapped as shared_ptr<void>).
59+
// resolveEntry never casts the anchor to a concrete type; the type erasure
60+
// stays opaque all the way to the consumer.
61+
sdk::PayloadView payload;
4962
};
5063

5164
struct RetentionBudget {
@@ -108,8 +121,14 @@ class ObjectStore {
108121

109122
Expected<void, std::string> pushOwned(ObjectTopicId id, Timestamp timestamp, std::vector<uint8_t> payload);
110123

111-
Expected<void, std::string> pushLazy(
112-
ObjectTopicId id, Timestamp timestamp, std::function<std::vector<uint8_t>()> fetch);
124+
// The fetch callable is invoked on every read. It returns a PayloadView
125+
// (Span + anchor). When the producer already holds the bytes in memory
126+
// behind a shared_ptr (e.g. a streaming buffer being handed off between
127+
// stores), the closure can capture that shared_ptr and return a view
128+
// backed by it — no copy. For producers that materialize bytes from disk
129+
// or other sources, the closure allocates a fresh buffer and uses it as
130+
// the anchor.
131+
Expected<void, std::string> pushLazy(ObjectTopicId id, Timestamp timestamp, std::function<sdk::PayloadView()> fetch);
113132

114133
// --- Read ---
115134

@@ -136,6 +155,29 @@ class ObjectStore {
136155
void evictBefore(ObjectTopicId id, Timestamp threshold);
137156
void evictAllBefore(Timestamp threshold);
138157

158+
// --- Cross-store flush ---
159+
160+
// Move every entry from this store into `dst`, leaving this store empty
161+
// (topic registrations are preserved). Topics are matched by descriptor
162+
// (dataset_id + topic_name); both stores must have registered the same
163+
// descriptors or the call fails without partial mutation. For each series,
164+
// monotonicity is enforced strictly: the earliest timestamp being moved
165+
// must be greater than or equal to the destination's current last
166+
// timestamp. On any validation failure the call returns an error and
167+
// neither store is mutated.
168+
//
169+
// Zero-copy on the payload bytes. Each ObjectEntry is moved into the
170+
// destination's series by value; the std::variant inside (or, post-#184,
171+
// the std::any) holds either a shared_ptr or a std::function, and moving
172+
// it is a pointer/buffer move — bytes captured by the closure or owned by
173+
// the shared_ptr are never copied or materialized during the flush. Lazy
174+
// entries preserve their semantics in the destination: their closure is
175+
// re-invoked only when the destination is read.
176+
//
177+
// After the move, the destination's retention budget is applied to each
178+
// touched series in normal order.
179+
Expected<void, std::string> flushTo(ObjectStore& dst);
180+
139181
// --- Lifecycle ---
140182

141183
void removeTopic(ObjectTopicId id);

pj_datastore/include/pj_datastore/plugin_data_host.hpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ class DatastoreSourceWriteHost {
3131
[[nodiscard]] PJ_source_write_host_t raw() noexcept;
3232
void flushPending();
3333

34+
// Atomically swap the destination DataEngine. Mirrors the parser write
35+
// host's setTarget: the streaming two-engine flow routes source-level
36+
// scalar pushes to a secondary DataEngine during pause and back to the
37+
// primary on resume. Pending rows are flushed to the current engine
38+
// before the switch. Does not take ownership.
39+
void setTarget(DataEngine* target);
40+
3441
private:
3542
std::unique_ptr<DatastoreSourceWriteHostState> state_;
3643
};
@@ -51,6 +58,13 @@ class DatastoreSourceObjectWriteHost {
5158

5259
[[nodiscard]] PJ_object_write_host_t raw() noexcept;
5360

61+
// Atomically swap the destination store. Used by the streaming two-store
62+
// flow to route pushes to a secondary ObjectStore during pause and back to
63+
// the primary on resume. Must point to a live store; the host does not
64+
// take ownership and the caller must keep the target alive for as long as
65+
// the host can receive pushes against it.
66+
void setTarget(ObjectStore* target) noexcept;
67+
5468
private:
5569
std::unique_ptr<DatastoreSourceObjectWriteHostState> state_;
5670
};
@@ -68,6 +82,14 @@ class DatastoreParserWriteHost {
6882
[[nodiscard]] PJ_parser_write_host_t raw() noexcept;
6983
void flushPending();
7084

85+
// Atomically swap the destination DataEngine. Mirrors the object write
86+
// host's setTarget: the streaming two-store flow routes scalar pushes to a
87+
// secondary DataEngine during pause and back to the primary on resume.
88+
// Pending rows are flushed to the current engine before the switch; the
89+
// bound topic must exist in `target` with the same TopicId (the streaming
90+
// manager registers it lockstep on both engines). Does not take ownership.
91+
void setTarget(DataEngine* target);
92+
7193
private:
7294
std::unique_ptr<DatastoreParserWriteHostState> state_;
7395
};
@@ -111,6 +133,13 @@ class DatastoreParserObjectWriteHost {
111133

112134
[[nodiscard]] PJ_parser_object_write_host_t raw() noexcept;
113135

136+
// Atomically swap the destination store. Used by the streaming two-store
137+
// flow to route pushes to a secondary ObjectStore during pause and back to
138+
// the primary on resume. The bound topic id must exist in `target` (the
139+
// streaming manager ensures this via lockstep registerTopic on both
140+
// stores). The host does not take ownership of the target.
141+
void setTarget(ObjectStore* target) noexcept;
142+
114143
private:
115144
std::unique_ptr<DatastoreParserObjectWriteHostState> state_;
116145
};

pj_datastore/include/pj_datastore/topic_storage.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ struct TopicMetadata {
5757
uint32_t truncated_sample_count = 0;
5858
};
5959

60+
class DataEngine;
61+
6062
/// Storage container for committed chunks of one topic.
6163
class TopicStorage {
6264
public:
@@ -123,6 +125,12 @@ class TopicStorage {
123125
void setArrayExpansionCount(const std::string& field_path, uint32_t count);
124126

125127
private:
128+
// DataEngine::flushTo needs to move sealed_chunks_ between TopicStorage
129+
// instances of different engines without copying. Friending it lets the
130+
// transfer happen entirely inside DataEngine without exposing the move
131+
// primitive on the public TopicStorage API.
132+
friend class DataEngine;
133+
126134
TopicId topic_id_;
127135
TopicDescriptor descriptor_;
128136
std::deque<TopicChunk> sealed_chunks_;

pj_datastore/src/engine.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,63 @@ void DataEngine::enforceRetention(Timestamp retention_window_ns) {
182182
}
183183
}
184184

185+
Expected<void, std::string> DataEngine::flushTo(DataEngine& dst) {
186+
if (&dst == this) {
187+
return PJ::unexpected("flushTo: source and destination are the same engine");
188+
}
189+
190+
// Phase 1: validate. Walk every src topic with sealed chunks and look up
191+
// the matching dst topic by descriptor (dataset_id + name). Verify
192+
// monotonicity against dst's current time_max. No mutation yet.
193+
struct Step {
194+
TopicStorage* src;
195+
TopicStorage* dst;
196+
};
197+
std::vector<Step> plan;
198+
plan.reserve(impl_->topics.size());
199+
200+
for (auto it = impl_->topics.begin(); it != impl_->topics.end(); ++it) {
201+
auto& src_storage = it.value();
202+
if (src_storage.empty()) {
203+
continue;
204+
}
205+
TopicStorage* dst_storage = nullptr;
206+
for (auto dst_it = dst.impl_->topics.begin(); dst_it != dst.impl_->topics.end(); ++dst_it) {
207+
auto& candidate = dst_it.value();
208+
if (candidate.descriptor().dataset_id == src_storage.descriptor().dataset_id &&
209+
candidate.descriptor().name == src_storage.descriptor().name) {
210+
dst_storage = &candidate;
211+
break;
212+
}
213+
}
214+
if (dst_storage == nullptr) {
215+
return PJ::unexpected(
216+
"flushTo: destination has no topic '" + src_storage.descriptor().name + "' for dataset " +
217+
std::to_string(src_storage.descriptor().dataset_id));
218+
}
219+
if (!dst_storage->empty() && src_storage.time_min() < dst_storage->time_max()) {
220+
return PJ::unexpected("flushTo: monotonicity violation for topic '" + src_storage.descriptor().name + "'");
221+
}
222+
plan.push_back({&src_storage, dst_storage});
223+
}
224+
225+
// Phase 2: execute. friend access lets us move sealed_chunks_ directly
226+
// between TopicStorage instances of different engines — the deque move
227+
// transfers chunk ownership without copying any column data or value
228+
// buffers. Each chunk's TopicChunkStats (t_min/t_max/row_count) rides
229+
// along inside the chunk by value, so dst's time_min/time_max queries
230+
// reflect the new state immediately after the move.
231+
for (auto& step : plan) {
232+
auto drained = std::move(step.src->sealed_chunks_);
233+
step.src->sealed_chunks_.clear(); // post-move state: deque is valid but empty.
234+
for (auto& chunk : drained) {
235+
step.dst->sealed_chunks_.push_back(std::move(chunk));
236+
}
237+
}
238+
239+
return {};
240+
}
241+
185242
// ---------------------------------------------------------------------------
186243
// Listing helpers
187244
// ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)