@@ -2,102 +2,97 @@ mod storage;
22mod network;
33mod tokenizer;
44
5- use actix_web:: { web, App , HttpServer } ;
5+ use actix_web:: { web, App , HttpServer , Responder } ;
66use once_cell:: sync:: OnceCell ;
77use reqwest;
8- use serde_json:: Value ;
8+ use serde_json:: json ;
99use std:: collections:: HashMap ;
1010use std:: env;
1111use std:: process;
12- use std:: sync:: Mutex ;
13-
12+ use std:: sync:: Arc ;
13+ use tokio :: sync :: RwLock ;
1414use storage:: engine:: {
1515 AppState , ClusterData , delete_value, get_value, get_table_store, join_cluster, put_value,
16+ get_all_keys, get_multiple_keys, NodeInfo , NodeStatus , current_timestamp,
17+ VersionedValue ,
1618} ;
1719use storage:: persistance:: { cold_save, load_all_tables} ;
18- use crate :: storage :: engine :: { get_all_keys , get_multiple_keys } ;
20+ use network :: broadcaster :: { membership_sync , heartbeat , get_membership , update_membership } ;
1921
2022/// Declare APP_STATE globally so that it’s available throughout the module.
2123static APP_STATE : OnceCell < web:: Data < AppState > > = OnceCell :: new ( ) ;
2224
2325/// Merge the remote state for a given table into the local state.
24- /// For each key (i.e. partition) from the remote state:
25- /// - If the key already exists, update its attributes.
26- /// - Otherwise, insert the new key with its attributes.
2726fn merge_table_state (
28- local : & mut HashMap < String , HashMap < String , Value > > ,
29- remote : HashMap < String , HashMap < String , Value > > ,
27+ local : & mut HashMap < String , VersionedValue > ,
28+ remote : HashMap < String , VersionedValue > ,
3029) {
31- for ( key, remote_partition ) in remote {
30+ for ( key, remote_val ) in remote {
3231 local
3332 . entry ( key)
34- . and_modify ( |local_partition | {
35- for ( attr , value ) in remote_partition . iter ( ) {
36- local_partition . insert ( attr . clone ( ) , value . clone ( ) ) ;
33+ . and_modify ( |local_val | {
34+ if remote_val . version > local_val . version {
35+ * local_val = remote_val . clone ( ) ;
3736 }
3837 } )
39- . or_insert ( remote_partition ) ;
38+ . or_insert ( remote_val ) ;
4039 }
4140}
4241
4342/// Merge the entire global remote store into the local store.
44- /// Both stores have the type:
45- /// HashMap<table_name, HashMap<partition_key, HashMap<attribute, Value>>>
4643fn merge_global_store (
47- local : & mut HashMap < String , HashMap < String , HashMap < String , Value > > > ,
48- remote : HashMap < String , HashMap < String , HashMap < String , Value > > > ,
44+ local : & mut HashMap < String , HashMap < String , VersionedValue > > ,
45+ remote : HashMap < String , HashMap < String , VersionedValue > > ,
4946) {
50- for ( table_name , remote_table) in remote {
47+ for ( table , remote_table) in remote {
5148 local
52- . entry ( table_name . clone ( ) )
49+ . entry ( table . clone ( ) )
5350 . and_modify ( |local_table| {
54- // Clone remote_table so we don't move it
5551 merge_table_state ( local_table, remote_table. clone ( ) ) ;
5652 } )
5753 . or_insert ( remote_table) ;
5854 }
5955}
6056
57+ async fn get_global_store ( state : web:: Data < AppState > ) -> impl Responder {
58+ let store = state. store . read ( ) . await ;
59+ web:: Json ( store. clone ( ) )
60+ }
61+
6162#[ actix_web:: main]
6263async fn main ( ) -> std:: io:: Result < ( ) > {
6364 // Usage: <program> <current_node_address> [join_node_address]
6465 let args: Vec < String > = env:: args ( ) . collect ( ) ;
6566 if args. len ( ) < 2 {
66- eprintln ! (
67- "Usage: {} <current_node_address> [join_node_address]" ,
68- args[ 0 ]
69- ) ;
67+ eprintln ! ( "Usage: {} <current_node_address> [join_node_address]" , args[ 0 ] ) ;
7068 process:: exit ( 1 ) ;
7169 }
7270 let current_node = args[ 1 ] . clone ( ) ;
7371 // Use the current node address for binding.
7472 let bind_addr = current_node. clone ( ) ;
7573
7674 // Initialize the local in‑memory multi‑table key–value store.
77- // Each table name maps to a key–value store:
78- // table_name -> (key -> attributes)
7975 let state = web:: Data :: new ( AppState {
80- // Specify a base directory for table folders.
8176 base_dir : "./data" ,
82- store : Mutex :: new ( HashMap :: new ( ) ) ,
77+ store : RwLock :: new ( HashMap :: new ( ) ) ,
78+ write_quorum : 0 ,
79+ read_quorum : 0 ,
8380 } ) ;
84- // Set the global APP_STATE pointer.
8581 let _ = APP_STATE . set ( state. clone ( ) ) ;
8682
87- // Load local cold storage from disk by enumerating each table folder .
88- match load_all_tables ( & state) {
83+ // Load cold storage (snapshot + WAL replay) from disk .
84+ match load_all_tables ( & state) . await {
8985 Ok ( _) => println ! ( "Cold storage loaded" ) ,
9086 Err ( e) => eprintln ! ( "Error loading cold storage: {}" , e) ,
9187 }
9288
9389 // (Optionally) Ensure the default table exists in memory.
9490 {
95- let default_table = "default" . to_string ( ) ;
96- let mut store = state. store . lock ( ) . unwrap ( ) ;
97- store. entry ( default_table) . or_insert ( HashMap :: new ( ) ) ;
91+ let mut store = state. store . write ( ) . await ;
92+ store. entry ( "default" . to_string ( ) ) . or_insert ( HashMap :: new ( ) ) ;
9893 }
9994
100- // If a join node is provided, join its cluster and pull its global state.
95+ // If a join node is provided, join its cluster and merge its global state.
10196 if args. len ( ) >= 3 {
10297 let join_node = args[ 2 ] . clone ( ) ;
10398
@@ -106,46 +101,38 @@ async fn main() -> std::io::Result<()> {
106101 let join_url = format ! ( "http://{}/join" , join_node) ;
107102 match client
108103 . post ( & join_url)
109- . json ( & serde_json :: json!( { "node" : current_node } ) )
104+ . json ( & json ! ( { "node" : current_node } ) )
110105 . send ( )
111106 . await
112107 {
113108 Ok ( response) => {
114109 if response. status ( ) . is_success ( ) {
115- if let Ok ( nodes) = response. json :: < Vec < String > > ( ) . await {
110+ if let Ok ( nodes) = response. json :: < HashMap < String , NodeInfo > > ( ) . await {
116111 println ! ( "Joined cluster: {:?}" , nodes) ;
117112 }
118113 } else {
119- println ! (
120- "Failed to join cluster (status = {})" ,
121- response. status( )
122- ) ;
114+ println ! ( "Failed to join cluster (status = {})" , response. status( ) ) ;
123115 }
124116 }
125117 Err ( e) => println ! ( "Error joining cluster: {}" , e) ,
126118 }
127119
128120 // Pull the remote state for all tables.
129- // This requires that the remote node exposes a global state endpoint at GET /store.
130121 let store_url = format ! ( "http://{}/store" , join_node) ;
131- let client = reqwest:: Client :: new ( ) ;
132122 match client. get ( & store_url) . send ( ) . await {
133123 Ok ( resp) => {
134124 if resp. status ( ) . is_success ( ) {
135- let remote_store: HashMap <
136- String ,
137- HashMap < String , HashMap < String , Value > > ,
138- > = resp . json ( ) . await . unwrap_or_else ( |e| {
139- eprintln ! ( "Failed to parse global cold storage: {}" , e) ;
140- HashMap :: new ( )
141- } ) ;
125+ let remote_store = resp
126+ . json :: < HashMap < String , HashMap < String , VersionedValue > > > ( )
127+ . await
128+ . unwrap_or_else ( |e| {
129+ eprintln ! ( "Failed to parse global cold storage: {}" , e) ;
130+ HashMap :: new ( )
131+ } ) ;
142132 {
143- let mut store = state. store . lock ( ) . unwrap ( ) ;
133+ let mut store = state. store . write ( ) . await ;
144134 merge_global_store ( & mut store, remote_store) ;
145- println ! (
146- "Merged global cold storage from {}: {:?}" ,
147- join_node, * store
148- ) ;
135+ println ! ( "Merged global cold storage: {:?}" , * store) ;
149136 }
150137 } else {
151138 eprintln ! (
@@ -158,33 +145,32 @@ async fn main() -> std::io::Result<()> {
158145 Err ( e) => {
159146 eprintln ! (
160147 "Error fetching global cold storage from {}: {}" ,
161- join_node, e,
148+ join_node, e
162149 ) ;
163150 }
164151 }
165152 }
166153
167154 // Spawn the periodic cold save task.
168- tokio:: spawn ( {
169- let state = state. clone ( ) ;
170- async move {
171- cold_save ( state, 30 ) . await ;
172- }
173- } ) ;
155+ tokio:: spawn ( cold_save ( state. clone ( ) , 30 ) ) ;
174156
175157 // Initialize cluster data with dynamic membership.
158+ let mut initial_nodes = HashMap :: new ( ) ;
159+ initial_nodes. insert (
160+ current_node. clone ( ) ,
161+ NodeInfo {
162+ status : NodeStatus :: Active ,
163+ last_heartbeat : current_timestamp ( ) ,
164+ } ,
165+ ) ;
176166 let cluster_data = web:: Data :: new ( ClusterData {
177- nodes : std :: sync :: Arc :: new ( Mutex :: new ( vec ! [ current_node . clone ( ) ] ) ) ,
167+ nodes : Arc :: new ( RwLock :: new ( initial_nodes ) ) ,
178168 } ) ;
179169
180- // Spawn the membership synchronization background task.
181- let cluster_data_clone = cluster_data. clone ( ) ;
182- let current_node_clone = current_node. clone ( ) ;
183- tokio:: spawn ( network:: broadcaster:: membership_sync (
184- cluster_data_clone,
185- current_node_clone,
186- 60 ,
187- ) ) ;
170+ // Spawn the membership synchronization (heartbeat) task.
171+ let cluster_clone = cluster_data. clone ( ) ;
172+ let current_clone = current_node. clone ( ) ;
173+ tokio:: spawn ( membership_sync ( cluster_clone, current_clone, 60 ) ) ;
188174
189175 println ! ( "Starting distributed DB engine at http://{}" , current_node) ;
190176
@@ -194,32 +180,22 @@ async fn main() -> std::io::Result<()> {
194180 . app_data ( state. clone ( ) )
195181 . app_data ( cluster_data. clone ( ) )
196182 . app_data ( web:: Data :: new ( current_node. clone ( ) ) )
197- // Endpoint to join the cluster .
183+ // Cluster management endpoints .
198184 . route ( "/join" , web:: post ( ) . to ( join_cluster) )
199- // Endpoints for updating/fetching membership.
200- . route (
201- "/membership" ,
202- web:: get ( ) . to ( network:: broadcaster:: get_membership) ,
203- )
204- . route (
205- "/update_membership" ,
206- web:: post ( ) . to ( network:: broadcaster:: update_membership) ,
207- )
208- // Key endpoints with multi‑table support: the table name is part of the URL.
185+ . route ( "/membership" , web:: get ( ) . to ( get_membership) )
186+ . route ( "/update_membership" , web:: post ( ) . to ( update_membership) )
187+ . route ( "/heartbeat" , web:: get ( ) . to ( heartbeat) )
188+ // Key–value endpoints with multi‑table support.
209189 . route ( "/{table}/key/{key}" , web:: get ( ) . to ( get_value) )
210190 . route ( "/{table}/key/{key}" , web:: put ( ) . to ( put_value) )
211191 . route ( "/{table}/key/{key}" , web:: delete ( ) . to ( delete_value) )
212192 // Endpoint to fetch a table’s entire in‑memory store.
213193 . route ( "/{table}/store" , web:: get ( ) . to ( get_table_store) )
214194 // Global endpoint returning the entire in‑memory store.
215- . route ( "/store" , web:: get ( ) . to ( |state : web:: Data < AppState > | async move {
216- web:: Json ( state. store . lock ( ) . unwrap ( ) . clone ( ) )
217- } ) )
218- // Endpoint to get all key names in a table.
195+ . route ( "/store" , web:: get ( ) . to ( get_global_store) )
196+ // Endpoints to get keys from a table.
219197 . route ( "/{table}/keys" , web:: get ( ) . to ( get_all_keys) )
220- // Endpoint to retrieve multiple keys from a table (using a JSON body).
221198 . route ( "/{table}/keys" , web:: post ( ) . to ( get_multiple_keys) )
222-
223199 } )
224200 . bind ( bind_addr. as_str ( ) ) ?
225201 . run ( )
0 commit comments