Applicare la Concorrenza con Async
In questa sezione, vedremo come usare async per affrontare alcune sfide di concorrenza che abbiamo già visto con i thread nel Capitolo 16. Dato che abbiamo già parlato dei concetti chiave, ci concentreremo sulle differenze tra thread e future.
In molti casi, le API per lavorare con la concorrenza usando async sono molto simili a quelle per usare i thread. In altri casi, finiscono per essere piuttosto diverse. Anche quando le API sembrano simili tra thread e async, spesso hanno comportamenti diversi e quasi sempre caratteristiche di prestazioni differenti.
Creare un Nuovo Task con spawn_task
La prima operazione che abbiamo affrontato nella sezione “Creare un Nuovo
Thread con spawn” del Capitolo 16 era
effettuare un conteggio su due thread separati. Facciamo la stessa cosa usando
async. Il crate trpl fornisce una funzione spawn_task che sembra molto
simile all’API thread::spawn, e una funzione sleep che è una versione
async dell’API thread::sleep. Possiamo usarle insieme per implementare
l’esempio di conteggio, come mostrato nel Listato 17-6.
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::block_on(async { trpl::spawn_task(async { for i in 1..10 { println!("ciao numero {i} dal primo task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("ciao numero {i} dal secondo task!"); trpl::sleep(Duration::from_millis(500)).await; } }); }
Come punto di partenza, impostiamo la nostra funzione main con
trpl::block_on in modo che la nostra funzione di livello superiore possa
essere async.
Nota: Da questo punto in poi nel capitolo, ogni esempio includerà lo stesso esatto codice di incapsulamento con
trpl::block_oninmain, quindi spesso lo salteremo proprio come facciamo conmain. Ricordati di includerlo nel tuo codice!
Poi scriviamo due loop all’interno di quel blocco, ciascuno contenente una
chiamata a trpl::sleep, che aspetta mezzo secondo (500 millisecondi) prima di
inviare il prossimo messaggio. Mettiamo un ciclo nel corpo di un
trpl::spawn_task e l’altro è un ciclo for nel task principale. Aggiungiamo
anche un await dopo le chiamate sleep.
Questo codice si comporta in modo simile all’implementazione basata su thread, inclusa la possibilità che tu possa vedere i messaggi apparire in un ordine diverso nel tuo terminale quando lo esegui:
ciao numero 1 dal secondo task!
ciao numero 1 dal primo task!
ciao numero 2 dal primo task!
ciao numero 2 dal secondo task!
ciao numero 3 dal primo task!
ciao numero 3 dal secondo task!
ciao numero 4 dal primo task!
ciao numero 4 dal secondo task!
ciao numero 5 dal primo task!
Questa versione si ferma non appena il ciclo for nel corpo del blocco async
principale finisce, perché il task avviato da spawn_task viene chiuso quando
la funzione main termina. Se vuoi che si esegua fino al completamento del
task, dovrai usare un join handle per aspettare che il primo task si
completi. Con i thread, abbiamo usato il metodo join per “bloccare” fino a
quando il thread avesse finito di eseguirsi. Nel Listato 17-7, possiamo usare
await per fare la stessa cosa, perché l’handle del task stesso è una
future. Il suo type Output è un Result, quindi dopo averlo atteso
(await), dobbiamo anche esporlo (unwrap).
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::run(async { let handle = trpl::spawn_task(async { for i in 1..10 { println!("ciao numero {i} dal primo task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("ciao numero {i} dal secondo task!"); trpl::sleep(Duration::from_millis(500)).await; } handle.await.unwrap(); }); }
await con un join handle per eseguire un task fino al completamentoQuesta versione aggiornata si esegue fino a quando entrambi i cicli finiscono:
ciao numero 1 dal secondo task!
ciao numero 1 dal primo task!
ciao numero 2 dal primo task!
ciao numero 2 dal secondo task!
ciao numero 3 dal primo task!
ciao numero 3 dal secondo task!
ciao numero 4 dal primo task!
ciao numero 4 dal secondo task!
ciao numero 5 dal primo task!
ciao numero 6 dal primo task!
ciao numero 7 dal primo task!
ciao numero 8 dal primo task!
ciao numero 9 dal primo task!
Finora, sembra che async e thread ci diano simili risultati, solo con una
sintassi diversa: usando await invece di chiamare join sull’handle, e
aspettando le chiamate sleep.
La differenza più grande è che non abbiamo dovuto avviare un altro thread del
sistema operativo per farlo. In realtà, non dobbiamo nemmeno avviare un task
qui. Poiché i blocchi async si compilano in future anonime, possiamo mettere
ogni ciclo in un blocco async e far eseguire al runtime entrambe fino al
completamento usando la funzione trpl::join.
Nella sezione “Attendere Che Tutti i Thread Finiscano” del Capitolo 16, abbiamo mostrato come usare il metodo join sul
type JoinHandle restituito quando si chiama std::thread::spawn. La
funzione trpl::join è simile, ma per le future. Quando gli dai due future,
produce una singola nuova future il cui output è una tupla che contiene
l’output di ciascuna future che hai passato una volta che entrambe si
completano. Quindi, nel Listato 17-8, usiamo trpl::join per aspettare che sia
fut1 che fut2 finiscano. Non aspettiamo fut1 e fut2 ma invece la nuova
future prodotta da trpl::join. Ignoriamo l’output, perché è solo una tupla
che contiene due valori unitari.
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::run(async { let fut1 = async { for i in 1..10 { println!("ciao numero {i} dal primo task!"); trpl::sleep(Duration::from_millis(500)).await; } }; let fut2 = async { for i in 1..5 { println!("ciao numero {i} dal secondo task!"); trpl::sleep(Duration::from_millis(500)).await; } }; trpl::join(fut1, fut2).await; }); }
trpl::join per aspettare due future anonimeQuando lo eseguiamo, vediamo entrambe le future eseguirsi fino al completamento:
ciao numero 1 dal primo task!
ciao numero 1 dal secondo task!
ciao numero 2 dal primo task!
ciao numero 2 dal secondo task!
ciao numero 3 dal primo task!
ciao numero 3 dal secondo task!
ciao numero 4 dal primo task!
ciao numero 4 dal secondo task!
ciao numero 5 dal primo task!
ciao numero 6 dal primo task!
ciao numero 7 dal primo task!
ciao numero 8 dal primo task!
ciao numero 9 dal primo task!
Ora, vedrai lo stesso ordine ogni volta, il che è molto diverso da quello che
abbiamo visto con i thread e con trpl::spawn_task nel Listato 17-7. Questo
perché la funzione trpl::join è equa, il che significa che controlla
ciascuna future con la stessa frequenza, alternando tra loro, e non lascia che
una “corra avanti” se l’altra è pronta. Con i thread, il sistema operativo
decide quale thread controllare e per quanto tempo farlo eseguire. Con async
Rust, il runtime decide quale task controllare. (Nella pratica, i dettagli
si complicano perché un runtime async potrebbe in realtà usare i thread
del sistema operativo come parte della gestione della concorrenza, quindi
garantire l’equità può essere più lavoro per un runtime, ma è comunque
possibile!) I runtime non devono garantire l’equità per qualsiasi operazione
data, e spesso offrono diverse API per farti scegliere se vuoi l’equità o meno.
Prova alcune di queste varianti sull’attesa delle future e vedi cosa fanno:
- Rimuovi il blocco async da uno o entrambi i cicli.
- Aspetta ogni blocco async immediatamente dopo averlo definito.
- Incapsula solo il primo ciclo in un blocco async e aspetta la future risultante dopo il corpo del secondo ciclo.
Per una sfida extra, cerca di capire quale sarà l’output in ciascun caso prima di eseguire il codice!
Inviare Dati Tra Due Task Usando il Passaggio di Messaggi
Condividere dati tra future sarà anch’esso familiare: useremo di nuovo il passaggio di messaggi, ma questa volta con le versioni async dei type e delle funzioni. Prenderemo una strada leggermente diversa rispetto a quella che abbiamo preso nella sezione “Trasferire Dati tra Thread Usando il Passaggio di Messaggi” del Capitolo 16 per illustrare alcune delle differenze chiave tra concorrenza basata su thread e concorrenza basata su future. Nel Listato 17-9, inizieremo con un singolo blocco async, non creando un task separato come avevamo creato un thread separato.
extern crate trpl; // necessario per test mdbook fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let val = String::from("ciao"); tx.send(val).unwrap(); let ricevuto = rx.recv().await.unwrap(); println!("ricevuto '{ricevuto}'"); }); }
tx e rxQui, usiamo trpl::channel, una versione async dell’API del canale
multi-produttore, singolo-consumatore che abbiamo usato con i thread nel
Capitolo 16. La versione async dell’API è solo un po’ diversa dalla versione
basata su thread: usa un ricevitore rx mutabile piuttosto che immutabile, e
il suo metodo recv produce una future che dobbiamo aspettare piuttosto che
produrre il valore direttamente. Ora possiamo inviare messaggi dal mittente al
ricevitore. Nota che non dobbiamo avviare un thread separato o nemmeno un
task; dobbiamo solo aspettare la chiamata rx.recv.
Il metodo sincrono Receiver::recv in std::mpsc::channel blocca fino a quando
non riceve un messaggio. Il metodo trpl::Receiver::recv non lo fa, perché è
async. Invece di bloccare, restituisce il controllo al runtime fino a quando
non viene ricevuto un messaggio o la estremità di invio del canale si chiude. Al
contrario, non aspettiamo la chiamata send, perché non blocca. Non ne ha
bisogno, perché il canale in cui lo stiamo inviando è senza vincoli.
Nota: Poiché tutto questo codice async si esegue in un blocco async in una chiamata
trpl::block_on, tutto al suo interno può evitare di bloccare. Tuttavia, il codice fuori da esso si bloccherà sulla funzioneblock_onche restituisce. Questo è proprio lo scopo della funzionetrpl::block_on: ci permette di scegliere dove bloccare in un insieme di codice async, e quindi dove passare tra codice sincrono e asincrono.
Nota due cose in questo esempio. Prima di tutto, il messaggio arriverà subito. Secondo, anche se usiamo una future qui, non c’è ancora concorrenza. Tutto nell’elenco accade in sequenza, proprio come farebbe se non ci fossero future coinvolte.
Affrontiamo la prima parte inviando una serie di messaggi e “dormendo” tra di loro, come mostrato nel Listato 17-10.
extern crate trpl; // necessario per test mdbook
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let valori = vec![
String::from("ciao"),
String::from("dalla"),
String::from("future"),
String::from("!!!"),
];
for valore in valori {
tx.send(valore).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(val) = rx.recv().await {
println!("ricevuto '{val}'");
}
});
}
await tra ogni messaggioOltre ad inviare i messaggi, dobbiamo riceverli. In questo caso, poiché sappiamo
quanti messaggi stanno arrivando, potremmo farlo manualmente chiamando
rx.recv().await quattro volte. Nel mondo reale, tuttavia, di solito stiamo
aspettando un numero sconosciuto di messaggi, quindi dobbiamo continuare ad
aspettare fino a quando non determiniamo che non ci sono più messaggi.
Nel Listato 16-10, abbiamo usato un ciclo for per elaborare tutti gli elementi
ricevuti da un canale sincrono. Rust non ha ancora un modo per scrivere un ciclo
for su una serie di elementi prodotta asincronamente, quindi dobbiamo usare
un ciclo che non avevamo ancora visto: il ciclo condizionale while let. Questo
è la versione ciclo del costrutto if let che abbiamo visto nella sezione
“Controllare il Flusso con if let e let else” del Capitolo 6. Il ciclo continuerà ad eseguirsi finché il pattern
specificato continua a corrispondere al valore.
La chiamata rx.recv produce una future, che aspettiamo. Il runtime metterà
in pausa la future fino a quando non sarà pronta. Una volta che arriva un
messaggio, la future si risolverà in Some(messaggio) tutte le volte che
arriva un messaggio. Quando il canale si chiude, indipendentemente dal fatto che
siano arrivati alcuni messaggi, la future si risolverà invece in None per
indicare che non ci sono più valori e quindi dobbiamo smettere di aspettare.
Il ciclo while let mette insieme tutto questo. Se il risultato della chiamata
rx.recv().await è Some(messaggio), otteniamo accesso al messaggio e possiamo
usarlo nel corpo del ciclo, proprio come potremmo fare con if let. Se il
risultato è None, il ciclo termina. Ogni volta che il ciclo si completa,
raggiunge di nuovo il punto di attesa, quindi il runtime lo mette di nuovo in
pausa fino a quando non arriva un altro messaggio.
Il codice invia e riceve ora tutti i messaggi con successo. Purtroppo, ci sono ancora un paio di problemi. Innanzitutto, i messaggi non arrivano a intervalli di mezzo secondo. Arrivano tutti insieme, 2 secondi (2.000 millisecondi) dopo aver avviato il programma. In secondo luogo, questo programma non si arresta mai! Invece, aspetta per sempre nuovi messaggi. Dovrai interromperlo usando ctrl-C.
Il Codice Dentro Un Blocco Async Viene Eseguito Linearmente
Iniziamo esaminando perché i messaggi arrivano tutti insieme dopo il ritardo
cumulativo, piuttosto che arrivare con ritardi tra ciascuno. All’interno di un
dato blocco async, l’ordine in cui compaiono le parole chiave await nel
codice è anche l’ordine in cui vengono eseguite quando il programma si avvia.
C’è un singolo blocco async nel Listato 17-10, quindi tutto in esso si esegue
linearmente. Non c’è ancora concorrenza. Tutte le chiamate a tx.send accadono,
intercalate con tutte le chiamate trpl::sleep e i loro punti di attesa
associati. Solo allora il ciclo while let può passare in rassegna alcuni dei
punti di attesa sulle chiamate recv.
Per ottenere il comportamento che vogliamo, dove il ritardo accade tra ogni
messaggio, dobbiamo mettere le operazioni tx e rx nei loro blocchi async
separati, come mostrato nel Listato 17-11. In questo modo il runtime può
eseguire ciascuno di essi separatamente usando trpl::join, proprio come nel
Listato 17-8. Ancora una volta, aspettiamo il risultato della chiamata a
trpl::join, non le future singole. Se avessimo aspettato le future singole
in sequenza, saremmo tornati a un flusso sequenziale, proprio quello che stiamo
cercando di non fare.
extern crate trpl; // necessario per test mdbook
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let valori = vec![
String::from("ciao"),
String::from("dalla"),
String::from("future"),
String::from("!!!"),
];
for valore in valori {
tx.send(valore).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(val) = rx.recv().await {
println!("ricevuto '{val}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
send e recv nei loro blocchi async e aspettare le future per quei blocchiCon il codice aggiornato nel Listato 17-11, i messaggi vengono stampati a intervalli di 500 millisecondi, piuttosto che tutti insieme dopo 2 secondi.
Spostare la Ownership Dentro un Blocco Async
Il programma non si arresta comunque, per il modo in cui il ciclo while let
interagisce con trpl::join:
- La future restituita da
trpl::joinsi completa solo una volta che entrambe le future passate ad esso si sono completate. - La future
tx_futsi completa una volta che ha finito di dormire dopo aver inviato l’ultimo messaggio invals. - La future
rx_futnon si completerà fino a quando il ciclowhile letnon termina. - Il ciclo
while letnon terminerà fino a quando l’attesa dirx.recvproduceNone. - L’attesa di
rx.recvrestituiràNonesolo una volta che l’altra estremità del canale è chiusa. - Il canale si chiuderà solo se chiamiamo
rx.closeo quando l’estremità di invio,tx, viene eliminata. - Non chiamiamo
rx.closeda nessuna parte, etxnon verrà eliminato fino a quando il blocco async più esterno passato atrpl::block_onnon termina. - Il blocco non può terminare perché è bloccato su
trpl::joinin attesa di completamento, il che ci riporta all’inizio di questo elenco.
Al momento, il blocco async in cui inviamo i messaggi prende solo in prestito
tx perché inviare un messaggio non richiede la ownership, ma se potessimo
spostare tx in quel blocco async, verrebbe eliminato una volta che quel
blocco termina. Nella sezione “Catturare i Reference o Trasferire la
Ownership” del Capitolo 13, hai imparato come
usare la parola chiave move con le chiusure, e, come discusso nella sezione
“Usare le Chiusure move con i Thread” del
Capitolo 16, spesso dobbiamo spostare i dati nelle chiusure quando lavoriamo con
i thread. Le stesse dinamiche di base si applicano ai blocchi async, quindi
la parola chiave move funziona con i blocchi async proprio come fa con le
chiusure.
Nel Listato 17-12, cambiamo il blocco usato per inviare messaggi da async a
async move.
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { // --taglio-- let valori = vec![ String::from("ciao"), String::from("dalla"), String::from("future"), String::from("!!!"), ]; for valore in valori { tx.send(valore).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(val) = rx.recv().await { println!("ricevuto '{val}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
Quando eseguiamo questa versione del codice, si chiude correttamente dopo che l’ultimo messaggio è stato inviato e ricevuto. Adesso vediamo cosa deve cambiare per consentirci di inviare dati da più di una future.
Unire un Certo Numero di Future Con la Macro join!
Questo canale async è anche un canale multi-produttore, quindi possiamo
chiamare clone su tx se vogliamo inviare messaggi da più future, come
mostrato nel Listato 17-13.
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let valori = vec![ String::from("ciao"), String::from("dalla"), String::from("future"), String::from("!!!"), ]; for val in valori { tx1.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(valore) = rx.recv().await { println!("ricevuto '{valore}'"); } }; let tx_fut = async move { let valori = vec![ String::from("altri"), String::from("messaggi"), String::from("per"), String::from("te"), ]; for val in valori { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(1500)).await; } }; trpl::join!(tx1_fut, tx_fut, rx_fut); }); }
Prima di tutto, cloniamo tx, creando tx1 fuori dal primo blocco async.
Spostiamo tx1 in quel blocco proprio come abbiamo fatto prima con tx. Poi,
in seguito, spostiamo l’originale tx in un nuovo blocco async, dove
inviamo più messaggi con un ritardo leggermente maggiore. Abbiamo messo questo
nuovo blocco async dopo il blocco async per ricevere messaggi, ma andrebbe
bene anche se messo prima. La chiave è l’ordine in cui le future vengono
attese, non quello in cui vengono create.
Entrambi i blocchi async per inviare messaggi devono essere blocchi async move in modo che sia tx che tx1 vengano eliminati quando quei blocchi
finiscono. Altrimenti, finiremo di nuovo nello stesso ciclo infinito da cui
siamo partiti.
Infine, passiamo da trpl::join a trpl::join! per gestire la future
aggiuntiva: la macro join! accetta un numero arbitrario di future come
parametro, ma questo numero deve essere conosciuto durante la compilazione.
Discuteremo come gestire un numero non definito di future più avanti nel
capitolo.
Ora vediamo tutti i messaggi da entrambe le future di invio, e poiché le future di invio usano ritardi leggermente diversi dopo l’invio, i messaggi vengono anche ricevuti a quegli intervalli diversi:
ricevuto 'ciao'
ricevuto 'altri'
ricevuto 'dalla'
ricevuto 'future'
ricevuto 'messaggi'
ricevuto '!!!'
ricevuto 'per'
ricevuto 'te'
Abbiamo visto come gestire il passaggio di messaggi per inviare dati tra future, come il codice sia eseguito sequenzialmente all’interno di un blocco async, come spostare la ownership dentro il blocco async, e come unire più future. Ora vedremo come e perché informare il runtime di passare all’esecuzione di un altro task.