Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Trasferire Dati tra Thread Usando il Passaggio di Messaggi

Un approccio sempre più diffuso per garantire una concomitanza sicura è il passaggio di messaggi (message passing), in cui i thread o gli attori comunicano inviandosi messaggi contenenti dati. Ecco l’idea in uno slogan tratto dalla documentazione del linguaggio Go: “Non comunicare condividendo la memoria; condividi invece la memoria comunicando.”

Per realizzare la concomitanza tramite invio di messaggi, la libreria standard di Rust fornisce un’implementazione dei canali. Un canale è un concetto generale di programmazione con cui i dati vengono inviati da un thread all’altro.

Puoi immaginare un canale nella programmazione come un canale d’acqua direzionale, come un ruscello o un fiume. Se metti una paperella di gomma in un fiume, questa viaggerà a valle fino alla fine del corso d’acqua.

Un canale ha due estremità: una trasmettitore e una ricevitore. L’estremità del trasmettitore è il punto a monte in cui metti la paperella di gomma nel fiume, mentre l’estremità del ricevitore è il punto in cui la paperella di gomma finisce a valle. Una parte del tuo codice chiama i metodi del trasmettitore con i dati che vuoi inviare, mentre un’altra parte controlla la ricezione dei messaggi in arrivo. Un canale si dice chiuso se una delle due estremità del trasmettitore o del ricevitore viene cancellata.

Qui lavoreremo su un programma che ha un thread che genera valori e li invia attraverso un canale, e un altro thread che riceve i valori e li stampa. Per illustrare la funzione, invieremo semplici valori tra i thread utilizzando un canale. Una volta che avrai acquisito familiarità con la tecnica, potrai utilizzare i canali per qualsiasi thread che abbia bisogno di comunicare tra loro, come ad esempio un sistema di chat o un sistema in cui molti thread eseguono parti di un calcolo e inviano i risultati parziali a un thread che aggrega i risultati.

Iniziamo nel Listato 16-6 creando semplicemente un canale senza fargli fare nulla. Nota che questo non verrà ancora compilato perché Rust non può dire che tipo di valori vogliamo inviare attraverso il canale.

File: src/main.rs
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}
Listato 16-6: Creare un canale e assegnare le due estremità a tx e rx

Creiamo un nuovo canale utilizzando la funzione mpsc::channel; mpsc sta per multiple producer, single consumer. In breve, il modo in cui la libreria standard di Rust implementa i canali significa che un canale può avere più punti di invio (produttori) che producono valori, ma un solo punto di ricezione (consumatore) che li riceve. Immagina più ruscelli che confluiscono in un unico grande fiume: tutto ciò che viene inviato lungo uno qualsiasi dei ruscelli finirà in un unico fiume alla fine. Inizieremo con un singolo produttore per ora, ma aggiungeremo più produttori quando questo esempio funzionerà.

La funzione mpsc::channel restituisce una tupla, in cui il primo elemento è l’estremità di invio, il trasmettitore, e il secondo elemento è l’estremità di ricezione, il ricevitore. Le abbreviazioni tx e rx sono tradizionalmente utilizzate in molti campi per indicare rispettivamente il trasmettitore e il ricevitore, quindi chiamiamo le nostre variabili in questo modo per indicare ciascuna estremità. Stiamo utilizzando un’istruzione let con un pattern che destruttura la tupla; parleremo più approfonditamente dell’uso dei pattern nelle istruzioni let e della destrutturazione nel Capitolo 19. Per ora, sappi che l’utilizzo di un’istruzione let in questo modo è un approccio conveniente per estrarre i pezzi della tupla restituita da mpsc::channel.

Spostiamo l’estremità di trasmissione in un thread generato e facciamogli inviare una stringa in modo che il thread generato comunichi con il thread principale, come mostrato nel Listato 16-7. Questo è come mettere una paperella di gomma nel fiume a monte o inviare un messaggio di chat da un thread all’altro.

File: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("ciao");
        tx.send(val).unwrap();
    });
}
Listato 16-7: Spostamento di tx in un thread generato e invio di “ciao”

Anche in questo caso, usiamo thread::spawn per generare un nuovo thread e poi usiamo move per spostare tx nella chiusura in modo che il thread generato possieda tx. Il thread generato deve possedere il trasmettitore per poter inviare messaggi attraverso il canale.

Il trasmettitore ha un metodo send (invio) che accetta il valore che vogliamo inviare. Il metodo send restituisce un type Result<T, E>, quindi se il ricevitore è già stato cancellato e non c’è nessuno che possa ricevere quanto inviato, l’operazione di invio restituirà un errore. In questo esempio, chiamiamo unwrap per andare in panic in caso di errore. Ma in un’applicazione reale, lo gestiremmo in modo corretto: torna al Capitolo 9 per rivedere le strategie per una corretta gestione degli errori.

Nel Listato 16-8, otterremo il valore dal ricevitore nel thread principale. È come recuperare la paperella di gomma dall’acqua alla fine del fiume o ricevere un messaggio di chat.

File: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("ciao");
        tx.send(val).unwrap();
    });

    let ricevuto = rx.recv().unwrap();
    println!("Ricevuto: {ricevuto}");
}
Listato 16-8: Ricevere il valore “ciao” nel thread principale e stamparlo

Il ricevitore ha due metodi utili: recv e try_recv. Utilizzeremo recv, abbreviazione di receive (ricevi), che bloccherà l’esecuzione del thread principale e aspetterà che un valore venga ricevuto dal canale. Una volta ricevuto un valore, recv lo restituirà in un Result<T, E>. Quando il trasmettitore si chiude, recv restituirà un errore per segnalare che non arriveranno altri valori.

Il metodo try_recv invece non aspetterà, ma restituisce immediatamente un Result<T, E>: un valore Ok che contiene un messaggio se è disponibile e un valore Err se non ci sono messaggi questa volta. L’uso di try_recv è utile se questo thread ha altro lavoro da fare mentre aspetta i messaggi: potremmo scrivere un ciclo che chiama try_recv di tanto in tanto, gestisce un messaggio se è disponibile e altrimenti svolge altro lavoro per un po’ di tempo fino a quando non viene controllato di nuovo.

In questo esempio abbiamo usato recv per semplicità; non abbiamo altro lavoro da fare per il thread principale oltre all’attesa dei messaggi, quindi bloccare il thread principale è appropriato.

Quando eseguiamo il codice nel Listato 16-8, vedremo il valore stampato dal thread principale:

Ricevuto: ciao

Perfetto!

Trasferire Ownership Attraverso i Canali

Le regole di ownership giocano un ruolo fondamentale nell’invio dei messaggi perché ti aiutano a scrivere codice sicuro e concorrente. Prevenire gli errori nella programmazione concorrente è il vantaggio di pensare in termini di ownership in tutti i tuoi programmi Rust. Facciamo un esperimento per mostrare come i canali e la ownership lavorino insieme per prevenire i problemi: proveremo a usare un valore val nel thread generato dopo che lo abbiamo inviato nel canale. Prova a compilare il codice nel Listato 16-9 per vedere perché questo codice non è consentito.

File: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("ciao");
        tx.send(val).unwrap();
        println!("val è {val}");
    });

    let ricevuto = rx.recv().unwrap();
    println!("Ricevuto: {ricevuto}");
}
Listato 16-9: Tentativo di utilizzare val dopo averlo inviato nel canale

In questo caso, cerchiamo di stampare val dopo averlo inviato nel canale tramite tx.send. Consentire questa operazione sarebbe una cattiva idea: una volta che il valore è stato inviato a un altro thread, questo thread potrebbe modificarlo o liberarne la memoria prima che noi cerchiamo di utilizzarlo di nuovo. Potenzialmente, le modifiche dell’altro thread potrebbero causare errori o risultati inaspettati a causa di dati incoerenti o inesistenti. Tuttavia, Rust ci dà un errore se proviamo a compilare il codice del Listato 16-9:

$ cargo run
   Compiling passaggio-messaggio v0.1.0 (file:///progetti/passaggio-messaggio)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:26
   |
 8 |         let val = String::from("ciao");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
 9 |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val è {val}");
   |                          ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `passaggio-messaggio` (bin "passaggio-messaggio") due to 1 previous error

Il nostro errore di concorrenza ha causato un errore in fase di compilazione. La funzione send prende ownership del suo parametro e quando il valore viene inviato, è il destinatario che ne prende la ownership. Questo ci impedisce di utilizzare accidentalmente il valore dopo averlo inviato; il sistema di ownership controlla che tutto sia a posto.

Inviare Più Valori

Il codice del Listato 16-8 è stato compilato ed eseguito, ma non mostrava chiaramente che due thread separati stavano parlando tra loro attraverso il canale.

Nel Listato 16-10, abbiamo apportato alcune modifiche che dimostreranno che il codice del Listato 16-8 è in esecuzione simultanea: il thread generato ora invierà più messaggi e farà una pausa di un secondo tra un messaggio e l’altro.

File: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let valori = vec![
            String::from("ciao"),
            String::from("dal"),
            String::from("thread"),
            String::from("!!!"),
        ];

        for valore in valori {
            tx.send(valore).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for ricevuto in rx {
        println!("Ricevuto: {ricevuto}");
    }
}
Listato 16-10: Invio di più messaggi e pausa tra uno e l’altro

Questa volta, il thread generato ha un vettore di stringhe che vogliamo inviare al thread principale. Le iteriamo, inviandole singolarmente, e facciamo una pausa tra una e l’altra chiamando la funzione thread::sleep con un valore Duration di 1 secondo.

Nel thread principale, non chiamiamo più esplicitamente la funzione recv, ma trattiamo rx come un iteratore. Per ogni valore ricevuto, lo stampiamo. Quando il canale viene chiuso perché i messaggi inviati finiscono, l’iterazione termina.

Quando esegui il codice del Listato 16-10, dovresti vedere il seguente output con una pausa di 1 secondo tra una riga e l’altra:

Ricevuto: ciao
Ricevuto: dal
Ricevuto: thread
Ricevuto: !!!

Poiché non abbiamo alcun codice che mette in pausa o ritarda il ciclo for nel thread principale, possiamo dire che il thread principale sta effettivamente aspettando di ricevere i valori dal thread generato.

Creare più Produttori

Prima abbiamo detto che mpsc è l’acronimo di multiple producer, single consumer. Mettiamo in pratica mpsc ed espandiamo il codice del Listato 16-10 per creare thread multipli che tutti inviano i valori allo stesso ricevitore. Possiamo farlo clonando il trasmettitore, come mostrato nel Listato 16-11.

File: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --taglio--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let valori = vec![
            String::from("ciao"),
            String::from("dal"),
            String::from("thread"),
            String::from("!!!"),
        ];

        for valore in valori {
            tx1.send(valore).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let valori = vec![
            String::from("ancora"),
            String::from("messaggi"),
            String::from("per"),
            String::from("te"),
        ];

        for valore in valori {
            tx.send(valore).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for ricevuto in rx {
        println!("Ricevuto: {ricevuto}");
    }

    // --taglio--
}
Listato 16-11: Invio di più messaggi da più produttori

Questa volta, prima di creare il primo thread generato, chiamiamo clone sul trasmettitore. In questo modo avremo un nuovo trasmettitore da passare al primo thread generato. Passiamo poi il trasmettitore originale a un secondo thread generato. In questo modo avremo due thread, ognuno dei quali invierà messaggi diversi all’unico ricevitore.

Quando esegui il codice, l’output dovrebbe essere simile a questo:

Ricevuto: ciao
Ricevuto: altri
Ricevuto: dal
Ricevuto: messaggi
Ricevuto: thread
Ricevuto: per
Ricevuto: !!!
Ricevuto: te

Potresti vedere i valori in un altro ordine, a seconda del tuo sistema. Questo è ciò che rende la concorrenza interessante e difficile. Se sperimenti con thread::sleep, dandogli vari valori nei diversi thread, ogni esecuzione sarà più non deterministica e creerà ogni volta un output diverso.

Ora che abbiamo visto come funzionano i canali, analizziamo un altro metodo di concorrenza.