forked from dragonflydb/dragonfly
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserializer_base.h
More file actions
132 lines (100 loc) · 4.97 KB
/
serializer_base.h
File metadata and controls
132 lines (100 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright 2026, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <absl/container/flat_hash_map.h>
#include <vector>
#include "server/db_slice.h"
#include "server/journal/types.h"
#include "server/synchronization.h"
#include "server/table.h"
#include "server/tiered_storage.h"
namespace dfly {
class ExecutionState;
// Opaque identity for a physical DashTable bucket — its memory address.
// Unique across all databases/segments for the lifetime of a serialization.
using BucketIdentity = uintptr_t;
// Tracks serialization progress of offloaded (delayed) entries.
struct DelayedEntryHandler {
void EnqueueOffloaded(BucketIdentity bucket, DbIndex db_index, PrimeKey pk, const PrimeValue& pv,
time_t expire_time, uint32_t mc_flags);
// Must be called periodically to progress on delayed entries. Calls SerializeFetchedEntry.
// If force is false, only serializes entries whose futures are already resolved.
// If flush_bucket is provided, flushes all entries belonging to this bucket.
void ProcessDelayedEntries(bool force, BucketIdentity flush_bucket, ExecutionState* cntx);
// Serialize delayed entry that was fetched with serializer specific implementation
virtual void SerializeFetchedEntry(const TieredDelayedEntry& tde, const PrimeValue& pv) = 0;
private:
// Entries that are waiting for tiered storage reads to complete before they can be serialized.
std::multimap<BucketIdentity, std::unique_ptr<TieredDelayedEntry>> delayed_entries_;
};
// SerializerBase owns the DbSlice change-listener registration and a per-bucket
// state machine that tracks each bucket through:
//
// NotVisited -> Serializing -> (DelayedPending ->) Covered
//
// NotVisited and Covered are implicit (bucket absent from the map).
// Only transient states (Serializing, DelayedPending) are stored in the map.
//
// State tracking is purely observational in early PRs: it drives DCHECKs and
// stats but does not alter the serialization control flow.
class SerializerBase : public DelayedEntryHandler {
public:
struct Stats {
uint64_t keys_serialized = 0; // total number of keys serialized
uint64_t buckets_serialized = 0; // total number of buckets serialized
uint64_t buckets_on_change = 0; // buckets serialized by OnChangeBlocking flow
uint64_t buckets_skipped = 0; // already Covered when seen
uint64_t change_during_serialization = 0; // change hit an in-flight bucket
};
explicit SerializerBase(DbSlice* slice, ExecutionState* cntx);
virtual ~SerializerBase();
// Register db_slice change listener and save snapshot it
void RegisterChangeListener();
// Unregisters the callback. Safe to call if already unregistered.
void UnregisterChangeListener();
const Stats& GetStats() const {
return stats_;
}
protected:
// Phase of an in-flight bucket (only stored while transient).
enum class BucketPhase : uint8_t {
kSerializing, // bucket is being iterated by the main loop / OnChangeBlocking
kDelayedPending, // all entries serialized but tiered reads still in-flight
};
// Process bucket if needed
bool ProcessBucket(DbIndex db_index, PrimeTable::bucket_iterator it, bool on_update);
// Serialize a single bucket. Returns the number of entries serialized.
// To be implemented by classses extending this base class.
// Currently runs with big_value_mu_ held.
virtual unsigned SerializeBucketLocked(DbIndex db_index, PrimeTable::bucket_iterator it,
bool on_update) = 0;
// Called when an existing bucket is about to be mutated. Calls ProcessBucket.
void OnChangeBlocking(DbIndex db_index, PrimeTable::bucket_iterator it);
// Called when a new key is about to be inserted. Calls ProcessBucket for the buckets.
void OnChangeBlocking(DbIndex db_index, const PrimeTable::BucketSet& set);
// --- Shared members (to be moved from subclasses in later PRs) ---
DbSlice* const db_slice_;
ExecutionState* const base_cntx_;
DbTableArray db_array_;
uint64_t snapshot_version_ = 0;
ThreadLocalMutex big_value_mu_;
Stats stats_;
private:
friend class SerializerBaseTest;
SerializerBase() : db_slice_(nullptr), base_cntx_(nullptr) {
}
// Return identity if bucket should be processed.
// Checks bucket validity, version and state
bool ShouldProcessBucket(PrimeTable::bucket_iterator);
// Process single bucket and call SerializeBucket. Return true if processed, false if skipped
bool ProcessBucketInternal(DbIndex db_index, PrimeTable::bucket_iterator it, bool on_update);
// Transition bucket from NotVisited -> Serializing.
void MarkBucketSerializing(BucketIdentity bid);
// Transition bucket from Serializing -> Covered (empty delayed) or
// Serializing -> DelayedPending (non-empty delayed).
void FinishBucketIteration(BucketIdentity bid);
absl::flat_hash_map<BucketIdentity, BucketPhase> bucket_states_;
uint64_t change_cb_id_ = 0;
};
} // namespace dfly