Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Stream: Future in Sequenza

Ricorda come abbiamo utilizzato il ricevitore per il nostro canale async in precedenza nella sezione “Inviare Dati Tra Due Task Usando il Passaggio di Messaggi” di questo capitolo. Il metodo async recv produce una sequenza di elementi nel tempo. Questo è un esempio di un modello molto più generale noto come stream. Molti concetti sono naturalmente rappresentati come stream: elementi che diventano disponibili in una coda, blocchi di dati che vengono estratti in modo incrementale dal filesystem quando l’intero set di dati è troppo grande per la memoria del computer, o dati che arrivano attraverso la rete nel tempo. Poiché gli stream sono future, possiamo usarli con qualsiasi altro tipo di future e combinarli in modi interessanti. Ad esempio, possiamo raggruppare gli eventi per evitare di innescare troppe chiamate di rete, impostare timeout su sequenze di operazioni di lunga durata o limitare gli eventi dell’interfaccia utente per evitare di svolgere lavori inutili.

Abbiamo visto una sequenza di elementi nel Capitolo 13, quando abbiamo esaminato il trait Iterator nella sezione “Il Trait Iterator e il Metodo next, ma ci sono due differenze tra gli iteratori e il ricevitore del canale async. La prima differenza sono le tempistiche: gli iteratori sono sincroni, mentre il ricevitore del canale è asincrono. La seconda è l’API. Quando lavoriamo direttamente con Iterator, chiamiamo il suo metodo sincrono next. Con lo stream trpl::Receiver, in particolare, abbiamo invece chiamato un metodo asincrono recv. A parte questo, le API si somigliano molto, e questa somiglianza non è una coincidenza. Uno stream è come una forma asincrona di iterazione. Mentre il trpl::Receiver aspetta specificamente di ricevere messaggi, però, l’API dello stream di uso generale è molto più ampia: fornisce il prossimo elemento come fa Iterator, ma in modo asincrono.

La somiglianza tra iteratori e stream in Rust significa che possiamo effettivamente creare uno stream da qualsiasi iteratore. Come con un iteratore, possiamo lavorare con uno stream chiamando il suo metodo next e poi aspettare l’output, come nel Listato 17-21, che per ora non si compilerà.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

fn main() {
    trpl::block_on(async {
        let valori = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = valori.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(valore) = stream.next().await {
            println!("Il valore era: {valore}");
        }
    });
}
Listato 17-21: Creazione di uno stream da un iteratore e stampa dei suoi valori

Iniziamo con un array di numeri, che convertiamo in un iteratore e poi chiamiamo map su di esso per raddoppiare tutti i valori. Poi convertiamo l’iteratore in uno stream usando la funzione trpl::stream_from_iter. Successivamente, iteriamo sugli elementi nello stream man mano che arrivano con il ciclo while let.

Sfortunatamente, quando proviamo a eseguire il codice, non si compila ma invece riporta che non c’è alcun metodo next disponibile:

error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter<I>` in the current scope
  --> src/main.rs:10:41
   |
10 |         while let Some(valore) = stream.next().await {
   |                                         ^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
 1 + use crate::trpl::StreamExt;
   |
 1 + use futures_util::stream::stream::StreamExt;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(valore) = stream.try_next().await {
   |                                         ++++

Come spiega questo output, la ragione dell’errore del compilatore è che abbiamo bisogno del trait giusto in scope per poter utilizzare il metodo next. Dato il nostro discorso finora, potresti ragionevolmente aspettarti che quel trait sia Stream, ma in realtà è StreamExt. Abbreviazione di estensione, Ext è un modello comune nella comunità Rust per estendere un trait con un altro.

Il trait Stream definisce un’interfaccia a basso livello che combina efficacemente i trait Iterator e Future. StreamExt fornisce un insieme di API di livello superiore costruite sulla base di Stream, inclusi il metodo next e altri metodi utili simili a quelli forniti dal trait Iterator. Stream e StreamExt non fanno ancora parte della libreria standard di Rust, ma la maggior parte dei crate dell’ecosistema utilizza definizioni simili.

La soluzione all’errore del compilatore è aggiungere una dichiarazione use per trpl::StreamExt, come nel Listato 17-22.

File: src/main.rs
extern crate trpl; // necessario per test mdbook

use trpl::StreamExt;

fn main() {
    trpl::block_on(async {
        let valori = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        // --taglio--
        let iter = valori.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(valore) = stream.next().await {
            println!("Il valore era: {valore}");
        }
    });
}
Listato 17-22: Utilizzo con successo di un iteratore come base per uno stream

Con tutti questi pezzi messi insieme, questo codice funziona come vogliamo! Inoltre, ora che abbiamo StreamExt in scope, possiamo utilizzare tutti i suoi metodi utili, proprio come con gli iteratori.