Da Server Single-Thread a Server Multi-Thread
Al momento, il server elaborerà ogni richiesta a turno, il che significa che non elaborerà una seconda connessione fino a quando la prima connessione non avrà finito di essere elaborata. Se il server riceve sempre più richieste, questa esecuzione seriale sarebbe sempre meno ottimale. Se il server riceve una richiesta che richiede molto tempo per essere elaborata, le richieste successive dovranno aspettare fino a quando la richiesta lunga non sarà finita, anche se le nuove richieste potrebbero essere elaborate rapidamente. Dovremo risolvere questo problema, ma prima osserviamo il problema in azione.
Simulare una Richiesta Lenta
Vedremo come una richiesta che impiega molto tempo a essere processata possa influenzare le altre richieste fatte alla nostra implementazione attuale del server. Il Listato 21-10 implementa la gestione di una richiesta ad /attesa con una risposta lenta simulata, che farà attendere il server per cinque secondi prima di rispondere.
use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --taglio-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); gestisci_connessione(stream); } } fn gestisci_connessione(mut stream: TcpStream) { // --taglio-- let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "ciao.html"), "GET /attesa HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "ciao.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --taglio-- let contenuto = fs::read_to_string(filename).unwrap(); let lunghezza = contenuto.len(); let risposta = format!("{status_line}\r\nContent-Length: {lunghezza}\r\n\r\n{contenuto}"); stream.write_all(risposta.as_bytes()).unwrap(); }
Siamo passati da if a match ora che abbiamo tre casi. Dobbiamo
esplicitamente fare match su una slice di request_line per trovare
corrispondenza con dei valori letterali stringa; match non fa riferimenti e
de-referenziamenti automatici, come fa il metodo di uguaglianza.
Il primo ramo è uguale al blocco if del Listato 21-9. Il secondo ramo fa
match di una richiesta ad /attesa. Quando viene ricevuta quella richiesta,
il server attende per cinque secondi prima di inviare la pagina HTML di
successo. Il terzo ramo è lo stesso del blocco else del Listato 21-9.
Puoi vedere quanto sia primitivo il nostro server: librerie reali gestirebbero il riconoscimento di richieste multiple in un modo molto meno verboso!
Avvia il server usando cargo run. Poi, apri due finestre del browser: una per
http://127.0.0.1:7878 e l’altra per http://127.0.0.1:7878/attesa. Se
inserisci l’URI / alcune volte, come prima, vedrai che risponde rapidamente.
Ma se inserisci /attesa e poi carichi /, vedrai che / aspetta fino a
quando sleep non ha completato l’attesa per i suoi cinque secondi completi
prima di caricarsi.
Ci sono molteplici tecniche che potremmo usare per evitare che le richieste si accumulino dietro a una richiesta lenta, inclusa l’uso di async come abbiamo fatto nel Capitolo 17; quella che implementeremo è un thread pool.
Migliorare la Produttività con un Thread Pool
Un thread pool è un gruppo di thread generati che sono pronti e in attesa di gestire un compito. Quando il programma riceve un nuovo compito, assegna uno dei thread nel gruppo al compito, e quel thread elaborerà il compito. I thread rimanenti nel gruppo sono disponibili per gestire qualsiasi altro compito che arrivi mentre il primo thread sta elaborando. Quando il primo thread ha finito di elaborare il suo compito, viene restituito al gruppo di thread inattivi, pronto per gestire un nuovo compito. Un thread pool ti permette quindi di elaborare connessioni concorrentemente, aumentando la produttività del tuo server.
Limiteremo il numero di thread nel gruppo a un numero piccolo per proteggerci da attacchi DoS; se il programma creasse un nuovo thread per ogni richiesta in arrivo, qualcuno che fa 10 milioni di richieste al nostro server potrebbe causare grossi problemi utilizzando tutte le risorse del nostro server e bloccando l’elaborazione delle richieste fino a fermarla.
Quindi, invece di generare thread illimitati, avremo un numero fisso di
thread in attesa nel gruppo. Le richieste in arrivo vengono mandate al gruppo
per l’elaborazione. Il gruppo manterrà una coda di richieste in arrivo. Ogni
thread del gruppo prenderà una richiesta da questa coda, la gestirà e poi
chiederà un’altra richiesta dalla coda. Con questo modello, possiamo elaborare
fino a N richieste simultaneamente, dove N è il numero di thread. Se
ogni thread sta rispondendo a una richiesta a lungo termine, le richieste
successive possono ancora accumularsi nella coda, ma abbiamo aumentato il numero
di richieste a lungo termine che possiamo gestire prima di raggiungere quel
punto.
Questa tecnica è solo una delle molte maniere per migliorare la produttività di un server web. Altre opzioni che potresti esplorare sono il modello fork/join, il modello I/O async a singolo thread, e il modello I/O async multi-thread. Se sei interessato a questo argomento, puoi leggere ed informarti su queste ed altre soluzioni e provare a implementarle; con un linguaggio di basso livello come Rust, tutte queste opzioni sono possibili.
Prima di iniziare a implementare un pool di thread, parliamo prima di come dovrebbe essere usato un pool. Quando stai cercando di progettare codice, scrivere prima l’interfaccia client può aiutare a guidare il tuo design. Scrivi l’API del codice così che sia strutturata nel modo in cui vuoi chiamarla; poi implementa la funzionalità dentro questa struttura invece di implementare prima la funzionalità e poi progettare l’API pubblica.
In modo simile a come abbiamo usato lo sviluppo guidato dai test nel progetto del Capitolo 12, qui invece useremo lo sviluppo guidato dal compilatore. Scriveremo il codice che chiama le funzioni che vogliamo, e poi guarderemo agli errori dal compilatore per determinare cosa dovremmo cambiare dopo per far funzionare il codice. Prima di farlo, tuttavia, esploreremo la tecnica che non useremo come punto di partenza.
Generare un Thread per Ogni Richiesta
Per prima cosa, esploriamo come potrebbe apparire il nostro codice se creasse un nuovo thread per ogni connessione. Come detto, questa non è la nostra soluzione finale a causa dei problemi legati al numero illimitato di thread che potrebbero essere creati, ma è un punto di partenza per avere un server multi-thread funzionante. Poi, aggiungeremo il thread pool come miglioramento, e confrontare le due soluzioni sarà più facile.
Il Listato 21-11 mostra le modifiche da fare a main per generare un nuovo
thread per gestire ogni stream nel ciclo for.
use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { gestisci_connessione(stream); }); } } fn gestisci_connessione(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "ciao.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "ciao.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contenuto = fs::read_to_string(filename).unwrap(); let lunghezza = contenuto.len(); let risposta = format!("{status_line}\r\nContent-Length: {lunghezza}\r\n\r\n{contenuto}"); stream.write_all(risposta.as_bytes()).unwrap(); }
Come hai imparato nel Capitolo 16, thread::spawn creerà un nuovo thread e
poi eseguirà il codice della chiusura nel nuovo thread. Se esegui questo
codice e carichi /attesa nel tuo browser, poi / in altre due schede del
browser, vedrai infatti che le richieste a / non devono aspettare che
/attesa finisca. Tuttavia, come abbiamo menzionato, questo alla fine
sovraccaricherà il sistema perché staresti creando nuovi thread senza alcun
limite.
Potresti anche ricordare dal Capitolo 17 che questa è esattamente la situazione in cui async e await eccellono! Tieni a mente questo mentre costruiamo il thread pool e pensa a come le cose sarebbero diverse o uguali con async.
Creare un Numero Finito di Thread
Vogliamo che il nostro thread pool funzioni in un modo simile e familiare in
modo che passare dai thread a un thread pool non richieda grandi
cambiamenti al codice che usa la nostra API. Il Listato 21-12 mostra
l’interfaccia ipotetica per una struct ThreadPool che vogliamo usare invece di
thread::spawn.
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
gestisci_connessione(stream);
});
}
}
fn gestisci_connessione(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "ciao.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "ciao.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contenuto = fs::read_to_string(filename).unwrap();
let lunghezza = contenuto.len();
let risposta =
format!("{status_line}\r\nContent-Length: {lunghezza}\r\n\r\n{contenuto}");
stream.write_all(risposta.as_bytes()).unwrap();
}
ThreadPoolUsiamo ThreadPool::new per creare un nuovo gruppo di thread con un numero
configurabile di thread, in questo caso 4. Poi, nel ciclo for,
pool.execute ha un interfaccia simile a thread::spawn nel senso che prende
una chiusura che il gruppo dovrebbe eseguire per ogni stream. Dobbiamo
implementare pool.execute in modo che prenda la chiusura e la dia a un
thread nel gruppo per eseguirla. Questo codice non si compilerà ancora, ma lo
proveremo in modo che il compilatore ci guidi su come sistemarlo.
Costruire ThreadPool Usando lo Sviluppo Guidato dal Compilatore
Apporta le modifiche riportate nel Listato 21-12 a src/main.rs, quindi
utilizziamo gli errori del compilatore da cargo check per guidare il nostro
sviluppo. Ecco il primo errore che otteniamo:
$ cargo check
Checking ciao v0.1.0 (file:///progetti/ciao)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `ciao` (bin "ciao") due to 1 previous error
Ottimo! Questo errore ci dice che abbiamo bisogno di un type o modulo
ThreadPool, quindi ne creeremo uno adesso. La nostra implementazione di
ThreadPool sarà indipendente dal tipo di lavoro svolto dal nostro server web.
Quindi, trasformiamo il crate ciao da un crate binario a un crate
libreria per contenere la nostra implementazione di ThreadPool. Dopo essere
passati a un crate libreria, potremmo anche utilizzare la libreria del thread
pool separata per qualsiasi lavoro che vogliamo svolgere utilizzando un gruppo
di thread, non solo per eseguire richieste web.
Creiamo un file src/lib.rs che contenga quanto segue, ovvero la definizione
più semplice di una struct ThreadPool che possiamo avere per ora:
pub struct ThreadPool;
Quindi, modifichiamo il file main.rs per portare ThreadPool nello scope
dal crate della libreria aggiungendo il seguente codice all’inizio di
src/main.rs:
use ciao::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
gestisci_connessione(stream);
});
}
}
fn gestisci_connessione(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "ciao.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "ciao.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contenuto = fs::read_to_string(filename).unwrap();
let lunghezza = contenuto.len();
let risposta =
format!("{status_line}\r\nContent-Length: {lunghezza}\r\n\r\n{contenuto}");
stream.write_all(risposta.as_bytes()).unwrap();
}
Questo codice ancora non funzionerà, ma controlliamolo di nuovo per ottenere il prossimo errore che dobbiamo risolvere:
$ cargo check
Checking ciao v0.1.0 (file:///progetti/ciao)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `ciao` (bin "ciao") due to 1 previous error
Questo errore indica che ora dobbiamo creare una funzione associata denominata
new per ThreadPool. Sappiamo anche che new deve avere un parametro che può
accettare 4 come argomento e dovrebbe restituire un’istanza ThreadPool.
Implementiamo la funzione new più semplice che abbia queste caratteristiche:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(dimensione: usize) -> ThreadPool {
ThreadPool
}
}
Abbiamo scelto usize come type del parametro dimensione perché sappiamo
che un numero negativo di thread non ha alcun senso. Sappiamo anche che
useremo questo 4 come numero di elementi in una collezione di thread, che è
lo scopo del type usize, come discusso nella sezione “Il Type
Intero”del Capitolo 3.
Controlliamo nuovamente il codice:
$ cargo check
Checking ciao v0.1.0 (file:///progetti/ciao)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `ciao` (bin "ciao") due to 1 previous error
Ora l’errore si verifica perché non abbiamo un metodo execute su ThreadPool.
Ricorda dalla sezione “Creare un Numero Finito di
Thread” che abbiamo deciso che il nostro
thread pool dovrebbe avere un’interfaccia simile a thread::spawn. In
aggiunta, implementeremo la funzione execute in modo che prenda la chiusura
che riceve e la dia a un thread inattivo nel gruppo per eseguirla.
Definiremo il metodo execute su ThreadPool in modo che accetti una chiusura
come parametro. Ricorda dalla sezione “Restituire i Valori Catturati dalle
Chiusure” del Capitolo 13 che possiamo
accettare chiusure come parametri con tre diversi trait: Fn, FnMut e
FnOnce. Dobbiamo decidere quale tipo di chiusura utilizzare in questo caso.
Sappiamo che finiremo per fare qualcosa di simile all’implementazione della
libreria standard thread::spawn, quindi possiamo guardare quali sono i vincoli
definiti nella firma di thread::spawn sul suo parametro. La documentazione ci
mostra quanto segue:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Il parametro di type F è quello che ci interessa in questo caso; il
parametro di type T è relativo al valore di ritorno, e non ci interessa.
Possiamo vedere che spawn utilizza FnOnce come vincolo di trait su F.
Probabilmente è quello che vogliamo anche noi, perché alla fine passeremo
l’argomento che otteniamo in execute a spawn. Possiamo essere ulteriormente
sicuri che FnOnce sia il trait che vogliamo utilizzare perché il thread
per l’esecuzione di una richiesta eseguirà solo una volta la chiusura di quella
richiesta, il che corrisponde a Once in FnOnce.
Il parametro di type F ha anche il vincolo di trait Send e il vincolo di
lifetime 'static, che sono utili nella nostra situazione: abbiamo bisogno di
Send per trasferire la chiusura da un thread all’altro e di 'static perché
non sappiamo quanto tempo impiegherà il thread per eseguire quanto richiesto.
Creiamo un metodo execute su ThreadPool che prenderà un parametro generico
di type F con questi vincoli:
pub struct ThreadPool;
impl ThreadPool {
// --taglio--
pub fn new(dimensione: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Continuiamo a usare () dopo FnOnce perché questo FnOnce rappresenta una
chiusura che non accetta parametri e restituisce il type unitario ().
Proprio come nelle definizioni delle funzioni, il type di ritorno può essere
omesso dalla firma, ma anche se non abbiamo parametri, abbiamo comunque bisogno
delle parentesi.
Ancora una volta, questa è l’implementazione più semplice del metodo execute:
non fa nulla, ma stiamo solo cercando di far compilare il nostro codice.
Controlliamo di nuovo:
$ cargo check
Checking ciao v0.1.0 (file:///progetti/ciao)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
Si compila! Ma nota che se provi cargo run e fai una richiesta nel browser,
vedrai gli errori nel browser che abbiamo visto all’inizio del capitolo. La
nostra libreria non sta ancora chiamando la chiusura passata a execute!
Nota: un detto che potreste sentire riguardo ai linguaggi con compilatori rigorosi, come Haskell e Rust, è “Se il codice si compila, funziona”. Ma questo detto non è universalmente vero. Il nostro progetto si compila, ma non fa assolutamente nulla! Se stessimo realizzando un progetto reale e completo, questo sarebbe un buon momento per iniziare a scrivere dei test unitari per verificare che il codice si compili e abbia il comportamento che desideriamo.
Una considerazione: cosa cambierebbe qui se eseguissimo una future invece di una chiusura?
Validare il Numero di Thread in new
Non stiamo facendo nulla con i parametri di new e execute. Implementiamo il
corpo di queste funzioni con il comportamento desiderato. Per iniziare, pensiamo
a new. In precedenza abbiamo scelto un type senza segno per il parametro
dimensione perché un gruppo con un numero negativo di thread non ha senso.
Tuttavia, anche un gruppo con zero thread non ha senso, ma zero è un usize
perfettamente valido. Aggiungeremo del codice per verificare che dimensione
sia maggiore di zero prima di restituire un’istanza ThreadPool e faremo andare
in panic il programma se riceve uno zero utilizzando la macro assert!, come
mostrato nel Listato 21-13.
pub struct ThreadPool;
impl ThreadPool {
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
ThreadPool
}
// --taglio--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool::new per generare un errore se dimensione è zeroAbbiamo anche aggiunto della documentazione per il nostro ThreadPool con
commenti di documentazione. Nota che abbiamo seguito le buone pratiche di
documentazione aggiungendo una sezione che evidenzia le situazioni in cui la
nostra funzione può generare un panic, come discusso nel Capitolo 14. Prova ad
eseguire cargo doc --open e clicca sulla struct ThreadPool per vedere come
appare la documentazione generata per new!
Invece di aggiungere la macro assert! come abbiamo fatto qui, potremmo
cambiare new in build e restituire un Result come abbiamo fatto con
Config::build nel progetto I/O nel Listato 12-9. Ma in questo caso abbiamo
deciso che cercare di creare un thread pool senza alcun thread dovrebbe
essere un errore irrecuperabile. Se ti senti ambizioso, prova a scrivere una
funzione chiamata build con la seguente firma per confrontarla con la funzione
new:
pub fn build(size: usize) -> Result<ThreadPool, ErroreCreazionePool>
Creare Spazio per Memorizzare i Thread
Ora che abbiamo un modo per sapere che abbiamo un numero valido di thread da
memorizzare nel gruppo, possiamo creare quei thread e memorizzarli nella
struct ThreadPool prima di restituire la struct. Ma come si “memorizza” un
thread? Diamo un’altra occhiata alla firma di thread::spawn:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
La funzione spawn restituisce un JoinHandle<T>, dove T è il type che la
chiusura restituisce. Proviamo anche noi a usare JoinHandle e vediamo cosa
succede. Nel nostro caso, le chiusure che stiamo passando al thread pool
gestiranno la connessione e non restituiranno nulla, quindi T sarà il type
unitario ().
Il codice nel Listato 21-14 verrà compilato, ma non creerà ancora alcun
thread. Abbiamo modificato la definizione di ThreadPool per contenere un
vettore di istanze thread::JoinHandle<()>, inizializzato il vettore con una
capacità di dimensione, impostato un ciclo for che eseguirà del codice per
creare i thread e restituito un’istanza ThreadPool che li contiene.
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --taglio--
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let mut threads = Vec::with_capacity(dimensione);
for _ in 0..dimensione {
// crea qualche thread e memorizzali in un vettore
}
ThreadPool { threads }
}
// --taglio--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool per contenere i threadAbbiamo portato std::thread nello scope della libreria crate perché stiamo
utilizzando thread::JoinHandle come type degli elementi nel vettore in
ThreadPool.
Una volta ricevuta una dimensione valida, il nostro ThreadPool crea un nuovo
vettore in grado di contenere dimensione elementi. La funzione with_capacity
svolge lo stesso compito di Vec::new, ma con un’importante differenza:
pre-alloca lo spazio nel vettore. Poiché sappiamo che dobbiamo memorizzare
dimensione elementi nel vettore, eseguire questa allocazione in anticipo è
leggermente più efficiente rispetto all’utilizzo di Vec::new, che ridimensiona
se stesso man mano che vengono inseriti gli elementi.
Quando esegui nuovamente cargo check, dovrebbe avere esito positivo.
Inviare Codice da ThreadPool a un Thread
Abbiamo lasciato un commento nel ciclo for nel Listato 21-14 riguardo alla
creazione di thread. Qui vedremo come creare effettivamente i thread. La
libreria standard fornisce thread::spawn come metodo per creare thread, e
thread::spawn si aspetta di ricevere del codice che il thread deve eseguire
non appena viene creato. Tuttavia, nel nostro caso, vogliamo creare i thread e
farli attendere il codice che invieremo in seguito. L’implementazione dei
thread della libreria standard non include alcun modo per farlo; dobbiamo
implementarlo manualmente.
Implementeremo questo comportamento introducendo una nuova struttura dati
intermedia tra ThreadPool e i thread che gestirà questo nuovo comportamento.
Chiameremo questa struttura dati Worker, che è un termine comune nelle
implementazioni di pooling. Il Worker raccoglie il codice che deve essere
eseguito ed esegue il codice nel suo thread.
Pensate alle persone che lavorano in cucina in un ristorante: i lavoratori aspettano che arrivino le ordinazioni dai clienti, quindi sono responsabili di prendere quelle ordinazioni e soddisfarle.
Invece di memorizzare un vettore di istanze JoinHandle<()> nel thread
pool, memorizzeremo le istanze della struct Worker. Ogni Worker
memorizzerà una singola istanza JoinHandle<()>. Quindi, implementeremo un
metodo su Worker che prenderà una chiusura di codice da eseguire e la invierà
al thread già in esecuzione per l’esecuzione. Assegneremo anche a ciascun
Worker un id in modo da poter distinguere tra le diverse istanze di Worker
nel gruppo quando facciamo logging o debugging.
Ecco il nuovo processo che avverrà quando creeremo un ThreadPool.
Implementeremo il codice che invia la chiusura al thread dopo aver impostato
Worker in questo modo:
- Definire una struttura
Workerche contenga unide unJoinHandle<()>. - Modificare
ThreadPoolin modo che contenga un vettore di istanzeWorker. - Definire una funzione
Worker::newche accetta un numeroide restituisce un’istanzaWorkerche contiene l’ide un thread generato con una chiusura vuota. - In
ThreadPool::new, utilizzare il contatore del cicloforper generare unid, creare un nuovoWorkercon quell’ide memorizzare ilWorkernel vettore.
Se sei pronto per una sfida, prova a implementare queste modifiche da solo prima di guardare il codice nel Listato 21-15.
Pronto? Ecco il Listato 21-15 con un modo per apportare le modifiche precedenti.
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --taglio--
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera _panic_ se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --taglio--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool per contenere istanze Worker invece di contenere direttamente i threadAbbiamo cambiato il nome del campo su ThreadPool da threads a workers
perché ora contiene istanze Worker invece di istanze JoinHandle<()>. Usiamo
il contatore nel ciclo for come argomento per Worker::new e memorizziamo
ogni nuovo Worker nel vettore chiamato workers.
Il codice esterno (come il nostro server in src/main.rs) non ha bisogno di
conoscere i dettagli di implementazione relativi all’uso di una struct
Worker all’interno di ThreadPool, quindi rendiamo privata la struct
Worker e la sua funzione new. La funzione Worker::new utilizza l’id che
le forniamo e memorizza un’istanza JoinHandle<()> creata generando un nuovo
thread utilizzando una chiusura vuota.
Nota: se il sistema operativo non è in grado di creare un thread perché non ci sono risorse di sistema sufficienti,
thread::spawnandrà in panic. Ciò causerà il panic dell’intero server, anche se la creazione di alcuni thread potrebbe avere esito positivo. Per semplicità, questo comportamento va bene, ma in un’implementazione di thread pool di produzione, probabilmente si preferirà utilizzarestd::thread::Buildere il suospawnche restituisce inveceResult.
Questo codice verrà compilato e memorizzerà il numero di istanze Worker che
abbiamo specificato come argomento di ThreadPool::new. Ma non stiamo ancora
elaborando la chiusura che otteniamo in execute. Vediamo come farlo.
Inviare Richieste ai Thread Tramite Canali
Il prossimo problema che affronteremo è che le chiusure fornite a
thread::spawn non fanno assolutamente nulla. Attualmente, otteniamo la
chiusura che vogliamo eseguire nel metodo execute. Ma dobbiamo fornire a
thread::spawn una chiusura da eseguire quando creiamo ogni Worker durante la
creazione del ThreadPool.
Vogliamo che le struct Worker che abbiamo appena creato recuperino il codice
da eseguire da una coda contenuta nel ThreadPool e lo inviino al proprio
thread per l’esecuzione.
I canali che abbiamo imparato a conoscere nel Capitolo 16, un modo semplice per
comunicare tra due thread, sarebbero perfetti per questo caso d’uso. Useremo
un canale da far funzionare come coda di lavori, e execute invierà un lavoro
dal ThreadPool alle istanze Worker, che invieranno il lavoro al proprio
thread. Ecco il piano:
- Il
ThreadPoolcreerà un canale e diverrà l’estremità mittente. - Ogni
Workerdiverrà il ricevitore. - Creeremo una nuova struct
Jobche conterrà le chiusure che vogliamo inviare lungo il canale. - Il metodo
executeinvierà il lavoro che vuole eseguire tramite il mittente. - Nel suo thread, il
Workereseguirà un ciclo sul suo ricevitore ed eseguirà le chiusure di tutti i lavori che riceve.
Iniziamo creando un canale in ThreadPool::new e conservando il mittente
nell’istanza ThreadPool, come mostrato nel Listato 21-16. La struttura Job
per ora non contiene nulla, ma sarà il type di elemento che invieremo nel
canale.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
mittente: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --taglio--
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let (mittente, ricevitore) = mpsc::channel();
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id));
}
ThreadPool { workers, mittente }
}
// --taglio--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool per memorizzare il mittente di un canale che trasmette istanze JobIn ThreadPool::new, creiamo il nostro nuovo canale e facciamo in modo che il
gruppo mantenga l’estremità del mittente. Questo verrà compilato con successo.
Proviamo a passare un ricevitore del canale a ciascun Worker mentre il
thread pool crea il canale. Sappiamo che vogliamo utilizzare il ricevitore
nel thread che le istanze Worker generano, quindi faremo riferimento al
parametro ricevitore nella chiusura. Il codice nel Listato 21-17 non è ancora
completamente compilabile.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
mittente: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --taglio--
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let (mittente, ricevitore) = mpsc::channel();
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id, ricevitore));
}
ThreadPool { workers, mittente }
}
// --taglio--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --taglio--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, ricevitore: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
ricevitore;
});
Worker { id, thread }
}
}
WorkerAbbiamo apportato alcune piccole e semplici modifiche: passiamo il ricevitore a
Worker::new, per poi usarlo all’interno della chiusura.
Quando proviamo a controllare questo codice, otteniamo questo errore:
$ cargo check
Checking ciao v0.1.0 (file:///progetti/ciao)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:30:42
|
25 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
29 | for id in 0..dimensione {
| ----------------------- inside of this loop
30 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:57:33
|
57 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
29 ~ let mut value = Worker::new(id, receiver);
30 ~ for id in 0..dimensione {
31 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `ciao` (lib) due to 1 previous error
Il codice sta cercando di passare ricevitore a più istanze Worker. Questo
non funzionerà, come ricorderai dal Capitolo 16: l’implementazione del canale
che Rust fornisce è multi-produttore, singolo consumatore. Ciò significa che
non possiamo semplicemente clonare l’estremità di ricezione del canale per
correggere questo codice. Inoltre, non vogliamo inviare un messaggio più volte a
più ricevitori; vogliamo un unico elenco di messaggi con più istanze Worker in
modo che ogni messaggio venga elaborato una sola volta.
Inoltre, rimuovere un lavoro dalla coda del canale comporta la mutazione del
ricevitore, quindi i thread hanno bisogno di un modo sicuro per condividere
e modificare ricevitore; altrimenti, potremmo ottenere condizioni di
competizione (come descritto nel capitolo 16).
Ricorda i puntatori intelligenti thread-safe discussi nel Capitolo 16: per
condividere la ownership tra più thread e consentire ai thread di
modificare il valore, dobbiamo utilizzare Arc<Mutex<T>>. Il type Arc
consentirà a più istanze Worker di possedere il ricevitore, mentre Mutex
garantirà che solo un Worker alla volta riceva un lavoro dal ricevitore. Il
Listato 21-18 mostra le modifiche che dobbiamo apportare.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
// --taglio--
pub struct ThreadPool {
workers: Vec<Worker>,
mittente: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --taglio--
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let (mittente, ricevitore) = mpsc::channel();
let ricevitore = Arc::new(Mutex::new(ricevitore));
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id, Arc::clone(&ricevitore)));
}
ThreadPool { workers, mittente }
}
// --taglio--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --taglio--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --taglio--
let thread = thread::spawn(|| {
ricevitore;
});
Worker { id, thread }
}
}
Worker utilizzando Arc e MutexIn ThreadPool::new, inseriamo il ricevitore in un Arc e in un Mutex. Per
ogni nuovo Worker, cloniamo l’Arc per aumentare il conteggio dei reference
in modo che le istanze Worker possano condividere la ownership del
ricevitore.
Con queste modifiche, il codice viene compilato! Ci siamo quasi!
Implementare il Metodo execute
Implementiamo infine il metodo execute su ThreadPool. Modificheremo anche
Job da una struct a un alias di type per un oggetto trait che contiene
il type della chiusura che execute riceve. Come discusso nella sezione
“Sinonimi e Alias di Type” nel Capitolo 20,
gli alias di type ci consentono di abbreviare i type lunghi per
facilitarne l’uso. Guarda il Listato 21-19.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
mittente: mpsc::Sender<Job>,
}
// --taglio--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --taglio--
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let (mittente, ricevitore) = mpsc::channel();
let ricevitore = Arc::new(Mutex::new(ricevitore));
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id, Arc::clone(&ricevitore)));
}
ThreadPool { workers, mittente }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.mittente.send(job).unwrap();
}
}
// --taglio--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
ricevitore;
});
Worker { id, thread }
}
}
Job per una Box che contiene ogni chiusura e quindi invio del lavoro al canaleDopo aver creato una nuova istanza Job utilizzando la chiusura ottenuta in
execute, inviamo quel lavoro tramite l’estremità mittente del canale.
Chiamiamo unwrap su send nel caso in cui l’invio fallisca. Ciò potrebbe
accadere se, ad esempio, interrompiamo l’esecuzione di tutti i nostri thread,
il che significa che l’estremità ricevente ha smesso di ricevere nuovi messaggi.
Al momento, non possiamo interrompere l’esecuzione dei nostri thread: i nostri
thread continuano a essere eseguiti finché esiste il pool. Il motivo per cui
utilizziamo unwrap è che sappiamo che il caso di errore non si verificherà, ma
il compilatore non lo sa.
Ma non abbiamo ancora finito! Nel Worker, la nostra chiusura passata a
thread::spawn continua a fare riferimento solo all’estremità ricevente del
canale. Invece, abbiamo bisogno che la chiusura continui a girare all’infinito,
chiedendo all’estremità ricevente del canale un lavoro ed eseguendolo quando lo
riceve. Apportiamo la modifica mostrata nel Listato 21-20 a Worker::new.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
mittente: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let (mittente, ricevitore) = mpsc::channel();
let ricevitore = Arc::new(Mutex::new(ricevitore));
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id, Arc::clone(&ricevitore)));
}
ThreadPool { workers, mittente }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.mittente.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --taglio--
impl Worker {
fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = ricevitore.lock().unwrap().recv().unwrap();
println!("Worker {id} ha un lavoro; in esecuzione.");
job();
}
});
Worker { id, thread }
}
}
WorkerQui, chiamiamo prima lock sul ricevitore per acquisire il mutex, quindi
chiamiamo unwrap per generare un panic in caso di errori. L’acquisizione di
un blocco potrebbe non riuscire se il mutex è in uno stato poisoned
(avvelenato), cosa che può accadere se un altro thread va in panic mentre
mantiene il blocco invece di rilasciarlo. In questa situazione, chiamare
unwrap per far andare in panic questo thread è l’azione corretta da
intraprendere. Potresti anche cambiare questo unwrap in un expect con un
messaggio di errore che sia significativo per te.
Se otteniamo il blocco sul mutex, chiamiamo recv per ricevere un Job dal
canale. Un ultimo unwrap supera anche qui eventuali errori, che potrebbero
verificarsi se il thread che detiene il mittente si è arrestato, in modo
simile a come il metodo send restituisce Err se il ricevente si arresta.
La chiamata a recv blocca, quindi se non ci sono ancora lavori, il thread
corrente attenderà fino a quando non sarà disponibile un lavoro. Il Mutex<T>
assicura che solo un thread Worker alla volta tenti di richiedere un lavoro.
Il nostro gruppo di thread è ora in uno stato funzionante! Esegui cargo run
e fai alcune richieste:
$ cargo run
Compiling ciao v0.1.0 (file:///progetti/ciao)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `ciao` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 2.57s
Running `target/debug/ciao`
Worker 1 ha un lavoro; in esecuzione.
Worker 1 ha un lavoro; in esecuzione.
Worker 0 ha un lavoro; in esecuzione.
Worker 3 ha un lavoro; in esecuzione.
Worker 2 ha un lavoro; in esecuzione.
Worker 1 ha un lavoro; in esecuzione.
Successo! Ora abbiamo un thread pool che esegue le connessioni in modo asincrono. Non vengono mai creati più di quattro thread, quindi il nostro sistema non andrà in sovraccarico se il server riceve molte richieste. Se effettuiamo una richiesta a /attesa, il server sarà in grado di servire altre richieste facendo in modo che un altro thread le esegua.
Nota: se apri /attesa in più finestre del browser contemporaneamente, potrebbero caricarsi una alla volta a intervalli di cinque secondi. Alcuni browser web eseguono più istanze della stessa richiesta in sequenza per motivi di cache. Questa limitazione non è causata dal nostro server web.
Questo è un buon momento per fare una pausa e considerare come il codice nei Listati 21-18, 21-19 e 21-20 sarebbe diverso se utilizzassimo le future invece di una chiusura per il lavoro da svolgere. Quali type cambierebbero? In che modo le firme dei metodi sarebbero diverse, se lo fossero? Quali parti del codice rimarrebbero invariate?
Dopo aver appreso il funzionamento del ciclo while let nei capitoli 17 e 19,
potresti chiederti perché non abbiamo scritto il codice del thread Worker
come mostrato nel Listato 21-21.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
mittente: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Crea un nuovo ThreadPool.
///
/// La dimensione é il numero di thread nel gruppo.
///
/// # Panics
///
/// La funzione `new` genera panic se la dimensione é zero.
pub fn new(dimensione: usize) -> ThreadPool {
assert!(dimensione > 0);
let (mittente, ricevitore) = mpsc::channel();
let ricevitore = Arc::new(Mutex::new(ricevitore));
let mut workers = Vec::with_capacity(dimensione);
for id in 0..dimensione {
workers.push(Worker::new(id, Arc::clone(&ricevitore)));
}
ThreadPool { workers, mittente }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.mittente.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --taglio--
impl Worker {
fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = ricevitore.lock().unwrap().recv() {
println!("Worker {id} ha un lavoro; in esecuzione.");
job();
}
});
Worker { id, thread }
}
}
Worker::new utilizzando while letQuesto codice viene compilato ed eseguito, ma non produce il comportamento
concorrente desiderato: una richiesta lenta continuerà a causare l’attesa delle
altre richieste per essere elaborate. Il motivo è piuttosto sottile: la struct
Mutex non ha un metodo pubblico unlock perché la ownership del blocco si
basa sulla longevità di MutexGuard<T> all’interno del
LockResult<MutexGuard<T>> che il metodo lock restituisce. In fase di
compilazione, il controllo dei prestiti può quindi applicare la regola secondo
cui non è possibile accedere a una risorsa protetta da un Mutex a meno che non
si detenga il blocco. Tuttavia, questa implementazione può anche comportare il
mantenimento del blocco più a lungo del previsto se non si presta attenzione
alla lifetime del MutexGuard<T>.
Il codice nel Listato 21-20 che utilizza let job = ricevitore.lock().unwrap().recv().unwrap(); funziona perché con let,
qualsiasi valore temporaneo utilizzato nell’espressione a destra del segno di
uguale viene immediatamente eliminato al termine dell’istruzione let.
Tuttavia, while let (e if let e match) non elimina i valori temporanei
fino alla fine del blocco associato. Nel Listato 21-21, il blocco rimane attivo
per tutta la durata della chiamata a job(), il che significa che altre istanze
Worker non possono ricevere lavori.