Skip to content

Commit fc4caa8

Browse files
authored
feat: resolve some unwrap
1 parent 17531be commit fc4caa8

4 files changed

Lines changed: 46 additions & 53 deletions

File tree

src/config/etcd.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
1-
use std::{
2-
sync::{Arc, Mutex},
3-
time::Duration,
4-
};
1+
use std::{sync::Arc, time::Duration};
52

6-
use anyhow::{Result, anyhow};
3+
use anyhow::{Context, Result, anyhow};
74
use async_trait::async_trait;
85
use backon::{ConstantBuilder, Retryable};
96
use dashmap::{DashMap, Entry};
107
use etcd_client::{GetOptions, PutOptions, WatchOptions};
118
use log::{debug, error, warn};
129
use serde::Deserialize;
1310
use tokio::{
14-
sync::{Notify, mpsc},
11+
sync::{Mutex, Notify, mpsc},
1512
task::JoinHandle,
1613
time::sleep,
1714
};
@@ -22,8 +19,6 @@ use crate::config::{ConfigEvent, ConfigEventReceiver, ConfigProvider, GetEntry,
2219
const MAX_BACKOFF: Duration = Duration::from_secs(60);
2320
/// Initial backoff delay.
2421
const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
25-
/// Timeout for waiting the watch supervisor task to stop on shutdown.
26-
const SHUTDOWN_WAIT: Duration = Duration::from_secs(10);
2722

2823
#[derive(Clone, Debug, Deserialize)]
2924
pub struct Config {
@@ -66,7 +61,7 @@ impl EtcdConfigProvider {
6661
)
6762
.notify(|err, dur| error!("Failed to connect to etcd: {err}, retrying after {:?}", dur))
6863
.await
69-
.map_err(|err| anyhow!("Failed to connect to etcd: {err}, retry exhausted"))?;
64+
.context("failed to connect to etcd and retry exhausted")?;
7065
let txs = Arc::new(DashMap::<String, mpsc::Sender<ConfigEvent>>::new());
7166
let shutdown = Arc::new(Notify::new());
7267

@@ -379,23 +374,22 @@ impl ConfigProvider for EtcdConfigProvider {
379374
}
380375
}
381376

382-
async fn shutdown(&self) -> anyhow::Result<()> {
377+
async fn shutdown(&self) -> Result<()> {
383378
// Signal the supervisor to stop.
384379
self.shutdown.notify_one();
385380

386381
// Close all dispatch channels so consumers see channel-closed.
387382
self.txs.clear();
388383

389-
// Wait for the supervisor task to exit (with timeout).
390-
let handle = self.supervisor_handle.lock().unwrap().take();
391-
if let Some(h) = handle {
392-
match tokio::time::timeout(SHUTDOWN_WAIT, h).await {
393-
Ok(Ok(())) => {}
394-
Ok(Err(e)) => warn!("etcd supervisor task panicked: {}", e),
395-
Err(_) => warn!(
396-
"etcd supervisor task did not stop within {:?}",
397-
SHUTDOWN_WAIT
398-
),
384+
let handle = self.supervisor_handle.lock().await.take();
385+
if let Some(mut h) = handle {
386+
match tokio::time::timeout(Duration::from_secs(10), &mut h).await {
387+
Ok(joined) => joined.context("failed to shutdown watch supervisor")?,
388+
Err(_) => {
389+
return Err(anyhow!(
390+
"timed out waiting for watch supervisor to shutdown"
391+
));
392+
}
399393
}
400394
}
401395
Ok(())

src/config/types.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ pub mod defaults {
1111
use super::*;
1212

1313
pub fn listen() -> SocketAddr {
14-
"0.0.0.0:3000".parse().unwrap()
14+
SocketAddr::from(([0, 0, 0, 0], 3000))
1515
}
1616

1717
pub fn admin_listen() -> SocketAddr {
18-
"127.0.0.1:3001".parse().unwrap()
18+
SocketAddr::from(([127, 0, 0, 1], 3001))
1919
}
2020

2121
pub fn server() -> Server {

src/main.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{process::exit, sync::Arc};
22

33
use aisix::{config::Config, *};
4+
use anyhow::{Context, Result};
45
use axum::Router;
56
use clap::Parser;
67
use log::{error, info};
@@ -15,16 +16,17 @@ struct Args {
1516
}
1617

1718
#[tokio::main]
18-
async fn main() {
19+
async fn main() -> Result<()> {
1920
let args = Args::parse();
2021

21-
let (ob_shutdown_signal, ob_shutdown_task) = init_observability();
22+
let (ob_shutdown_signal, ob_shutdown_task) =
23+
init_observability().context("failed to initialize observability")?;
2224

23-
let config = Arc::new(config::load(args.config).expect("Failed to load configuration"));
25+
let config = Arc::new(config::load(args.config).context("failed to load configuration")?);
2426

2527
let config_provider = config::create_provider(&config)
2628
.await
27-
.expect("Failed to create config provider");
29+
.context("failed to create config provider")?;
2830
let resources =
2931
Arc::new(config::entities::ResourceRegistry::new(config_provider.clone()).await);
3032

@@ -35,8 +37,11 @@ async fn main() {
3537

3638
let mut exception = false;
3739
select! {
38-
_ = tokio::signal::ctrl_c() => {
39-
info!("Stopping, see you next time!");
40+
res = tokio::signal::ctrl_c() => {
41+
if let Err(e) = res {
42+
error!("Failed to listen for shutdown signal: {}", e);
43+
exception = true;
44+
}
4045
}
4146
res = serve_proxy(config.clone(), proxy_router.clone()) => {
4247
if let Err(e) = res {
@@ -57,14 +62,16 @@ async fn main() {
5762
exception = true;
5863
}
5964

65+
info!("Stopping, see you next time!");
6066
let _ = ob_shutdown_signal.send(());
6167
ob_shutdown_task
6268
.await
63-
.expect("Failed to shutdown observability");
69+
.context("failed to shutdown observability")?;
70+
6471
exit(if exception { 1 } else { 0 });
6572
}
6673

67-
fn init_observability() -> (oneshot::Sender<()>, tokio::task::JoinHandle<()>) {
74+
fn init_observability() -> Result<(oneshot::Sender<()>, tokio::task::JoinHandle<()>)> {
6875
use std::{borrow::Cow, time::Duration};
6976

7077
use fastrace::collector::Config;
@@ -89,7 +96,7 @@ fn init_observability() -> (oneshot::Sender<()>, tokio::task::JoinHandle<()>) {
8996
// log
9097
logforth::starter_log::builder()
9198
.dispatch(|d| {
92-
d.filter(EnvFilterBuilder::from_default_env_or("info").build())
99+
d.filter(EnvFilterBuilder::from_default_env_or("info,opentelemetry_sdk=off").build())
93100
.append(Stdout::default().with_layout(TextLayout::default()))
94101
})
95102
.dispatch(|d| {
@@ -102,7 +109,7 @@ fn init_observability() -> (oneshot::Sender<()>, tokio::task::JoinHandle<()>) {
102109
let reporter = OpenTelemetryReporter::new(
103110
SpanExporter::builder()
104111
.build()
105-
.expect("initialize otlp exporter"),
112+
.context("failed to initialize otlp exporter")?,
106113
Cow::Owned(Resource::builder().build()),
107114
InstrumentationScope::builder(INSTRUMENTATION_NAME)
108115
.with_version(env!("CARGO_PKG_VERSION"))
@@ -114,20 +121,18 @@ fn init_observability() -> (oneshot::Sender<()>, tokio::task::JoinHandle<()>) {
114121
);
115122

116123
// metric
117-
let exporter = opentelemetry_otlp::MetricExporter::builder()
118-
.build()
119-
.unwrap();
124+
let exporter = opentelemetry_otlp::MetricExporter::builder().build()?;
120125

121126
let reader = PeriodicReader::builder(exporter).build();
122127

123128
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
124129
let meter = meter_provider.meter(INSTRUMENTATION_NAME);
125130

126131
metrics::set_global_recorder(OpenTelemetryRecorder::new(meter))
127-
.expect("initialize metrics recorder");
132+
.context("failed to initialize metrics recorder")?;
128133
utils::metrics::describe_metrics();
129134

130-
// shuting down signal handler
135+
// shutting down signal handler
131136
let shutdown_handle = tokio::spawn(async move {
132137
let _ = rx.await;
133138

@@ -141,7 +146,7 @@ fn init_observability() -> (oneshot::Sender<()>, tokio::task::JoinHandle<()>) {
141146
logforth::core::default_logger().exit();
142147
});
143148

144-
(tx, shutdown_handle)
149+
Ok((tx, shutdown_handle))
145150
}
146151

147152
async fn serve_proxy(config: Arc<Config>, router: Router) -> Result<(), std::io::Error> {

src/providers/mock.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,7 @@ impl Provider for MockProvider {
4747
role: "assistant".to_string(),
4848
content: format!(
4949
"Hello! 👋 Current time: {:?}",
50-
std::time::SystemTime::now()
51-
.duration_since(std::time::UNIX_EPOCH)
52-
.unwrap()
53-
.as_nanos() as u64
50+
epoch_duration().as_nanos() as u64
5451
),
5552
},
5653
finish_reason: Some("stop".to_string()),
@@ -68,10 +65,7 @@ impl Provider for MockProvider {
6865
request: ChatCompletionRequest,
6966
) -> Result<BoxStream<'static, Result<ChatCompletionChunk, ProviderError>>, ProviderError> {
7067
let id = "ae343f5c-6383-4c33-90e3-26421324b5c5".to_string();
71-
let created = std::time::SystemTime::now()
72-
.duration_since(std::time::UNIX_EPOCH)
73-
.unwrap()
74-
.as_secs();
68+
let created = epoch_duration().as_secs();
7569
let model = request.model.clone();
7670

7771
let mut chunks: Vec<ChatCompletionChunk> = Vec::new();
@@ -93,13 +87,7 @@ impl Provider for MockProvider {
9387
usage: None,
9488
});
9589

96-
let time = format!(
97-
"{:?}",
98-
std::time::SystemTime::now()
99-
.duration_since(std::time::UNIX_EPOCH)
100-
.unwrap()
101-
.as_nanos() as u64
102-
);
90+
let time = format!("{:?}", epoch_duration().as_nanos() as u64);
10391
let latest_message = request
10492
.messages
10593
.iter()
@@ -238,6 +226,12 @@ impl Provider for MockProvider {
238226
}
239227
}
240228

229+
fn epoch_duration() -> Duration {
230+
std::time::SystemTime::now()
231+
.duration_since(std::time::UNIX_EPOCH)
232+
.expect("system clock must be after UNIX_EPOCH")
233+
}
234+
241235
fn embedding_inputs(request: &EmbeddingRequest) -> Vec<String> {
242236
let value = serde_json::to_value(&request.input).unwrap_or(Value::Null);
243237
match value {

0 commit comments

Comments
 (0)