forked from vectordotdev/vector
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontrol_server.rs
More file actions
149 lines (135 loc) · 4.83 KB
/
control_server.rs
File metadata and controls
149 lines (135 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#![allow(missing_docs)]
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use axum::{
extract::{rejection::JsonRejection, Json},
response::{IntoResponse, Response},
routing::{get, post},
Router,
};
use futures::FutureExt;
use http::StatusCode;
use hyper::server::{self, conn::Http};
use serde_json::json;
use stream_cancel::Tripwire;
use tokio::{net::UnixListener, sync::Mutex};
use tokio_stream::wrappers::UnixListenerStream;
use crate::{
config::ConfigBuilder,
topology::{ReloadOutcome, TopologyController},
};
pub struct ControlServer {
listener: UnixListener,
topology_controller: Arc<Mutex<TopologyController>>,
shutdown_signal: Tripwire,
socket_path: PathBuf,
}
impl ControlServer {
pub fn bind(
socket_path: impl AsRef<Path>,
topology_controller: Arc<Mutex<TopologyController>>,
shutdown_signal: Tripwire,
) -> Result<Self, crate::Error> {
// Try to remove any lingering socket from previous runs
if socket_path.as_ref().try_exists()? {
std::fs::remove_file(&socket_path)?;
}
let listener = UnixListener::bind(&socket_path)?;
Ok(ControlServer {
listener,
topology_controller,
shutdown_signal,
socket_path: socket_path.as_ref().to_path_buf(),
})
}
pub async fn run(self) -> Result<(), crate::Error> {
let app = Router::new()
.route("/ping", get(|| async { "pong" }))
.route(
"/config",
post(
|payload: Result<Json<ConfigBuilder>, JsonRejection>| async move {
let Ok(mut controller) = self.topology_controller.try_lock() else {
return Err(ApiError::Locked)
};
let Json(builder) = payload.map_err(ApiError::Json)?;
let new_config = builder.build().map_err(ApiError::Build)?;
match controller.reload(Some(new_config)).await {
ReloadOutcome::Success => Ok(StatusCode::CREATED),
ReloadOutcome::MissingApiKey => Err(ApiError::MissingApiKey),
// TODO: return these errors up from inner topology methods
ReloadOutcome::RolledBack => Err(ApiError::RolledBack(vec![])),
ReloadOutcome::FatalError => Err(ApiError::Fatal(vec![])),
ReloadOutcome::NoConfig => {
unreachable!("Some(config) was passed above")
}
}
},
),
);
let accept = server::accept::from_stream(UnixListenerStream::new(self.listener));
let server = server::Builder::new(accept, Http::new()).serve(app.into_make_service());
let graceful = server.with_graceful_shutdown(self.shutdown_signal.map(|_| ()));
info!(message = "Starting Vector control server.", socket_path = ?self.socket_path);
graceful.await?;
// Try to clean up after ourselves
std::fs::remove_file(&self.socket_path).ok();
Ok(())
}
}
enum ApiError {
Locked,
Json(JsonRejection),
Build(Vec<String>),
MissingApiKey,
RolledBack(Vec<String>),
Fatal(Vec<String>),
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
match self {
ApiError::Locked => (
StatusCode::CONFLICT,
Json(json!({
"reason": "topology currently locked",
})),
),
ApiError::Json(rejection) => (
rejection.status(),
Json(json!({
"reason": rejection.body_text(),
})),
),
ApiError::Build(errors) => (
StatusCode::UNPROCESSABLE_ENTITY,
Json(json!({
"reason": "config build error",
"errors": errors,
})),
),
ApiError::MissingApiKey => (
StatusCode::UNAUTHORIZED,
Json(json!({
"reason": "missing API key",
})),
),
ApiError::RolledBack(errors) => (
StatusCode::UNPROCESSABLE_ENTITY,
Json(json!({
"reason": "reload error, rolled back to previous config",
"errors": errors,
})),
),
ApiError::Fatal(errors) => (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"reason": "fatal reload error, failed to roll back to previous config",
"errors": errors,
})),
),
}
.into_response()
}
}