一个常见的说法是,线程可以做到 async/await
所能做的一切,且更简单。那么,为什么大家选择 async/await
呢?
Rust 是一种低级语言,它不会隐藏协程的复杂性。这与像 Go 这样的语言相反,在 Go 中,异步是默认发生的,程序员甚至不需要考虑它。
聪明的程序员试图避免复杂性。因此,他们看到 async/await
中的额外复杂性,并质疑为什么需要它。当考虑到存在一个合理的替代方案——操作系统线程时,这个问题尤其相关。
让我们通过 async
来进行一次思维之旅吧。
背景概览
通常,代码是线性的;一件事情在另一件事情之后运行。它看起来像这样:
fn main() {
foo();
bar();
baz();
}
很简单,对吧?然而,有时你会想同时运行很多事情。这方面的典型例子是 web 服务器。考虑以下用线性代码编写的:
fn main() -> io::Result<()> {
let socket = TcpListener::bind("0.0.0.0:80")?;
loop {
let (client, _) = socket.accept()?;
handle_client(client)?;
}
}
想象一下,如果 handle_client
需要几毫秒,并且两个客户端同时尝试连接到你的 web 服务器。你会遇到一个严重的问题!
客户端 #1 连接到 web 服务器,并被 accept()
函数接受。它开始运行 handle_client()
。
客户端 #2 连接到 web 服务器。然而,由于 accept()
当前没有运行,我们必须等待 handle_client()
完成客户端 #1 的运行。
几毫秒后,我们回到 accept()
。客户端 #2 可以连接。
现在想象一下,如果有两百万个同时客户端。在队列的末尾,你必须等待几分钟,web 服务器才能帮助你。它很快就会变得不可扩展。
显然,初期的 web 试图解决这个问题。最初的解决方案是引入线程。通过将一些寄存器的值和程序的栈保存到内存中,操作系统可以停止一个程序,用另一个程序替换它,然后再后继续运行那个程序。本质上,它允许多个例程(或“线程”,或“进程”)在同一个 CPU 上运行。
使用线程,我们可以将上述代码重写如下:
fn main() -> io::Result<()> {
let socket = TcpListener::bind("0.0.0.0:80")?;
loop {
let (client, _) = socket.accept()?;
thread::spawn(move || handle_client(client));
}
}
现在,客户端由一个与处理新连接等待不同的线程处理。太棒了!通过允许并发线程访问,这避免了问题。
客户端 #1 被服务器接受。服务器生成一个调用 handle_client
的线程。
最终,handle_client
在某处阻塞。操作系统保存处理客户端 #1 的线程,并将主线程带回来。
主线程接受客户端 #2。它生成一个单独的线程来处理客户端 #2。在只有几微秒的延迟后,客户端 #1 和客户端 #2 并行运行。
线程在考虑到生产级 web 服务器拥有几十个 CPU 核心时特别好用。不仅仅是操作系统可以给人一种所有这些线程同时运行的错觉;实际上,操作系统可以让它们真正同时运行。
最终,出于我稍后将详细说明的原因,程序员希望将这种并发性从操作系统空间带到用户空间。用户空间并发性有许多不同的模型。有事件驱动编程、actor
和协程。Rust 选择的是 async/await
。
简单来说,你将程序编译成一个状态机的集合,这些状态机可以独立于彼此运行。Rust 本身提供了一种创建状态机的机制;async
和 await
的机制。使用 smol
编写的上述程序将如下所示:
#[apply(smol_macros::main!)]
async fn main(ex: &smol::Executor) -> io::Result<()> {
let socket = TcpListener::bind("0.0.0.0:80").await?;
loop {
let (client, _) = socket.accept().await?;
ex.spawn(async move {
handle_client(client).await;
}).detach();
}
}
主函数前面有 async
关键字。这意味着它不是一个传统函数,而是一个返回状态机的函数。大致上,函数的内容对应于该状态机。
await
包括另一个状态机作为当前运行状态机的一部分。对于 accept()
,这意味着状态机将把它作为一个步骤包含在内。
最终,一个内部函数将会产生结果,或者放弃控制。例如,当 accept()
等待新连接时。在这一点上,整个状态机将把执行权交给更高级别的执行器。对我们来说,那是 smol::Executor
。
一旦执行被产生,执行器将用另一个正在并发运行的状态机替换当前状态机,该状态机是通过 spawn
函数生成的。
我们将一个异步块传递给 spawn
函数。这个块代表一个完全新的状态机,独立于由 main
函数创建的状态机。这个状态机所做的一切都是运行 handle_client
函数。
一旦 main
产生结果,就选择一个客户端来代替它运行。一旦那个客户端产生结果,循环就会重复。
你现在可以处理数百万的并发客户端。
当然,像这样的用户空间并发性引入了复杂性的提升。当你使用线程时,你不必处理执行器、任务和状态机等。
如果你是一个理智的人,你可能会问:“我们为什么需要做所有这些事情?线程工作得很好;对于 99% 的程序,我们不需要涉及任何用户空间并发性。引入新复杂性是技术债务,技术债务会花费我们的时间和金钱。”
“那么,我们为什么不使用线程呢?”
超时问题
也许 Rust 最大的优势之一是可组合性。它提供了一组可以嵌套、构建、组合和扩展的抽象。
我记得让我坚持使用 Rust 的是 Iterator trait
。它可以让我将某个东西变成 Iterator
,应用一些不同的组合器,然后将结果 Iterator
传递给任何接受 Iterator
的函数,这让我大开眼界。
它继续给我留下深刻印象。假设你想从另一个线程接收一列表整数,只取那些立即可用的整数,丢弃任何不是偶数的整数,给它们全部加一,然后将它们推到一个新列表上。
在某些其他语言中,这将是五十行代码和一个辅助函数。在 Rust 中,可以用五行完成:
let (send, recv) = mpsc::channel();
my_list.extend(
recv.try_iter()
.filter(|x| x & 1 == 0)
.map(|x| x + 1)
);
async/await
最好的事情是,它允许你将这种可组合性应用于 I/O 限制函数。假设你有一个新的客户端要求;你想在上面的函数中添加一个超时。假设我们的 handle_client
函数看起来像这样:
async fn handle_client(client: TcpStream) -> io::Result<()> {
let mut data = vec![];
client.read_to_end(&mut data).await?;
let response = do_something_with_data(data).await?;
client.write_all(&response).await?;
Ok(())
}
如果我们想添加一个三秒钟的超时,我们可以组合两个组合器来做到这一点:
race
函数同时运行两个 future
。
Timer future
等待一段时间后返回。
最终的代码看起来像这样:
async fn handle_client(client: TcpStream) -> io::Result<()> {
// 处理实际连接的 future
let driver = async move {
let mut data = vec![];
client.read_to_end(&mut data).await?;
let response = do_something_with_data(data).await?;
client.write_all(&response).await?;
Ok(())
};
// 处理等待超时的 future
let timeout = async {
Timer::after(Duration::from_secs(3)).await;
// 我们刚刚超时了!返回一个错误。
Err(io::ErrorKind::TimedOut.into())
};
// 并行运行两者
driver.race(timeout).await
}
我发现这是一个非常简单的过程。你所要做的就是将你的现有代码包装在一个异步块中,然后将其与另一个 future 竞速。
这种方法的额外好处是,它适用于任何类型的流。在这里,我们使用 TcpStream
。然而,我们可以很容易地将其替换为任何实现 impl AsyncRead + AsyncWrite
的东西。async
可以轻松地适应你需要的任何模式。
用线程实现
如果我们想在我们的线程示例中实现这一点呢?
fn handle_client(client: TcpStream) -> io::Result<()> {
let mut data = vec![];
client.read_to_end(&mut data)?;
let response = do_something_with_data(data)?;
client.write_all(&response)?;
Ok(())
}
这并不容易。通常,你不能在阻塞代码中中断 read
或 write
系统调用,除非做一些灾难性的事情,比如关闭文件描述符(在 Rust 中无法做到)。
幸运的是,TcpStream
有两个函数 set_read_timeout
和 set_write_timeout
,可以用来分别设置读写超时。然而,我们不能天真地使用它。想象一个客户端每 2.9 秒发送一个字节,只是为了重置超时。
所以我们需要在这里稍微防御性地编程。由于 Rust 组合器的强大,我们可以编写自己的类型,包装 TcpStream
来编程超时。
// `TcpStream` 的截止日期感知包装器
struct DeadlineStream {
tcp: TcpStream,
deadline: Instant,
}
impl DeadlineStream {
// 创建一个新的 `DeadlineStream`,经过一段时间后过期
fn new(tcp: TcpStream, timeout: Duration) -> Self {
Self {
tcp,
deadline: Instant::now() + timeout,
}
}
}
impl io::Read for DeadlineStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// 设置截止日期
let time_left = self.deadline.saturating_duration_since(Instant::now());
self.tcp.set_read_timeout(Some(time_left))?;
// 从流中读取
self.tcp.read(buf)
}
}
impl io::Write for DeadlineStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// 设置截止日期
let time_left = self.deadline.saturating_duration_since(Instant::now());
self.tcp.set_write_timeout(Some(time_left))?;
// 从流中读取
self.tcp.write(buf)
}
}
// 创建包装器
let client = DeadlineStream::new(client, Duration::from_secs(3));
let mut data = vec![];
client.read_to_end(&mut data)?;
let response = do_something_with_data(data)?;
client.write_all(&response)?;
Ok(())
一方面,可以认为这是优雅的。我们使用 Rust 的能力用一个相对简单的组合器解决了问题。我相信它会运行得很好。
另一方面,这绝对是 hacky。
我们锁定了自己使用 TcpStream
。Rust 中没有特质来抽象使用 set_read_timeout
和 set_write_timeout
类型。所以如果要使用任何类型的写入器,需要额外的工作。
这涉及到设置超时的额外系统调用。
我认为这种类型对于 web 服务器要求的实际逻辑来说,使用起来要笨重得多。
异步成功案例
这就是为什么 HTTP 生态系统采用 async/await
作为其主要运行机制的原因,即使是客户端也是如此。你可以取任何进行 HTTP 调用的函数,并使其适应你想要的任何用例。
tower
可能是我能想到的这种现象最好的例子,这也是让我意识到 async/await
可以有多强大的东西。如果你将你的服务实现为一个异步函数,你会得到超时、速率限制、负载均衡、对冲和背压处理。所有这些都是无负担实现的。
不管你使用的是什么运行时,或者你的服务实际上在做什么。你可以将它扔给 tower
,使其更加健壮。
macroquad
是一个小型 Rust 游戏引擎,旨在使游戏开发尽可能简单。它的主函数使用 async/await
来运行其引擎。这是因为 async/await
确实是在 Rust 中表达需要停下来等待其他事情的线性函数的最佳方式。
在实践中,这可能非常强大。想象一下,同时轮询你的游戏服务器和你的 GUI 框架的网络连接,在同一线程上。可能性是无限的。
提升异步的形象
我认为问题不在于有人认为线程比异步更好。我认为问题是异步的好处没有被广泛传播。这导致一些人对异步的好处有误解。
如果这是一个教育问题,我认为值得看一下教育材料。这是 Rust Async Book 在比较 async/await
和操作系统线程时所说的:
操作系统线程不需要对编程模型做任何改变,这使得并发表达非常容易。然而,线程间的同步可能会很困难,性能开销也很大。线程池可以缓解这些成本,但不足以支持大规模的 I/O 密集型工作负载。
—— Rust Async Book
我认为这是整个异步社区的一个一贯问题。当有人问“为什么我们想用这个而不是操作系统线程”时,人们倾向于挥挥手说“异步开销更小。除此之外,其他都一样。”
这就是 web 服务器作者转向 async/await
的原因。这就是他们如何解决 C10k
问题的。但这不会是其他人转向 async/await 的原因。
c10k 问题:https://en.wikipedia.org/wiki/C10k_problem
性能提升是不稳定的,可能会在错误的情况下消失。有很多情况下,线程工作流程可以比等效的异步工作流程更快(主要是在 CPU 密集型任务的情况下)。可能以前我们过分强调了异步 Rust 的短暂性能优势,但低估了它的语义优势。
在最坏的情况下,这会导致人们对 async/await
置之不理,认为它是“你为小众用例而求助的奇怪事物”。它应该被视为一个强大的编程模型,让你能够简洁地表达在同步 Rust 中无法表达的模式,而不需要数十个线程和通道。
有一种趋势是试图使异步 Rust “就像同步 Rust 一样”,这种方式鼓励了负面比较。当我说到“趋势”时,我的意思是这是 Rust 项目的明确路线图,即“编写异步 Rust 代码应该像编写同步代码一样容易,除了偶尔的 async 和 await 关键字。”
我拒绝这种框架,因为它根本不可能。这就像试图在一个滑雪坡上举办披萨派对。我们不应该试图将我们的模型强行塞入不友好的惯用法,以迎合拒绝采用另一种模式的程序员。我们应该努力突出 Rust 的 async/await
生态系统的优势;它的可组合性和它的能力。我们应该努力使 async/await
成为程序员达到并发性时的默认选择。我们不应该试图使同步 Rust 和异步 Rust 相同,我们应该接受差异。
该文章在 2024/4/28 21:30:25 编辑过