Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Applicare la Concorrenza con Async

In questa sezione, vedremo come usare async per affrontare alcune sfide di concorrenza che abbiamo già visto con i thread nel Capitolo 16. Dato che abbiamo già parlato dei concetti chiave, ci concentreremo sulle differenze tra thread e future.

In molti casi, le API per lavorare con la concorrenza usando async sono molto simili a quelle per usare i thread. In altri casi, finiscono per essere piuttosto diverse. Anche quando le API sembrano simili tra thread e async, spesso hanno comportamenti diversi e quasi sempre caratteristiche di prestazioni differenti.

Creare un Nuovo Task con spawn_task

La prima operazione che abbiamo affrontato in “Creare un Nuovo Thread con spawn era contare su due thread separati. Facciamo la stessa cosa usando async. Il crate trpl fornisce una funzione spawn_task che sembra molto simile all’API thread::spawn, e una funzione sleep che è una versione async dell’API thread::sleep. Possiamo usarle insieme per implementare l’esempio di conteggio, come mostrato nel Listato 17-6.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("ciao numero {i} dal primo task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("ciao numero {i} dal secondo task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}
Listato 17-6: Creare un nuovo task per stampare una cosa mentre il task principale ne stampa un’altra

Come punto di partenza, impostiamo la nostra funzione main con trpl::run in modo che la nostra funzione di livello superiore possa essere async.

Nota: Da questo punto in poi nel capitolo, ogni esempio includerà lo stesso esatto codice di incapsulamento con trpl::run in main, quindi spesso lo salteremo proprio come facciamo con main. Non dimenticare di includerlo nel tuo codice!

Poi scriviamo due loop all’interno di quel blocco, ciascuno contenente una chiamata a trpl::sleep, che aspetta mezzo secondo (500 millisecondi) prima di inviare il prossimo messaggio. Mettiamo un loop nel corpo di un trpl::spawn_task e l’altro è un ciclo for nel task principale. Aggiungiamo anche un await dopo le chiamate sleep.

Questo codice si comporta in modo simile all’implementazione basata su thread, inclusa la possibilità che tu possa vedere i messaggi apparire in un ordine diverso nel tuo terminale quando lo esegui:

ciao numero 1 dal secondo task!
ciao numero 1 dal primo task!
ciao numero 2 dal primo task!
ciao numero 2 dal secondo task!
ciao numero 3 dal primo task!
ciao numero 3 dal secondo task!
ciao numero 4 dal primo task!
ciao numero 4 dal secondo task!
ciao numero 5 dal primo task!

Questa versione si ferma non appena il ciclo for nel corpo del blocco async principale finisce, perché il task avviato da spawn_task viene chiuso quando la funzione main termina. Se vuoi che si esegua fino al completamento del task, dovrai usare un join handle per aspettare che il primo task si completi. Con i thread, abbiamo usato il metodo join per “bloccare” fino a quando il thread avesse finito di eseguirsi. Nel Listato 17-7, possiamo usare await per fare la stessa cosa, perché l’handle del task stesso è un future. Il suo type Output è un Result, quindi dopo averlo atteso (await), dobbiamo anche esporlo (unwrap).

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::time::Duration;

fn main() {
    trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("ciao numero {i} dal primo task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("ciao numero {i} dal secondo task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}
Listato 17-7: Usare await con un join handle per eseguire un task fino al completamento

Questa versione aggiornata si esegue fino a quando entrambi i loop finiscono.

ciao numero 1 dal secondo task!
ciao numero 1 dal primo task!
ciao numero 2 dal primo task!
ciao numero 2 dal secondo task!
ciao numero 3 dal primo task!
ciao numero 3 dal secondo task!
ciao numero 4 dal primo task!
ciao numero 4 dal secondo task!
ciao numero 5 dal primo task!
ciao numero 6 dal primo task!
ciao numero 7 dal primo task!
ciao numero 8 dal primo task!
ciao numero 9 dal primo task!

Finora, sembra che async e thread ci diano gli stessi risultati di base, solo con una sintassi diversa: usando await invece di chiamare join sull’handle, e aspettando le chiamate sleep.

La differenza più grande è che non abbiamo dovuto avviare un altro thread del sistema operativo per farlo. In realtà, non dobbiamo nemmeno avviare un task qui. Poiché i blocchi async si compilano in future anonime, possiamo mettere ogni loop in un blocco async e far eseguire al runtime entrambe fino al completamento usando la funzione trpl::join.

Nella sezione “Attendere Che Tutti i Thread Finiscano”, abbiamo mostrato come usare il metodo join sul type JoinHandle restituito quando si chiama std::thread::spawn. La funzione trpl::join è simile, ma per le future. Quando gli dai due future, produce una singola nuova future il cui output è una tupla che contiene l’output di ciascuna future che hai passato una volta che entrambe si completano. Quindi, nel Listato 17-8, usiamo trpl::join per aspettare che sia fut1 che fut2 finiscano. Non aspettiamo fut1 e fut2 ma invece la nuova future prodotta da trpl::join. Ignoriamo l’output, perché è solo una tupla che contiene due valori unitari.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("ciao numero {i} dal primo task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("ciao numero {i} dal secondo task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}
Listato 17-8: Usare trpl::join per aspettare due future anonime

Quando lo eseguiamo, vediamo entrambe le future eseguirsi fino al completamento:

ciao numero 1 dal primo task!
ciao numero 1 dal secondo task!
ciao numero 2 dal primo task!
ciao numero 2 dal secondo task!
ciao numero 3 dal primo task!
ciao numero 3 dal secondo task!
ciao numero 4 dal primo task!
ciao numero 4 dal secondo task!
ciao numero 5 dal primo task!
ciao numero 6 dal primo task!
ciao numero 7 dal primo task!
ciao numero 8 dal primo task!
ciao numero 9 dal primo task!

Ora, vedrai lo stesso ordine ogni volta, il che è molto diverso da quello che abbiamo visto con i thread. Questo perché la funzione trpl::join è equa, il che significa che controlla ciascuna future con la stessa frequenza, alternando tra loro, e non lascia che una “corra avanti” se l’altra è pronta. Con i thread, il sistema operativo decide quale thread controllare e per quanto tempo farlo eseguire. Con async Rust, il runtime decide quale task controllare. (Nella pratica, i dettagli si complicano perché un runtime async potrebbe in realtà usare i thread del sistema operativo come parte della gestione della concorrenza, quindi garantire l’equità può essere più lavoro per un runtime, ma è comunque possibile!) I runtime non devono garantire l’equità per qualsiasi operazione data, e spesso offrono diverse API per farti scegliere se vuoi l’equità o meno.

Prova alcune di queste varianti sull’attesa dei future e vedi cosa fanno:

  • Rimuovi il blocco async da uno o entrambi i loop.
  • Aspetta ogni blocco async immediatamente dopo averlo definito.
  • Incapsula solo il primo loop in un blocco async e aspetta il future risultante dopo il corpo del secondo loop.

Per una sfida extra, cerca di capire quale sarà l’output in ciascun caso prima di eseguire il codice!

Conteggiare su Due Task Usando il Passaggio di Messaggi

Condividere dati tra future sarà familiare: useremo di nuovo il passaggio di messaggi, ma questa volta con le versioni async dei type e delle funzioni. Prenderemo una strada leggermente diversa rispetto a quella che abbiamo preso in “Usare il Passaggio di Messaggi per Trasferire Dati tra Thread per illustrare alcune delle differenze chiave tra concorrenza basata su thread e concorrenza basata su future. Nel Listato 17-9, inizieremo con un singolo blocco async, non creando un task separato come avevamo creato un thread separato.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("ciao");
        tx.send(val).unwrap();

        let ricevuto = rx.recv().await.unwrap();
        println!("ricevuto '{ricevuto}'");
    });
}
Listato 17-9: Creare un canale async e assegnare le due metà a tx e rx

Qui, usiamo trpl::channel, una versione async dell’API del canale multi-produttore, singolo-consumatore che abbiamo usato con i thread nel Capitolo 16. La versione async dell’API è solo un po’ diversa dalla versione basata su thread: usa un ricevitore rx mutabile piuttosto che immutabile, e il suo metodo recv produce una future che dobbiamo aspettare piuttosto che produrre il valore direttamente. Ora possiamo inviare messaggi dal mittente al ricevitore. Nota che non dobbiamo avviare un thread separato o nemmeno un task; dobbiamo solo aspettare la chiamata rx.recv.

Il metodo sincrono Receiver::recv in std::mpsc::channel blocca fino a quando non riceve un messaggio. Il metodo trpl::Receiver::recv non lo fa, perché è async. Invece di bloccare, restituisce il controllo al runtime fino a quando non viene ricevuto un messaggio o la estremità di invio del canale si chiude. Al contrario, non aspettiamo la chiamata send, perché non blocca. Non ne ha bisogno, perché il canale in cui lo stiamo inviando è senza vincoli.

Nota: Poiché tutto questo codice async si esegue in un blocco async in una chiamata trpl::run, tutto al suo interno può evitare di bloccare. Tuttavia, il codice fuori da esso si bloccherà sulla funzione run che restituisce. Questo è proprio lo scopo della funzione trpl::run: ci permette di scegliere dove bloccare su un insieme di codice async, e quindi dove passare tra codice sincrono e asincrono. In molti runtime asincroni, run è effettivamente chiamato block_on proprio per questo motivo.

Nota due cose in questo esempio. Prima di tutto, il messaggio arriverà subito. Secondo, anche se usiamo una future qui, non c’è ancora concorrenza. Tutto nell’elenco accade in sequenza, proprio come farebbe se non ci fossero future coinvolte.

Affrontiamo la prima parte inviando una serie di messaggi e “dormendo” tra di loro, come mostrato nel Listato 17-10.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let valori = vec![
            String::from("ciao"),
            String::from("dalla"),
            String::from("future"),
            String::from("!!!"),
        ];

        for valore in valori {
            tx.send(valore).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(val) = rx.recv().await {
            println!("ricevuto '{val}'");
        }
    });
}
Listato 17-10: Inviare e ricevere più messaggi sul canale async e dormire con un await tra ogni messaggio

Oltre ad inviare i messaggi, dobbiamo riceverli. In questo caso, poiché sappiamo quanti messaggi stanno arrivando, potremmo farlo manualmente chiamando rx.recv().await quattro volte. Nel mondo reale, tuttavia, di solito stiamo aspettando un numero sconosciuto di messaggi, quindi dobbiamo continuare ad aspettare fino a quando non determiniamo che non ci sono più messaggi.

Nel Listato 16-10, abbiamo usato un ciclo for per elaborare tutti gli elementi ricevuti da un canale sincrono. Rust non ha ancora un modo per scrivere un ciclo for su una serie asincrona di elementi, quindi dobbiamo usare un ciclo che non abbiamo visto prima: il ciclo condizionale while let. Questo è la versione ciclo della costruzione if let che abbiamo visto nella sezione “Controllare il Flusso con if let e let else. Il ciclo continuerà ad eseguirsi finché il pattern specificato continua a corrispondere al valore.

La chiamata rx.recv produce una future, che aspettiamo. Il runtime metterà in pausa la future fino a quando non sarà pronta. Una volta che arriva un messaggio, la future si risolverà in Some(messaggio) tutte le volte che arriva un messaggio. Quando il canale si chiude, indipendentemente dal fatto che siano arrivati alcuni messaggi, la future si risolverà invece in None per indicare che non ci sono più valori e quindi dobbiamo smettere di aspettare.

Il ciclo while let mette insieme tutto questo. Se il risultato della chiamata rx.recv().await è Some(messaggio), otteniamo accesso al messaggio e possiamo usarlo nel corpo del ciclo, proprio come potremmo fare con if let. Se il risultato è None, il ciclo termina. Ogni volta che il ciclo si completa, raggiunge di nuovo il punto di attesa, quindi il runtime lo mette di nuovo in pausa fino a quando non arriva un altro messaggio.

Il codice invia e riceve ora tutti i messaggi con successo. Purtroppo, ci sono ancora un paio di problemi. Innanzitutto, i messaggi non arrivano a intervalli di mezzo secondo. Arrivano tutti insieme, 2 secondi (2.000 millisecondi) dopo aver avviato il programma. In secondo luogo, questo programma non si arresta mai! Invece, aspetta per sempre nuovi messaggi. Dovrai interromperlo usando ctrl-C.

Iniziamo esaminando perché i messaggi arrivano tutti insieme dopo il ritardo cumulativo, piuttosto che arrivare con ritardi tra ciascuno. All’interno di un dato blocco async, l’ordine in cui compaiono le parole chiave await nel codice è anche l’ordine in cui vengono eseguite quando il programma si avvia.

C’è un singolo blocco async nel Listato 17-10, quindi tutto in esso si esegue linearmente. Non c’è ancora concorrenza. Tutti i tx.send accadono, intercalati con tutte le chiamate trpl::sleep e i loro punti di attesa associati. Solo allora il ciclo while let può passare in rassegna alcuni dei punti di attesa sulle chiamate recv.

Per ottenere il comportamento che vogliamo, dove il ritardo accade tra ogni messaggio, dobbiamo mettere le operazioni tx e rx nei loro blocchi async separati, come mostrato nel Listato 17-11. In questo modo il runtime può eseguire ciascuno di essi separatamente usando trpl::join, proprio come nell’esempio del conteggio. Ancora una volta, aspettiamo il risultato della chiamata a trpl::join, non le future singole. Se avessimo aspettato le future singole in sequenza, saremmo tornati a un flusso sequenziale, proprio quello che stiamo cercando di non fare.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let valori = vec![
                String::from("ciao"),
                String::from("dalla"),
                String::from("future"),
                String::from("!!!"),
            ];

            for valore in valori {
                tx.send(valore).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(val) = rx.recv().await {
                println!("ricevuto '{val}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listato 17-11: Separare send e recv nei loro blocchi async e aspettare le future per quei blocchi

Con il codice aggiornato nel Listato 17-11, i messaggi vengono stampati a intervalli di 500 millisecondi, piuttosto che tutti insieme dopo 2 secondi.

Il programma non si arresta comunque, perché il ciclo while let interagisce con trpl::join:

  • La future restituita da trpl::join si completa solo una volta che entrambe le future passate ad esso si sono completate.
  • La future tx si completa una volta che ha finito di dormire dopo aver inviato l’ultimo messaggio in vals.
  • La future rx non si completerà fino a quando il ciclo while let non termina.
  • Il ciclo while let non terminerà fino a quando l’attesa di rx.recv produce None.
  • L’attesa di rx.recv restituirà None solo una volta che l’altra estremità del canale è chiusa.
  • Il canale si chiuderà solo se chiamiamo rx.close o quando l’estremità invio, tx, viene eliminata.
  • Non chiamiamo rx.close da nessuna parte, e tx non verrà eliminato fino a quando il blocco async più esterno passato a trpl::run non termina.
  • Il blocco non può terminare perché è bloccato su trpl::join in attesa di completamento, il che ci riporta all’inizio di questo elenco.

Potremmo chiudere manualmente rx chiamando rx.close da qualche parte, ma non ha molto senso. Fermarsi dopo aver gestito un numero arbitrario di messaggi farebbe chiudere il programma, ma potremmo perdere messaggi. Abbiamo bisogno di un altro modo per assicurarci che tx venga eliminato prima della fine della funzione.

Al momento, il blocco async in cui inviamo i messaggi prende in prestito solo tx perché inviare un messaggio non richiede la ownership, ma se potessimo spostare tx in quel blocco async, verrebbe eliminato una volta che quel blocco termina. Nella sezione del Capitolo 13 “Catturare i Reference o Trasferire la Ownership, hai imparato come usare la parola chiave move con le chiusure, e, come discusso nella sezione del Capitolo 16 “Usare le Chiusure move con i Thread, spesso dobbiamo spostare i dati nelle chiusure quando lavoriamo con i thread. Le stesse dinamiche di base si applicano ai blocchi async, quindi la parola chiave move funziona con i blocchi async proprio come fa con le chiusure.

Nel Listato 17-12, cambiamo il blocco usato per inviare messaggi da async a async move. Quando eseguiamo questa versione del codice, si chiude correttamente dopo che l’ultimo messaggio è stato inviato e ricevuto.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let valori = vec![
                String::from("ciao"),
                String::from("dalla"),
                String::from("future"),
                String::from("!!!"),
            ];

            for valore in valori {
                tx.send(valore).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(val) = rx.recv().await {
                println!("ricevuto '{val}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listato 17-12: Una revisione del codice nel Listato 17-11 che si chiude correttamente al completamento

Questo canale async è anche un canale multi-produttore, quindi possiamo chiamare clone su tx se vogliamo inviare messaggi da più future, come mostrato nel Listato 17-13.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let valori = vec![
                String::from("ciao"),
                String::from("dalla"),
                String::from("future"),
                String::from("!!!"),
            ];

            for val in valori {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(valore) = rx.recv().await {
                println!("ricevuto '{valore}'");
            }
        };

        let tx_fut = async move {
            let valori = vec![
                String::from("altri"),
                String::from("messaggi"),
                String::from("per"),
                String::from("te"),
            ];

            for val in valori {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}
Listato 17-13: Usare più produttori con blocchi async

Prima di tutto, cloniamo tx, creando tx1 fuori dal primo blocco async. Spostiamo tx1 in quel blocco proprio come abbiamo fatto prima con tx. Poi, in seguito, spostiamo l’originale tx in un nuovo blocco async, dove inviamo più messaggi con un ritardo leggermente minore. Abbiamo messo questo nuovo blocco async dopo il blocco async per ricevere messaggi, ma andrebbe bene anche se messo prima. La chiave è l’ordine in cui le future vengono attese, non quello in cui vengono create.

Entrambi i blocchi async per inviare messaggi devono essere blocchi async move in modo che sia tx che tx1 vengano eliminati quando quei blocchi finiscono. Altrimenti, finiremo di nuovo nello stesso ciclo infinito da cui siamo partiti. Infine, passiamo da trpl::join a trpl::join3 per gestire la future aggiuntiva.

Ora vediamo tutti i messaggi da entrambe le future di invio, e poiché le future di invio usano ritardi leggermente diversi dopo l’invio, i messaggi vengono anche ricevuti a quegli intervalli diversi.

ricevuto 'ciao'
ricevuto 'altri'
ricevuto 'dalla'
ricevuto 'future'
ricevuto 'messaggi'
ricevuto '!!!'
ricevuto 'per'
ricevuto 'te'

Questo è un buon inizio, ma ci limita a solo una manciata di future: due con join, o tre con join3. Vediamo come potremmo lavorare con più future.