Skip to content

Commit b8fa280

Browse files
committed
feat: real time notify client sdk
1 parent c8cb6c3 commit b8fa280

5 files changed

Lines changed: 157 additions & 68 deletions

File tree

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ pub mod http;
33
#[cfg(feature = "realtime")]
44
pub mod realtime;
55
pub mod repo;
6+
pub mod secrets;
67

78
pub use base::{FPServerError, ServerConfig};

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod http;
2323
#[cfg(feature = "realtime")]
2424
mod realtime;
2525
mod repo;
26+
mod secrets;
2627

2728
#[tokio::main]
2829
async fn main() -> Result<()> {

src/realtime.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,27 @@ impl RealtimeSocket {
3333
Self { server, port }
3434
}
3535

36-
pub async fn notify_sdk(&self, sdk_key: String, event: &str, data: serde_json::Value) {
37-
trace!("notify_sdk {} {} {:?}", sdk_key, event, data);
38-
self.server.emit_to("/", vec![&sdk_key], event, data).await
36+
pub async fn notify_sdk(
37+
&self,
38+
server_sdk_key: String,
39+
client_sdk_key: Option<String>,
40+
event: &str,
41+
data: serde_json::Value,
42+
) {
43+
trace!(
44+
"notify_sdk {} {:?} {} {:?}",
45+
server_sdk_key,
46+
client_sdk_key,
47+
event,
48+
data
49+
);
50+
51+
let mut keys: Vec<&str> = vec![&server_sdk_key];
52+
if let Some(client_sdk_key) = &client_sdk_key {
53+
keys.push(client_sdk_key);
54+
}
55+
56+
self.server.emit_to("/", keys, event, data).await
3957
}
4058

4159
fn register(payload: Option<Payload>, socket: ServerSocket) -> SocketCallback {

src/repo.rs

Lines changed: 52 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1-
use crate::base::ServerConfig;
21
#[cfg(feature = "realtime")]
32
use crate::realtime::RealtimeSocket;
43
use crate::FPServerError;
4+
use crate::{base::ServerConfig, secrets::SecretMapping};
55
use feature_probe_server_sdk::{EvalDetail, FPConfig, FPUser, FeatureProbe as FPClient, Url};
66
#[cfg(feature = "unstable")]
77
use feature_probe_server_sdk::{Segment, Toggle};
88
use parking_lot::RwLock;
99
use reqwest::Method;
10-
use serde::Deserialize;
1110
use serde_json::Value;
1211
use std::{collections::HashMap, sync::Arc};
1312
use tracing::{debug, error, info};
@@ -17,12 +16,6 @@ pub struct SdkRepository {
1716
inner: Arc<Inner>,
1817
}
1918

20-
#[derive(Deserialize, Debug, Default, Clone)]
21-
struct SecretMapping {
22-
pub version: u128,
23-
pub mapping: HashMap<String, String>,
24-
}
25-
2619
#[derive(Debug)]
2720
struct Inner {
2821
server_config: ServerConfig,
@@ -71,16 +64,13 @@ impl SdkRepository {
7164

7265
pub fn secret_keys(&self) -> HashMap<String, String> {
7366
let secret_mapping = self.inner.secret_mapping.read();
74-
secret_mapping.mapping.clone()
67+
secret_mapping.mapping_clone()
7568
}
7669

7770
pub fn sync(&self, client_sdk_key: String, server_sdk_key: String, version: u128) {
7871
self.inner.sync(&server_sdk_key);
7972
let mut secret_mapping = self.inner.secret_mapping.write();
80-
secret_mapping.version = version;
81-
(*secret_mapping)
82-
.mapping
83-
.insert(client_sdk_key, server_sdk_key);
73+
secret_mapping.insert(client_sdk_key, server_sdk_key, version);
8474
}
8575

8676
pub fn sync_with(&self, keys_url: Url) {
@@ -114,10 +104,7 @@ impl SdkRepository {
114104
Ok(body) => match serde_json::from_str::<SecretMapping>(&body) {
115105
Err(e) => error!("sync_secret_keys json error: {}", e),
116106
Ok(r) => {
117-
debug!(
118-
"sync_secret_keys success. version: {:?}, mapping: {:?}",
119-
r.version, r.mapping
120-
);
107+
debug!("sync_secret_keys success. version: {:?}", r.version(),);
121108
inner.update_mapping(r);
122109
}
123110
},
@@ -130,15 +117,10 @@ impl SdkRepository {
130117

131118
pub fn server_sdk_repo_string(&self, server_sdk_key: &str) -> Result<String, FPServerError> {
132119
let secret_mapping = self.inner.secret_mapping.read();
133-
if secret_mapping.version == 0 {
120+
if secret_mapping.version() == 0 {
134121
return Err(FPServerError::NotReady(server_sdk_key.to_string()));
135122
}
136-
let server_sdk_keys: &[String] = &secret_mapping
137-
.mapping
138-
.clone()
139-
.into_values()
140-
.collect::<Vec<String>>();
141-
if !server_sdk_keys.contains(&server_sdk_key.to_string()) {
123+
if !secret_mapping.contains_server_sdk_key(server_sdk_key) {
142124
return Err(FPServerError::NotFound(server_sdk_key.to_string()));
143125
}
144126
match self.inner.repo_string(server_sdk_key) {
@@ -153,10 +135,10 @@ impl SdkRepository {
153135
user: &FPUser,
154136
) -> Result<String, FPServerError> {
155137
let secret_mapping = self.inner.secret_mapping.read();
156-
if secret_mapping.version == 0 {
138+
if secret_mapping.version() == 0 {
157139
return Err(FPServerError::NotReady(client_sdk_key.to_string()));
158140
}
159-
let server_sdk_key = match secret_mapping.mapping.get(client_sdk_key) {
141+
let server_sdk_key = match secret_mapping.server_sdk_key(client_sdk_key) {
160142
Some(sdk_key) => sdk_key,
161143
None => return Err(FPServerError::NotFound(client_sdk_key.to_string())),
162144
};
@@ -177,28 +159,31 @@ impl Inner {
177159
let sdks = self.sdk_clients.read();
178160
!sdks.contains_key(server_sdk_key)
179161
};
180-
if should_sync {
181-
let mut mut_sdks = self.sdk_clients.write();
182-
let config = FPConfig {
183-
server_sdk_key: server_sdk_key.to_owned(),
184-
remote_url: Url::parse("http://nouse.com").unwrap(),
185-
toggles_url: Some(self.server_config.toggles_url.clone()),
186-
refresh_interval: self.server_config.refresh_interval,
187-
http_client: Some(self.http_client.clone()),
188-
..Default::default()
189-
};
190-
info!("{:?} added", server_sdk_key);
191162

192-
#[cfg(feature = "realtime")]
193-
{
194-
let mut client = FPClient::new(config);
195-
self.setup_notify(server_sdk_key, &mut client);
196-
let _ = &mut_sdks.insert(server_sdk_key.to_owned(), client);
197-
}
163+
if !should_sync {
164+
return;
165+
}
166+
167+
let mut mut_sdks = self.sdk_clients.write();
168+
let config = FPConfig {
169+
server_sdk_key: server_sdk_key.to_owned(),
170+
remote_url: Url::parse("http://nouse.com").unwrap(),
171+
toggles_url: Some(self.server_config.toggles_url.clone()),
172+
refresh_interval: self.server_config.refresh_interval,
173+
http_client: Some(self.http_client.clone()),
174+
..Default::default()
175+
};
176+
info!("{:?} added", server_sdk_key);
198177

199-
#[cfg(not(feature = "realtime"))]
200-
let _ = &mut_sdks.insert(server_sdk_key.to_owned(), FPClient::new(config));
178+
#[cfg(feature = "realtime")]
179+
{
180+
let mut client = FPClient::new(config);
181+
self.setup_notify(server_sdk_key, &mut client);
182+
let _ = &mut_sdks.insert(server_sdk_key.to_owned(), client);
201183
}
184+
185+
#[cfg(not(feature = "realtime"))]
186+
let _ = &mut_sdks.insert(server_sdk_key.to_owned(), FPClient::new(config));
202187
}
203188

204189
pub fn remove_client(&self, server_sdk_key: &str) {
@@ -209,14 +194,14 @@ impl Inner {
209194
pub fn update_clients(&self) {
210195
let secret_mapping = self.secret_mapping.read();
211196
let clients = self.sdk_clients.read().clone();
212-
if secret_mapping.version > 0 {
213-
let mut keys = vec![];
214-
for server_sdk_key in secret_mapping.mapping.values() {
197+
if secret_mapping.version() > 0 {
198+
let server_sdk_keys = secret_mapping.server_sdk_keys();
199+
for server_sdk_key in &server_sdk_keys {
215200
self.sync(server_sdk_key);
216-
keys.push(server_sdk_key.to_string());
217201
}
202+
218203
for server_sdk_key in clients.keys() {
219-
if !keys.contains(server_sdk_key) {
204+
if !server_sdk_keys.contains(&server_sdk_key) {
220205
info!("{:?} removed.", server_sdk_key);
221206
self.remove_client(server_sdk_key);
222207
}
@@ -225,25 +210,29 @@ impl Inner {
225210
}
226211

227212
pub fn update_mapping(&self, new: SecretMapping) {
228-
let version = self.secret_mapping.read().version;
229-
if new.version > version {
213+
let version = self.secret_mapping.read().version();
214+
if new.version() > version {
230215
let mut secret_mapping = self.secret_mapping.write();
231-
secret_mapping.version = new.version;
232-
secret_mapping.mapping = new.mapping;
216+
secret_mapping.update_mapping(new)
233217
}
234218
}
235219

236220
#[cfg(feature = "realtime")]
237221
fn setup_notify(&self, server_sdk_key: &str, client: &mut FPClient) {
238222
let sdk_key = server_sdk_key.to_owned();
239223
let realtime_socket = self.realtime_socket.clone();
224+
let client_sdk_key = {
225+
let mapping = self.secret_mapping.read();
226+
mapping.client_sdk_key(server_sdk_key).cloned()
227+
};
240228

241229
client.set_update_callback(Box::new(move |_old, _new| {
242-
let key = sdk_key.clone();
230+
let server_key = sdk_key.clone();
231+
let client_key = client_sdk_key.clone();
243232
let socket = realtime_socket.clone();
244233
tokio::spawn(async move {
245234
socket
246-
.notify_sdk(key, "update", serde_json::json!(""))
235+
.notify_sdk(server_key, client_key, "update", serde_json::json!(""))
247236
.await;
248237
});
249238
}));
@@ -317,18 +306,16 @@ mod tests {
317306
assert_eq!(secret_keys.get(&client_sdk_key), Some(&server_sdk_key));
318307

319308
// test mapping sync
320-
let mut new = SecretMapping {
321-
version: 2,
322-
..Default::default()
323-
};
324-
new.mapping
325-
.insert(client_sdk_key2.to_string(), server_sdk_key2.to_string());
309+
310+
let mut mapping = HashMap::new();
311+
mapping.insert(client_sdk_key2.to_string(), server_sdk_key2.to_string());
312+
let new = SecretMapping::new(2, mapping);
326313
let clients = { (repository.inner.sdk_clients.read()).clone() };
327314
assert!(clients.contains_key(&server_sdk_key));
328315
repository.inner.update_mapping(new);
329316
let secret_mapping = { (repository.inner.secret_mapping.read()).clone() };
330-
let secret = &secret_mapping.mapping.get(&client_sdk_key2);
331-
assert_eq!(secret_mapping.version, 2);
317+
let secret = &secret_mapping.server_sdk_key(&client_sdk_key2);
318+
assert_eq!(secret_mapping.version(), 2);
332319
assert_eq!(secret.unwrap(), &server_sdk_key2.to_string());
333320

334321
// test clients sync
@@ -354,7 +341,7 @@ mod tests {
354341
let r = serde_json::from_str::<Repository>(&repo_string.unwrap()).unwrap();
355342
assert!(r == repo_from_test_file());
356343
let secret_keys = repository.secret_keys();
357-
let secret_keys_version = repository.inner.secret_mapping.read().version;
344+
let secret_keys_version = repository.inner.secret_mapping.read().version();
358345
assert!(secret_keys_version == 1);
359346
assert!(secret_keys.len() == 1);
360347
assert!(secret_keys.get(&client_sdk_key) == Some(&server_sdk_key));

src/secrets.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use std::{collections::HashMap, ops::Deref};
2+
3+
use serde::Deserialize;
4+
5+
type ClientSdkKey = String;
6+
type ServerSdkKey = String;
7+
8+
#[derive(Deserialize, Debug, Default, Clone)]
9+
pub struct SecretMapping {
10+
version: u128,
11+
mapping: HashMap<ClientSdkKey, ServerSdkKey>,
12+
#[serde(skip)]
13+
reverse: HashMap<ServerSdkKey, ClientSdkKey>,
14+
}
15+
16+
impl SecretMapping {
17+
#[cfg(test)]
18+
pub fn new(version: u128, mapping: HashMap<ClientSdkKey, ServerSdkKey>) -> Self {
19+
let mut reverse = HashMap::new();
20+
for (k, v) in &mapping {
21+
reverse.insert(v.clone(), k.clone());
22+
}
23+
Self {
24+
version,
25+
mapping,
26+
reverse,
27+
}
28+
}
29+
30+
pub fn version(&self) -> u128 {
31+
self.version
32+
}
33+
34+
pub fn update_mapping(&mut self, new: SecretMapping) {
35+
if new.version > self.version {
36+
self.version = new.version;
37+
self.mapping = new.mapping;
38+
39+
let mut reverse = HashMap::new();
40+
for (k, v) in &self.mapping {
41+
reverse.insert(v.clone(), k.clone());
42+
}
43+
44+
self.reverse = reverse;
45+
}
46+
}
47+
48+
pub fn client_sdk_key(&self, server_sdk_key: &str) -> Option<&String> {
49+
self.reverse.get(server_sdk_key)
50+
}
51+
52+
pub fn server_sdk_key(&self, client_sdk_key: &str) -> Option<&String> {
53+
self.mapping.get(client_sdk_key)
54+
}
55+
56+
pub fn server_sdk_keys(&self) -> Vec<&String> {
57+
self.reverse.keys().into_iter().collect()
58+
}
59+
60+
pub fn mapping_clone(&self) -> HashMap<String, String> {
61+
self.mapping.clone()
62+
}
63+
64+
pub fn insert(&mut self, client_sdk_key: String, server_sdk_key: String, version: u128) {
65+
self.version = version;
66+
self.mapping
67+
.insert(client_sdk_key.clone(), server_sdk_key.clone());
68+
self.reverse.insert(server_sdk_key, client_sdk_key);
69+
}
70+
71+
pub fn contains_server_sdk_key(&self, server_sdk_key: &str) -> bool {
72+
self.reverse.contains_key(server_sdk_key)
73+
}
74+
}
75+
76+
impl Deref for SecretMapping {
77+
type Target = HashMap<ClientSdkKey, ServerSdkKey>;
78+
79+
fn deref(&self) -> &Self::Target {
80+
&self.mapping
81+
}
82+
}

0 commit comments

Comments
 (0)