Skip to content

Commit 81824ce

Browse files
committed
Shutdown :)
1 parent ebf4867 commit 81824ce

2 files changed

Lines changed: 41 additions & 14 deletions

File tree

src/lib.rs

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55

66
pub struct ThreadPool {
77
workers: Vec<Worker>,
8-
sender: mpsc::Sender<Job>,
8+
sender: Option<mpsc::Sender<Job>>,
99
}
1010

1111
#[derive(Debug)]
@@ -14,6 +14,11 @@ pub enum PoolCreationError {
1414
NotANumber, // Usually handled during string-to-int parsing
1515
}
1616

17+
///The Worker picks up code that needs to be run and runs the code in its thread.
18+
struct Worker {
19+
id: usize,
20+
thread: thread::JoinHandle<()>,
21+
}
1722
///Type alias for a trait objetc that holds the type of closure that execute receives
1823
type Job = Box<dyn FnOnce() + Send + 'static>;
1924

@@ -38,9 +43,13 @@ impl ThreadPool {
3843
}
3944

4045
// 3. Return the struct
41-
Ok(ThreadPool { workers, sender })
46+
Ok(ThreadPool {
47+
workers,
48+
sender: Some(sender),
49+
})
4250
}
43-
51+
///Worker structs that we just created to fetch the code to run
52+
///from a queue held in the ThreadPool and send that code to its thread to run.
4453
pub fn execute<F>(&self, f: F)
4554
where
4655
F: FnOnce() + Send + 'static, //the closure will be executed only one time
@@ -49,22 +58,38 @@ impl ThreadPool {
4958
{
5059
let job = Box::new(f);
5160

52-
self.sender.send(job).unwrap();
61+
self.sender.as_ref().unwrap().send(job).unwrap();
5362
}
5463
}
55-
struct Worker {
56-
id: usize,
57-
thread: thread::JoinHandle<()>,
58-
}
5964

65+
impl Drop for ThreadPool {
66+
fn drop(&mut self) {
67+
drop(self.sender.take());
68+
69+
for worker in self.workers.drain(..) {
70+
println!("Shutting down worker {}", worker.id);
71+
72+
worker.thread.join().unwrap();
73+
}
74+
}
75+
}
6076
impl Worker {
77+
///The Arc type will let multiple Worker instances own the receiver,
78+
///and Mutex will ensure that only one Worker gets a job from the receiver at a time
6179
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
6280
let thread = thread::spawn(move || loop {
63-
let job = receiver.lock().unwrap().recv().unwrap();
64-
65-
println!("Worker {id} got a job; executing.");
81+
let message = receiver.lock().unwrap().recv();
6682

67-
job();
83+
match message {
84+
Ok(job) => {
85+
println!("Worker {id} got a job; executing");
86+
job();
87+
}
88+
Err(_) => {
89+
println!("Worker{id} disconected; shutting down.");
90+
break;
91+
}
92+
}
6893
});
6994

7095
Worker { id, thread }

src/main.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::{
1919
time::Duration,
2020
};
2121

22-
use web_server_made_rusty::{PoolCreationError, ThreadPool};
22+
use web_server_made_rusty::ThreadPool;
2323

2424
const LCL_ADDR: &str = "127.0.0.1:7878";
2525
fn main() {
@@ -41,13 +41,15 @@ fn main() {
4141
ThreadPool::new(1).unwrap()
4242
}
4343
};
44-
for stream in listener.incoming() {
44+
for stream in listener.incoming().take(2) {
4545
let stream = stream.unwrap();
4646

4747
pool.execute(|| {
4848
handle_connection(stream);
4949
});
5050
}
51+
52+
println!("Shutting down.");
5153
}
5254

5355
fn handle_connection(mut stream: TcpStream) {

0 commit comments

Comments
 (0)