Stream: Future in Sequenza
Fino ad ora in questo capitolo, ci siamo principalmente concentrati su future
singole. L’unica grande eccezione è stata il canale async che abbiamo usato.
Ricorda come abbiamo utilizzato il ricevitore per il nostro canale async in
precedenza in questo capitolo nella sezione “Conteggiare su Due Task Usando
il Passaggio di Messaggi”. Il metodo async
recv
produce una sequenza di elementi nel tempo. Questo è un esempio di un
modello molto più generale noto come stream.
Abbiamo visto una sequenza di elementi nel Capitolo 13, quando abbiamo esaminato
il trait Iterator
nella sezione “Il Trait Iterator
e il Metodo
next
”, ma ci sono due differenze tra gli
iteratori e il ricevitore del canale async. La prima differenza sono le
tempistiche: gli iteratori sono sincroni, mentre il ricevitore del canale è
asincrono. La seconda è l’API. Quando lavoriamo direttamente con Iterator
,
chiamiamo il suo metodo sincrono next
. Con lo stream trpl::Receiver
, in
particolare, abbiamo invece chiamato un metodo asincrono recv
. A parte questo,
le API si somigliano molto, e questa somiglianza non è una coincidenza. Uno
stream è come una forma asincrona di iterazione. Mentre il trpl::Receiver
aspetta specificamente di ricevere messaggi, però, l’API dello stream di uso
generale è molto più ampia: fornisce il prossimo elemento come fa Iterator
, ma
in modo asincrono.
La somiglianza tra iteratori e stream in Rust significa che possiamo
effettivamente creare uno stream da qualsiasi iteratore. Come con un
iteratore, possiamo lavorare con uno stream chiamando il suo metodo next
e
poi aspettare l’output, come nel Listato 17-30.
extern crate trpl; // necessario per test mdbook
fn main() {
trpl::run(async {
let valori = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = valori.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(valore) = stream.next().await {
println!("Il valore era: {valore}");
}
});
}
Iniziamo con un array di numeri, che convertiamo in un iteratore e poi chiamiamo
map
su di esso per raddoppiare tutti i valori. Poi convertiamo l’iteratore in
uno stream usando la funzione trpl::stream_from_iter
. Successivamente,
iteriamo sugli elementi nello stream man mano che arrivano con il ciclo while let
.
Sfortunatamente, quando proviamo a eseguire il codice, non si compila, ma invece
riporta che non c’è alcun metodo next
disponibile:
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter<I>` in the current scope
--> src/main.rs:10:41
|
10 | while let Some(valore) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(valore) = stream.try_next().await {
| ++++
Come spiega questo output, la ragione dell’errore del compilatore è che abbiamo
bisogno del trait giusto in scope per poter utilizzare il metodo next
.
Dato il nostro discorso finora, potresti ragionevolmente aspettarti che quel
trait sia Stream
, ma in realtà è StreamExt
. Abbreviazione di estensione,
Ext
è un modello comune nella comunità Rust per estendere un trait con un
altro.
Spiegheremo i trait Stream
e StreamExt
in modo un po’ più dettagliato alla
fine del capitolo, ma per ora tutto ciò che devi sapere è che il trait
Stream
definisce un’interfaccia a basso livello che combina efficacemente i
trait Iterator
e Future
. StreamExt
fornisce un insieme di API di livello
superiore costruite sulla base di Stream
, inclusi il metodo next
e altri
metodi utili simili a quelli forniti dal trait Iterator
. Stream
e
StreamExt
non fanno ancora parte della libreria standard di Rust, ma la
maggior parte dei crate dell’ecosistema utilizza la stessa definizione.
La soluzione all’errore del compilatore è aggiungere una dichiarazione use
per
trpl::StreamExt
, come nel Listato 17-31.
extern crate trpl; // necessario per test mdbook use trpl::StreamExt; fn main() { trpl::run(async { let valori = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let iter = valori.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(valore) = stream.next().await { println!("Il valore era: {valore}"); } }); }
Con tutti questi pezzi messi insieme, questo codice funziona come vogliamo!
Inoltre, ora che abbiamo StreamExt
in scope, possiamo utilizzare tutti i
suoi metodi utili, proprio come con gli iteratori. Ad esempio, nel Listato
17-32, utilizziamo il metodo filter
per filtrare tutto tranne i multipli di
tre e cinque.
extern crate trpl; // necessario per test mdbook use trpl::StreamExt; fn main() { trpl::run(async { let valori = 1..101; let iter = valori.map(|n| n * 2); let stream = trpl::stream_from_iter(iter); let mut filtrato = stream.filter(|val| val % 3 == 0 || val % 5 == 0); while let Some(valore) = filtrato.next().await { println!("Il valore era: {valore}"); } }); }
StreamExt::filter
Certo, questo non è molto interessante, dato che potremmo fare lo stesso con normali iteratori e senza alcun async. Vediamo cosa possiamo fare che è unico per gli stream.
Combinare Stream
Molti concetti sono naturalmente rappresentati come stream: elementi che diventano disponibili in una coda, porzioni di dati che vengono estratti incrementalmente dal filesystem quando l’intero set di dati è troppo grande per la memoria del computer, o dati che arrivano attraverso la rete nel tempo. Poiché gli stream sono future, possiamo usarli con qualsiasi altro tipo di future e combinarli in modi interessanti. Ad esempio, possiamo raggruppare eventi per evitare di attivare troppe chiamate di rete, impostare timeout su sequenze di operazioni a lungo termine, o limitare gli eventi dell’interfaccia utente per evitare di fare lavoro inutile.
Iniziamo costruendo un piccolo stream di messaggi come sostituto di uno stream di dati che potremmo vedere da un WebSocket o un altro protocollo di comunicazione in tempo reale, come mostrato nel Listato 17-33.
extern crate trpl; // necessario per test mdbook use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messaggi = ricevi_messaggi(); while let Some(messaggio) = messaggi.next().await { println!("{messaggio}"); } }); } fn ricevi_messaggi() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for messaggio in messaggi { tx.send(format!("Messaggio: '{messaggio}'")).unwrap(); } ReceiverStream::new(rx) }
rx
come ReceiverStream
Per prima cosa, creiamo una funzione chiamata ricevi_messaggi
che restituisce
impl Stream<Item = String>
. Per la sua implementazione, creiamo un canale
async, iteriamo sulle prime 10 lettere dell’alfabeto inglese e le inviamo
attraverso il canale.
Utilizziamo anche un nuovo type: ReceiverStream
, che converte il ricevitore
rx
da trpl::channel
in uno Stream
con un metodo next
. Tornando a main
,
utilizziamo un ciclo while let
per stampare tutti i messaggi dallo stream.
Quando eseguiamo questo codice, otteniamo esattamente i risultati che ci aspetteremmo:
$ cargo run
Messaggio: 'a'
Messaggio: 'b'
Messaggio: 'c'
Messaggio: 'd'
Messaggio: 'e'
Messaggio: 'f'
Messaggio: 'g'
Messaggio: 'h'
Messaggio: 'i'
Messaggio: 'j'
Ancora una volta, potremmo fare questo con l’API Receiver
regolare o anche con
l’API Iterator
regolare, quindi aggiungiamo una funzionalità che richiede
stream: aggiungere un timeout che si applica a ogni elemento nello stream
e un ritardo sugli elementi che emettiamo, come mostrato nel Listato 17-34.
extern crate trpl; // necessario per test mdbook use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messaggi = pin!(ricevi_messaggi().timeout(Duration::from_millis(200))); while let Some(risultato) = messaggi.next().await { match risultato { Ok(messaggio) => println!("{messaggio}"), Err(ragione) => eprintln!("Problema: {ragione:?}"), } } }) } fn ricevi_messaggi() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for messaggio in messaggi { tx.send(format!("Messaggio: '{messaggio}'")).unwrap(); } ReceiverStream::new(rx) }
StreamExt::timeout
per impostare un limite di tempo sugli elementi in uno streamIniziamo aggiungendo un timeout allo stream con il metodo timeout
, che
proviene dal trait StreamExt
. Poi aggiorniamo il corpo del ciclo while let
, perché ora lo stream restituisce un Result
. La variante Ok
indica
che un messaggio è arrivato in tempo; la variante Err
indica che il timeout
è scaduto prima che arrivasse un messaggio. Facciamo il match
su quel
risultato e stampiamo il messaggio quando lo riceviamo con successo o stampiamo
una notifica riguardo al timeout. Infine, nota che fissiamo i messaggi con
pin!
dopo aver applicato il timeout, perché timeout produce uno stream
che deve essere fissato per essere letto.
Tuttavia, poiché non ci sono ritardi tra i messaggi, questo timeout non cambia il comportamento del programma. Ora aggiungiamo un ritardo variabile ai messaggi che inviamo, come mostrato nel Listato 17-35.
extern crate trpl; // necessario per test mdbook use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messaggi = pin!(ricevi_messaggi().timeout(Duration::from_millis(200))); while let Some(risultato) = messaggi.next().await { match risultato { Ok(messaggio) => println!("{messaggio}"), Err(ragione) => eprintln!("Problema: {ragione:?}"), } } }) } fn ricevi_messaggi() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (indice, messaggio) in messaggi.into_iter().enumerate() { let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(tempo_dormita)).await; tx.send(format!("Messaggio: '{messaggio}'")).unwrap(); } }); ReceiverStream::new(rx) }
tx
con un ritardo async senza rendere ricevi_messaggi
una funzione asyncIn ricevi_messaggi
, utilizziamo il metodo enumerate
dell’iteratore con
l’array messaggi
in modo da poter ottenere l’indice di ogni elemento che
stiamo inviando insieme all’elemento stesso. Poi applichiamo un ritardo di 100
millisecondi agli elementi con indice pari e un ritardo di 300 millisecondi agli
elementi con indice dispari per simulare i diversi ritardi che potremmo vedere
da uno stream di messaggi nel mondo reale. Poiché il nostro timeout è di 200
millisecondi, questo dovrebbe influenzare metà dei messaggi.
Per “dormire” tra i messaggi nella funzione ricevi_messaggi
senza bloccare,
dobbiamo usare async. Tuttavia, non possiamo rendere ricevi_messaggi
stessa
una funzione async, perché altrimenti restituiremmo un Future<Output = Stream<Item = String>>
invece di uno Stream<Item = String>
. Il chiamante
dovrebbe attendere ricevi_messaggi
stessa per accedere allo stream. Ma
ricorda: tutto in una data future avviene linearmente; la concorrenza avviene
tra le future. Attendere ricevi_messaggi
richiederebbe che inviasse tutti
i messaggi, incluso il ritardo tra ogni messaggio, prima di restituire il
ricevitore dello stream. Di conseguenza, il timeout sarebbe inutile. Non ci
sarebbero ritardi nello stream stesso; si verificherebbero tutti ancor prima
che lo stream fosse disponibile.
Invece, lasciamo ricevi_messaggi
come una funzione regolare che restituisce
uno stream e invece creiamo un task per gestire le chiamate async a
sleep
.
Nota: Chiamare
spawn_task
in questo modo funziona perché abbiamo già impostato il nostro runtime; se non lo avessimo fatto, causerebbe un panic. Altre implementazioni scelgono compromessi diversi: potrebbero avviare un nuovo runtime e evitare il panic, ma avrebbero un po’ di overhead extra, oppure potrebbero semplicemente non fornire un modo autonomo per avviare un task senza un riferimento a un runtime. Assicurati di sapere quale compromesso ha scelto il tuo runtime e scrivi il tuo codice di conseguenza!
Ora il nostro codice ha un risultato molto più interessante. Tra ogni coppia di
messaggi, appare un errore Problema: Elapsed(())
.
$cargo run
Messaggio: 'a'
Problema: Elapsed(())
Messaggio: 'b'
Messaggio: 'c'
Problema: Elapsed(())
Messaggio: 'd'
Messaggio: 'e'
Problema: Elapsed(())
Messaggio: 'f'
Messaggio: 'g'
Problema: Elapsed(())
Messaggio: 'h'
Messaggio: 'i'
Problema: Elapsed(())
Messaggio: 'j'
Il timeout non impedisce ai messaggi di arrivare alla fine. Riceviamo ancora tutti i messaggi originali, perché il nostro canale è illimitato: può contenere quanti più messaggi possiamo inserire in memoria. Se il messaggio non arriva prima del timeout, il nostro gestore di stream ne terrà conto, ma quando interroga di nuovo lo stream, il messaggio potrebbe ora essere arrivato.
Puoi ottenere un comportamento diverso se necessario utilizzando altri tipi di canali o altri tipi di stream in modo più generale. Vediamo uno di questi in pratica combinando uno stream di intervalli di tempo con questo stream di messaggi.
Unire Stream
Per prima cosa, creiamo un altro stream, che invierà un elemento ogni
millisecondo se lo lasciamo girare direttamente. Per semplicità, possiamo usare
la funzione sleep
per inviare un messaggio con un ritardo e combinarlo con lo
stesso approccio che abbiamo usato in ricevi_messaggi
per creare uno stream
da un canale. La differenza è che questa volta, stiamo per restituire il
conteggio degli intervalli che sono trascorsi, quindi il type di ritorno sarà
impl Stream<Item = u32>
, e possiamo chiamare la funzione ricevi_intervalli
(vedi Listato 17-36).
extern crate trpl; // necessario per test mdbook use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messaggi = pin!(ricevi_messaggi().timeout(Duration::from_millis(200))); while let Some(risultato) = messaggi.next().await { match risultato { Ok(messaggio) => println!("{messaggio}"), Err(ragione) => eprintln!("Problema: {ragione:?}"), } } }) } fn ricevi_messaggi() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (indice, messaggio) in messaggi.into_iter().enumerate() { let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(tempo_dormita)).await; tx.send(format!("Messaggio: '{messaggio}'")).unwrap(); } }); ReceiverStream::new(rx) } fn ricevi_intervalli() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut conteggio = 0; loop { trpl::sleep(Duration::from_millis(1)).await; conteggio += 1; tx.send(conteggio).unwrap(); } }); ReceiverStream::new(rx) }
Iniziamo definendo un conteggio
nel task. (Potremmo definirlo anche al di
fuori del task, ma il codice risulterà più chiaro se limitiamo ogni variabile
allo scope che la riguarda.) Poi creiamo un ciclo infinito. Ogni iterazione
del ciclo “dorme” asincronamente per un millisecondo, incrementa il conteggio e
poi lo invia attraverso il canale. Poiché tutto questo è incapsulato
nell’attività creata da spawn_task
, tutto, incluso il ciclo infinito, verrà
de-allocato insieme al runtime.
Questo tipo di ciclo infinito, che termina solo quando l’intero runtime viene eliminato, è abbastanza comune in Rust async: molti programmi devono continuare a girare indefinitamente. Con async, questo non blocca nulla, purché ci sia almeno un punto di attesa in ogni iterazione del ciclo.
Ora, tornando al blocco async della nostra funzione principale, possiamo
tentare di unire gli stream messaggi
e intervalli
, come mostrato nel
Listato 17-37.
extern crate trpl; // necessario per test mdbook
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messaggi = ricevi_messaggi().timeout(Duration::from_millis(200));
let intervalli = ricevi_intervalli();
let uniti = messaggi.merge(intervalli);
while let Some(risultato) = uniti.next().await {
match risultato {
Ok(messaggio) => println!("{messaggio}"),
Err(ragione) => eprintln!("Problema: {ragione:?}"),
}
}
})
}
fn ricevi_messaggi() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (indice, messaggio) in messaggi.into_iter().enumerate() {
let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(tempo_dormita)).await;
tx.send(format!("Messaggio: '{messaggio}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn ricevi_intervalli() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut conteggio = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
conteggio += 1;
tx.send(conteggio).unwrap();
}
});
ReceiverStream::new(rx)
}
messaggi
e intervalli
Iniziamo chiamando ricevi_intervalli
. Poi uniamo gli stream messaggi
e
intervalli
con il metodo merge
, che combina più stream in uno stream
unico che produce elementi da qualsiasi degli stream sorgente non appena gli
elementi sono disponibili, senza imporre alcun ordinamento particolare. Infine,
iteriamo su quello stream unico invece che su messaggi
.
A questo punto, né messaggi
né intervalli
devono essere fissati o mutabili,
perché entrambi finiranno nello stream unico uniti
. Tuttavia, questa
chiamata a merge
non si compila! (Neanche la chiamata a next
nel ciclo
while let
, ma ci torneremo.) Questo perché i due stream hanno type
diversi. Lo stream messaggi
ha il type Timeout<impl Stream<Item = String>>
, dove Timeout
è il type che implementa Stream
per una chiamata
di timeout
. Lo stream intervalli
ha il type impl Stream<Item = u32>
.
Per unire questi due stream, dobbiamo trasformare uno di essi per farlo
corrispondere all’altro. Rielaboreremo lo stream degli intervalli, perché
messaggi
è già nel formato di base che vogliamo e deve gestire gli errori di
timeout (vedi Listato 17-38).
extern crate trpl; // necessario per test mdbook
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messaggi = ricevi_messaggi().timeout(Duration::from_millis(200));
let intervalli = ricevi_intervalli()
.map(|conteggio| format!("Intervallo: {conteggio}"))
.timeout(Duration::from_secs(10));
let uniti = messaggi.merge(intervalli);
let mut stream = pin!(uniti);
while let Some(risultato) = stream.next().await {
match risultato {
Ok(messaggio) => println!("{messaggio}"),
Err(ragione) => eprintln!("Problema: {ragione:?}"),
}
}
})
}
fn ricevi_messaggi() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (indice, messaggio) in messaggi.into_iter().enumerate() {
let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(tempo_dormita)).await;
tx.send(format!("Messaggio: '{messaggio}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn ricevi_intervalli() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut conteggio = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
conteggio += 1;
tx.send(conteggio).unwrap();
}
});
ReceiverStream::new(rx)
}
intervalli
con il type dello stream messaggi
Per prima cosa, possiamo usare il metodo map
per trasformare gli intervalli
in una stringa. In secondo luogo, dobbiamo abbinare il Timeout
da messaggi
.
Siccome non vogliamo effettivamente un timeout per intervalli
, possiamo
semplicemente creare un timeout che sia più lungo delle altre durate che
stiamo usando. Qui, creiamo un timeout di 10 secondi con
Duration::from_secs(10)
. Infine, dobbiamo rendere stream
mutabile, in modo
che le chiamate next
del ciclo while let
possano iterare attraverso lo
stream, e fissarlo in modo che sia sicuro farlo. Questo ci porta quasi dove
dobbiamo essere. Tutto è dello stesso type. Se lo esegui in questo momento,
però, ci saranno due problemi. Primo, non si fermerà mai! Dovrai fermarlo con
ctrl-C. Secondo, i messaggi dall’alfabeto inglese saranno
sepolti in mezzo a tutti i messaggi del contatore degli intervalli:
--taglio--
Intervallo: 43
Intervallo: 44
Intervallo: 45
Messaggio: 'a'
Intervallo: 46
Intervallo: 47
Intervallo: 48
--taglio--
Il Listato 17-39 mostra un modo per risolvere questi ultimi due problemi.
extern crate trpl; // necessario per test mdbook use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messaggi = ricevi_messaggi().timeout(Duration::from_millis(200)); let intervalli = ricevi_intervalli() .map(|conteggio| format!("Intervallo: {conteggio}")) .throttle(Duration::from_millis(100)) .timeout(Duration::from_secs(10)); let uniti = messaggi.merge(intervalli).take(20); let mut stream = pin!(uniti); while let Some(risultato) = stream.next().await { match risultato { Ok(messaggio) => println!("{messaggio}"), Err(ragione) => eprintln!("Problema: {ragione:?}"), } } }) } fn ricevi_messaggi() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (indice, messaggio) in messaggi.into_iter().enumerate() { let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(tempo_dormita)).await; tx.send(format!("Messaggio: '{messaggio}'")).unwrap(); } }); ReceiverStream::new(rx) } fn ricevi_intervalli() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut conteggio = 0; loop { trpl::sleep(Duration::from_millis(1)).await; conteggio += 1; tx.send(conteggio).unwrap(); } }); ReceiverStream::new(rx) }
throttle
e take
per gestire gli stream unitiPer prima cosa, utilizziamo il metodo throttle
sullo stream intervalli
in
modo che non sovraccarichi lo stream messaggi
. Throttling è un modo per
limitare la frequenza con cui una funzione verrà chiamata, o, in questo caso,
quanto spesso lo stream verrà interrogato. Una volta ogni 100 millisecondi
dovrebbe andare bene, perché è più o meno quanto spesso arrivano i nostri
messaggi.
Per limitare il numero di elementi che accetteremo da uno stream, applichiamo
il metodo take
allo stream uniti
, perché vogliamo limitare l’output
finale, non solo uno stream o l’altro.
Ora, quando eseguiamo il programma, si ferma dopo aver estratto 20 elementi
dallo stream, e gli intervalli non sovraccaricano i messaggi. Non otteniamo
Interval: 100
o Interval: 200
e così via, ma invece otteniamo Interval: 1
,
Interval: 2
, e così via, anche se abbiamo uno stream sorgente che può
produrre un evento ogni millisecondo. Questo perché la chiamata a throttle
produce un nuovo stream che incapsula lo stream originale, in modo che lo
stream originale venga interrogato solo alla velocità di throttle, non alla
sua “velocità nativa”. Non abbiamo un sacco di messaggi di intervallo non
gestiti che scegliamo di ignorare. Invece, evitiamo di non produrre quei
messaggi di intervallo! Questa è l’innata “pigrizia” dei future di Rust che
entra in gioco, permettendoci di scegliere le nostre caratteristiche
prestazionali.
$ cargo run
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.94s
Running `target/debug/async_await`
Intervallo: 1
Messaggio: 'a'
Intervallo: 2
Intervallo: 3
Problema: Elapsed(())
Intervallo: 4
Messaggio: 'b'
Intervallo: 5
Messaggio: 'c'
Intervallo: 6
Intervallo: 7
Problema: Elapsed(())
Intervallo: 8
Messaggio: 'd'
Intervallo: 9
Messaggio: 'e'
Intervallo: 10
Intervallo: 11
Problema: Elapsed(())
Intervallo: 12
C’è un’ultima cosa che dobbiamo gestire: gli errori! Con entrambi questi
stream basati su canali, le chiamate a send
potrebbero fallire quando
l’altra estremità del canale si chiude, e questo è solo una questione di come il
runtime esegue le future che compongono lo stream. Fino ad ora, abbiamo
ignorato questa possibilità chiamando unwrap
, ma in un’applicazione ben
progettata, dovremmo gestire esplicitamente l’errore, almeno terminando il ciclo
in modo da non provare a inviare ulteriori messaggi. Il Listato 17-40 mostra una
semplice strategia per gli errori: stampare il problema e poi uscire dai cicli
con break
.
extern crate trpl; // necessario per test mdbook use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messaggi = ricevi_messaggi().timeout(Duration::from_millis(200)); let intervalli = ricevi_intervalli() .map(|conteggio| format!("Intervallo: {conteggio}")) .throttle(Duration::from_millis(500)) .timeout(Duration::from_secs(10)); let uniti = messaggi.merge(intervalli).take(20); let mut stream = pin!(uniti); while let Some(risultato) = stream.next().await { match risultato { Ok(elemento) => println!("{elemento}"), Err(ragione) => eprintln!("Problema: {ragione:?}"), } } }); } fn ricevi_messaggi() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messaggi = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (indice, messaggio) in messaggi.into_iter().enumerate() { let tempo_dormita = if indice % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(tempo_dormita)).await; if let Err(errore_invio) = tx.send(format!("Messaggio: '{messaggio}'")) { eprintln!("Impossibile inviare messaggio '{messaggio}': {errore_invio}"); break; } } }); ReceiverStream::new(rx) } fn ricevi_intervalli() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut conteggio = 0; loop { trpl::sleep(Duration::from_millis(1)).await; conteggio += 1; if let Err(errore_invio) = tx.send(conteggio) { eprintln!("Impossibile inviare intervallo {conteggio}: {errore_invio}"); break; }; } }); ReceiverStream::new(rx) }
Come al solito, il modo corretto per gestire un errore di invio di un messaggio varierà; assicurati solo di avere una strategia per gestirlo.
Ora che abbiamo visto un sacco di async nella pratica, facciamo un passo
indietro e approfondiamo alcuni dettagli su come Rust usa Future
, Stream
e
gli altri trait chiave per far funzionare l’async.