Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Da Server Single-Thread a Server Multi-Thread

Al momento, il server elaborerà ogni richiesta a turno, il che significa che non elaborerà una seconda connessione fino a quando la prima connessione non avrà finito di essere elaborata. Se il server riceve sempre più richieste, questa esecuzione seriale sarebbe sempre meno ottimale. Se il server riceve una richiesta che richiede molto tempo per essere elaborata, le richieste successive dovranno aspettare fino a quando la richiesta lunga non sarà finita, anche se le nuove richieste potrebbero essere elaborate rapidamente. Dovremo risolvere questo problema, ma prima osserviamo il problema in azione.

Simulare una Richiesta Lenta

Vedremo come una richiesta che impiega molto tempo a essere processata possa influenzare le altre richieste fatte alla nostra implementazione attuale del server. Il Listato 21-10 implementa la gestione di una richiesta ad /attesa con una risposta lenta simulata, che farà attendere il server per cinque secondi prima di rispondere.

File: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --taglio--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        gestisci_connessione(stream);
    }
}

fn gestisci_connessione(mut stream: TcpStream) {
    // --taglio--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "ciao.html"),
        "GET /attesa HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "ciao.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --taglio--

    let contenuto = fs::read_to_string(filename).unwrap();
    let lunghezza = contenuto.len();

    let risposta =
        format!("{status_line}\r\nContent-Length: {lunghezza}\r\n\r\n{contenuto}");

    stream.write_all(risposta.as_bytes()).unwrap();
}
Listato 21-10: Simulare una richiesta lenta aspettando per 5 secondi

Siamo passati da if a match ora che abbiamo tre casi. Dobbiamo esplicitamente fare match su una slice di request_line per trovare corrispondenza con dei valori letterali stringa; match non fa riferimenti e de-referenziamenti automatici, come fa il metodo di uguaglianza.

Il primo ramo è uguale al blocco if del Listato 21-9. Il secondo ramo fa match di una richiesta ad /attesa. Quando viene ricevuta quella richiesta, il server attende per cinque secondi prima di inviare la pagina HTML di successo. Il terzo ramo è lo stesso del blocco else del Listato 21-9.

Puoi vedere quanto sia primitivo il nostro server: librerie reali gestirebbero il riconoscimento di richieste multiple in un modo molto meno verboso!

Avvia il server usando cargo run. Poi, apri due finestre del browser: una per http://127.0.0.1:7878 e l’altra per http://127.0.0.1:7878/attesa. Se inserisci l’URI / alcune volte, come prima, vedrai che risponde rapidamente. Ma se inserisci /attesa e poi carichi /, vedrai che / aspetta fino a quando sleep non ha completato l’attesa per i suoi cinque secondi completi prima di caricarsi.

Ci sono molteplici tecniche che potremmo usare per evitare che le richieste si accumulino dietro a una richiesta lenta, inclusa l’uso di async come abbiamo fatto nel Capitolo 17; quella che implementeremo è un thread pool.

Migliorare la Produttività con un Thread Pool

Un thread pool è un gruppo di thread generati che sono pronti e in attesa di gestire un compito. Quando il programma riceve un nuovo compito, assegna uno dei thread nel gruppo al compito, e quel thread elaborerà il compito. I thread rimanenti nel gruppo sono disponibili per gestire qualsiasi altro compito che arrivi mentre il primo thread sta elaborando. Quando il primo thread ha finito di elaborare il suo compito, viene restituito al gruppo di thread inattivi, pronto per gestire un nuovo compito. Un thread pool ti permette quindi di elaborare connessioni concorrentemente, aumentando la produttività del tuo server.

Limiteremo il numero di thread nel gruppo a un numero piccolo per proteggerci da attacchi DoS; se il programma creasse un nuovo thread per ogni richiesta in arrivo, qualcuno che fa 10 milioni di richieste al nostro server potrebbe causare grossi problemi utilizzando tutte le risorse del nostro server e bloccando l’elaborazione delle richieste fino a fermarla.

Quindi, invece di generare thread illimitati, avremo un numero fisso di thread in attesa nel gruppo. Le richieste in arrivo vengono mandate al gruppo per l’elaborazione. Il gruppo manterrà una coda di richieste in arrivo. Ogni thread del gruppo prenderà una richiesta da questa coda, la gestirà e poi chiederà un’altra richiesta dalla coda. Con questo modello, possiamo elaborare fino a N richieste simultaneamente, dove N è il numero di thread. Se ogni thread sta rispondendo a una richiesta a lungo termine, le richieste successive possono ancora accumularsi nella coda, ma abbiamo aumentato il numero di richieste a lungo termine che possiamo gestire prima di raggiungere quel punto.

Questa tecnica è solo una delle molte maniere per migliorare la produttività di un server web. Altre opzioni che potresti esplorare sono il modello fork/join, il modello I/O async a singolo thread, e il modello I/O async multi-thread. Se sei interessato a questo argomento, puoi leggere ed informarti su queste ed altre soluzioni e provare a implementarle; con un linguaggio di basso livello come Rust, tutte queste opzioni sono possibili.

Prima di iniziare a implementare un pool di thread, parliamo prima di come dovrebbe essere usato un pool. Quando stai cercando di progettare codice, scrivere prima l’interfaccia client può aiutare a guidare il tuo design. Scrivi l’API del codice così che sia strutturata nel modo in cui vuoi chiamarla; poi implementa la funzionalità dentro questa struttura invece di implementare prima la funzionalità e poi progettare l’API pubblica.

In modo simile a come abbiamo usato lo sviluppo guidato dai test nel progetto del Capitolo 12, qui invece useremo lo sviluppo guidato dal compilatore. Scriveremo il codice che chiama le funzioni che vogliamo, e poi guarderemo agli errori dal compilatore per determinare cosa dovremmo cambiare dopo per far funzionare il codice. Prima di farlo, tuttavia, esploreremo la tecnica che non useremo come punto di partenza.

Generare un Thread per Ogni Richiesta

Per prima cosa, esploriamo come potrebbe apparire il nostro codice se creasse un nuovo thread per ogni connessione. Come detto, questa non è la nostra soluzione finale a causa dei problemi legati al numero illimitato di thread che potrebbero essere creati, ma è un punto di partenza per avere un server multi-thread funzionante. Poi, aggiungeremo il thread pool come miglioramento, e confrontare le due soluzioni sarà più facile.

Il Listato 21-11 mostra le modifiche da fare a main per generare un nuovo thread per gestire ogni stream nel ciclo for.

File: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            gestisci_connessione(stream);
        });
    }
}

fn gestisci_connessione(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "ciao.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "ciao.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contenuto = fs::read_to_string(filename).unwrap();
    let lunghezza = contenuto.len();

    let risposta =
        format!("{status_line}\r\nContent-Length: {lunghezza}\r\n\r\n{contenuto}");

    stream.write_all(risposta.as_bytes()).unwrap();
}
Listato 21-11: Generare un nuovo thread per ogni stream

Come hai imparato nel Capitolo 16, thread::spawn creerà un nuovo thread e poi eseguirà il codice della chiusura nel nuovo thread. Se esegui questo codice e carichi /attesa nel tuo browser, poi / in altre due schede del browser, vedrai infatti che le richieste a / non devono aspettare che /attesa finisca. Tuttavia, come abbiamo menzionato, questo alla fine sovraccaricherà il sistema perché staresti creando nuovi thread senza alcun limite.

Potresti anche ricordare dal Capitolo 17 che questa è esattamente la situazione in cui async e await eccellono! Tieni a mente questo mentre costruiamo il thread pool e pensa a come le cose sarebbero diverse o uguali con async.

Creare un Numero Finito di Thread

Vogliamo che il nostro thread pool funzioni in un modo simile e familiare in modo che passare dai thread a un thread pool non richieda grandi cambiamenti al codice che usa la nostra API. Il Listato 21-12 mostra l’interfaccia ipotetica per una struct ThreadPool che vogliamo usare invece di thread::spawn.

File: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            gestisci_connessione(stream);
        });
    }
}

fn gestisci_connessione(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "ciao.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "ciao.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contenuto = fs::read_to_string(filename).unwrap();
    let lunghezza = contenuto.len();

    let risposta =
        format!("{status_line}\r\nContent-Length: {lunghezza}\r\n\r\n{contenuto}");

    stream.write_all(risposta.as_bytes()).unwrap();
}
Listato 21-12: La nostra interfaccia ideale per ThreadPool

Usiamo ThreadPool::new per creare un nuovo gruppo di thread con un numero configurabile di thread, in questo caso 4. Poi, nel ciclo for, pool.execute ha un interfaccia simile a thread::spawn nel senso che prende una chiusura che il gruppo dovrebbe eseguire per ogni stream. Dobbiamo implementare pool.execute in modo che prenda la chiusura e la dia a un thread nel gruppo per eseguirla. Questo codice non si compilerà ancora, ma lo proveremo in modo che il compilatore ci guidi su come sistemarlo.

Costruire ThreadPool Usando lo Sviluppo Guidato dal Compilatore

Apporta le modifiche riportate nel Listato 21-12 a src/main.rs, quindi utilizziamo gli errori del compilatore da cargo check per guidare il nostro sviluppo. Ecco il primo errore che otteniamo:

$ cargo check
    Checking ciao v0.1.0 (file:///progetti/ciao)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `ciao` (bin "ciao") due to 1 previous error

Ottimo! Questo errore ci dice che abbiamo bisogno di un type o modulo ThreadPool, quindi ne creeremo uno adesso. La nostra implementazione di ThreadPool sarà indipendente dal tipo di lavoro svolto dal nostro server web. Quindi, trasformiamo il crate ciao da un crate binario a un crate libreria per contenere la nostra implementazione di ThreadPool. Dopo essere passati a un crate libreria, potremmo anche utilizzare la libreria del thread pool separata per qualsiasi lavoro che vogliamo svolgere utilizzando un gruppo di thread, non solo per eseguire richieste web.

Creiamo un file src/lib.rs che contenga quanto segue, ovvero la definizione più semplice di una struct ThreadPool che possiamo avere per ora:

File: src/lib.rs
pub struct ThreadPool;

Quindi, modifichiamo il file main.rs per portare ThreadPool nello scope dal crate della libreria aggiungendo il seguente codice all’inizio di src/main.rs:

File: src/main.rs
use ciao::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            gestisci_connessione(stream);
        });
    }
}

fn gestisci_connessione(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "ciao.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "ciao.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contenuto = fs::read_to_string(filename).unwrap();
    let lunghezza = contenuto.len();

    let risposta =
        format!("{status_line}\r\nContent-Length: {lunghezza}\r\n\r\n{contenuto}");

    stream.write_all(risposta.as_bytes()).unwrap();
}

Questo codice ancora non funzionerà, ma controlliamolo di nuovo per ottenere il prossimo errore che dobbiamo risolvere:

$ cargo check
    Checking ciao v0.1.0 (file:///progetti/ciao)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `ciao` (bin "ciao") due to 1 previous error

Questo errore indica che ora dobbiamo creare una funzione associata denominata new per ThreadPool. Sappiamo anche che new deve avere un parametro che può accettare 4 come argomento e dovrebbe restituire un’istanza ThreadPool. Implementiamo la funzione new più semplice che abbia queste caratteristiche:

File: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(dimensione: usize) -> ThreadPool {
        ThreadPool
    }
}

Abbiamo scelto usize come type del parametro dimensione perché sappiamo che un numero negativo di thread non ha alcun senso. Sappiamo anche che useremo questo 4 come numero di elementi in una collezione di thread, che è lo scopo del type usize, come discusso nella sezione “Il Type Intero”del Capitolo 3.

Controlliamo nuovamente il codice:

$ cargo check
    Checking ciao v0.1.0 (file:///progetti/ciao)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `ciao` (bin "ciao") due to 1 previous error

Ora l’errore si verifica perché non abbiamo un metodo execute su ThreadPool. Ricorda dalla sezione “Creare un Numero Finito di Thread che abbiamo deciso che il nostro thread pool dovrebbe avere un’interfaccia simile a thread::spawn. In aggiunta, implementeremo la funzione execute in modo che prenda la chiusura che riceve e la dia a un thread inattivo nel gruppo per eseguirla.

Definiremo il metodo execute su ThreadPool in modo che accetti una chiusura come parametro. Ricorda dalla sezione “Restituire i Valori Catturati dalle Chiusure” del Capitolo 13 che possiamo accettare chiusure come parametri con tre diversi trait: Fn, FnMut e FnOnce. Dobbiamo decidere quale tipo di chiusura utilizzare in questo caso. Sappiamo che finiremo per fare qualcosa di simile all’implementazione della libreria standard thread::spawn, quindi possiamo guardare quali sono i vincoli definiti nella firma di thread::spawn sul suo parametro. La documentazione ci mostra quanto segue:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Il parametro di type F è quello che ci interessa in questo caso; il parametro di type T è relativo al valore di ritorno, e non ci interessa. Possiamo vedere che spawn utilizza FnOnce come vincolo di trait su F. Probabilmente è quello che vogliamo anche noi, perché alla fine passeremo l’argomento che otteniamo in execute a spawn. Possiamo essere ulteriormente sicuri che FnOnce sia il trait che vogliamo utilizzare perché il thread per l’esecuzione di una richiesta eseguirà solo una volta la chiusura di quella richiesta, il che corrisponde a Once in FnOnce.

Il parametro di type F ha anche il vincolo di trait Send e il vincolo di lifetime 'static, che sono utili nella nostra situazione: abbiamo bisogno di Send per trasferire la chiusura da un thread all’altro e di 'static perché non sappiamo quanto tempo impiegherà il thread per eseguire quanto richiesto. Creiamo un metodo execute su ThreadPool che prenderà un parametro generico di type F con questi vincoli:

File: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    // --taglio--
    pub fn new(dimensione: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Continuiamo a usare () dopo FnOnce perché questo FnOnce rappresenta una chiusura che non accetta parametri e restituisce il type unitario (). Proprio come nelle definizioni delle funzioni, il type di ritorno può essere omesso dalla firma, ma anche se non abbiamo parametri, abbiamo comunque bisogno delle parentesi.

Ancora una volta, questa è l’implementazione più semplice del metodo execute: non fa nulla, ma stiamo solo cercando di far compilare il nostro codice. Controlliamo di nuovo:

$ cargo check
    Checking ciao v0.1.0 (file:///progetti/ciao)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

Si compila! Ma nota che se provi cargo run e fai una richiesta nel browser, vedrai gli errori nel browser che abbiamo visto all’inizio del capitolo. La nostra libreria non sta ancora chiamando la chiusura passata a execute!

Nota: un detto che potreste sentire riguardo ai linguaggi con compilatori rigorosi, come Haskell e Rust, è “Se il codice si compila, funziona”. Ma questo detto non è universalmente vero. Il nostro progetto si compila, ma non fa assolutamente nulla! Se stessimo realizzando un progetto reale e completo, questo sarebbe un buon momento per iniziare a scrivere dei test unitari per verificare che il codice si compili e abbia il comportamento che desideriamo.

Una considerazione: cosa cambierebbe qui se eseguissimo una future invece di una chiusura?

Validare il Numero di Thread in new

Non stiamo facendo nulla con i parametri di new e execute. Implementiamo il corpo di queste funzioni con il comportamento desiderato. Per iniziare, pensiamo a new. In precedenza abbiamo scelto un type senza segno per il parametro dimensione perché un gruppo con un numero negativo di thread non ha senso. Tuttavia, anche un gruppo con zero thread non ha senso, ma zero è un usize perfettamente valido. Aggiungeremo del codice per verificare che dimensione sia maggiore di zero prima di restituire un’istanza ThreadPool e faremo andare in panic il programma se riceve uno zero utilizzando la macro assert!, come mostrato nel Listato 21-13.

File: “src/lib.rs”
pub struct ThreadPool;

impl ThreadPool {
    /// Crea un nuovo ThreadPool.
    ///
    /// La dimensione é il numero di thread nel gruppo.
    ///
    /// # Panics
    ///
    /// La funzione `new` genera panic se la dimensione é zero.
    pub fn new(dimensione: usize) -> ThreadPool {
        assert!(dimensione > 0);

        ThreadPool
    }

    // --taglio--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listato 21-13: Implementazione di ThreadPool::new per generare un errore se dimensione è zero

Abbiamo anche aggiunto della documentazione per il nostro ThreadPool con commenti di documentazione. Nota che abbiamo seguito le buone pratiche di documentazione aggiungendo una sezione che evidenzia le situazioni in cui la nostra funzione può generare un panic, come discusso nel Capitolo 14. Prova ad eseguire cargo doc --open e clicca sulla struct ThreadPool per vedere come appare la documentazione generata per new!

Invece di aggiungere la macro assert! come abbiamo fatto qui, potremmo cambiare new in build e restituire un Result come abbiamo fatto con Config::build nel progetto I/O nel Listato 12-9. Ma in questo caso abbiamo deciso che cercare di creare un thread pool senza alcun thread dovrebbe essere un errore irrecuperabile. Se ti senti ambizioso, prova a scrivere una funzione chiamata build con la seguente firma per confrontarla con la funzione new:

pub fn build(size: usize) -> Result<ThreadPool, ErroreCreazionePool>

Creare Spazio per Memorizzare i Thread

Ora che abbiamo un modo per sapere che abbiamo un numero valido di thread da memorizzare nel gruppo, possiamo creare quei thread e memorizzarli nella struct ThreadPool prima di restituire la struct. Ma come si “memorizza” un thread? Diamo un’altra occhiata alla firma di thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

La funzione spawn restituisce un JoinHandle<T>, dove T è il type che la chiusura restituisce. Proviamo anche noi a usare JoinHandle e vediamo cosa succede. Nel nostro caso, le chiusure che stiamo passando al thread pool gestiranno la connessione e non restituiranno nulla, quindi T sarà il type unitario ().

Il codice nel Listato 21-14 verrà compilato, ma non creerà ancora alcun thread. Abbiamo modificato la definizione di ThreadPool per contenere un vettore di istanze thread::JoinHandle<()>, inizializzato il vettore con una capacità di dimensione, impostato un ciclo for che eseguirà del codice per creare i thread e restituito un’istanza ThreadPool che li contiene.

File: “src/lib.rs”
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --taglio--
    /// Crea un nuovo ThreadPool.
    ///
    /// La dimensione é il numero di thread nel gruppo.
    ///
    /// # Panics
    ///
    /// La funzione `new` genera panic se la dimensione é zero.
    pub fn new(dimensione: usize) -> ThreadPool {
        assert!(dimensione > 0);

        let mut threads = Vec::with_capacity(dimensione);

        for _ in 0..dimensione {
            // crea qualche thread e memorizzali in un vettore
        }

        ThreadPool { threads }
    }
    // --taglio--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listato 21-14: Creazione di un vettore per ThreadPool per contenere i thread

Abbiamo portato std::thread nello scope della libreria crate perché stiamo utilizzando thread::JoinHandle come type degli elementi nel vettore in ThreadPool.

Una volta ricevuta una dimensione valida, il nostro ThreadPool crea un nuovo vettore in grado di contenere dimensione elementi. La funzione with_capacity svolge lo stesso compito di Vec::new, ma con un’importante differenza: pre-alloca lo spazio nel vettore. Poiché sappiamo che dobbiamo memorizzare dimensione elementi nel vettore, eseguire questa allocazione in anticipo è leggermente più efficiente rispetto all’utilizzo di Vec::new, che ridimensiona se stesso man mano che vengono inseriti gli elementi.

Quando esegui nuovamente cargo check, dovrebbe avere esito positivo.

Inviare Codice da ThreadPool a un Thread

Abbiamo lasciato un commento nel ciclo for nel Listato 21-14 riguardo alla creazione di thread. Qui vedremo come creare effettivamente i thread. La libreria standard fornisce thread::spawn come metodo per creare thread, e thread::spawn si aspetta di ricevere del codice che il thread deve eseguire non appena viene creato. Tuttavia, nel nostro caso, vogliamo creare i thread e farli attendere il codice che invieremo in seguito. L’implementazione dei thread della libreria standard non include alcun modo per farlo; dobbiamo implementarlo manualmente.

Implementeremo questo comportamento introducendo una nuova struttura dati intermedia tra ThreadPool e i thread che gestirà questo nuovo comportamento. Chiameremo questa struttura dati Worker, che è un termine comune nelle implementazioni di pooling. Il Worker raccoglie il codice che deve essere eseguito ed esegue il codice nel suo thread.

Pensate alle persone che lavorano in cucina in un ristorante: i lavoratori aspettano che arrivino le ordinazioni dai clienti, quindi sono responsabili di prendere quelle ordinazioni e soddisfarle.

Invece di memorizzare un vettore di istanze JoinHandle<()> nel thread pool, memorizzeremo le istanze della struct Worker. Ogni Worker memorizzerà una singola istanza JoinHandle<()>. Quindi, implementeremo un metodo su Worker che prenderà una chiusura di codice da eseguire e la invierà al thread già in esecuzione per l’esecuzione. Assegneremo anche a ciascun Worker un id in modo da poter distinguere tra le diverse istanze di Worker nel gruppo quando facciamo logging o debugging.

Ecco il nuovo processo che avverrà quando creeremo un ThreadPool. Implementeremo il codice che invia la chiusura al thread dopo aver impostato Worker in questo modo:

  1. Definire una struttura Worker che contenga un id e un JoinHandle<()>.
  2. Modificare ThreadPool in modo che contenga un vettore di istanze Worker.
  3. Definire una funzione Worker::new che accetta un numero id e restituisce un’istanza Worker che contiene l’id e un thread generato con una chiusura vuota.
  4. In ThreadPool::new, utilizzare il contatore del ciclo for per generare un id, creare un nuovo Worker con quell’id e memorizzare il Worker nel vettore.

Se sei pronto per una sfida, prova a implementare queste modifiche da solo prima di guardare il codice nel Listato 21-15.

Pronto? Ecco il Listato 21-15 con un modo per apportare le modifiche precedenti.

File: “src/lib.rs”
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --taglio--
    /// Crea un nuovo ThreadPool.
    ///
    /// La dimensione é il numero di thread nel gruppo.
    ///
    /// # Panics
    ///
    /// La funzione `new` genera _panic_ se la dimensione é zero.
    pub fn new(dimensione: usize) -> ThreadPool {
        assert!(dimensione > 0);

        let mut workers = Vec::with_capacity(dimensione);

        for id in 0..dimensione {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --taglio--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listato 21-15: Modifica di ThreadPool per contenere istanze Worker invece di contenere direttamente i thread

Abbiamo cambiato il nome del campo su ThreadPool da threads a workers perché ora contiene istanze Worker invece di istanze JoinHandle<()>. Usiamo il contatore nel ciclo for come argomento per Worker::new e memorizziamo ogni nuovo Worker nel vettore chiamato workers.

Il codice esterno (come il nostro server in src/main.rs) non ha bisogno di conoscere i dettagli di implementazione relativi all’uso di una struct Worker all’interno di ThreadPool, quindi rendiamo privata la struct Worker e la sua funzione new. La funzione Worker::new utilizza l’id che le forniamo e memorizza un’istanza JoinHandle<()> creata generando un nuovo thread utilizzando una chiusura vuota.

Nota: se il sistema operativo non è in grado di creare un thread perché non ci sono risorse di sistema sufficienti, thread::spawn andrà in panic. Ciò causerà il panic dell’intero server, anche se la creazione di alcuni thread potrebbe avere esito positivo. Per semplicità, questo comportamento va bene, ma in un’implementazione di thread pool di produzione, probabilmente si preferirà utilizzare std::thread::Builder e il suo spawn che restituisce invece Result.

Questo codice verrà compilato e memorizzerà il numero di istanze Worker che abbiamo specificato come argomento di ThreadPool::new. Ma non stiamo ancora elaborando la chiusura che otteniamo in execute. Vediamo come farlo.

Inviare Richieste ai Thread Tramite Canali

Il prossimo problema che affronteremo è che le chiusure fornite a thread::spawn non fanno assolutamente nulla. Attualmente, otteniamo la chiusura che vogliamo eseguire nel metodo execute. Ma dobbiamo fornire a thread::spawn una chiusura da eseguire quando creiamo ogni Worker durante la creazione del ThreadPool.

Vogliamo che le struct Worker che abbiamo appena creato recuperino il codice da eseguire da una coda contenuta nel ThreadPool e lo inviino al proprio thread per l’esecuzione.

I canali che abbiamo imparato a conoscere nel Capitolo 16, un modo semplice per comunicare tra due thread, sarebbero perfetti per questo caso d’uso. Useremo un canale da far funzionare come coda di lavori, e execute invierà un lavoro dal ThreadPool alle istanze Worker, che invieranno il lavoro al proprio thread. Ecco il piano:

  1. Il ThreadPool creerà un canale e diverrà l’estremità mittente.
  2. Ogni Worker diverrà il ricevitore.
  3. Creeremo una nuova struct Job che conterrà le chiusure che vogliamo inviare lungo il canale.
  4. Il metodo execute invierà il lavoro che vuole eseguire tramite il mittente.
  5. Nel suo thread, il Worker eseguirà un ciclo sul suo ricevitore ed eseguirà le chiusure di tutti i lavori che riceve.

Iniziamo creando un canale in ThreadPool::new e conservando il mittente nell’istanza ThreadPool, come mostrato nel Listato 21-16. La struttura Job per ora non contiene nulla, ma sarà il type di elemento che invieremo nel canale.

File: “src/lib.rs”
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    mittente: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --taglio--
    /// Crea un nuovo ThreadPool.
    ///
    /// La dimensione é il numero di thread nel gruppo.
    ///
    /// # Panics
    ///
    /// La funzione `new` genera panic se la dimensione é zero.
    pub fn new(dimensione: usize) -> ThreadPool {
        assert!(dimensione > 0);

        let (mittente, ricevitore) = mpsc::channel();

        let mut workers = Vec::with_capacity(dimensione);

        for id in 0..dimensione {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, mittente }
    }
    // --taglio--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listato 21-16: Modifica di ThreadPool per memorizzare il mittente di un canale che trasmette istanze Job

In ThreadPool::new, creiamo il nostro nuovo canale e facciamo in modo che il gruppo mantenga l’estremità del mittente. Questo verrà compilato con successo.

Proviamo a passare un ricevitore del canale a ciascun Worker mentre il thread pool crea il canale. Sappiamo che vogliamo utilizzare il ricevitore nel thread che le istanze Worker generano, quindi faremo riferimento al parametro ricevitore nella chiusura. Il codice nel Listato 21-17 non è ancora completamente compilabile.

File: “src/lib.rs”
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    mittente: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --taglio--
    /// Crea un nuovo ThreadPool.
    ///
    /// La dimensione é il numero di thread nel gruppo.
    ///
    /// # Panics
    ///
    /// La funzione `new` genera panic se la dimensione é zero.
    pub fn new(dimensione: usize) -> ThreadPool {
        assert!(dimensione > 0);

        let (mittente, ricevitore) = mpsc::channel();

        let mut workers = Vec::with_capacity(dimensione);

        for id in 0..dimensione {
            workers.push(Worker::new(id, ricevitore));
        }

        ThreadPool { workers, mittente }
    }
    // --taglio--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --taglio--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, ricevitore: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            ricevitore;
        });

        Worker { id, thread }
    }
}
Listato 21-17: Passaggio del ricevitore a ciascun Worker

Abbiamo apportato alcune piccole e semplici modifiche: passiamo il ricevitore a Worker::new, per poi usarlo all’interno della chiusura.

Quando proviamo a controllare questo codice, otteniamo questo errore:

$ cargo check
    Checking ciao v0.1.0 (file:///progetti/ciao)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:30:42
   |
25 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
29 |         for id in 0..dimensione {
   |         ----------------------- inside of this loop
30 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:57:33
   |
57 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
29 ~         let mut value = Worker::new(id, receiver);
30 ~         for id in 0..dimensione {
31 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `ciao` (lib) due to 1 previous error

Il codice sta cercando di passare ricevitore a più istanze Worker. Questo non funzionerà, come ricorderai dal Capitolo 16: l’implementazione del canale che Rust fornisce è multi-produttore, singolo consumatore. Ciò significa che non possiamo semplicemente clonare l’estremità di ricezione del canale per correggere questo codice. Inoltre, non vogliamo inviare un messaggio più volte a più ricevitori; vogliamo un unico elenco di messaggi con più istanze Worker in modo che ogni messaggio venga elaborato una sola volta.

Inoltre, rimuovere un lavoro dalla coda del canale comporta la mutazione del ricevitore, quindi i thread hanno bisogno di un modo sicuro per condividere e modificare ricevitore; altrimenti, potremmo ottenere condizioni di competizione (come descritto nel capitolo 16).

Ricorda i puntatori intelligenti thread-safe discussi nel Capitolo 16: per condividere la ownership tra più thread e consentire ai thread di modificare il valore, dobbiamo utilizzare Arc<Mutex<T>>. Il type Arc consentirà a più istanze Worker di possedere il ricevitore, mentre Mutex garantirà che solo un Worker alla volta riceva un lavoro dal ricevitore. Il Listato 21-18 mostra le modifiche che dobbiamo apportare.

File: “src/lib.rs”
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --taglio--

pub struct ThreadPool {
    workers: Vec<Worker>,
    mittente: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --taglio--
    /// Crea un nuovo ThreadPool.
    ///
    /// La dimensione é il numero di thread nel gruppo.
    ///
    /// # Panics
    ///
    /// La funzione `new` genera panic se la dimensione é zero.
    pub fn new(dimensione: usize) -> ThreadPool {
        assert!(dimensione > 0);

        let (mittente, ricevitore) = mpsc::channel();

        let ricevitore = Arc::new(Mutex::new(ricevitore));

        let mut workers = Vec::with_capacity(dimensione);

        for id in 0..dimensione {
            workers.push(Worker::new(id, Arc::clone(&ricevitore)));
        }

        ThreadPool { workers, mittente }
    }

    // --taglio--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --taglio--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --taglio--
        let thread = thread::spawn(|| {
            ricevitore;
        });

        Worker { id, thread }
    }
}
Listato 21-18: Condivisione del ricevitore tra le istanze Worker utilizzando Arc e Mutex

In ThreadPool::new, inseriamo il ricevitore in un Arc e in un Mutex. Per ogni nuovo Worker, cloniamo l’Arc per aumentare il conteggio dei reference in modo che le istanze Worker possano condividere la ownership del ricevitore.

Con queste modifiche, il codice viene compilato! Ci siamo quasi!

Implementare il Metodo execute

Implementiamo infine il metodo execute su ThreadPool. Modificheremo anche Job da una struct a un alias di type per un oggetto trait che contiene il type della chiusura che execute riceve. Come discusso nella sezione “Sinonimi e Alias di Type nel Capitolo 20, gli alias di type ci consentono di abbreviare i type lunghi per facilitarne l’uso. Guarda il Listato 21-19.

File: “src/lib.rs”
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    mittente: mpsc::Sender<Job>,
}

// --taglio--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --taglio--
    /// Crea un nuovo ThreadPool.
    ///
    /// La dimensione é il numero di thread nel gruppo.
    ///
    /// # Panics
    ///
    /// La funzione `new` genera panic se la dimensione é zero.
    pub fn new(dimensione: usize) -> ThreadPool {
        assert!(dimensione > 0);

        let (mittente, ricevitore) = mpsc::channel();

        let ricevitore = Arc::new(Mutex::new(ricevitore));

        let mut workers = Vec::with_capacity(dimensione);

        for id in 0..dimensione {
            workers.push(Worker::new(id, Arc::clone(&ricevitore)));
        }

        ThreadPool { workers, mittente }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.mittente.send(job).unwrap();
    }
}

// --taglio--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            ricevitore;
        });

        Worker { id, thread }
    }
}
Listato 21-19: Creazione di un alias di type Job per una Box che contiene ogni chiusura e quindi invio del lavoro al canale

Dopo aver creato una nuova istanza Job utilizzando la chiusura ottenuta in execute, inviamo quel lavoro tramite l’estremità mittente del canale. Chiamiamo unwrap su send nel caso in cui l’invio fallisca. Ciò potrebbe accadere se, ad esempio, interrompiamo l’esecuzione di tutti i nostri thread, il che significa che l’estremità ricevente ha smesso di ricevere nuovi messaggi. Al momento, non possiamo interrompere l’esecuzione dei nostri thread: i nostri thread continuano a essere eseguiti finché esiste il pool. Il motivo per cui utilizziamo unwrap è che sappiamo che il caso di errore non si verificherà, ma il compilatore non lo sa.

Ma non abbiamo ancora finito! Nel Worker, la nostra chiusura passata a thread::spawn continua a fare riferimento solo all’estremità ricevente del canale. Invece, abbiamo bisogno che la chiusura continui a girare all’infinito, chiedendo all’estremità ricevente del canale un lavoro ed eseguendolo quando lo riceve. Apportiamo la modifica mostrata nel Listato 21-20 a Worker::new.

File: “src/lib.rs”
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    mittente: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Crea un nuovo ThreadPool.
    ///
    /// La dimensione é il numero di thread nel gruppo.
    ///
    /// # Panics
    ///
    /// La funzione `new` genera panic se la dimensione é zero.
    pub fn new(dimensione: usize) -> ThreadPool {
        assert!(dimensione > 0);

        let (mittente, ricevitore) = mpsc::channel();

        let ricevitore = Arc::new(Mutex::new(ricevitore));

        let mut workers = Vec::with_capacity(dimensione);

        for id in 0..dimensione {
            workers.push(Worker::new(id, Arc::clone(&ricevitore)));
        }

        ThreadPool { workers, mittente }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.mittente.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --taglio--

impl Worker {
    fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = ricevitore.lock().unwrap().recv().unwrap();

                println!("Worker {id} ha un lavoro; in esecuzione.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listato 21-20: Ricezione ed esecuzione dei lavori nel thread dell’istanza Worker

Qui, chiamiamo prima lock sul ricevitore per acquisire il mutex, quindi chiamiamo unwrap per generare un panic in caso di errori. L’acquisizione di un blocco potrebbe non riuscire se il mutex è in uno stato poisoned (avvelenato), cosa che può accadere se un altro thread va in panic mentre mantiene il blocco invece di rilasciarlo. In questa situazione, chiamare unwrap per far andare in panic questo thread è l’azione corretta da intraprendere. Potresti anche cambiare questo unwrap in un expect con un messaggio di errore che sia significativo per te.

Se otteniamo il blocco sul mutex, chiamiamo recv per ricevere un Job dal canale. Un ultimo unwrap supera anche qui eventuali errori, che potrebbero verificarsi se il thread che detiene il mittente si è arrestato, in modo simile a come il metodo send restituisce Err se il ricevente si arresta.

La chiamata a recv blocca, quindi se non ci sono ancora lavori, il thread corrente attenderà fino a quando non sarà disponibile un lavoro. Il Mutex<T> assicura che solo un thread Worker alla volta tenti di richiedere un lavoro.

Il nostro gruppo di thread è ora in uno stato funzionante! Esegui cargo run e fai alcune richieste:

$ cargo run
   Compiling ciao v0.1.0 (file:///progetti/ciao)
warning: field `workers` is never read    
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `ciao` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 2.57s
     Running `target/debug/ciao`
Worker 1 ha un lavoro; in esecuzione.
Worker 1 ha un lavoro; in esecuzione.
Worker 0 ha un lavoro; in esecuzione.
Worker 3 ha un lavoro; in esecuzione.
Worker 2 ha un lavoro; in esecuzione.
Worker 1 ha un lavoro; in esecuzione.

Successo! Ora abbiamo un thread pool che esegue le connessioni in modo asincrono. Non vengono mai creati più di quattro thread, quindi il nostro sistema non andrà in sovraccarico se il server riceve molte richieste. Se effettuiamo una richiesta a /attesa, il server sarà in grado di servire altre richieste facendo in modo che un altro thread le esegua.

Nota: se apri /attesa in più finestre del browser contemporaneamente, potrebbero caricarsi una alla volta a intervalli di cinque secondi. Alcuni browser web eseguono più istanze della stessa richiesta in sequenza per motivi di cache. Questa limitazione non è causata dal nostro server web.

Questo è un buon momento per fare una pausa e considerare come il codice nei Listati 21-18, 21-19 e 21-20 sarebbe diverso se utilizzassimo le future invece di una chiusura per il lavoro da svolgere. Quali type cambierebbero? In che modo le firme dei metodi sarebbero diverse, se lo fossero? Quali parti del codice rimarrebbero invariate?

Dopo aver appreso il funzionamento del ciclo while let nei capitoli 17 e 19, potresti chiederti perché non abbiamo scritto il codice del thread Worker come mostrato nel Listato 21-21.

File: “src/lib.rs”
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    mittente: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Crea un nuovo ThreadPool.
    ///
    /// La dimensione é il numero di thread nel gruppo.
    ///
    /// # Panics
    ///
    /// La funzione `new` genera panic se la dimensione é zero.
    pub fn new(dimensione: usize) -> ThreadPool {
        assert!(dimensione > 0);

        let (mittente, ricevitore) = mpsc::channel();

        let ricevitore = Arc::new(Mutex::new(ricevitore));

        let mut workers = Vec::with_capacity(dimensione);

        for id in 0..dimensione {
            workers.push(Worker::new(id, Arc::clone(&ricevitore)));
        }

        ThreadPool { workers, mittente }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.mittente.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --taglio--

impl Worker {
    fn new(id: usize, ricevitore: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = ricevitore.lock().unwrap().recv() {
                println!("Worker {id} ha un lavoro; in esecuzione.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listato 21-21: Un’implementazione alternativa di Worker::new utilizzando while let

Questo codice viene compilato ed eseguito, ma non produce il comportamento concorrente desiderato: una richiesta lenta continuerà a causare l’attesa delle altre richieste per essere elaborate. Il motivo è piuttosto sottile: la struct Mutex non ha un metodo pubblico unlock perché la ownership del blocco si basa sulla longevità di MutexGuard<T> all’interno del LockResult<MutexGuard<T>> che il metodo lock restituisce. In fase di compilazione, il controllo dei prestiti può quindi applicare la regola secondo cui non è possibile accedere a una risorsa protetta da un Mutex a meno che non si detenga il blocco. Tuttavia, questa implementazione può anche comportare il mantenimento del blocco più a lungo del previsto se non si presta attenzione alla lifetime del MutexGuard<T>.

Il codice nel Listato 21-20 che utilizza let job = ricevitore.lock().unwrap().recv().unwrap(); funziona perché con let, qualsiasi valore temporaneo utilizzato nell’espressione a destra del segno di uguale viene immediatamente eliminato al termine dell’istruzione let. Tuttavia, while let (e if let e match) non elimina i valori temporanei fino alla fine del blocco associato. Nel Listato 21-21, il blocco rimane attivo per tutta la durata della chiamata a job(), il che significa che altre istanze Worker non possono ricevere lavori.