Rust异步编程async/.await原理解析(一)

在这个教程中我们将详细分析rust异步代码async/.await的 内部运行机制。我们将使用async-std库而不是tokio,因为 这是第一个支持async/.await语法的rust库。async/.await原理 解析教程分为两部分,这是第一部分。

0、准备Rust练习环境

首先让我们先创建一个Cargo项目:

1
~$ cargo new --bin sleepus-interruptus

如果你期望和教程使用的编译器保持一致,可以添加一个内容为 1.39.0的rust-toolchain文件。

在继续下面的内容之前,先运行cargo run确保环境没有问题。

1、一个交替显示的Rust程序

我们要写一个简单的程序,它可以显示10次Sleepus消息,每次间隔 0.5秒;同时显示5次Interruptus消息,每次间隔1秒。下面是相当 简单的rust实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::thread::{sleep};
use std::time::Duration;

fn sleepus() {
for i in 1..=10 {
println!("Sleepus {}", i);
sleep(Duration::from_millis(500));
}
}

fn interruptus() {
for i in 1..=5 {
println!("Interruptus {}", i);
sleep(Duration::from_millis(1000));
}
}

fn main() {
sleepus();
interruptus();
}

不过,上面的代码会同步执行两个操作,它会先显示完所有的Sleepus消息, 然后再显示Interruptus消息。而我们期望的是这两种消息交织显示,也就是说 Interruptus消息可以打断Sleepus消息的显示。

有两个办法可以实现交织显示的目标。显而易见的一个是为每个函数创建一个 单独的线程,然后等待线程执行完毕。

1
2
3
4
5
6
7
8
9
use std::thread::{sleep, spawn};

fn main() {
let sleepus = spawn(sleepus);
let interruptus = spawn(interruptus);

sleepus.join().unwrap();
interruptus.join().unwrap();
}

需要指出的是:

  • 我们使用spawn(sleepus)而不是spawn(sleepus())来创建线程。后者将 立即执行sleepus()然后将其执行结果传给spawn,这不是我们期望的
  • 我在主函数种使用join()来等待子线程结束,并使用unwrap()来处理 可以发生的故障,因为我懒。

另一种实现方法是创建一个辅助线程,然后在主线程种调用其中一个函数:

1
2
3
4
5
6
fn main() {
let sleepus = spawn(sleepus);
interruptus();

sleepus.join().unwrap();
}

这种方法效率更高,因为只需要额外创建一个线程,并且也没有什么副作用, 因此我推荐使用这个方法。

不过这两种方法都不是异步解决方案!我们使用两个由操作系统管理的线程 来并发执行两个同步任务!接下来让我们尝试如何在单一线程内让两个任务 协作执行!

2、用Rust异步async/.await实现交替显示程序

我们将从较高层次的抽象开始,然后逐步深入rust异步编程的细节。 现在让我们以async风格重写前面的应用。

首先在Cargo.toml中添加以下依赖:

1
async-std = { version = "1.2.0", features = ["attributes"] }

现在我们可以将应用重写为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use async_std::task::{sleep, spawn};
use std::time::Duration;

async fn sleepus() {
for i in 1..=10 {
println!("Sleepus {}", i);
sleep(Duration::from_millis(500)).await;
}
}

async fn interruptus() {
for i in 1..=5 {
println!("Interruptus {}", i);
sleep(Duration::from_millis(1000)).await;
}
}

#[async_std::main]
async fn main() {
let sleepus = spawn(sleepus());
interruptus().await;

sleepus.await;
}

主要的修改说明如下:

  • 我们不再使用std::thread中的sleep和spawn函数,而是采用async_std::task。
  • 在sleepus和interruptus函数前都加async
  • 在调用sleep之后,我们补充了.await。注意不是.await()调用,而是一个新语法
  • 在主函数上使用#[async_std::main]属性
  • 主函数前也有async关键字
  • 我们现在使用spawn(sleepus())而不是spawn(sleepus),这表示直接调用sleepus 并将结果传给spawn
  • 对interruptus()的调用增加.await
  • 对sleepus不再使用join(),而是改用.await语法

看起来有很多修改,不过实际上,我们的代码结构和之前的版本基本是一致的。 现在程序运行和我们的期望一致:采用单一线程进行无阻塞调用。

接下来让我们分析上述修改到底意味着什么。

3、async关键字的作用

在函数定义前添加async主要做了以下3个事:

  • 这将允许你在函数体内使用.await语法。我们接下来会深入探讨这一点
  • 它修改了函数的返回类型。async fn foo() -> Bar 实际上返回的是 impl std::future::Future<Output=Bar>
  • 它自动将结果值封装进一个新的Future对象。我们下面会详细展示这一点

现在让我们展开说明第2点。在Rust的标准库中有一个名为Future的trait, Future有一个关联类型Output。这个trait的意思是:我承诺当我完成任务时, 会给你一个类型为Output的值。例如你可以想象一个异步HTTP客户端可能会 这样实现:

1
2
3
impl HttpRequest {
fn perform(self) -> impl Future<Output=HttpResponse> { ... }
}

在发送HTTP请求时需要一些无阻塞的I/O,我们并不希望阻塞调用线程,但是 需要最终得到响应结果。

async fn sleepus()的结果类型隐含为()。因此我们的Future的Output 也应该为()。这意味着我们需要修改函数为:

1
fn sleepus() -> impl std::future::Future<Output=()>

不过如果只修改这里,编译就会出现如下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/main.rs:7:9
|
4 | fn sleepus() -> impl std::future::Future<Output=()> {
| ------- this is not `async`
...
7 | sleep(Duration::from_millis(500)).await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks

error[E0277]: the trait bound `(): std::future::Future` is not satisfied
--> src/main.rs:4:17
|
4 | fn sleepus() -> impl std::future::Future<Output=()> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::future::Future` is not implemented for `()`
|
= note: the return type of a function must have a statically known size

第一个错误信息很直接:你只能在async函数或代码块中使用.await语法。 我们还没有接触到异步代码块,不过看起来就是这样:

1
2
3
async {
// async noises intensify
}

第二个错误消息就是第一个的结果:async关键字要求函数返回类型是impl Future。 如果没有这个关键字,我们的loop结果类型是(),这显然不满足要求。

将整个函数体用一个异步代码块包裹起来就解决问题了:

1
2
3
4
5
6
7
8
fn sleepus() -> impl std::future::Future<Output=()> {
async {
for i in 1..=10 {
println!("Sleepus {}", i);
sleep(Duration::from_millis(500)).await;
}
}
}

4、.await语法的作用

可能我们并不需要所有这些async/.await。如果我们移除sleepus的.await 会怎么样?令人吃惊的是,居然编译通过了,虽然给出了一个警告:

1
2
3
4
5
6
7
8
warning: unused implementer of `std::future::Future` that must be used
--> src/main.rs:8:13
|
8 | sleep(Duration::from_millis(500));
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(unused_must_use)]` on by default
= note: futures do nothing unless you `.await` or poll them

我们在生成一个Future值但没有使用它。如果查看程序的输出,你可以 理解编译器的警告是什么意思了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Interruptus 1
Sleepus 1
Sleepus 2
Sleepus 3
Sleepus 4
Sleepus 5
Sleepus 6
Sleepus 7
Sleepus 8
Sleepus 9
Sleepus 10
Interruptus 2
Interruptus 3
Interruptus 4
Interruptus 5

我们所有的Sleepus消息输出都没有延迟。问题在于对sleep的调用 实际上没有让当前线程休息,它只是生成一个实现了Future的值, 然后当承诺最终实现时,我们知道的确发生了延迟。但是由于我们简单 地忽略了Future,因此实际上没有利用延迟。

为了理解.await语法到底做了什么,我们接下来直接使用Future值来实现 我们的函数。首先从不用async块开始。

5、不使用async关键字的Rust异步代码

如果我们丢掉async代码块,看起来就是这样:

1
2
3
4
5
6
fn sleepus() -> impl std::future::Future<Output=()> {
for i in 1..=10 {
println!("Sleepus {}", i);
sleep(Duration::from_millis(500));
}
}

这样编译会出现以下错误:

1
2
3
4
5
6
error[E0277]: the trait bound `(): std::future::Future` is not satisfied
--> src/main.rs:4:17
|
4 | fn sleepus() -> impl std::future::Future<Output=()> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::future::Future` is not implemented for `()`
|

上面错误是由于for循环的结果类型为(),它没有实现Future这个trait。 修复这个问题的一种办法是在for循环后面加一句话使其返回Future的实现类型。 我们已经知道可以用这个:sleep:

1
2
3
4
5
6
7
fn sleepus() -> impl std::future::Future<Output=()> {
for i in 1..=10 {
println!("Sleepus {}", i);
sleep(Duration::from_millis(500));
}
sleep(Duration::from_millis(0))
}

现在我们依然会看到在for循环内存在未使用的Future值的警告信息, 不过返回值那个错误已经解决掉了。这个sleep调用实际上什么 也没做,我们可以将其替换为一个真正的占位Future:

1
2
3
4
5
6
7
fn sleepus() -> impl std::future::Future<Output=()> {
for i in 1..=10 {
println!("Sleepus {}", i);
sleep(Duration::from_millis(500));
}
async_std::future::ready(())
}

6、实现自己的Future

为了打破沙锅问到底,让我们再深入一步,不适用async_std库中的ready函数, 而是定义自己的实现Future的结构。让我们称之为DoNothing。

1
2
3
4
5
6
7
8
9
10
11
use std::future::Future;

struct DoNothing;

fn sleepus() -> impl Future<Output=()> {
for i in 1..=10 {
println!("Sleepus {}", i);
sleep(Duration::from_millis(500));
}
DoNothing
}

问题在于DoNothing还没有提供Future实现。我们接下来将进行一些 编译器驱动的开发,让rustc告诉我们如何修复这个程序。第一个错误信息是:

1
the trait bound `DoNothing: std::future::Future` is not satisfied

因此让我们补上这个trait的实现:

1
2
impl Future for DoNothing {
}

继续报错:

1
2
3
4
5
6
7
8
error[E0046]: not all trait items implemented, missing: `Output`, `poll`
--> src/main.rs:7:1
|
7 | impl Future for DoNothing {
| ^^^^^^^^^^^^^^^^^^^^^^^^^ missing `Output`, `poll` in implementation
|
= note: `Output` from trait: `type Output;`
= note: `poll` from trait: `fn(std::pin::Pin<&mut Self>, &mut std::task::Context<'_>) -> std::task::Poll<<Self as std::future::Future>::Output>`

我们还不是真正了解Pin<&mut Self>或者Context,不过我们知道Output。因为我们 之前返回(),现在让我们照做。

1
2
3
4
5
6
7
8
9
10
use std::pin::Pin;
use std::task::{Context, Poll};

impl Future for DoNothing {
type Output = ();

fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
unimplemented!()
}
}

喔!编译通过了!当然在运行时它会失败,因为我们的unimplemented!()调用:

1
thread 'async-std/executor' panicked at 'not yet implemented', src/main.rs:13:9

现在让我们尝试实现poll。我们需要返回一个值其类型为Poll<Self::Output>或者 Poll<()>。 让我们看一下Poll的定义:

1
2
3
4
pub enum Poll<T> {
Ready(T),
Pending,
}

利用一些基本的推理,我们可以理解Ready表示“我们的Future已经完成,这是输出”, 而Pending表示“还没完事儿”。假设我们的DoNothing希望立即返回()类型的输出, 可以这样:

1
2
3
fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output> {
Poll::Ready(())
}

恭喜!你刚刚实现了自己的第一个Future结构!

7、async与函数返回值

还记得我们说过async对函数做的第三件事吗:自动将结果值封装为一个新的Future。 我们接下来展示这一点。

首先简化sleepus的定义:

1
2
3
fn sleepus() -> impl Future<Output=()> {
DoNothing
}

编译和运行正常。现在切换回async风格:

1
2
3
async fn sleepus() {
DoNothing
}

这时候会报错:

1
2
3
4
5
6
7
8
error[E0271]: type mismatch resolving `<impl std::future::Future as std::future::Future>::Output == ()`
--> src/main.rs:17:20
|
17 | async fn sleepus() {
| ^ expected struct `DoNothing`, found ()
|
= note: expected type `DoNothing`
found type `()`

可以看到,当你有了一个async函数或代码块,结果会自动封装到一个Future实现对象里。 因此我们需要返回一个impl Future<Output=DoNothing>。现在我们的类型需要是Output=()

处理很简单,只需要在DoNothing后面简单添加.await:

1
2
3
async fn sleepus() {
DoNothing.await
}

这让我们对.await的作用增加了一点直觉:它从DoNothing中提取Output值。不过, 我们依然并不真正了解它是如何实现的。现在让我们实现一个更复杂的Future来 继续探索。


原文链接:Down and dirty with Future - Rust Crash Course lesson 8

汇智网翻译整理,转载请标明出处