Lavorare con un Numero Qualsiasi di Future
Quando siamo passati dall’usare due future a tre nella sezione precedente,
abbiamo dovuto passare da join
a join3
. Sarebbe fastidioso dover chiamare
una funzione diversa ogni volta che cambiamo il numero di future che vogliamo
unire. Per fortuna, abbiamo una forma macro di join
a cui possiamo passare un
numero arbitrario di argomenti. Gestisce anche l’attesa delle future stessa.
Quindi, potremmo riscrivere il codice del Listato 17-13 per usare join!
invece
di join3
, come nel Listato 17-14.
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let valori = vec![ String::from("ciao"), String::from("dalla"), String::from("future"), String::from("!!!"), ]; for val in valori { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(valore) = rx.recv().await { println!("ricevuto '{valore}'"); } }; let tx_fut = async move { let valori = vec![ String::from("altri"), String::from("messaggi"), String::from("per"), String::from("te"), ]; for val in valori { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join!(tx1_fut, tx_fut, rx_fut); }); }
join!
per aspettare più futureQuesto è sicuramente un miglioramento rispetto allo scambio tra join
e join3
e join4
e così via! Tuttavia, anche questa macro funziona solo quando
conosciamo il numero di future in anticipo. Nel mondo reale di Rust, tuttavia,
mettere le future in una collezione e poi aspettare che alcune o tutte le
future si completino è il modello più comune.
Per controllare tutte le future in una qualche collezione, dovremo iterare e
unire su tutte loro. La funzione trpl::join_all
accetta qualsiasi type che
implementa il trait Iterator
, che hai imparato nel Capitolo 13 in “Il
Trait Iterator
e il Metodo next
”, quindi
sembra proprio la cosa giusta. Proviamo a mettere le nostre future in un
vettore e sostituire join!
con join_all
come mostrato nel Listato 17-15.
extern crate trpl; // necessario per test mdbook
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let valori = vec![
String::from("ciao"),
String::from("dalla"),
String::from("future"),
String::from("!!!"),
];
for val in valori {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(valore) = rx.recv().await {
println!("ricevuto '{valore}'");
}
};
let tx_fut = async move {
let valori = vec![
String::from("altri"),
String::from("messaggi"),
String::from("per"),
String::from("te"),
];
for val in valori {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let future = vec![tx1_fut, rx_fut, tx_fut];
trpl::join_all(future).await;
});
}
join_all
Purtroppo, questo codice non si compila. Invece, otteniamo questo errore:
error[E0308]: mismatched types
--> src/main.rs:45:36
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
45 | let future = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a different `async` block
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
Some errors have detailed explanations: E0308, E0425.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `async_await` (bin "async_await") due to 2 previous errors
Questo potrebbe essere sorprendente. Dopotutto, nessuno dei blocchi async
restituisce nulla, quindi ciascuno produce un Future<Output = ()>
. Ricorda che
Future
è un trait, e che il compilatore crea una enum univoca per ogni
blocco async. Non puoi mettere due struct scritte a mano diverse in un
Vec
, e la stessa regola si applica alle enum diverse generate dal
compilatore.
Per farlo funzionare, dobbiamo usare gli oggetti trait, proprio come abbiamo
fatto in “Restituire Errori dalla Funzione esegui
” nel
Capitolo 12. (Parleremo degli oggetti trait in dettaglio nel Capitolo 18.)
Usare oggetti trait ci permette di trattare ciascuna delle future anonime
prodotte da questi type come fossero il medesimo type, perché tutti
implementano il trait Future
.
Nota: In “Utilizzare un’Enum per Memorizzare Più Type” nel Capitolo 8, abbiamo discusso un altro modo per includere più type in un
Vec
: usando una enum per rappresentare ciascun type che può apparire nel vettore. Non possiamo farlo qui, però. Per prima cosa, non abbiamo modo di nominare i diversi type, perché sono anonimi. Inoltre, il motivo per cui abbiamo aggiunto un vettore ejoin_all
in primo luogo era per poter lavorare con una collezione dinamica di future dove ci importa solo che abbiano lo stesso tipo di output.
Iniziamo incapsulando ciascuna future nel vec!
in una Box::new
, come
mostrato nel Listato 17-16.
extern crate trpl; // necessario per test mdbook
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let valori = vec![
String::from("ciao"),
String::from("dalla"),
String::from("future"),
String::from("!!!"),
];
for val in valori {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(valore) = rx.recv().await {
println!("ricevuto '{valore}'");
}
};
let tx_fut = async move {
let valori = vec![
String::from("altri"),
String::from("messaggi"),
String::from("per"),
String::from("te"),
];
for val in valori {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let future =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(future).await;
});
}
Box::new
per allineare i type delle future in un Vec
Purtroppo, questo codice non si compila ancora. In realtà, otteniamo lo stesso
errore di base che abbiamo ottenuto prima sia per la seconda che la terza
chiamata a Box::new
, oltre a nuovi errori che fanno riferimento al trait
Unpin
. Torneremo sugli errori Unpin
tra un momento. Prima, correggiamo gli
errori di type sulle chiamate Box::new
annotando esplicitamente il type
della variabile future
come nel Listato 17-17.
extern crate trpl; // necessario per test mdbook
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let valori = vec![
String::from("ciao"),
String::from("dalla"),
String::from("future"),
String::from("!!!"),
];
for val in valori {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(valore) = rx.recv().await {
println!("ricevuto '{valore}'");
}
};
let tx_fut = async move {
let valori = vec![
String::from("altri"),
String::from("messaggi"),
String::from("per"),
String::from("te"),
];
for val in valori {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let future: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(future).await;
});
}
Questa dichiarazione di type è un po’ complicata, quindi descriviamola pezzo per pezzo:
- Il type più interno è la future stessa. Annotiamo esplicitamente che
l’output della future è il type unitario
()
scrivendoFuture<Output = ()>
. - Quindi annotiamo il trait con
dyn
per marcarlo come dinamico. - L’intero reference al trait è incapsulato in una
Box
. - Infine, dichiariamo esplicitamente che
future
è unVec
che contiene questi elementi.
Questo ha già fatto una grande differenza. Ora, quando eseguiamo la
compilazione, otteniamo solo gli errori che menzionano Unpin
. Anche se ce ne
sono tre, i loro contenuti sono molto simili.
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:24
|
49 | trpl::join_all(future).await;
| -------------- ^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `join_all`
--> /home/utente/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:9
|
49 | trpl::join_all(future).await;
| ^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> /home/utente/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:32
|
49 | trpl::join_all(future).await;
| ^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> /home/utente/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
Questo è un sacco da digerire, quindi facciamolo a pezzi. La prima parte del
messaggio ci dice che il primo blocco async (src/main.rs:8:23: 20:10
) non
implementa il trait Unpin
e suggerisce di usare pin!
o Box::pin
per
risolverlo. Più avanti nel capitolo, approfondiremo alcuni dettagli su Pin
e
Unpin
. Per il momento, però, possiamo semplicemente seguire il consiglio del
compilatore per sbloccarci. Nel Listato 17-18, iniziamo importando Pin
da
std::pin
. Quindi aggiorniamo l’annotazione di type per future
, con un
Pin
che incapsula ogni Box
. Infine, usiamo Box::pin
per sistemare le
stesse future.
extern crate trpl; // necessario per test mdbook use std::pin::Pin; // --taglio-- use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let valori = vec![ String::from("ciao"), String::from("dalla"), String::from("future"), String::from("!!!"), ]; for val in valori { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(valore) = rx.recv().await { println!("ricevuto '{valore}'"); } }; let tx_fut = async move { let valori = vec![ String::from("altri"), String::from("messaggi"), String::from("per"), String::from("te"), ]; for val in valori { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let future: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)]; trpl::join_all(future).await; }); }
Pin
e Box::pin
per far sì che il type Vec
superi il controlloSe compiliamo ed eseguiamo questo, otteniamo finalmente l’output che speravamo:
ricevuto 'ciao'
ricevuto 'altri'
ricevuto 'dalla'
ricevuto 'messaggi'
ricevuto 'future'
ricevuto 'per'
ricevuto '!!!'
ricevuto 'te'
Bene!
C’è ancora un po’ da fare qui. Per prima cosa, usare Pin<Box<T>>
aggiunge una
piccola quantità di overhead perché mettiamo queste future nell’heap con
Box
, e lo stiamo facendo solo per far sì che i type si allineino. In realtà,
non abbiamo bisogno dell’allocazione nell’heap: queste future sono locali a
questa particolare funzione. Come notato prima, Pin
è esso stesso un type di
incapsulamento, quindi possiamo ottenere il beneficio di avere un singolo type
nel Vec
, la ragione per cui abbiamo usato Box
, senza fare un’allocazione
nell’heap. Possiamo perciò usare Pin
direttamente con ciascuna future,
usando la macro std::pin::pin
.
Tuttavia, dobbiamo ancora essere espliciti sul type del reference fissato;
altrimenti, Rust non saprà di interpretare questi come oggetti trait dinamici,
che è ciò di cui abbiamo bisogno che siano nel Vec
. Aggiungiamo pin
alla
nostra lista di importazioni da std::pin
e quindi possiamo usare pin!
con
ciascuna future quando la definiamo per poi definire future
come un Vec
che contiene reference mutabili fissati ai type future dinamici, come nel
Listato 17-19.
extern crate trpl; // necessario per test mdbook use std::pin::{Pin, pin}; // --taglio-- use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { // --taglio-- let valori = vec![ String::from("ciao"), String::from("dalla"), String::from("future"), String::from("!!!"), ]; for val in valori { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { // --taglio-- while let Some(valore) = rx.recv().await { println!("ricevuto '{valore}'"); } }); let tx_fut = pin!(async move { // --taglio-- let valori = vec![ String::from("altri"), String::from("messaggi"), String::from("per"), String::from("te"), ]; for val in valori { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let future: Vec<Pin<&mut dyn Future<Output = ()>>> = vec![tx1_fut, rx_fut, tx_fut]; trpl::join_all(future).await; }); }
Pin
direttamente con la macro pin!
per evitare allocazioni nell’heap non necessarieSiamo arrivati fin qui ignorando il fatto che potremmo avere type Output
diversi. Ad esempio, nel Listato 17-20, la future anonima per a
implementa
Future<Output = u32>
, la future anonima per b
implementa Future<Output = &str>
, e la future anonima per c
implementa Future<Output = bool>
.
extern crate trpl; // necessario per test mdbook fn main() { trpl::run(async { let a = async { 1u32 }; let b = async { "Ciao!" }; let c = async { true }; let (risultato_a, risultato_b, risultato_c) = trpl::join!(a, b, c); println!("{risultato_a}, {risultato_b}, {risultato_c}"); }); }
Possiamo usare trpl::join!
per aspettarle, perché ci permette di passare più
type di future e produce una tupla di quei type. Non possiamo usare
trpl::join_all
, perché richiede che tutte le future passate abbiano lo
stesso type. Ricorda, quell’errore è quello che ci ha fatto iniziare questa
avventura con Pin
!
Questo è un compromesso fondamentale: possiamo gestire un numero dinamico di
future con join_all
, purché abbiano tutte lo stesso type, oppure possiamo
gestire un numero fisso di future con le funzioni join
o la macro join!
,
anche se hanno type diversi. Questo è lo stesso scenario che affronteremmo
lavorando con qualsiasi altro type in Rust. Le future non sono speciali,
anche se abbiamo una bella sintassi per lavorare con loro, e questo è un bene.
Competizione tra Future
Quando “uniamo” le future con la famiglia di funzioni e macro join
,
richiediamo che tutte finiscano prima di andare avanti. A volte, però, abbiamo
bisogno che solo alcune future di un insieme finiscano prima di proseguire,
un po’ come mettere in competizione una future contro un’altra.
Nel Listato 17-21, utilizziamo di nuovo trpl::race
per eseguire due future,
lenta
e veloce
, l’una contro l’altra.
extern crate trpl; // necessario per test mdbook use std::time::Duration; fn main() { trpl::run(async { let lenta = async { println!("'lenta' iniziato."); trpl::sleep(Duration::from_millis(100)).await; println!("'lenta' finito."); }; let veloce = async { println!("'veloce' iniziato."); trpl::sleep(Duration::from_millis(50)).await; println!("'veloce' finito."); }; trpl::race(lenta, veloce).await; }); }
race
per ottenere il risultato di quale future finisce primaOgni future stampa un messaggio quando inizia l’esecuzione, si ferma per un
certo periodo di tempo chiamando e aspettando sleep
, e poi stampa un altro
messaggio quando finisce. Poi passiamo sia lenta
che veloce
a trpl::race
e
aspettiamo che una di esse finisca. (Il risultato qui non è troppo sorprendente:
veloce
vince.) A differenza di quando abbiamo usato race
ne “Il Nostro
Primo Programma Async”, qui ignoriamo
semplicemente l’istanza Either
che restituisce, perché tutto il comportamento
interessante avviene nel corpo dei blocchi async.
Nota che se inverti l’ordine degli argomenti a race
, l’ordine dei messaggi
“iniziati” cambia, anche se la future veloce
si conclude sempre per prima.
Questo perché l’implementazione di questa particolare funzione race
non è
equa. Esegue sempre le future passate come argomenti nell’ordine in cui sono
passate. Altre implementazioni sono eque e sceglieranno casualmente quale
future eseguire per prima. Indipendentemente dal fatto che l’implementazione
di race che stiamo usando sia equa, però, una delle future eseguirà fino
al primo await
nel suo corpo prima che un’altra attività possa iniziare.
Ricorda da “Il Nostro Primo Programma Async” che ad ogni punto di attesa, Rust dà a un runtime la possibilità di mettere in pausa l’attività e passare a un’altra se la future in attesa non è pronta. Anche l’inverso è vero: Rust mette in pausa solo i blocchi async e restituisce il controllo a un runtime in un punto di attesa. Tutto ciò che si trova tra i punti di attesa è sincrono.
Questo significa che se fai un sacco di lavoro in un blocco async senza un punto di attesa, quella future bloccherà qualsiasi altra future dal fare progressi. A volte potresti sentire questo comportamento riferito come una future che affama (starving) altre future. In alcuni casi, potrebbe non essere un grosso problema. Tuttavia, se stai facendo qualche tipo di elaborazione dispendiosa o lavoro a lungo termine, o se hai una future che continuerà a fare un particolare compito indefinitamente, dovrai pensare a quando e dove restituire il controllo al runtime.
Allo stesso modo, se hai operazioni bloccanti a lungo termine, l’async può essere uno strumento utile per fornire modi affinché diverse parti del programma si relazionino tra loro.
Ma come restituiresti il controllo al runtime in quei casi?
Restituire il Controllo al Runtime
Simuliamo un’operazione a lungo termine. Il Listato 17-22 introduce una funzione
lenta
.
extern crate trpl; // necessario per test mdbook use std::{thread, time::Duration}; fn main() { trpl::run(async { // Pià tardi chiameremo `lenta` da qui }); } fn lenta(nome: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{nome}' eseguita per {ms}ms"); }
thread::sleep
per simulare operazioni lenteQuesto codice utilizza std::thread::sleep
invece di trpl::sleep
in modo che
chiamare lenta
blocchi il thread corrente per un certo numero di
millisecondi. Possiamo usare lenta
per rappresentare operazioni del mondo
reale che sono sia a lungo termine che bloccanti.
Nel Listato 17-23, utilizziamo lenta
per emulare questo tipo di lavoro legato
alla CPU in un paio di future.
extern crate trpl; // necessario per test mdbook use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' iniziata."); lenta("a", 30); lenta("a", 10); lenta("a", 20); trpl::sleep(Duration::from_millis(50)).await; println!("'a' finita."); }; let b = async { println!("'b' iniziata."); lenta("b", 75); lenta("b", 10); lenta("b", 15); lenta("b", 350); trpl::sleep(Duration::from_millis(50)).await; println!("'b' finita."); }; trpl::race(a, b).await; }); } fn lenta(nome: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{nome}' eseguita per {ms}ms"); }
lenta
per simulare operazioni di lunga durataPer cominciare, ogni future restituisce il controllo al runtime dopo aver eseguito alcune operazioni lente. Se esegui questo codice, vedrai questo output:
'a' iniziata.
'a' eseguita per 30ms
'a' eseguita per 10ms
'a' eseguita per 20ms
'b' iniziata.
'b' eseguita per 75ms
'b' eseguita per 10ms
'b' eseguita per 15ms
'b' eseguita per 350ms
'a' finita.
Come nel nostro esempio precedente, race
termina non appena a
è completata.
Non c’è “intreccio” tra le due future, però. La future a
fa tutto il suo
lavoro fino a quando la chiamata a trpl::sleep
è in attesa, poi la future
b
fa tutto il suo lavoro fino a quando la sua chiamata a trpl::sleep
è in
attesa, e infine la future a
finisce. Per consentire a entrambe le future
lente di fare progressi, abbiamo bisogno di punti di attesa in modo da poter
restituire il controllo al runtime di tanto in tanto per consentire anche
all’altra di proseguire!
Possiamo già vedere questo tipo di passaggio avvenire nel Listato 17-23: se
rimuovessimo trpl::sleep
alla fine della future a
, essa completerebbe la
propria esecuzione senza che la future b
nemmeno cominciasse. Proviamo a
utilizzare la funzione sleep
come punto di partenza per consentire alle
operazioni di alternarsi nel fare progressi, come mostrato nel Listato 17-24.
extern crate trpl; // necessario per test mdbook use std::{thread, time::Duration}; fn main() { trpl::run(async { let un_ms = Duration::from_millis(1); let a = async { println!("'a' iniziata."); lenta("a", 30); trpl::sleep(un_ms).await; lenta("a", 10); trpl::sleep(un_ms).await; lenta("a", 20); trpl::sleep(un_ms).await; println!("'a' finita."); }; let b = async { println!("'b' iniziata."); lenta("b", 75); trpl::sleep(un_ms).await; lenta("b", 10); trpl::sleep(un_ms).await; lenta("b", 15); trpl::sleep(un_ms).await; lenta("b", 350); trpl::sleep(un_ms).await; println!("'b' finita."); }; trpl::race(a, b).await; }); } fn lenta(nome: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{nome}' eseguita per {ms}ms"); }
sleep
per consentire alle operazioni di alternarsi nel fare progressiNel Listato 17-24, aggiungiamo chiamate a trpl::sleep
con punti di attesa tra
ogni chiamata a lenta
. Ora il lavoro delle due future è intervallato:
'a' iniziata.
'a' eseguita per 30ms
'b' iniziata.
'b' eseguita per 75ms
'a' eseguita per 10ms
'b' eseguita per 10ms
'a' eseguita per 20ms
'b' eseguita per 15ms
'a' finita.
La future a
continua a lavorare per un po’ prima di restituire il controllo
a b
, perché chiama lenta
prima di chiamare trpl::sleep
, ma dopo ciò le
future si alternano ogni volta che una di esse incontra un punto di attesa. In
questo caso, abbiamo fatto ciò dopo ogni chiamata a lenta
, ma potremmo
suddividere il lavoro in qualsiasi modo abbia più senso per noi.
Ma non vogliamo davvero dormire qui, però: vogliamo eseguire le nostre
operazioni il più velocemente possibile e restituire il controllo al runtime
quando possibile. Possiamo farlo direttamente, utilizzando la funzione
yield_now
. Nel Listato 17-25, sostituiamo tutte quelle chiamate a sleep
con
yield_now
.
extern crate trpl; // necessario per test mdbook use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' iniziata."); lenta("a", 30); trpl::yield_now().await; lenta("a", 10); trpl::yield_now().await; lenta("a", 20); trpl::yield_now().await; println!("'a' finita."); }; let b = async { println!("'b' iniziata."); lenta("b", 75); trpl::yield_now().await; lenta("b", 10); trpl::yield_now().await; lenta("b", 15); trpl::yield_now().await; lenta("b", 350); trpl::yield_now().await; println!("'b' finita."); }; trpl::race(a, b).await; }); } fn lenta(nome: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{nome}' eseguita per {ms}ms"); }
yield_now
per consentire alle operazioni di alternarsi nel fare progressiQuesto codice è sia più chiaro riguardo all’intento reale sia può essere
significativamente più veloce rispetto all’uso di sleep
, perché i timer come
quello usato da sleep
hanno spesso limiti su quanto possono essere granulari.
La versione di sleep
che stiamo usando, ad esempio, dormirà sempre per almeno
un millisecondo, anche se le passiamo una Duration
di un nanosecondo. Ancora
una volta, i computer moderni sono veloci: possono fare molto in un
millisecondo!
Puoi vedere questo di persona impostando un piccolo benchmark, come quello nel Listato 17-26. (Questo non è un modo particolarmente rigoroso per fare test di prestazioni, ma è sufficiente a mostrare la differenza qui.)
extern crate trpl; // necessario per test mdbook use std::time::{Duration, Instant}; fn main() { trpl::run(async { let un_ns = Duration::from_nanos(1); let inizio = Instant::now(); async { for _ in 1..1000 { trpl::sleep(un_ns).await; } } .await; let tempo = Instant::now() - inizio; println!( "versione 'sleep' finita dopo {} secondi.", tempo.as_secs_f32() ); let inizio = Instant::now(); async { for _ in 1..1000 { trpl::yield_now().await; } } .await; let tempo = Instant::now() - inizio; println!( "versione 'yield' finita dopo {} secondi.", tempo.as_secs_f32() ); }); }
sleep
e yield_now
Qui, saltiamo tutte le stampe di stato, passiamo una Duration
di un
nanosecondo a trpl::sleep
, e lasciamo che ogni future giri da sola, senza
alternarci tra le future. Poi eseguiamo per 1.000 iterazioni e vediamo quanto
tempo impiega la future che utilizza trpl::sleep
rispetto alla future che
utilizza trpl::yield_now
.
versione 'sleep' finita dopo 1.1282331 secondi.
versione 'yield' finita dopo 0.000536924 secondi.
La versione con yield_now
è di gran lunga più veloce!
Questo significa che l’async può essere utile anche per compiti legati al calcolo, a seconda di cosa sta facendo il tuo programma, perché fornisce uno strumento utile per strutturare le relazioni tra le diverse parti del programma. Questa è una forma di multitasking cooperativo, in cui ogni future ha il potere di determinare quando restituisce il controllo tramite i punti di attesa. Ogni future ha quindi anche la responsabilità di evitare di bloccarsi troppo a lungo. In alcuni sistemi operativi embedded basati su Rust, questo è l’unico tipo di multi-tasking!
Nel codice reale, di solito non lavorerai direttamente alternando chiamate di funzione con punti di attesa su ogni singola riga, ovviamente. Anche se restituire il controllo in questo modo è relativamente poco costoso, non è gratuito. In molti casi, cercare di suddividere un compito legato al calcolo potrebbe renderlo significativamente più lento, quindi a volte è meglio per le prestazioni complessive lasciare che un’operazione si blocchi brevemente. Misura sempre per vedere quali sono i veri colli di bottiglia delle prestazioni del tuo codice. Tuttavia, la dinamica sottostante è importante da tenere a mente, se stai vedendo molto lavoro avvenire in serie che ti aspettavi avvenisse in parallelo!
Costruire le Nostre Astrazioni Async
Possiamo anche comporre le future insieme per creare nuovi schemi. Ad esempio,
possiamo costruire una funzione timeout
con i blocchi async che abbiamo già.
Quando abbiamo finito, il risultato sarà un altro blocco di costruzione che
potremmo usare per creare ancora più astrazioni async.
Il Listato 17-27 mostra come ci aspettiamo che funzioni questo timeout
con una
future lenta.
extern crate trpl; // necessario per test mdbook
use std::time::Duration;
fn main() {
trpl::run(async {
let lento = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finalmente finito"
};
match timeout(lento, Duration::from_secs(2)).await {
Ok(messaggio) => println!("Completato con '{messaggio}'"),
Err(durata) => {
println!("Fallito dopo {} secondi", durata.as_secs())
}
}
});
}
timeout
per eseguire un’operazione lenta con un limite di tempoImplementiamolo! Per cominciare, pensiamo all’API per timeout
:
- Deve essere essa stessa una funzione async in modo da poterla attendere.
- Il suo primo parametro dovrebbe essere una future da eseguire. Possiamo renderla generica per consentirle di funzionare con qualsiasi future.
- Il suo secondo parametro sarà il tempo massimo da attendere. Se usiamo una
Duration
, sarà facile passarla atrpl::sleep
. - Dovrebbe restituire un
Result
. Se la future completa con successo, ilResult
saràOk
con il valore prodotto dalla future. Se il timeout scade prima, ilResult
saràErr
con la durata che il timeout ha atteso.
Il Listato 17-28 mostra questa dichiarazione.
extern crate trpl; // necessario per test mdbook
use std::time::Duration;
fn main() {
trpl::run(async {
let lento = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finalmente finito"
};
match timeout(lento, Duration::from_secs(2)).await {
Ok(messaggio) => println!("Completato con '{messaggio}'"),
Err(durata) => {
println!("Fallito dopo {} secondi", durata.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_da_testare: F,
tempo_massimo: Duration,
) -> Result<F::Output, Duration> {
// Qui è dove metteremo l'implementazione!
}
timeout
Questo soddisfa i nostri obiettivi per i type. Ora pensiamo al comportamento
di cui abbiamo bisogno: vogliamo far competere la future passata contro la
durata fornita. Possiamo usare trpl::sleep
per creare una future che duri
quanto richiesto e usare trpl::race
per eseguirla contro la future che il
chiamante passa.
Sappiamo anche che race
non è equa, processando gli argomenti nell’ordine in
cui sono passati. Pertanto, passiamo future_da_testare
a race
per prima in
modo che abbia la possibilità di completare anche se tempo_massimo
è una
durata molto breve. Se future_da_testare
finisce prima, race
restituirà
Left
con l’output da future_da_testare
. Se il timer finisce prima, race
restituirà Right
con l’output del timer di ()
.
Nel Listato 17-29, facciamo il match sul risultato dell’attesa di
trpl::race
.
extern crate trpl; // necessario per test mdbook use std::time::Duration; use trpl::Either; // --taglio-- fn main() { trpl::run(async { let lento = async { trpl::sleep(Duration::from_secs(5)).await; "Finalmente finito" }; match timeout(lento, Duration::from_secs(2)).await { Ok(messaggio) => println!("Completato con '{messaggio}'"), Err(durata) => { println!("Fallito dopo {} secondi", durata.as_secs()) } } }); } async fn timeout<F: Future>( future_da_testare: F, tempo_massimo: Duration, ) -> Result<F::Output, Duration> { match trpl::race(future_da_testare, trpl::sleep(tempo_massimo)).await { Either::Left(output) => Ok(output), Either::Right(_) => Err(tempo_massimo), } }
timeout
con race
e sleep
Se future_da_testare
ha successo e otteniamo un Left(output)
, restituiamo
Ok(output)
. Se invece il timer finisce prima e otteniamo un Right(())
,
ignoriamo il ()
con _
e restituiamo Err(tempo_massimo)
.
Con questo, abbiamo un timeout
funzionante combinando più blocchi async. Se
eseguiamo il nostro codice, stamperà la modalità di errore dopo il timeout:
Fallito dopo 2 secondi
Poiché le future si compongono con altre future, puoi costruire strumenti davvero potenti utilizzando blocchi di costruzione async più piccoli. Ad esempio, puoi utilizzare questo stesso approccio per combinare timeout con ripetizioni, e a loro volta usarli con operazioni come chiamate di rete (uno degli esempi dall’inizio del capitolo).
Nella pratica, di solito lavorerai direttamente con async
e await
, e
secondariamente con funzioni e macro come join
, join_all
, race
, e così
via. Avrai bisogno di ricorrere a pin
solo di tanto in tanto per utilizzare le
future con quelle API.
Abbiamo ora visto diversi modi per lavorare con più future contemporaneamente. Prossimamente, vedremo come possiamo lavorare con più future in una sequenza nel tempo con gli stream. Ecco un paio di altre cose che potresti voler considerare prima, però:
- Abbiamo usato un
Vec
conjoin_all
per attendere che tutte le future in un gruppo finissero. Come potresti usare unVec
per elaborare un gruppo di future in sequenza invece? Quali sono i compromessi nel farlo? - Dai un’occhiata al type
futures::stream::FuturesUnordered
dal cratefutures
. Come sarebbe diverso usarlo rispetto a unVec
? (Non preoccuparti del fatto che provenga dalla partestream
del crate; funziona benissimo con qualsiasi collezione di future.)