I/O
1610591062
Tokio 中的 I/O
操作跟标准库 std
的保持一致,只是他是异步的。他提供了用于读取数据的 AsyncRead Trait
及用于写入操作的 AayncWrite Trait
,很多类型都实现了他们,比如 TcpStream
、File
、Stdout
等,还有很多其他的基础类型如 Vec<u8>
、 &[u8]
,他们让我们能够像使用 Writer
跟 Reader
一般使用字节数组。
本页面会使用 Tokio 来实现包含基础 I/O
的读取跟写入的一些示例。下一节我们则将会来探索更高级的示例。
AsyncRead
And AsyncWrite
这两个 Trait
基础设施为字节流提供了异步的读取跟写入能力,事实上这两个 Trait
的函数并不会被直接调用,就像你不会手动的去调用 Future
的 poll
函数,他们更多的是会被其他的使用 Trait
来调用,如 AsyncReadExt
跟 AsyncWriteExt
。
接下来让我们来稍微看其中几个函数,所有的这些函数都是 async
并且必须使用 .await
来调用。
async fn read()
AsyncReadExt::read()
提供了用来读取数据到缓存然后返回读取字节数的异步函数。
Note: 当 read()
返回 Ok(0)
时,意味着读取的数据流已经关闭,后续再进行的 read()
调用都会马上返回 Ok(0)
。对于 TcpStream
来说则意味着套接字的读取部分已经关闭。
use tokio::fs::File; use tokio::{self, AsyncReadExt}; #[tokio::main] async fn main() -> io::Result<()> { let mut f = File::open("foo.txt").await?; let mut buffer = [0; 10]; // read up to 10 bytes let n = f.read(&mut buffer[..]).await?; println!("The bytes: {:?}", &buffer[..n]); Ok(()) }
async fn read_to_end()
AsyncReadExt::read_to_end()
从数据流中读取所有的数据直到读完。
use tokio::io::{self, AsyncReadExt}; use tokio::fs::File; #[tokil::main] async fn main() -> io::Result<()> { let mut f = File::open("foo.txt").await?; let mut buffer = Vec::new(); // read to whole file f.read_to_end(&mut buffer).await?; Ok(()) }
async fn write()
AsyncWriteExt::write
将缓存中的数据写入到写入者中,然后返回写入的数据字节数。
use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; #[tokio::main] async fn main() -> io::Result<()> { let mut file = File::create("foo.txt").await?; // Writes some prefix of the byte string, but not necessarily all of it. let n = file.write(b"some bytes").await?; println!("Wrote the first {} bytes of 'some bytes'", n); Ok(()) }
async fn write_all()
AsyncWriteExt::write_all
写入缓存中所有的数据到写入者中。
use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; #[tokio::main] async fn main() -> io::Result<()> { let mut buffer = File::create("foo.txt").await?; buffer.write_all(b"some bytes").await?; Ok(()) }
每个 Trait
都包含了一些其他的函数,可以查看文档来全面的了解可用的函数。
Helper Functions
跟标准库 std
一样,tokio::io
模块也包含了许多适用于标准输入、标准输出及标准错误的实用函数,比如 tokio::io::copy
异步复制函数可用于从读取对象中读取所有数据到写入对象中。
use tokio::fs::File; use tokio::io; #[tokio::main] async fn main() -> io::Result<()> { let mut reader: &[u8] = b"hello"; let mut file = File::create("foo.txt").await?; io::copy(&mut reader, &mut file).await?; Ok(()) }
我们知道这里能够复制是因为字节数组同样实现了 AsyncRead
。
Echo Server
接下来会通过一个 Echo 服务来熟悉异步 IO。
该 Echo 服务会绑定一个 TcpListener
并用他在循环中接收进来的连接,都会该套接字中读取所有的数据,然后马上将数据写回该套接字。对于客户端来说,则是发送数据给服务端然后会接收到一模一样的回复。
我们会以两种不同的策略来实现 Echo 服务两次。
Using io::copy()
第一步我们先使用一个实用的 io::copy
函数来实现。
这是个 TCP 服务所以我们首先需要一个接收连接的循环,在每次接收到一个套接字时则会创建一个任务对其进行处理。
use tokio::io; use tokio::net::TcpListener; #[tokio::main] async fn main() -> io::Result<()> { let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap(); loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { // Copy data here }) } }
正如我们之前介绍的,这个实用函数需要 Reader
跟 Writer
两个参数,并从 Reader
中拷贝数据到 Writer
。我们现在只有一个同时实现了 AsyncRead
跟 AsyncWrite
的 TcpStream
,并且因为 io::copy
的两个参数都需要 &mut
类型,所以我们现在手上的套接字不能同时作为两个参数传递给它。
#![allow(unused)] fn main() { // This fails to cpmpile io::copy(&mut socket, &,ut socket).await }
Spliting A Reader + Writer
为了解决这个问题,我们需要将套接字拆分为 Reader
跟 Writer
,而拆分的最佳方法取决于我们所需要拆分的具体类型。
任何一个 Reader
+ Writer
都可以通过 io::split
来拆分,这个函数获取一个参数然后将其拆分为 Reader
跟 Writer
返回,返回的这两个句柄现在可以独立的使用,比如将他们用到不同的任务中。
举个例子,Echo 服务可以像这样来使用 Writer
跟 Reader
use tokio::io::{self, AsyncReaderExt, AsyncWriterExt}; use tokio::net::TcpStream; #[tokio::main] async fn main() -> io::Result<()> { let socket = TcpStream::connect("127.0.0.1:6142").await?; let (mut rf, mut wr) = io::split(socket); // Write data in the background let write_data = tokio::spawn(async move { wr.write_all(b"hello\r\n").await?; wr.write_all(b"world\r\n").await?; Ok::<_,io::Error>(()) }); let mut buf = vec![0; 128]; lop { let n = rd.read(&mut buf).await?; if n == 0 { break; } println!("GOT {:?}", &buf[..n]); } Ok(()) }
io::split
支持将任意的实现了 AsyncRead
+ AsyncWrite
的值拆分为独立的句柄,在函数的内部,他使用 Arc
跟 Mutex
来实现这个功能。为了避免这个额外的负荷,我们可以使用 TcpStream
提供的两个特殊的拆分函数。
TcpStream::split
获取一个自身的引用并返回 Reader
跟 Writer
。因为使用了自身的引用,所以两个返回的句柄需要保留在调用了 split()
进行拆分在同一个任务中,这个特殊的 split
调用是无需任何代价的,他不需要使用到任何类似 Arc
或 Mutex
之类的东西。TcpStream
同时也提供使用了 Arc
作为实现的 into_split
用来支持将拆分的 Reader
跟 Writer
应用到不同的任务中。
因为 io::copy
是在持有 TcpStream
的同一个任务中调用的,所以我们使用 TcpSteram::split
,处理请求的任务现在变为了
#![allow(unused)] fn main() { tokio::spawn(async move { let (mut rd, mut wr) = soket.split(); if io::copy(&mut rd, &mut wr).await.is_err() { eprintln!("failed to copy"); } }); }
完整的代码可以在 这里 找到。
Manual Copying
接下来我们来实现一个手动复制数据的 Echo 服务,这里面我们使用了 AsyncReadExt::read
跟 AsyncWriteExt::write_all
。
完整的代码如下:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; #[tokio::main] async fn main() -> io::Result<()> { let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap(); loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = vec![0; 1024]; loop { match socket.read(&mut buf).await { // Return value of `Ok(0)` signfies that eh remote has closed Ok(0) => return, Ok(n) => { // Copy the data back to socket if socket.write_all(&buf[..n]).await.is_err() { // Unexpected socket error. There isn't much we can // do here so just stop processing. return; } } Err(_) => { // Unexpected socket error. There isn't must we can do // here so just stop processing. return; } } } }); } }
接下来逐步拆分说明,因为使用了 AsyncRead
跟 AsyncWrite
相关的东西,所以需要在当前代码中引入下面的两个 Trait
#![allow(unused)] fn main() { use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; }
Allocating A Buffer
当前的实现方案需要将数据从套接字中读取到缓存,然后再将其从缓存中写回套接字
#![allow(unused)] fn main() { let mut buf = vec![0; 1024]; }
一个创建在栈中的缓存是需要明确避免的,正如早前提到的,所有的任务数据都需要保存到任务中,用来在 .await
调用之间使用。在我们的例子中, buf
会跨越 .await
调用使用,所有的任务数据都需要保存到单次申请中,你可以假定将使用 enum
来保存那些需要跨越 .await
调用的数据。
如果将缓存表现为栈中的数组,在每次创建任务时产生的内部数据结构看起来会是这样:
#![allow(unused)] fn main() { struct Task { // internal task fields here task: enum { AwaitingRead { socket: TcpStream, buf: [BufferType], }, AwaitingWriteAll { socket: TcpStream, buf: [BufferType], } } } }
如果使用一个栈中的数组作为上面的缓存类型,他会被内联的保存到任务的数据结构中,这会让任务的数据结构变得非常庞大。而且,缓存的大小通常跟页的大小设为相同的,这会让任务的数据大小变成一个不是很合适的尺寸: $page-size + a-few_bytes
。
编译器会将异步代码块优化的比我们看到的基础的 enum
更好,在实际中,变量并不像枚举那样在不同的状态中移动,但是任务的数据结构大小仍然会像我们看到的那么大。
因此,使用专用独立的内存分配来使用缓存会是更好的方式。
Handling EOF
当我们所读取的 TCP 流被关闭了,对其调用的 read()
会返回 Ok(0)
,这意味着我们需要中断这个读取循环了,忘记中断这个读取的循环是很多常见 Bug 的来源
#![allow(unused)] fn main() { loop { match socket.read(&mut buf).await { Ok(0) => return } } }
没有中断这个读取的循环会导致 CPU 被占满 100%,当套接字关闭后,socket.read()
会马上返回,所以这个循环会无限循环下去。
完整的代码可以在 这里 找到。