Stream: Future in Sequenza
Ricorda come abbiamo utilizzato il ricevitore per il nostro canale async in
precedenza nella sezione “Inviare Dati Tra Due Task Usando il Passaggio di
Messaggi” di questo capitolo. Il metodo async
recv produce una sequenza di elementi nel tempo. Questo è un esempio di un
modello molto più generale noto come stream. Molti concetti sono naturalmente
rappresentati come stream: elementi che diventano disponibili in una coda,
blocchi di dati che vengono estratti in modo incrementale dal filesystem quando
l’intero set di dati è troppo grande per la memoria del computer, o dati che
arrivano attraverso la rete nel tempo. Poiché gli stream sono future,
possiamo usarli con qualsiasi altro tipo di future e combinarli in modi
interessanti. Ad esempio, possiamo raggruppare gli eventi per evitare di
innescare troppe chiamate di rete, impostare timeout su sequenze di operazioni
di lunga durata o limitare gli eventi dell’interfaccia utente per evitare di
svolgere lavori inutili.
Abbiamo visto una sequenza di elementi nel Capitolo 13, quando abbiamo esaminato
il trait Iterator nella sezione “Il Trait Iterator e il Metodo
next”, ma ci sono due differenze tra gli
iteratori e il ricevitore del canale async. La prima differenza sono le
tempistiche: gli iteratori sono sincroni, mentre il ricevitore del canale è
asincrono. La seconda è l’API. Quando lavoriamo direttamente con Iterator,
chiamiamo il suo metodo sincrono next. Con lo stream trpl::Receiver, in
particolare, abbiamo invece chiamato un metodo asincrono recv. A parte questo,
le API si somigliano molto, e questa somiglianza non è una coincidenza. Uno
stream è come una forma asincrona di iterazione. Mentre il trpl::Receiver
aspetta specificamente di ricevere messaggi, però, l’API dello stream di uso
generale è molto più ampia: fornisce il prossimo elemento come fa Iterator, ma
in modo asincrono.
La somiglianza tra iteratori e stream in Rust significa che possiamo
effettivamente creare uno stream da qualsiasi iteratore. Come con un
iteratore, possiamo lavorare con uno stream chiamando il suo metodo next e
poi aspettare l’output, come nel Listato 17-21, che per ora non si compilerà.
extern crate trpl; // necessario per test mdbook
fn main() {
trpl::block_on(async {
let valori = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = valori.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(valore) = stream.next().await {
println!("Il valore era: {valore}");
}
});
}
Iniziamo con un array di numeri, che convertiamo in un iteratore e poi chiamiamo
map su di esso per raddoppiare tutti i valori. Poi convertiamo l’iteratore in
uno stream usando la funzione trpl::stream_from_iter. Successivamente,
iteriamo sugli elementi nello stream man mano che arrivano con il ciclo while let.
Sfortunatamente, quando proviamo a eseguire il codice, non si compila ma invece
riporta che non c’è alcun metodo next disponibile:
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter<I>` in the current scope
--> src/main.rs:10:41
|
10 | while let Some(valore) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(valore) = stream.try_next().await {
| ++++
Come spiega questo output, la ragione dell’errore del compilatore è che abbiamo
bisogno del trait giusto in scope per poter utilizzare il metodo next.
Dato il nostro discorso finora, potresti ragionevolmente aspettarti che quel
trait sia Stream, ma in realtà è StreamExt. Abbreviazione di estensione,
Ext è un modello comune nella comunità Rust per estendere un trait con un
altro.
Il trait Stream definisce un’interfaccia a basso livello che combina
efficacemente i trait Iterator e Future. StreamExt fornisce un insieme
di API di livello superiore costruite sulla base di Stream, inclusi il metodo
next e altri metodi utili simili a quelli forniti dal trait Iterator.
Stream e StreamExt non fanno ancora parte della libreria standard di Rust,
ma la maggior parte dei crate dell’ecosistema utilizza definizioni simili.
La soluzione all’errore del compilatore è aggiungere una dichiarazione use per
trpl::StreamExt, come nel Listato 17-22.
extern crate trpl; // necessario per test mdbook use trpl::StreamExt; fn main() { trpl::block_on(async { let valori = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; // --taglio-- let iter = valori.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(valore) = stream.next().await { println!("Il valore era: {valore}"); } }); }
Con tutti questi pezzi messi insieme, questo codice funziona come vogliamo!
Inoltre, ora che abbiamo StreamExt in scope, possiamo utilizzare tutti i
suoi metodi utili, proprio come con gli iteratori.