Skip to content

Commit e1db443

Browse files
committed
feat: add exception details
1 parent 486c6cd commit e1db443

File tree

16 files changed

+171
-80
lines changed

16 files changed

+171
-80
lines changed

agent/src/dispatcher/base_dispatcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ impl BaseDispatcher {
302302
}
303303
while !leaky_bucket.acquire(1) {
304304
counter.get_token_failed.fetch_add(1, Ordering::Relaxed);
305-
exception_handler.set(Exception::RxPpsThresholdExceeded);
305+
exception_handler.set(Exception::RxPpsThresholdExceeded, None);
306306
thread::sleep(Duration::from_millis(1));
307307
}
308308

agent/src/ebpf_dispatcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ impl EbpfDispatcher {
490490

491491
if !leaky_bucket.acquire(1) {
492492
counter.get_token_failed.fetch_add(1, Ordering::Relaxed);
493-
exception_handler.set(Exception::RxPpsThresholdExceeded);
493+
exception_handler.set(Exception::RxPpsThresholdExceeded, None);
494494
continue;
495495
}
496496

agent/src/exception.rs

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,19 @@
1414
* limitations under the License.
1515
*/
1616

17+
use std::collections::HashMap;
1718
use std::sync::{
1819
atomic::{AtomicU64, Ordering},
19-
Arc,
20+
Arc, Mutex,
2021
};
2122

2223
use public::proto::agent::Exception;
2324

2425
#[derive(Clone, Debug, Default)]
25-
pub struct ExceptionHandler(Arc<AtomicU64>);
26+
pub struct ExceptionHandler {
27+
exception: Arc<AtomicU64>,
28+
descriptions: Arc<Mutex<HashMap<u64, String>>>,
29+
}
2630

2731
impl ExceptionHandler {
2832
const AUTO_CLEAR_BITS: u64 = Exception::NpbNoGwArp as u64
@@ -38,21 +42,52 @@ impl ExceptionHandler {
3842
| Exception::NpbSocketError as u64
3943
| Exception::DataBpsThresholdExceeded as u64;
4044

41-
pub fn set(&self, e: Exception) {
42-
self.0.fetch_or(e as u64, Ordering::SeqCst);
45+
pub fn set(&self, e: Exception, description: Option<String>) {
46+
self.exception.fetch_or(e as u64, Ordering::SeqCst);
47+
if let Some(d) = description {
48+
self.descriptions.lock().unwrap().insert(e as u64, d);
49+
}
4350
}
4451

4552
pub fn has(&self, e: Exception) -> bool {
4653
let e = e as u64;
47-
self.0.load(Ordering::Relaxed) & e == e
54+
self.exception.load(Ordering::Relaxed) & e == e
4855
}
4956

5057
pub fn clear(&self, e: Exception) {
51-
self.0.fetch_and(!(e as u64), Ordering::SeqCst);
58+
self.exception.fetch_and(!(e as u64), Ordering::SeqCst);
59+
self.descriptions.lock().unwrap().remove(&(e as u64));
5260
}
5361

54-
pub fn take(&self) -> u64 {
55-
self.0.fetch_and(!Self::AUTO_CLEAR_BITS, Ordering::SeqCst)
62+
pub fn take(&self) -> (u64, Option<String>) {
63+
let bits = self
64+
.exception
65+
.fetch_and(!Self::AUTO_CLEAR_BITS, Ordering::SeqCst);
66+
let mut descriptions = self.descriptions.lock().unwrap();
67+
let mut result = vec![];
68+
for i in 0..64 {
69+
let bit = 1u64 << i;
70+
if bits & bit == bit {
71+
let mut description = if bit & Self::AUTO_CLEAR_BITS == bit {
72+
descriptions.remove(&bit)
73+
} else {
74+
descriptions.get(&bit).cloned()
75+
};
76+
77+
if let Some(d) = description.take() {
78+
result.push(d);
79+
}
80+
}
81+
}
82+
83+
(
84+
bits,
85+
if result.is_empty() {
86+
None
87+
} else {
88+
Some(result.join(";"))
89+
},
90+
)
5691
}
5792
}
5893

@@ -65,9 +100,9 @@ mod tests {
65100
let mut expected = 0u64;
66101
let h = ExceptionHandler::default();
67102

68-
h.set(Exception::DiskNotEnough);
103+
h.set(Exception::DiskNotEnough, None);
69104
expected |= Exception::DiskNotEnough as u64;
70-
assert_eq!(h.take(), expected);
105+
assert_eq!(h.take().0, expected);
71106

72107
let exceptions = vec![
73108
Exception::DiskNotEnough,
@@ -79,17 +114,39 @@ mod tests {
79114
];
80115
expected = 0;
81116
for e in exceptions {
82-
h.set(e);
117+
h.set(e, None);
83118
expected |= e as u64;
84-
assert_eq!(h.0.load(Ordering::Relaxed), expected);
119+
assert_eq!(h.exception.load(Ordering::Relaxed), expected);
85120
}
86121

87122
h.clear(Exception::DiskNotEnough);
88123
expected &= !(Exception::DiskNotEnough as u64);
89-
assert_eq!(h.0.load(Ordering::Relaxed), expected);
124+
assert_eq!(h.exception.load(Ordering::Relaxed), expected);
90125

91-
assert_eq!(h.take(), expected);
126+
assert_eq!(h.take().0, expected);
92127
expected &= !(ExceptionHandler::AUTO_CLEAR_BITS);
93-
assert_eq!(h.0.load(Ordering::Relaxed), expected);
128+
assert_eq!(h.exception.load(Ordering::Relaxed), expected);
129+
130+
let h = ExceptionHandler::default();
131+
h.set(
132+
Exception::ControllerSocketError,
133+
Some("controller socket error".to_string()),
134+
);
135+
h.set(
136+
Exception::MemNotEnough,
137+
Some("memory not enough".to_string()),
138+
);
139+
let (bits, descriptions) = h.take();
140+
assert_eq!(
141+
bits,
142+
(Exception::ControllerSocketError as u64) | (Exception::MemNotEnough as u64)
143+
);
144+
assert_eq!(
145+
descriptions,
146+
Some("memory not enough;controller socket error".to_string())
147+
);
148+
let (bits, descriptions) = h.take();
149+
assert_eq!(bits, Exception::MemNotEnough as u64);
150+
assert_eq!(descriptions, Some("memory not enough".to_string()));
94151
}
95152
}

agent/src/integration_collector.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,9 @@ async fn aggregate_with_catch_exception(
217217
.body(e.to_string().into())
218218
.unwrap()
219219
} else {
220-
error!("integration collector error: {}", e);
221-
exception_handler.set(Exception::IntegrationSocketError);
220+
let error_msg = format!("integration collector error: {}", e);
221+
error!("{}", error_msg);
222+
exception_handler.set(Exception::IntegrationSocketError, Some(error_msg));
222223
Response::builder()
223224
.status(StatusCode::INTERNAL_SERVER_ERROR)
224225
.body(e.to_string().into())
@@ -1216,8 +1217,13 @@ impl MetricServer {
12161217
sleep(Duration::from_secs(1));
12171218
continue;
12181219
}
1219-
error!("integration collector error: {} with addr={}", e, addr);
1220-
exception_handler.set(Exception::IntegrationSocketError);
1220+
let error_msg = format!(
1221+
"integration collector error: {} with addr={}",
1222+
e, addr
1223+
);
1224+
error!("{}", error_msg);
1225+
exception_handler
1226+
.set(Exception::IntegrationSocketError, Some(error_msg));
12211227
sleep(Duration::from_secs(60));
12221228
continue;
12231229
}
@@ -1306,8 +1312,9 @@ impl MetricServer {
13061312
info!("integration collector started");
13071313
info!("integration collector listening on http://{}", addr);
13081314
if let Err(e) = server.await {
1309-
error!("external metric collector error: {}", e);
1310-
exception_handler.set(Exception::IntegrationSocketError);
1315+
let error_msg = format!("external metric collector error: {}", e);
1316+
error!("{}", error_msg);
1317+
exception_handler.set(Exception::IntegrationSocketError, Some(error_msg));
13111318
}
13121319
}
13131320

agent/src/platform/kubernetes/api_watcher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -699,8 +699,8 @@ impl ApiWatcher {
699699
}
700700
Err(e) => {
701701
let err = format!("kubernetes_api_sync grpc call failed: {}", e);
702-
exception_handler.set(Exception::ControllerSocketError);
703702
error!("{}", err);
703+
exception_handler.set(Exception::ControllerSocketError, Some(err.clone()));
704704
err_msgs.lock().unwrap().push(err);
705705
return;
706706
}
@@ -736,8 +736,8 @@ impl ApiWatcher {
736736
.block_on(session.grpc_kubernetes_api_sync_with_statsd(msg))
737737
{
738738
let err = format!("kubernetes_api_sync grpc call failed: {}", e);
739-
exception_handler.set(Exception::ControllerSocketError);
740739
error!("{}", err);
740+
exception_handler.set(Exception::ControllerSocketError, Some(err.clone()));
741741
err_msgs.lock().unwrap().push(err);
742742
}
743743
}

agent/src/platform/synchronizer.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,7 @@ impl Synchronizer {
330330
}
331331
}
332332
Err(e) => {
333-
args.exception_handler.set(Exception::ControllerSocketError);
334-
error!(
333+
let error_msg = format!(
335334
"send platform {} with genesis_sync grpc call failed: {}",
336335
if args.version == args.peer_version {
337336
"heartbeat"
@@ -340,6 +339,9 @@ impl Synchronizer {
340339
},
341340
e
342341
);
342+
error!("{}", error_msg);
343+
args.exception_handler
344+
.set(Exception::ControllerSocketError, Some(error_msg));
343345
if !Self::wait_for_running(&args.running, &args.timer, config.sync_interval)
344346
{
345347
break 'outer;

agent/src/rpc/remote_exec.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,10 @@ impl Interior {
292292
let mut stream = match client.remote_execute(responser).await {
293293
Ok(stream) => stream,
294294
Err(e) => {
295-
warn!("calling server remote_execute rpc failed: {:?}", e);
296-
self.exc.set(pb::Exception::ControllerSocketError);
295+
let error_msg = format!("calling server remote_execute rpc failed: {:?}", e);
296+
warn!("{}", error_msg);
297+
self.exc
298+
.set(pb::Exception::ControllerSocketError, Some(error_msg));
297299
tokio::time::sleep(RPC_RETRY_INTERVAL).await;
298300
continue;
299301
}
@@ -312,8 +314,11 @@ impl Interior {
312314
break;
313315
}
314316
Err(e) => {
315-
warn!("receiving server remote_execute rpc has error: {:?}", e);
316-
self.exc.set(pb::Exception::ControllerSocketError);
317+
let error_msg =
318+
format!("receiving server remote_execute rpc has error: {:?}", e);
319+
warn!("{}", error_msg);
320+
self.exc
321+
.set(pb::Exception::ControllerSocketError, Some(error_msg));
317322
tokio::time::sleep(RPC_RECONNECT_INTERVAL).await;
318323
break;
319324
}

agent/src/rpc/session.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,10 @@ impl Session {
228228
self.client.write().channel.replace(channel);
229229
}
230230
Err(e) => {
231-
self.exception_handler.set(Exception::ControllerSocketError);
232-
error!("{}", e);
231+
let error_msg = format!("Failed to dial controller: {}", e);
232+
error!("{}", error_msg);
233+
self.exception_handler
234+
.set(Exception::ControllerSocketError, Some(error_msg));
233235
}
234236
}
235237
}

0 commit comments

Comments
 (0)