Arresto Ordinato e Pulizia
Il codice nel Listato 21-20 risponde alle richieste in modo asincrono attraverso
l’uso di un thread pool, come volevamo. Riceviamo alcuni avvisi sui campi
workers, id e thread che non stiamo utilizzando in modo diretto, il che ci
ricorda che non stiamo ripulendo alcunché. Quando utilizziamo il metodo meno
elegante ctrl-C per arrestare il thread principale,
anche tutti gli altri thread vengono immediatamente arrestati, anche se sono
nel mezzo dell’elaborazione di una richiesta.
A seguire, implementeremo il trait Drop per chiamare join su ciascuno dei
thread nel gruppo in modo che possano completare le richieste su cui stanno
lavorando prima della chiusura. Quindi implementeremo un modo per comunicare ai
thread che devono smettere di accettare nuove richieste e chiudersi. Per
vedere questo codice in azione, modificheremo il nostro server in modo che
accetti solo due richieste prima di chiudere correttamente il suo gruppo di
thread.
Una cosa da notare mentre procediamo: nulla di tutto ciò influisce sulle parti del codice che gestiscono l’esecuzione delle chiusure, quindi tutto qui sarebbe esattamente lo stesso se utilizzassimo un thread pool per un runtime asincrono.
Implementare il Trait Drop su ThreadPool
Iniziamo con l’implementazione di Drop sul nostro thread pool. Quando il
gruppo viene eliminato, tutti i nostri thread dovrebbero unirsi per
assicurarsi di completare il loro lavoro. Il Listato 21-22 mostra un primo
tentativo di implementazione di Drop; questo codice non funziona ancora
perfettamente.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
mittente: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let (mittente, ricevitore) = mpsc::channel();
let ricevitore = Arc::new(Mutex::new(ricevitore));
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id, Arc::clone(&ricevitore)));
}
ThreadPool { workers, mittente }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.mittente.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Spegnimento worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = ricevitore.lock().unwrap().recv().unwrap();
println!("Worker {id} ha un lavoro; in esecuzione.");
job();
}
});
Worker { id, thread }
}
}
Per prima cosa, eseguiamo un ciclo su ciascuno dei thread del gruppo
workers. Usiamo &mut per questo perché self è un reference mutabile e
abbiamo anche bisogno di poter mutare worker. Per ogni worker, stampiamo un
messaggio che dice che questa particolare istanza di Worker si sta spegnendo,
quindi chiamiamo join sul thread di quell’istanza di Worker. Se la
chiamata a join fallisce, utilizziamo unwrap per far andare Rust in panic
e procedere a uno spegnimento non ordinato.
Ecco l’errore che otteniamo quando compiliamo questo codice:
$ cargo check
Checking ciao v0.1.0 (file:///progetti/ciao)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:53:13
|
53 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rust/library/std/src/thread/mod.rs:1921:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `ciao` (lib) due to 1 previous error
L’errore ci dice che non possiamo chiamare join perché abbiamo solo un
prestito mutabile di ogni worker e join assume la ownership del suo
argomento. Per risolvere questo problema, dobbiamo spostare il thread fuori
dall’istanza Worker che possiede thread in modo che join possa consumare
il thread. Un modo per farlo è quello di adottare lo stesso approccio che
abbiamo usato nel Listato 18-15. Se Worker contenesse un
Option<thread::JoinHandle<()>>, potremmo chiamare il metodo take su Option
per spostare il valore fuori dalla variante Some e lasciare una variante
None al suo posto. In altre parole, un Worker in esecuzione avrebbe una
variante Some in thread e, quando volessimo ripulire un Worker,
sostituiremmo Some con None in modo che il Worker non abbia più un
thread da eseguire.
Tuttavia, l’unico caso in cui ciò si verificherebbe sarebbe quando si elimina
Worker. In cambio, dovremmo gestire un Option<thread::JoinHandle<()>>
ovunque accedessimo a worker.thread. Il Rust idiomatico usa abbastanza spesso
Option, ma quando ti ritrovi a incapsulare qualcosa che sai sarà sempre
presente in un Option come scappatoia, è una buona idea cercare approcci
alternativi per rendere il tuo codice più pulito e meno soggetto a errori.
In questo caso, esiste un’alternativa migliore: il metodo Vec::drain. Accetta
un parametro di intervallo per specificare quali elementi rimuovere dal vettore
e restituisce un iteratore di tali elementi. Passando la sintassi
dell’intervallo .. si rimuoveranno tutti i valori dal vettore.
Quindi, dobbiamo aggiornare l’implementazione drop di ThreadPool in questo
modo:
#![allow(unused)] fn main() { use std::{ sync::{Arc, Mutex, mpsc}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, mittente: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Crea un nuovo ThreadPool. /// /// La dimensione é il numero di thread nel gruppo. /// /// # Panics /// /// La funzione `new` genera panic se la dimensione é zero. pub fn new(dimensione: usize) -> ThreadPool { assert!(dimensione > 0); let (mittente, ricevitore) = mpsc::channel(); let ricevitore = Arc::new(Mutex::new(ricevitore)); let mut workers = Vec::with_capacity(dimensione); for id in 0..dimensione { workers.push(Worker::new(id, Arc::clone(&ricevitore))); } ThreadPool { workers, mittente } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.mittente.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Spegnimento worker {}", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = ricevitore.lock().unwrap().recv().unwrap(); println!("Worker {id} ha un lavoro; in esecuzione."); job(); } }); Worker { id, thread } } } }
Questo risolve l’errore del compilatore e non richiede altre modifiche al nostro codice. Nota che, poiché drop può essere chiamato in caso di panic, anche unwrap potrebbe andare in panic e causare un doppio panic, che blocca immediatamente il programma e interrompe qualsiasi operazione di pulizia in corso. Questo va bene per un programma di esempio, ma non è consigliabile per il codice di produzione.
Segnalare ai Thread di Interrompere l’Attesa di Lavori
Con tutte le modifiche apportate, il nostro codice viene compilato senza alcun
avviso. Tuttavia, la cattiva notizia è che questo codice non funziona ancora nel
modo desiderato. La chiave è la logica nelle chiusure eseguite dai thread
delle istanze Worker: al momento, chiamiamo join, ma questo non chiude i
thread, perché il loop che eseguono cerca continuamente lavori. Se proviamo
a cancellare il nostro ThreadPool con la nostra attuale implementazione di
drop, il thread principale rimarrà bloccato per sempre, in attesa che il
primo thread finisca.
Per risolvere questo problema, dovremo modificare l’implementazione di drop in
ThreadPool e poi modificare il ciclo Worker.
Per prima cosa, modificheremo l’implementazione di drop in ThreadPool per
eliminare esplicitamente il mittente prima di attendere il completamento dei
thread. Il Listato 21-23 mostra le modifiche apportate a ThreadPool per
eliminare esplicitamente il mittente. A differenza del thread, qui abbiamo
bisogno di usare un Option per poter spostare mittente fuori da
ThreadPool con Option::take.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
mittente: Option<mpsc::Sender<Job>>,
}
// --taglio--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
// --taglio--
assert!(dimensione > 0);
let (mittente, ricevitore) = mpsc::channel();
let ricevitore = Arc::new(Mutex::new(ricevitore));
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id, Arc::clone(&ricevitore)));
}
ThreadPool {
workers,
mittente: Some(mittente),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.mittente.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.mittente.take());
for worker in self.workers.drain(..) {
println!("Spegnimento worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = ricevitore.lock().unwrap().recv().unwrap();
println!("Worker {id} ha un lavoro; in esecuzione.");
job();
}
});
Worker { id, thread }
}
}
mittente prima di unire i thread WorkerL’eliminazione di mittente chiude il canale, indicando che non verranno più
inviati messaggi. Quando ciò accade, tutte le chiamate a recv che le istanze
Worker eseguono nel ciclo infinito restituiranno un errore. Nel Listato 21-24,
modifichiamo il ciclo Worker per uscire correttamente dal ciclo in tal caso,
il che significa che i thread termineranno quando l’implementazione drop di
ThreadPool chiamerà join su di essi.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
mittente: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let (mittente, ricevitore) = mpsc::channel();
let ricevitore = Arc::new(Mutex::new(ricevitore));
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id, Arc::clone(&ricevitore)));
}
ThreadPool {
workers,
mittente: Some(mittente),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.mittente.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.mittente.take());
for worker in self.workers.drain(..) {
println!("Spegnimento worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let messaggio = ricevitore.lock().unwrap().recv();
match messaggio {
Ok(job) => {
println!("Worker {id} ha un lavoro; in esecuzione.");
job();
}
Err(_) => {
println!("Worker {id} disconnesso; spegnimento.");
break;
}
}
}
});
Worker { id, thread }
}
}
recv restituisce un errorePer vedere questo codice in azione, modifichiamo main in modo che accetti solo
due richieste prima di chiudere correttamente il server, come mostrato nel
Listato 21-25.
use ciao::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
gestisci_connessione(stream);
});
}
println!("Spegnimento.");
}
fn gestisci_connessione(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "ciao.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "ciao.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contenuto = fs::read_to_string(filename).unwrap();
let lunghezza = contenuto.len();
let risposta =
format!("{status_line}\r\nContent-Length: {lunghezza}\r\n\r\n{contenuto}");
stream.write_all(risposta.as_bytes()).unwrap();
}
Non vorresti che un server web reale si spegnesse dopo aver servito solo due richieste. Questo codice dimostra semplicemente che lo spegnimento avviene ordinatamente e la pulizia funziona correttamente.
Il metodo take è definito nel trait Iterator e limita l’iterazione al
massimo ai primi due elementi. Il ThreadPool uscirà dallo scope alla fine di
main e verrà eseguita l’implementazione drop.
Avvia il server con cargo run ed effettua tre richieste. La terza richiesta
dovrebbe generare un errore e nel terminale dovresti vedere un output simile a
questo:
$ cargo run
Compiling ciao v0.1.0 (file:///progetti/ciao)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.52s
Running `target/debug/ciao`
Worker 0 ha un lavoro; in esecuzione.
Spegnimento.
Spegnimento worker 0
Worker 3 ha un lavoro; in esecuzione.
Worker 1 disconnesso; spegnimento.
Worker 2 disconnesso; spegnimento.
Worker 3 disconnesso; spegnimento.
Worker 0 disconnesso; spegnimento.
Spegnimento worker 1
Spegnimento worker 2
Spegnimento worker 3
Potresti vedere un ordine diverso degli ID Worker e dei messaggi stampati.
Possiamo vedere come funziona questo codice dai messaggi: le istanze Worker 0
e 3 hanno ricevuto le prime due richieste. Il server ha smesso di accettare
connessioni dopo la seconda connessione e l’implementazione Drop su
ThreadPool inizia l’esecuzione prima ancora che Worker 3 inizi il suo
lavoro. L’eliminazione di mittente disconnette tutte le istanze Worker e
dice loro di chiudersi. Ciascuna istanza Worker stampa un messaggio quando si
disconnette, quindi il thread pool chiama join per attendere che ogni
thread Worker finisca.
Nota un aspetto interessante di questa particolare esecuzione: il ThreadPool
ha eliminato il mittente e, prima che qualsiasi Worker ricevesse un errore,
abbiamo provato a unire Worker 0. Worker 0 non aveva ancora ricevuto un
errore da recv, quindi il thread principale si è bloccato, in attesa che
Worker 0 terminasse. Nel frattempo, Worker 3 ha ricevuto un lavoro e poi
tutti i thread hanno ricevuto un errore. Quando Worker 0 ha terminato, il
thread principale ha atteso che le restanti istanze Worker terminassero. A
quel punto, tutte erano uscite dai loro cicli e si erano fermate.
Congratulazioni! Abbiamo completato il nostro progetto: ora abbiamo un server web di base che utilizza un thread pool per rispondere in modo asincrono. Siamo in grado di eseguire un arresto ordinato del server, che pulisce tutti i thread nel pool.
Ecco il codice completo come riferimento:
use ciao::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
gestisci_connessione(stream);
});
}
println!("Spegnimento.");
}
fn gestisci_connessione(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "ciao.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "ciao.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contenuto = fs::read_to_string(filename).unwrap();
let lunghezza = contenuto.len();
let risposta =
format!("{status_line}\r\nContent-Length: {lunghezza}\r\n\r\n{contenuto}");
stream.write_all(risposta.as_bytes()).unwrap();
}
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
mittente: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let (mittente, ricevitore) = mpsc::channel();
let ricevitore = Arc::new(Mutex::new(ricevitore));
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id, Arc::clone(&ricevitore)));
}
ThreadPool {
workers,
mittente: Some(mittente),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.mittente.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.mittente.take());
for worker in &mut self.workers {
println!("Spegnimento worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let messaggio = ricevitore.lock().unwrap().recv();
match messaggio {
Ok(job) => {
println!("Worker {id} ha un lavoro; in esecuzione.");
job();
}
Err(_) => {
println!("Worker {id} disconnesso; spegnimento.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
Potremmo fare di più qui! Se vuoi continuare a migliorare questo progetto, ecco alcune idee:
- Aggiungi altra documentazione a
ThreadPoole ai suoi metodi pubblici. - Aggiungi dei test delle funzionalità della libreria.
- Modifica le chiamate a
unwrapper una gestione degli errori più robusta. - Utilizza
ThreadPoolper eseguire alcune attività diverse dalla gestione delle richieste web. - Trova un crate che gestisce thread pool su crates.io e implementa un server web simile utilizzando il crate. Quindi, confronta la sua API e la sua robustezza con il thread pool che abbiamo implementato.
Riepilogo
Complimenti! Sei arrivato alla fine del libro! Ti ringraziamo per averci accompagnato in questo viaggio alla scoperta di Rust. Ora sei pronto per implementare i tuoi progetti Rust e aiutare gli altri nei loro. Ricorda che esiste una comunità accogliente di altri Rustacean che saranno felici di aiutarti con qualsiasi sfida incontrerai nel tuo viaggio con Rust.