Trasferire Dati tra Thread Usando il Passaggio di Messaggi
Un approccio sempre più diffuso per garantire una concomitanza sicura è il passaggio di messaggi (message passing), in cui i thread o gli attori comunicano inviandosi messaggi contenenti dati. Ecco l’idea in uno slogan tratto dalla documentazione del linguaggio Go: “Non comunicare condividendo la memoria; condividi invece la memoria comunicando.”
Per realizzare la concomitanza tramite invio di messaggi, la libreria standard di Rust fornisce un’implementazione dei canali. Un canale è un concetto generale di programmazione con cui i dati vengono inviati da un thread all’altro.
Puoi immaginare un canale nella programmazione come un canale d’acqua direzionale, come un ruscello o un fiume. Se metti una paperella di gomma in un fiume, questa viaggerà a valle fino alla fine del corso d’acqua.
Un canale ha due estremità: una trasmettitore e una ricevitore. L’estremità del trasmettitore è il punto a monte in cui metti la paperella di gomma nel fiume, mentre l’estremità del ricevitore è il punto in cui la paperella di gomma finisce a valle. Una parte del tuo codice chiama i metodi del trasmettitore con i dati che vuoi inviare, mentre un’altra parte controlla la ricezione dei messaggi in arrivo. Un canale si dice chiuso se una delle due estremità del trasmettitore o del ricevitore viene cancellata.
Qui lavoreremo su un programma che ha un thread che genera valori e li invia attraverso un canale, e un altro thread che riceve i valori e li stampa. Per illustrare la funzione, invieremo semplici valori tra i thread utilizzando un canale. Una volta che avrai acquisito familiarità con la tecnica, potrai utilizzare i canali per qualsiasi thread che abbia bisogno di comunicare tra loro, come ad esempio un sistema di chat o un sistema in cui molti thread eseguono parti di un calcolo e inviano i risultati parziali a un thread che aggrega i risultati.
Iniziamo nel Listato 16-6 creando semplicemente un canale senza fargli fare nulla. Nota che questo non verrà ancora compilato perché Rust non può dire che tipo di valori vogliamo inviare attraverso il canale.
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
tx
e rx
Creiamo un nuovo canale utilizzando la funzione mpsc::channel
; mpsc
sta per
multiple producer, single consumer. In breve, il modo in cui la libreria
standard di Rust implementa i canali significa che un canale può avere più punti
di invio (produttori) che producono valori, ma un solo punto di ricezione
(consumatore) che li riceve. Immagina più ruscelli che confluiscono in un
unico grande fiume: tutto ciò che viene inviato lungo uno qualsiasi dei ruscelli
finirà in un unico fiume alla fine. Inizieremo con un singolo produttore per
ora, ma aggiungeremo più produttori quando questo esempio funzionerà.
La funzione mpsc::channel
restituisce una tupla, in cui il primo elemento è
l’estremità di invio, il trasmettitore, e il secondo elemento è l’estremità di
ricezione, il ricevitore. Le abbreviazioni tx
e rx
sono tradizionalmente
utilizzate in molti campi per indicare rispettivamente il trasmettitore e il
ricevitore, quindi chiamiamo le nostre variabili in questo modo per indicare
ciascuna estremità. Stiamo utilizzando un’istruzione let
con un pattern che
destruttura la tupla; parleremo più approfonditamente dell’uso dei pattern
nelle istruzioni let
e della destrutturazione nel Capitolo 19. Per ora, sappi
che l’utilizzo di un’istruzione let
in questo modo è un approccio conveniente
per estrarre i pezzi della tupla restituita da mpsc::channel
.
Spostiamo l’estremità di trasmissione in un thread generato e facciamogli inviare una stringa in modo che il thread generato comunichi con il thread principale, come mostrato nel Listato 16-7. Questo è come mettere una paperella di gomma nel fiume a monte o inviare un messaggio di chat da un thread all’altro.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("ciao"); tx.send(val).unwrap(); }); }
tx
in un thread generato e invio di “ciao”
Anche in questo caso, usiamo thread::spawn
per generare un nuovo thread e
poi usiamo move
per spostare tx
nella chiusura in modo che il thread
generato possieda tx
. Il thread generato deve possedere il trasmettitore per
poter inviare messaggi attraverso il canale.
Il trasmettitore ha un metodo send
(invio) che accetta il valore che
vogliamo inviare. Il metodo send
restituisce un type Result<T, E>
, quindi
se il ricevitore è già stato cancellato e non c’è nessuno che possa ricevere
quanto inviato, l’operazione di invio restituirà un errore. In questo esempio,
chiamiamo unwrap
per andare in panic in caso di errore. Ma in
un’applicazione reale, lo gestiremmo in modo corretto: torna al Capitolo 9 per
rivedere le strategie per una corretta gestione degli errori.
Nel Listato 16-8, otterremo il valore dal ricevitore nel thread principale. È come recuperare la paperella di gomma dall’acqua alla fine del fiume o ricevere un messaggio di chat.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("ciao"); tx.send(val).unwrap(); }); let ricevuto = rx.recv().unwrap(); println!("Ricevuto: {ricevuto}"); }
“ciao”
nel thread principale e stamparloIl ricevitore ha due metodi utili: recv
e try_recv
. Utilizzeremo recv
,
abbreviazione di receive (ricevi), che bloccherà l’esecuzione del thread
principale e aspetterà che un valore venga ricevuto dal canale. Una volta
ricevuto un valore, recv
lo restituirà in un Result<T, E>
. Quando il
trasmettitore si chiude, recv
restituirà un errore per segnalare che non
arriveranno altri valori.
Il metodo try_recv
invece non aspetterà, ma restituisce immediatamente un
Result<T, E>
: un valore Ok
che contiene un messaggio se è disponibile e un
valore Err
se non ci sono messaggi questa volta. L’uso di try_recv
è utile
se questo thread ha altro lavoro da fare mentre aspetta i messaggi: potremmo
scrivere un ciclo che chiama try_recv
di tanto in tanto, gestisce un messaggio
se è disponibile e altrimenti svolge altro lavoro per un po’ di tempo fino a
quando non viene controllato di nuovo.
In questo esempio abbiamo usato recv
per semplicità; non abbiamo altro lavoro
da fare per il thread principale oltre all’attesa dei messaggi, quindi
bloccare il thread principale è appropriato.
Quando eseguiamo il codice nel Listato 16-8, vedremo il valore stampato dal thread principale:
Ricevuto: ciao
Perfetto!
Trasferire Ownership Attraverso i Canali
Le regole di ownership giocano un ruolo fondamentale nell’invio dei messaggi
perché ti aiutano a scrivere codice sicuro e concorrente. Prevenire gli errori
nella programmazione concorrente è il vantaggio di pensare in termini di
ownership in tutti i tuoi programmi Rust. Facciamo un esperimento per mostrare
come i canali e la ownership lavorino insieme per prevenire i problemi:
proveremo a usare un valore val
nel thread generato dopo che lo abbiamo
inviato nel canale. Prova a compilare il codice nel Listato 16-9 per vedere
perché questo codice non è consentito.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("ciao");
tx.send(val).unwrap();
println!("val è {val}");
});
let ricevuto = rx.recv().unwrap();
println!("Ricevuto: {ricevuto}");
}
val
dopo averlo inviato nel canaleIn questo caso, cerchiamo di stampare val
dopo averlo inviato nel canale
tramite tx.send
. Consentire questa operazione sarebbe una cattiva idea: una
volta che il valore è stato inviato a un altro thread, questo thread
potrebbe modificarlo o liberarne la memoria prima che noi cerchiamo di
utilizzarlo di nuovo. Potenzialmente, le modifiche dell’altro thread
potrebbero causare errori o risultati inaspettati a causa di dati incoerenti o
inesistenti. Tuttavia, Rust ci dà un errore se proviamo a compilare il codice
del Listato 16-9:
$ cargo run
Compiling passaggio-messaggio v0.1.0 (file:///progetti/passaggio-messaggio)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:26
|
8 | let val = String::from("ciao");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val è {val}");
| ^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `passaggio-messaggio` (bin "passaggio-messaggio") due to 1 previous error
Il nostro errore di concorrenza ha causato un errore in fase di compilazione. La
funzione send
prende ownership del suo parametro e quando il valore viene
inviato, è il destinatario che ne prende la ownership. Questo ci impedisce di
utilizzare accidentalmente il valore dopo averlo inviato; il sistema di
ownership controlla che tutto sia a posto.
Inviare Più Valori
Il codice del Listato 16-8 è stato compilato ed eseguito, ma non mostrava chiaramente che due thread separati stavano parlando tra loro attraverso il canale.
Nel Listato 16-10, abbiamo apportato alcune modifiche che dimostreranno che il codice del Listato 16-8 è in esecuzione simultanea: il thread generato ora invierà più messaggi e farà una pausa di un secondo tra un messaggio e l’altro.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let valori = vec![
String::from("ciao"),
String::from("dal"),
String::from("thread"),
String::from("!!!"),
];
for valore in valori {
tx.send(valore).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for ricevuto in rx {
println!("Ricevuto: {ricevuto}");
}
}
Questa volta, il thread generato ha un vettore di stringhe che vogliamo
inviare al thread principale. Le iteriamo, inviandole singolarmente, e
facciamo una pausa tra una e l’altra chiamando la funzione thread::sleep
con
un valore Duration
di 1 secondo.
Nel thread principale, non chiamiamo più esplicitamente la funzione recv
, ma
trattiamo rx
come un iteratore. Per ogni valore ricevuto, lo stampiamo. Quando
il canale viene chiuso perché i messaggi inviati finiscono, l’iterazione
termina.
Quando esegui il codice del Listato 16-10, dovresti vedere il seguente output con una pausa di 1 secondo tra una riga e l’altra:
Ricevuto: ciao
Ricevuto: dal
Ricevuto: thread
Ricevuto: !!!
Poiché non abbiamo alcun codice che mette in pausa o ritarda il ciclo for
nel
thread principale, possiamo dire che il thread principale sta effettivamente
aspettando di ricevere i valori dal thread generato.
Creare più Produttori
Prima abbiamo detto che mpsc
è l’acronimo di multiple producer, single
consumer. Mettiamo in pratica mpsc
ed espandiamo il codice del Listato 16-10
per creare thread multipli che tutti inviano i valori allo stesso ricevitore.
Possiamo farlo clonando il trasmettitore, come mostrato nel Listato 16-11.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --taglio--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let valori = vec![
String::from("ciao"),
String::from("dal"),
String::from("thread"),
String::from("!!!"),
];
for valore in valori {
tx1.send(valore).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let valori = vec![
String::from("ancora"),
String::from("messaggi"),
String::from("per"),
String::from("te"),
];
for valore in valori {
tx.send(valore).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for ricevuto in rx {
println!("Ricevuto: {ricevuto}");
}
// --taglio--
}
Questa volta, prima di creare il primo thread generato, chiamiamo clone
sul
trasmettitore. In questo modo avremo un nuovo trasmettitore da passare al primo
thread generato. Passiamo poi il trasmettitore originale a un secondo thread
generato. In questo modo avremo due thread, ognuno dei quali invierà messaggi
diversi all’unico ricevitore.
Quando esegui il codice, l’output dovrebbe essere simile a questo:
Ricevuto: ciao
Ricevuto: altri
Ricevuto: dal
Ricevuto: messaggi
Ricevuto: thread
Ricevuto: per
Ricevuto: !!!
Ricevuto: te
Potresti vedere i valori in un altro ordine, a seconda del tuo sistema. Questo è
ciò che rende la concorrenza interessante e difficile. Se sperimenti con
thread::sleep
, dandogli vari valori nei diversi thread, ogni esecuzione sarà
più non deterministica e creerà ogni volta un output diverso.
Ora che abbiamo visto come funzionano i canali, analizziamo un altro metodo di concorrenza.