Applicare la Concorrenza con Async
In questa sezione, vedremo come usare async per affrontare alcune sfide di concorrenza che abbiamo già visto con i thread nel Capitolo 16. Dato che abbiamo già parlato dei concetti chiave, ci concentreremo sulle differenze tra thread e future.
In molti casi, le API per lavorare con la concorrenza usando async sono molto simili a quelle per usare i thread. In altri casi, finiscono per essere piuttosto diverse. Anche quando le API sembrano simili tra thread e async, spesso hanno comportamenti diversi e quasi sempre caratteristiche di prestazioni differenti.
Creare un Nuovo Task con spawn_task
La prima operazione che abbiamo affrontato in “Creare un Nuovo Thread con
spawn
” era contare su due thread separati.
Facciamo la stessa cosa usando async. Il crate trpl
fornisce una funzione
spawn_task
che sembra molto simile all’API thread::spawn
, e una funzione
sleep
che è una versione async dell’API thread::sleep
. Possiamo usarle
insieme per implementare l’esempio di conteggio, come mostrato nel Listato 17-6.
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::run(async { trpl::spawn_task(async { for i in 1..10 { println!("ciao numero {i} dal primo task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("ciao numero {i} dal secondo task!"); trpl::sleep(Duration::from_millis(500)).await; } }); }
Come punto di partenza, impostiamo la nostra funzione main
con trpl::run
in
modo che la nostra funzione di livello superiore possa essere async.
Nota: Da questo punto in poi nel capitolo, ogni esempio includerà lo stesso esatto codice di incapsulamento con
trpl::run
inmain
, quindi spesso lo salteremo proprio come facciamo conmain
. Non dimenticare di includerlo nel tuo codice!
Poi scriviamo due loop all’interno di quel blocco, ciascuno contenente una
chiamata a trpl::sleep
, che aspetta mezzo secondo (500 millisecondi) prima di
inviare il prossimo messaggio. Mettiamo un loop nel corpo di un
trpl::spawn_task
e l’altro è un ciclo for
nel task principale. Aggiungiamo
anche un await
dopo le chiamate sleep
.
Questo codice si comporta in modo simile all’implementazione basata su thread, inclusa la possibilità che tu possa vedere i messaggi apparire in un ordine diverso nel tuo terminale quando lo esegui:
ciao numero 1 dal secondo task!
ciao numero 1 dal primo task!
ciao numero 2 dal primo task!
ciao numero 2 dal secondo task!
ciao numero 3 dal primo task!
ciao numero 3 dal secondo task!
ciao numero 4 dal primo task!
ciao numero 4 dal secondo task!
ciao numero 5 dal primo task!
Questa versione si ferma non appena il ciclo for
nel corpo del blocco async
principale finisce, perché il task avviato da spawn_task
viene chiuso quando
la funzione main
termina. Se vuoi che si esegua fino al completamento del
task, dovrai usare un join handle per aspettare che il primo task si
completi. Con i thread, abbiamo usato il metodo join
per “bloccare” fino a
quando il thread avesse finito di eseguirsi. Nel Listato 17-7, possiamo usare
await
per fare la stessa cosa, perché l’handle del task stesso è un
future. Il suo type Output
è un Result
, quindi dopo averlo atteso
(await), dobbiamo anche esporlo (unwrap).
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::run(async { let handle = trpl::spawn_task(async { for i in 1..10 { println!("ciao numero {i} dal primo task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("ciao numero {i} dal secondo task!"); trpl::sleep(Duration::from_millis(500)).await; } handle.await.unwrap(); }); }
await
con un join handle per eseguire un task fino al completamentoQuesta versione aggiornata si esegue fino a quando entrambi i loop finiscono.
ciao numero 1 dal secondo task!
ciao numero 1 dal primo task!
ciao numero 2 dal primo task!
ciao numero 2 dal secondo task!
ciao numero 3 dal primo task!
ciao numero 3 dal secondo task!
ciao numero 4 dal primo task!
ciao numero 4 dal secondo task!
ciao numero 5 dal primo task!
ciao numero 6 dal primo task!
ciao numero 7 dal primo task!
ciao numero 8 dal primo task!
ciao numero 9 dal primo task!
Finora, sembra che async e thread ci diano gli stessi risultati di base,
solo con una sintassi diversa: usando await
invece di chiamare join
sull’handle, e aspettando le chiamate sleep
.
La differenza più grande è che non abbiamo dovuto avviare un altro thread del
sistema operativo per farlo. In realtà, non dobbiamo nemmeno avviare un task
qui. Poiché i blocchi async si compilano in future anonime, possiamo mettere
ogni loop in un blocco async e far eseguire al runtime entrambe fino al
completamento usando la funzione trpl::join
.
Nella sezione “Attendere Che Tutti i Thread Finiscano”, abbiamo mostrato come usare il metodo join
sul type JoinHandle
restituito quando si chiama std::thread::spawn
. La funzione trpl::join
è
simile, ma per le future. Quando gli dai due future, produce una singola
nuova future il cui output è una tupla che contiene l’output di ciascuna
future che hai passato una volta che entrambe si completano. Quindi, nel
Listato 17-8, usiamo trpl::join
per aspettare che sia fut1
che fut2
finiscano. Non aspettiamo fut1
e fut2
ma invece la nuova future prodotta
da trpl::join
. Ignoriamo l’output, perché è solo una tupla che contiene due
valori unitari.
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::run(async { let fut1 = async { for i in 1..10 { println!("ciao numero {i} dal primo task!"); trpl::sleep(Duration::from_millis(500)).await; } }; let fut2 = async { for i in 1..5 { println!("ciao numero {i} dal secondo task!"); trpl::sleep(Duration::from_millis(500)).await; } }; trpl::join(fut1, fut2).await; }); }
trpl::join
per aspettare due future anonimeQuando lo eseguiamo, vediamo entrambe le future eseguirsi fino al completamento:
ciao numero 1 dal primo task!
ciao numero 1 dal secondo task!
ciao numero 2 dal primo task!
ciao numero 2 dal secondo task!
ciao numero 3 dal primo task!
ciao numero 3 dal secondo task!
ciao numero 4 dal primo task!
ciao numero 4 dal secondo task!
ciao numero 5 dal primo task!
ciao numero 6 dal primo task!
ciao numero 7 dal primo task!
ciao numero 8 dal primo task!
ciao numero 9 dal primo task!
Ora, vedrai lo stesso ordine ogni volta, il che è molto diverso da quello che
abbiamo visto con i thread. Questo perché la funzione trpl::join
è equa,
il che significa che controlla ciascuna future con la stessa frequenza,
alternando tra loro, e non lascia che una “corra avanti” se l’altra è pronta.
Con i thread, il sistema operativo decide quale thread controllare e per
quanto tempo farlo eseguire. Con async Rust, il runtime decide quale task
controllare. (Nella pratica, i dettagli si complicano perché un runtime
async potrebbe in realtà usare i thread del sistema operativo come parte
della gestione della concorrenza, quindi garantire l’equità può essere più
lavoro per un runtime, ma è comunque possibile!) I runtime non devono
garantire l’equità per qualsiasi operazione data, e spesso offrono diverse API
per farti scegliere se vuoi l’equità o meno.
Prova alcune di queste varianti sull’attesa dei future e vedi cosa fanno:
- Rimuovi il blocco async da uno o entrambi i loop.
- Aspetta ogni blocco async immediatamente dopo averlo definito.
- Incapsula solo il primo loop in un blocco async e aspetta il future risultante dopo il corpo del secondo loop.
Per una sfida extra, cerca di capire quale sarà l’output in ciascun caso prima di eseguire il codice!
Conteggiare su Due Task Usando il Passaggio di Messaggi
Condividere dati tra future sarà familiare: useremo di nuovo il passaggio di messaggi, ma questa volta con le versioni async dei type e delle funzioni. Prenderemo una strada leggermente diversa rispetto a quella che abbiamo preso in “Usare il Passaggio di Messaggi per Trasferire Dati tra Thread” per illustrare alcune delle differenze chiave tra concorrenza basata su thread e concorrenza basata su future. Nel Listato 17-9, inizieremo con un singolo blocco async, non creando un task separato come avevamo creato un thread separato.
extern crate trpl; // necessario per test mdbook fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let val = String::from("ciao"); tx.send(val).unwrap(); let ricevuto = rx.recv().await.unwrap(); println!("ricevuto '{ricevuto}'"); }); }
tx
e rx
Qui, usiamo trpl::channel
, una versione async dell’API del canale
multi-produttore, singolo-consumatore che abbiamo usato con i thread nel
Capitolo 16. La versione async dell’API è solo un po’ diversa dalla versione
basata su thread: usa un ricevitore rx
mutabile piuttosto che immutabile, e
il suo metodo recv
produce una future che dobbiamo aspettare piuttosto che
produrre il valore direttamente. Ora possiamo inviare messaggi dal mittente al
ricevitore. Nota che non dobbiamo avviare un thread separato o nemmeno un
task; dobbiamo solo aspettare la chiamata rx.recv
.
Il metodo sincrono Receiver::recv
in std::mpsc::channel
blocca fino a quando
non riceve un messaggio. Il metodo trpl::Receiver::recv
non lo fa, perché è
async. Invece di bloccare, restituisce il controllo al runtime fino a quando
non viene ricevuto un messaggio o la estremità di invio del canale si chiude. Al
contrario, non aspettiamo la chiamata send
, perché non blocca. Non ne ha
bisogno, perché il canale in cui lo stiamo inviando è senza vincoli.
Nota: Poiché tutto questo codice async si esegue in un blocco async in una chiamata
trpl::run
, tutto al suo interno può evitare di bloccare. Tuttavia, il codice fuori da esso si bloccherà sulla funzionerun
che restituisce. Questo è proprio lo scopo della funzionetrpl::run
: ci permette di scegliere dove bloccare su un insieme di codice async, e quindi dove passare tra codice sincrono e asincrono. In molti runtime asincroni,run
è effettivamente chiamatoblock_on
proprio per questo motivo.
Nota due cose in questo esempio. Prima di tutto, il messaggio arriverà subito. Secondo, anche se usiamo una future qui, non c’è ancora concorrenza. Tutto nell’elenco accade in sequenza, proprio come farebbe se non ci fossero future coinvolte.
Affrontiamo la prima parte inviando una serie di messaggi e “dormendo” tra di loro, come mostrato nel Listato 17-10.
extern crate trpl; // necessario per test mdbook
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let valori = vec![
String::from("ciao"),
String::from("dalla"),
String::from("future"),
String::from("!!!"),
];
for valore in valori {
tx.send(valore).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(val) = rx.recv().await {
println!("ricevuto '{val}'");
}
});
}
await
tra ogni messaggioOltre ad inviare i messaggi, dobbiamo riceverli. In questo caso, poiché sappiamo
quanti messaggi stanno arrivando, potremmo farlo manualmente chiamando
rx.recv().await
quattro volte. Nel mondo reale, tuttavia, di solito stiamo
aspettando un numero sconosciuto di messaggi, quindi dobbiamo continuare ad
aspettare fino a quando non determiniamo che non ci sono più messaggi.
Nel Listato 16-10, abbiamo usato un ciclo for
per elaborare tutti gli elementi
ricevuti da un canale sincrono. Rust non ha ancora un modo per scrivere un ciclo
for
su una serie asincrona di elementi, quindi dobbiamo usare un ciclo che
non abbiamo visto prima: il ciclo condizionale while let
. Questo è la versione
ciclo della costruzione if let
che abbiamo visto nella sezione “Controllare
il Flusso con if let
e let else
”. Il ciclo
continuerà ad eseguirsi finché il pattern specificato continua a corrispondere
al valore.
La chiamata rx.recv
produce una future, che aspettiamo. Il runtime metterà
in pausa la future fino a quando non sarà pronta. Una volta che arriva un
messaggio, la future si risolverà in Some(messaggio)
tutte le volte che
arriva un messaggio. Quando il canale si chiude, indipendentemente dal fatto che
siano arrivati alcuni messaggi, la future si risolverà invece in None
per
indicare che non ci sono più valori e quindi dobbiamo smettere di aspettare.
Il ciclo while let
mette insieme tutto questo. Se il risultato della chiamata
rx.recv().await
è Some(messaggio)
, otteniamo accesso al messaggio e possiamo
usarlo nel corpo del ciclo, proprio come potremmo fare con if let
. Se il
risultato è None
, il ciclo termina. Ogni volta che il ciclo si completa,
raggiunge di nuovo il punto di attesa, quindi il runtime lo mette di nuovo in
pausa fino a quando non arriva un altro messaggio.
Il codice invia e riceve ora tutti i messaggi con successo. Purtroppo, ci sono ancora un paio di problemi. Innanzitutto, i messaggi non arrivano a intervalli di mezzo secondo. Arrivano tutti insieme, 2 secondi (2.000 millisecondi) dopo aver avviato il programma. In secondo luogo, questo programma non si arresta mai! Invece, aspetta per sempre nuovi messaggi. Dovrai interromperlo usando ctrl-C.
Iniziamo esaminando perché i messaggi arrivano tutti insieme dopo il ritardo
cumulativo, piuttosto che arrivare con ritardi tra ciascuno. All’interno di un
dato blocco async, l’ordine in cui compaiono le parole chiave await
nel
codice è anche l’ordine in cui vengono eseguite quando il programma si avvia.
C’è un singolo blocco async nel Listato 17-10, quindi tutto in esso si esegue
linearmente. Non c’è ancora concorrenza. Tutti i tx.send
accadono, intercalati
con tutte le chiamate trpl::sleep
e i loro punti di attesa associati. Solo
allora il ciclo while let
può passare in rassegna alcuni dei punti di attesa
sulle chiamate recv
.
Per ottenere il comportamento che vogliamo, dove il ritardo accade tra ogni
messaggio, dobbiamo mettere le operazioni tx
e rx
nei loro blocchi async
separati, come mostrato nel Listato 17-11. In questo modo il runtime può
eseguire ciascuno di essi separatamente usando trpl::join
, proprio come
nell’esempio del conteggio. Ancora una volta, aspettiamo il risultato della
chiamata a trpl::join
, non le future singole. Se avessimo aspettato le
future singole in sequenza, saremmo tornati a un flusso sequenziale, proprio
quello che stiamo cercando di non fare.
extern crate trpl; // necessario per test mdbook
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let valori = vec![
String::from("ciao"),
String::from("dalla"),
String::from("future"),
String::from("!!!"),
];
for valore in valori {
tx.send(valore).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(val) = rx.recv().await {
println!("ricevuto '{val}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
send
e recv
nei loro blocchi async
e aspettare le future per quei blocchiCon il codice aggiornato nel Listato 17-11, i messaggi vengono stampati a intervalli di 500 millisecondi, piuttosto che tutti insieme dopo 2 secondi.
Il programma non si arresta comunque, perché il ciclo while let
interagisce
con trpl::join
:
- La future restituita da
trpl::join
si completa solo una volta che entrambe le future passate ad esso si sono completate. - La future
tx
si completa una volta che ha finito di dormire dopo aver inviato l’ultimo messaggio invals
. - La future
rx
non si completerà fino a quando il ciclowhile let
non termina. - Il ciclo
while let
non terminerà fino a quando l’attesa dirx.recv
produceNone
. - L’attesa di
rx.recv
restituiràNone
solo una volta che l’altra estremità del canale è chiusa. - Il canale si chiuderà solo se chiamiamo
rx.close
o quando l’estremità invio,tx
, viene eliminata. - Non chiamiamo
rx.close
da nessuna parte, etx
non verrà eliminato fino a quando il blocco async più esterno passato atrpl::run
non termina. - Il blocco non può terminare perché è bloccato su
trpl::join
in attesa di completamento, il che ci riporta all’inizio di questo elenco.
Potremmo chiudere manualmente rx
chiamando rx.close
da qualche parte, ma non
ha molto senso. Fermarsi dopo aver gestito un numero arbitrario di messaggi
farebbe chiudere il programma, ma potremmo perdere messaggi. Abbiamo bisogno di
un altro modo per assicurarci che tx
venga eliminato prima della fine della
funzione.
Al momento, il blocco async in cui inviamo i messaggi prende in prestito solo
tx
perché inviare un messaggio non richiede la ownership, ma se potessimo
spostare tx
in quel blocco async, verrebbe eliminato una volta che quel
blocco termina. Nella sezione del Capitolo 13 “Catturare i Reference o
Trasferire la Ownership”, hai imparato come
usare la parola chiave move
con le chiusure, e, come discusso nella sezione
del Capitolo 16 “Usare le Chiusure move
con i Thread
”, spesso dobbiamo spostare i dati nelle chiusure quando lavoriamo con
i thread. Le stesse dinamiche di base si applicano ai blocchi async, quindi
la parola chiave move
funziona con i blocchi async proprio come fa con le
chiusure.
Nel Listato 17-12, cambiamo il blocco usato per inviare messaggi da async
a
async move
. Quando eseguiamo questa versione del codice, si chiude
correttamente dopo che l’ultimo messaggio è stato inviato e ricevuto.
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let valori = vec![ String::from("ciao"), String::from("dalla"), String::from("future"), String::from("!!!"), ]; for valore in valori { tx.send(valore).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(val) = rx.recv().await { println!("ricevuto '{val}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
Questo canale async è anche un canale multi-produttore, quindi possiamo
chiamare clone
su tx
se vogliamo inviare messaggi da più future, come
mostrato nel Listato 17-13.
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let valori = vec![ String::from("ciao"), String::from("dalla"), String::from("future"), String::from("!!!"), ]; for val in valori { tx1.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(valore) = rx.recv().await { println!("ricevuto '{valore}'"); } }; let tx_fut = async move { let valori = vec![ String::from("altri"), String::from("messaggi"), String::from("per"), String::from("te"), ]; for val in valori { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(1500)).await; } }; trpl::join3(tx1_fut, tx_fut, rx_fut).await; }); }
Prima di tutto, cloniamo tx
, creando tx1
fuori dal primo blocco async.
Spostiamo tx1
in quel blocco proprio come abbiamo fatto prima con tx
. Poi,
in seguito, spostiamo l’originale tx
in un nuovo blocco async, dove
inviamo più messaggi con un ritardo leggermente minore. Abbiamo messo questo
nuovo blocco async dopo il blocco async per ricevere messaggi, ma andrebbe
bene anche se messo prima. La chiave è l’ordine in cui le future vengono
attese, non quello in cui vengono create.
Entrambi i blocchi async per inviare messaggi devono essere blocchi async move
in modo che sia tx
che tx1
vengano eliminati quando quei blocchi
finiscono. Altrimenti, finiremo di nuovo nello stesso ciclo infinito da cui
siamo partiti. Infine, passiamo da trpl::join
a trpl::join3
per gestire la
future aggiuntiva.
Ora vediamo tutti i messaggi da entrambe le future di invio, e poiché le future di invio usano ritardi leggermente diversi dopo l’invio, i messaggi vengono anche ricevuti a quegli intervalli diversi.
ricevuto 'ciao'
ricevuto 'altri'
ricevuto 'dalla'
ricevuto 'future'
ricevuto 'messaggi'
ricevuto '!!!'
ricevuto 'per'
ricevuto 'te'
Questo è un buon inizio, ma ci limita a solo una manciata di future: due con
join
, o tre con join3
. Vediamo come potremmo lavorare con più future.