Skip to content

Commit 2340842

Browse files
committed
make TCP interface less error-prone
1 parent efeb82b commit 2340842

1 file changed

Lines changed: 58 additions & 22 deletions

File tree

open-codegen/opengen/templates/tcp/tcp_server.rs

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ extern crate clap;
99

1010
use std::{
1111
io::{prelude::Read, Write},
12-
net::TcpListener,
12+
net::{TcpListener, TcpStream},
1313
};
1414

1515
use clap::{Arg, App};
@@ -80,14 +80,35 @@ struct OptimizerSolution<'a> {
8080
cost: f64,
8181
}
8282

83-
fn pong(stream: &mut std::net::TcpStream, code: i32) {
84-
let error_message = format!(
83+
fn write_bytes_to_stream(stream: &mut TcpStream, payload: &[u8], context: &str) -> bool {
84+
if let Err(err) = stream.write_all(payload) {
85+
warn!("{}: {}", context, err);
86+
return false;
87+
}
88+
true
89+
}
90+
91+
fn write_json_to_stream<T: Serialize>(
92+
stream: &mut TcpStream,
93+
payload: &T,
94+
context: &str,
95+
) -> bool {
96+
let payload_json = match serde_json::to_vec_pretty(payload) {
97+
Ok(payload_json) => payload_json,
98+
Err(err) => {
99+
error!("{}: {}", context, err);
100+
return false;
101+
}
102+
};
103+
write_bytes_to_stream(stream, &payload_json, context)
104+
}
105+
106+
fn pong(stream: &mut TcpStream, code: i32) {
107+
let pong_message = format!(
85108
{% raw %}"{{\n\t\"Pong\" : {}\n}}\n"{% endraw %},
86109
code
87110
);
88-
stream
89-
.write_all(error_message.as_bytes())
90-
.expect("cannot write to stream");
111+
write_bytes_to_stream(stream, pong_message.as_bytes(), "could not write pong to stream");
91112
}
92113

93114
/// Writes an error to the communication stream
@@ -99,25 +120,22 @@ struct ErrorResponse<'a> {
99120
message: &'a str,
100121
}
101122

102-
fn write_error_message(stream: &mut std::net::TcpStream, code: i32, error_msg: &str) {
123+
fn write_error_message(stream: &mut TcpStream, code: i32, error_msg: &str) {
103124
let error_response = ErrorResponse {
104125
response_type: "Error",
105126
code,
106127
message: error_msg,
107128
};
108-
let error_message = serde_json::to_string_pretty(&error_response).unwrap();
109129
warn!("TCP error {}: {}", code, error_msg);
110-
stream
111-
.write_all(error_message.as_bytes())
112-
.expect("cannot write to stream");
130+
write_json_to_stream(stream, &error_response, "could not write error response to stream");
113131
}
114132

115133
/// Serializes the solution and solution status and returns it
116134
/// to the client
117135
fn return_solution_to_client(
118136
status: AlmOptimizerStatus,
119137
solution: &[f64],
120-
stream: &mut std::net::TcpStream,
138+
stream: &mut TcpStream,
121139
) {
122140
let empty_vec : [f64; 0] = Default::default();
123141
let solution: OptimizerSolution = OptimizerSolution {
@@ -134,10 +152,7 @@ fn return_solution_to_client(
134152
cost: status.cost(),
135153

136154
};
137-
let solution_json = serde_json::to_string_pretty(&solution).unwrap();
138-
stream
139-
.write_all(solution_json.as_bytes())
140-
.expect("cannot write to stream");
155+
write_json_to_stream(stream, &solution, "could not write optimizer solution to stream");
141156
}
142157

143158
/// Handles an execution request
@@ -146,7 +161,7 @@ fn execution_handler(
146161
execution_parameter: &ExecutionParameter,
147162
u: &mut [f64],
148163
p: &mut [f64],
149-
stream: &mut std::net::TcpStream,
164+
stream: &mut TcpStream,
150165
) {
151166
// ----------------------------------------------------
152167
// Set initial value
@@ -221,20 +236,41 @@ fn run_server(tcp_config: &TcpServerConfiguration) {
221236
let mut p = [0.0; {{meta.optimizer_name|upper}}_NUM_PARAMETERS];
222237
let mut cache = initialize_solver();
223238
info!("Done");
224-
let listener = TcpListener::bind(format!("{}:{}", tcp_config.ip, tcp_config.port)).unwrap();
239+
let listener = match TcpListener::bind(format!("{}:{}", tcp_config.ip, tcp_config.port)) {
240+
Ok(listener) => listener,
241+
Err(err) => {
242+
error!(
243+
"failed to bind TCP server at {}:{}: {}",
244+
tcp_config.ip,
245+
tcp_config.port,
246+
err
247+
);
248+
return;
249+
}
250+
};
225251
let mut u = [0.0; {{meta.optimizer_name|upper}}_NUM_DECISION_VARIABLES];
226252
info!("listening started, ready to accept connections at {}:{}", tcp_config.ip, tcp_config.port);
227253
'incoming: for stream in listener.incoming() {
228-
let mut stream = stream.unwrap();
254+
let mut stream = match stream {
255+
Ok(stream) => stream,
256+
Err(err) => {
257+
warn!("failed to accept incoming TCP connection: {}", err);
258+
continue;
259+
}
260+
};
229261

230262
//The following is more robust compared to `read_to_string`
231263
let mut bytes_buffer = vec![0u8; READ_BUFFER_SIZE];
232264
let mut read_data_length = 1;
233265
let mut buffer = String::new();
234266
while read_data_length != 0 {
235-
read_data_length = stream
236-
.read(&mut bytes_buffer)
237-
.expect("could not read stream");
267+
read_data_length = match stream.read(&mut bytes_buffer) {
268+
Ok(read_data_length) => read_data_length,
269+
Err(err) => {
270+
warn!("could not read stream: {}", err);
271+
continue 'incoming;
272+
}
273+
};
238274
let new_string = match String::from_utf8(bytes_buffer[0..read_data_length].to_vec()) {
239275
Ok(new_string) => new_string,
240276
Err(err) => {

0 commit comments

Comments
 (0)