Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Stream: Future in Sequenza

Fino ad ora in questo capitolo, ci siamo principalmente concentrati su future singole. L’unica grande eccezione è stata il canale async che abbiamo usato. Ricorda come abbiamo utilizzato il ricevitore per il nostro canale async in precedenza in questo capitolo nella sezione “Conteggiare su Due Task Usando il Passaggio di Messaggi”. Il metodo async recv produce una sequenza di elementi nel tempo. Questo è un esempio di un modello molto più generale noto come stream.

Abbiamo visto una sequenza di elementi nel Capitolo 13, quando abbiamo esaminato il trait Iterator nella sezione “Il Trait Iterator e il Metodo next, ma ci sono due differenze tra gli iteratori e il ricevitore del canale async. La prima differenza sono le tempistiche: gli iteratori sono sincroni, mentre il ricevitore del canale è asincrono. La seconda è l’API. Quando lavoriamo direttamente con Iterator, chiamiamo il suo metodo sincrono next. Con lo stream trpl::Receiver, in particolare, abbiamo invece chiamato un metodo asincrono recv. A parte questo, le API si somigliano molto, e questa somiglianza non è una coincidenza. Uno stream è come una forma asincrona di iterazione. Mentre il trpl::Receiver aspetta specificamente di ricevere messaggi, però, l’API dello stream di uso generale è molto più ampia: fornisce il prossimo elemento come fa Iterator, ma in modo asincrono.

La somiglianza tra iteratori e stream in Rust significa che possiamo effettivamente creare uno stream da qualsiasi iteratore. Come con un iteratore, possiamo lavorare con uno stream chiamando il suo metodo next e poi aspettare l’output, come nel Listato 17-30.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

fn main() {
    trpl::run(async {
        let valori = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = valori.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(valore) = stream.next().await {
            println!("Il valore era: {valore}");
        }
    });
}
Listato 17-30: Creare uno stream da un iteratore e stampare i suoi valori

Iniziamo con un array di numeri, che convertiamo in un iteratore e poi chiamiamo map su di esso per raddoppiare tutti i valori. Poi convertiamo l’iteratore in uno stream usando la funzione trpl::stream_from_iter. Successivamente, iteriamo sugli elementi nello stream man mano che arrivano con il ciclo while let.

Sfortunatamente, quando proviamo a eseguire il codice, non si compila, ma invece riporta che non c’è alcun metodo next disponibile:

error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter<I>` in the current scope
  --> src/main.rs:10:41
   |
10 |         while let Some(valore) = stream.next().await {
   |                                         ^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
 1 + use crate::trpl::StreamExt;
   |
 1 + use futures_util::stream::stream::StreamExt;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(valore) = stream.try_next().await {
   |                                         ++++

Come spiega questo output, la ragione dell’errore del compilatore è che abbiamo bisogno del trait giusto in scope per poter utilizzare il metodo next. Dato il nostro discorso finora, potresti ragionevolmente aspettarti che quel trait sia Stream, ma in realtà è StreamExt. Abbreviazione di estensione, Ext è un modello comune nella comunità Rust per estendere un trait con un altro.

Spiegheremo i trait Stream e StreamExt in modo un po’ più dettagliato alla fine del capitolo, ma per ora tutto ciò che devi sapere è che il trait Stream definisce un’interfaccia a basso livello che combina efficacemente i trait Iterator e Future. StreamExt fornisce un insieme di API di livello superiore costruite sulla base di Stream, inclusi il metodo next e altri metodi utili simili a quelli forniti dal trait Iterator. Stream e StreamExt non fanno ancora parte della libreria standard di Rust, ma la maggior parte dei crate dell’ecosistema utilizza la stessa definizione.

La soluzione all’errore del compilatore è aggiungere una dichiarazione use per trpl::StreamExt, come nel Listato 17-31.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let valori = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = valori.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(valore) = stream.next().await {
            println!("Il valore era: {valore}");
        }
    });
}
Listato 17-31: Utilizzare con successo un iteratore come base per uno stream

Con tutti questi pezzi messi insieme, questo codice funziona come vogliamo! Inoltre, ora che abbiamo StreamExt in scope, possiamo utilizzare tutti i suoi metodi utili, proprio come con gli iteratori. Ad esempio, nel Listato 17-32, utilizziamo il metodo filter per filtrare tutto tranne i multipli di tre e cinque.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let valori = 1..101;
        let iter = valori.map(|n| n * 2);
        let stream = trpl::stream_from_iter(iter);

        let mut filtrato =
            stream.filter(|val| val % 3 == 0 || val % 5 == 0);

        while let Some(valore) = filtrato.next().await {
            println!("Il valore era: {valore}");
        }
    });
}
Listato 17-32: Filtrare uno stream con il metodo StreamExt::filter

Certo, questo non è molto interessante, dato che potremmo fare lo stesso con normali iteratori e senza alcun async. Vediamo cosa possiamo fare che è unico per gli stream.

Combinare Stream

Molti concetti sono naturalmente rappresentati come stream: elementi che diventano disponibili in una coda, porzioni di dati che vengono estratti incrementalmente dal filesystem quando l’intero set di dati è troppo grande per la memoria del computer, o dati che arrivano attraverso la rete nel tempo. Poiché gli stream sono future, possiamo usarli con qualsiasi altro tipo di future e combinarli in modi interessanti. Ad esempio, possiamo raggruppare eventi per evitare di attivare troppe chiamate di rete, impostare timeout su sequenze di operazioni a lungo termine, o limitare gli eventi dell’interfaccia utente per evitare di fare lavoro inutile.

Iniziamo costruendo un piccolo stream di messaggi come sostituto di uno stream di dati che potremmo vedere da un WebSocket o un altro protocollo di comunicazione in tempo reale, come mostrato nel Listato 17-33.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messaggi = ricevi_messaggi();

        while let Some(messaggio) = messaggi.next().await {
            println!("{messaggio}");
        }
    });
}

fn ricevi_messaggi() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for messaggio in messaggi {
        tx.send(format!("Messaggio: '{messaggio}'")).unwrap();
    }

    ReceiverStream::new(rx)
}
Listato 17-33: Utilizza il ricevitore rx come ReceiverStream

Per prima cosa, creiamo una funzione chiamata ricevi_messaggi che restituisce impl Stream<Item = String>. Per la sua implementazione, creiamo un canale async, iteriamo sulle prime 10 lettere dell’alfabeto inglese e le inviamo attraverso il canale.

Utilizziamo anche un nuovo type: ReceiverStream, che converte il ricevitore rx da trpl::channel in uno Stream con un metodo next. Tornando a main, utilizziamo un ciclo while let per stampare tutti i messaggi dallo stream.

Quando eseguiamo questo codice, otteniamo esattamente i risultati che ci aspetteremmo:

$ cargo run
Messaggio: 'a'
Messaggio: 'b'
Messaggio: 'c'
Messaggio: 'd'
Messaggio: 'e'
Messaggio: 'f'
Messaggio: 'g'
Messaggio: 'h'
Messaggio: 'i'
Messaggio: 'j'

Ancora una volta, potremmo fare questo con l’API Receiver regolare o anche con l’API Iterator regolare, quindi aggiungiamo una funzionalità che richiede stream: aggiungere un timeout che si applica a ogni elemento nello stream e un ritardo sugli elementi che emettiamo, come mostrato nel Listato 17-34.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messaggi =
            pin!(ricevi_messaggi().timeout(Duration::from_millis(200)));

        while let Some(risultato) = messaggi.next().await {
            match risultato {
                Ok(messaggio) => println!("{messaggio}"),
                Err(ragione) => eprintln!("Problema: {ragione:?}"),
            }
        }
    })
}

fn ricevi_messaggi() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for messaggio in messaggi {
        tx.send(format!("Messaggio: '{messaggio}'")).unwrap();
    }

    ReceiverStream::new(rx)
}
Listato 17-34: Utilizzo del metodo StreamExt::timeout per impostare un limite di tempo sugli elementi in uno stream

Iniziamo aggiungendo un timeout allo stream con il metodo timeout, che proviene dal trait StreamExt. Poi aggiorniamo il corpo del ciclo while let, perché ora lo stream restituisce un Result. La variante Ok indica che un messaggio è arrivato in tempo; la variante Err indica che il timeout è scaduto prima che arrivasse un messaggio. Facciamo il match su quel risultato e stampiamo il messaggio quando lo riceviamo con successo o stampiamo una notifica riguardo al timeout. Infine, nota che fissiamo i messaggi con pin! dopo aver applicato il timeout, perché timeout produce uno stream che deve essere fissato per essere letto.

Tuttavia, poiché non ci sono ritardi tra i messaggi, questo timeout non cambia il comportamento del programma. Ora aggiungiamo un ritardo variabile ai messaggi che inviamo, come mostrato nel Listato 17-35.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messaggi =
            pin!(ricevi_messaggi().timeout(Duration::from_millis(200)));

        while let Some(risultato) = messaggi.next().await {
            match risultato {
                Ok(messaggio) => println!("{messaggio}"),
                Err(ragione) => eprintln!("Problema: {ragione:?}"),
            }
        }
    })
}

fn ricevi_messaggi() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (indice, messaggio) in messaggi.into_iter().enumerate() {
            let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(tempo_dormita)).await;

            tx.send(format!("Messaggio: '{messaggio}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}
Listato 17-35: Invio di messaggi attraverso tx con un ritardo async senza rendere ricevi_messaggi una funzione async

In ricevi_messaggi, utilizziamo il metodo enumerate dell’iteratore con l’array messaggi in modo da poter ottenere l’indice di ogni elemento che stiamo inviando insieme all’elemento stesso. Poi applichiamo un ritardo di 100 millisecondi agli elementi con indice pari e un ritardo di 300 millisecondi agli elementi con indice dispari per simulare i diversi ritardi che potremmo vedere da uno stream di messaggi nel mondo reale. Poiché il nostro timeout è di 200 millisecondi, questo dovrebbe influenzare metà dei messaggi.

Per “dormire” tra i messaggi nella funzione ricevi_messaggi senza bloccare, dobbiamo usare async. Tuttavia, non possiamo rendere ricevi_messaggi stessa una funzione async, perché altrimenti restituiremmo un Future<Output = Stream<Item = String>> invece di uno Stream<Item = String>. Il chiamante dovrebbe attendere ricevi_messaggi stessa per accedere allo stream. Ma ricorda: tutto in una data future avviene linearmente; la concorrenza avviene tra le future. Attendere ricevi_messaggi richiederebbe che inviasse tutti i messaggi, incluso il ritardo tra ogni messaggio, prima di restituire il ricevitore dello stream. Di conseguenza, il timeout sarebbe inutile. Non ci sarebbero ritardi nello stream stesso; si verificherebbero tutti ancor prima che lo stream fosse disponibile.

Invece, lasciamo ricevi_messaggi come una funzione regolare che restituisce uno stream e invece creiamo un task per gestire le chiamate async a sleep.

Nota: Chiamare spawn_task in questo modo funziona perché abbiamo già impostato il nostro runtime; se non lo avessimo fatto, causerebbe un panic. Altre implementazioni scelgono compromessi diversi: potrebbero avviare un nuovo runtime e evitare il panic, ma avrebbero un po’ di overhead extra, oppure potrebbero semplicemente non fornire un modo autonomo per avviare un task senza un riferimento a un runtime. Assicurati di sapere quale compromesso ha scelto il tuo runtime e scrivi il tuo codice di conseguenza!

Ora il nostro codice ha un risultato molto più interessante. Tra ogni coppia di messaggi, appare un errore Problema: Elapsed(()).

$cargo run
Messaggio: 'a'
Problema: Elapsed(())
Messaggio: 'b'
Messaggio: 'c'
Problema: Elapsed(())
Messaggio: 'd'
Messaggio: 'e'
Problema: Elapsed(())
Messaggio: 'f'
Messaggio: 'g'
Problema: Elapsed(())
Messaggio: 'h'
Messaggio: 'i'
Problema: Elapsed(())
Messaggio: 'j'

Il timeout non impedisce ai messaggi di arrivare alla fine. Riceviamo ancora tutti i messaggi originali, perché il nostro canale è illimitato: può contenere quanti più messaggi possiamo inserire in memoria. Se il messaggio non arriva prima del timeout, il nostro gestore di stream ne terrà conto, ma quando interroga di nuovo lo stream, il messaggio potrebbe ora essere arrivato.

Puoi ottenere un comportamento diverso se necessario utilizzando altri tipi di canali o altri tipi di stream in modo più generale. Vediamo uno di questi in pratica combinando uno stream di intervalli di tempo con questo stream di messaggi.

Unire Stream

Per prima cosa, creiamo un altro stream, che invierà un elemento ogni millisecondo se lo lasciamo girare direttamente. Per semplicità, possiamo usare la funzione sleep per inviare un messaggio con un ritardo e combinarlo con lo stesso approccio che abbiamo usato in ricevi_messaggi per creare uno stream da un canale. La differenza è che questa volta, stiamo per restituire il conteggio degli intervalli che sono trascorsi, quindi il type di ritorno sarà impl Stream<Item = u32>, e possiamo chiamare la funzione ricevi_intervalli (vedi Listato 17-36).

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messaggi =
            pin!(ricevi_messaggi().timeout(Duration::from_millis(200)));

        while let Some(risultato) = messaggi.next().await {
            match risultato {
                Ok(messaggio) => println!("{messaggio}"),
                Err(ragione) => eprintln!("Problema: {ragione:?}"),
            }
        }
    })
}

fn ricevi_messaggi() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (indice, messaggio) in messaggi.into_iter().enumerate() {
            let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(tempo_dormita)).await;

            tx.send(format!("Messaggio: '{messaggio}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn ricevi_intervalli() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut conteggio = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            conteggio += 1;
            tx.send(conteggio).unwrap();
        }
    });

    ReceiverStream::new(rx)
}
Listato 17-36: Creare uno stream con un contatore che verrà inviato una volta ogni millisecondo

Iniziamo definendo un conteggio nel task. (Potremmo definirlo anche al di fuori del task, ma il codice risulterà più chiaro se limitiamo ogni variabile allo scope che la riguarda.) Poi creiamo un ciclo infinito. Ogni iterazione del ciclo “dorme” asincronamente per un millisecondo, incrementa il conteggio e poi lo invia attraverso il canale. Poiché tutto questo è incapsulato nell’attività creata da spawn_task, tutto, incluso il ciclo infinito, verrà de-allocato insieme al runtime.

Questo tipo di ciclo infinito, che termina solo quando l’intero runtime viene eliminato, è abbastanza comune in Rust async: molti programmi devono continuare a girare indefinitamente. Con async, questo non blocca nulla, purché ci sia almeno un punto di attesa in ogni iterazione del ciclo.

Ora, tornando al blocco async della nostra funzione principale, possiamo tentare di unire gli stream messaggi e intervalli, come mostrato nel Listato 17-37.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messaggi = ricevi_messaggi().timeout(Duration::from_millis(200));
        let intervalli = ricevi_intervalli();
        let uniti = messaggi.merge(intervalli);

        while let Some(risultato) = uniti.next().await {
            match risultato {
                Ok(messaggio) => println!("{messaggio}"),
                Err(ragione) => eprintln!("Problema: {ragione:?}"),
            }
        }
    })
}

fn ricevi_messaggi() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (indice, messaggio) in messaggi.into_iter().enumerate() {
            let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(tempo_dormita)).await;

            tx.send(format!("Messaggio: '{messaggio}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn ricevi_intervalli() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut conteggio = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            conteggio += 1;
            tx.send(conteggio).unwrap();
        }
    });

    ReceiverStream::new(rx)
}
Listato 17-37: Tentativo di unire gli stream messaggi e intervalli

Iniziamo chiamando ricevi_intervalli. Poi uniamo gli stream messaggi e intervalli con il metodo merge, che combina più stream in uno stream unico che produce elementi da qualsiasi degli stream sorgente non appena gli elementi sono disponibili, senza imporre alcun ordinamento particolare. Infine, iteriamo su quello stream unico invece che su messaggi.

A questo punto, né messaggiintervalli devono essere fissati o mutabili, perché entrambi finiranno nello stream unico uniti. Tuttavia, questa chiamata a merge non si compila! (Neanche la chiamata a next nel ciclo while let, ma ci torneremo.) Questo perché i due stream hanno type diversi. Lo stream messaggi ha il type Timeout<impl Stream<Item = String>>, dove Timeout è il type che implementa Stream per una chiamata di timeout. Lo stream intervalli ha il type impl Stream<Item = u32>. Per unire questi due stream, dobbiamo trasformare uno di essi per farlo corrispondere all’altro. Rielaboreremo lo stream degli intervalli, perché messaggi è già nel formato di base che vogliamo e deve gestire gli errori di timeout (vedi Listato 17-38).

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messaggi = ricevi_messaggi().timeout(Duration::from_millis(200));
        let intervalli = ricevi_intervalli()
            .map(|conteggio| format!("Intervallo: {conteggio}"))
            .timeout(Duration::from_secs(10));
        let uniti = messaggi.merge(intervalli);
        let mut stream = pin!(uniti);

        while let Some(risultato) = stream.next().await {
            match risultato {
                Ok(messaggio) => println!("{messaggio}"),
                Err(ragione) => eprintln!("Problema: {ragione:?}"),
            }
        }
    })
}

fn ricevi_messaggi() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (indice, messaggio) in messaggi.into_iter().enumerate() {
            let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(tempo_dormita)).await;

            tx.send(format!("Messaggio: '{messaggio}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn ricevi_intervalli() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut conteggio = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            conteggio += 1;
            tx.send(conteggio).unwrap();
        }
    });

    ReceiverStream::new(rx)
}
Listato 17-38: Allineare il type dello stream intervalli con il type dello stream messaggi

Per prima cosa, possiamo usare il metodo map per trasformare gli intervalli in una stringa. In secondo luogo, dobbiamo abbinare il Timeout da messaggi. Siccome non vogliamo effettivamente un timeout per intervalli, possiamo semplicemente creare un timeout che sia più lungo delle altre durate che stiamo usando. Qui, creiamo un timeout di 10 secondi con Duration::from_secs(10). Infine, dobbiamo rendere stream mutabile, in modo che le chiamate next del ciclo while let possano iterare attraverso lo stream, e fissarlo in modo che sia sicuro farlo. Questo ci porta quasi dove dobbiamo essere. Tutto è dello stesso type. Se lo esegui in questo momento, però, ci saranno due problemi. Primo, non si fermerà mai! Dovrai fermarlo con ctrl-C. Secondo, i messaggi dall’alfabeto inglese saranno sepolti in mezzo a tutti i messaggi del contatore degli intervalli:

--taglio--
Intervallo: 43
Intervallo: 44
Intervallo: 45
Messaggio: 'a'
Intervallo: 46
Intervallo: 47
Intervallo: 48
--taglio--

Il Listato 17-39 mostra un modo per risolvere questi ultimi due problemi.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messaggi = ricevi_messaggi().timeout(Duration::from_millis(200));
        let intervalli = ricevi_intervalli()
            .map(|conteggio| format!("Intervallo: {conteggio}"))
            .throttle(Duration::from_millis(100))
            .timeout(Duration::from_secs(10));
        let uniti = messaggi.merge(intervalli).take(20);
        let mut stream = pin!(uniti);

        while let Some(risultato) = stream.next().await {
            match risultato {
                Ok(messaggio) => println!("{messaggio}"),
                Err(ragione) => eprintln!("Problema: {ragione:?}"),
            }
        }
    })
}

fn ricevi_messaggi() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (indice, messaggio) in messaggi.into_iter().enumerate() {
            let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(tempo_dormita)).await;

            tx.send(format!("Messaggio: '{messaggio}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn ricevi_intervalli() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut conteggio = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            conteggio += 1;
            tx.send(conteggio).unwrap();
        }
    });

    ReceiverStream::new(rx)
}
Listato 17-39: Utilizzo di throttle e take per gestire gli stream uniti

Per prima cosa, utilizziamo il metodo throttle sullo stream intervalli in modo che non sovraccarichi lo stream messaggi. Throttling è un modo per limitare la frequenza con cui una funzione verrà chiamata, o, in questo caso, quanto spesso lo stream verrà interrogato. Una volta ogni 100 millisecondi dovrebbe andare bene, perché è più o meno quanto spesso arrivano i nostri messaggi.

Per limitare il numero di elementi che accetteremo da uno stream, applichiamo il metodo take allo stream uniti, perché vogliamo limitare l’output finale, non solo uno stream o l’altro.

Ora, quando eseguiamo il programma, si ferma dopo aver estratto 20 elementi dallo stream, e gli intervalli non sovraccaricano i messaggi. Non otteniamo Interval: 100 o Interval: 200 e così via, ma invece otteniamo Interval: 1, Interval: 2, e così via, anche se abbiamo uno stream sorgente che può produrre un evento ogni millisecondo. Questo perché la chiamata a throttle produce un nuovo stream che incapsula lo stream originale, in modo che lo stream originale venga interrogato solo alla velocità di throttle, non alla sua “velocità nativa”. Non abbiamo un sacco di messaggi di intervallo non gestiti che scegliamo di ignorare. Invece, evitiamo di non produrre quei messaggi di intervallo! Questa è l’innata “pigrizia” dei future di Rust che entra in gioco, permettendoci di scegliere le nostre caratteristiche prestazionali.

$ cargo run
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.94s
     Running `target/debug/async_await`
Intervallo: 1
Messaggio: 'a'
Intervallo: 2
Intervallo: 3
Problema: Elapsed(())
Intervallo: 4
Messaggio: 'b'
Intervallo: 5
Messaggio: 'c'
Intervallo: 6
Intervallo: 7
Problema: Elapsed(())
Intervallo: 8
Messaggio: 'd'
Intervallo: 9
Messaggio: 'e'
Intervallo: 10
Intervallo: 11
Problema: Elapsed(())
Intervallo: 12

C’è un’ultima cosa che dobbiamo gestire: gli errori! Con entrambi questi stream basati su canali, le chiamate a send potrebbero fallire quando l’altra estremità del canale si chiude, e questo è solo una questione di come il runtime esegue le future che compongono lo stream. Fino ad ora, abbiamo ignorato questa possibilità chiamando unwrap, ma in un’applicazione ben progettata, dovremmo gestire esplicitamente l’errore, almeno terminando il ciclo in modo da non provare a inviare ulteriori messaggi. Il Listato 17-40 mostra una semplice strategia per gli errori: stampare il problema e poi uscire dai cicli con break.

extern crate trpl; // necessario per test mdbook

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messaggi = ricevi_messaggi().timeout(Duration::from_millis(200));
        let intervalli = ricevi_intervalli()
            .map(|conteggio| format!("Intervallo: {conteggio}"))
            .throttle(Duration::from_millis(500))
            .timeout(Duration::from_secs(10));
        let uniti = messaggi.merge(intervalli).take(20);
        let mut stream = pin!(uniti);

        while let Some(risultato) = stream.next().await {
            match risultato {
                Ok(elemento) => println!("{elemento}"),
                Err(ragione) => eprintln!("Problema: {ragione:?}"),
            }
        }
    });
}

fn ricevi_messaggi() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (indice, messaggio) in messaggi.into_iter().enumerate() {
            let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(tempo_dormita)).await;

            if let Err(errore_invio) = tx.send(format!("Messaggio: '{messaggio}'")) {
                eprintln!("Impossibile inviare messaggio '{messaggio}': {errore_invio}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn ricevi_intervalli() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut conteggio = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            conteggio += 1;

            if let Err(errore_invio) = tx.send(conteggio) {
                eprintln!("Impossibile inviare intervallo {conteggio}: {errore_invio}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}
Listato 17-40: Gestione degli errori e chiusura dei cicli

Come al solito, il modo corretto per gestire un errore di invio di un messaggio varierà; assicurati solo di avere una strategia per gestirlo.

Ora che abbiamo visto un sacco di async nella pratica, facciamo un passo indietro e approfondiamo alcuni dettagli su come Rust usa Future, Stream e gli altri trait chiave per far funzionare l’async.