I/O

1610591062

Tokio 中的 I/O 操作跟标准库 std的保持一致,只是他是异步的。他提供了用于读取数据的 AsyncRead Trait 及用于写入操作的 AayncWrite Trait,很多类型都实现了他们,比如 TcpStreamFileStdout 等,还有很多其他的基础类型如 Vec<u8>&[u8] ,他们让我们能够像使用 WriterReader 一般使用字节数组。

本页面会使用 Tokio 来实现包含基础 I/O 的读取跟写入的一些示例。下一节我们则将会来探索更高级的示例。

AsyncRead And AsyncWrite

这两个 Trait 基础设施为字节流提供了异步的读取跟写入能力,事实上这两个 Trait 的函数并不会被直接调用,就像你不会手动的去调用 Futurepoll 函数,他们更多的是会被其他的使用 Trait 来调用,如 AsyncReadExtAsyncWriteExt

接下来让我们来稍微看其中几个函数,所有的这些函数都是 async 并且必须使用 .await 来调用。

async fn read()

AsyncReadExt::read() 提供了用来读取数据到缓存然后返回读取字节数的异步函数。

Note: 当 read() 返回 Ok(0) 时,意味着读取的数据流已经关闭,后续再进行的 read() 调用都会马上返回 Ok(0)。对于 TcpStream 来说则意味着套接字的读取部分已经关闭。

use tokio::fs::File;
use tokio::{self, AsyncReadExt};

#[tokio::main]
async fn main() -> io::Result<()> {
  let mut f = File::open("foo.txt").await?;
  let mut buffer = [0; 10];
  
  // read up to 10 bytes
  let n = f.read(&mut buffer[..]).await?;
  
  println!("The bytes: {:?}", &buffer[..n]);
  Ok(())
}

async fn read_to_end()

AsyncReadExt::read_to_end() 从数据流中读取所有的数据直到读完。

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokil::main]
async fn main() -> io::Result<()> {
  let mut f = File::open("foo.txt").await?;
  let mut buffer = Vec::new();
  
  // read to whole file
  f.read_to_end(&mut buffer).await?;
  Ok(())
}

async fn write()

AsyncWriteExt::write 将缓存中的数据写入到写入者中,然后返回写入的数据字节数。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
  let mut file = File::create("foo.txt").await?;
  
  // Writes some prefix of the byte string, but not necessarily all of it.
  let n = file.write(b"some bytes").await?;
  println!("Wrote the first {} bytes of 'some bytes'", n);
  Ok(())
}

async fn write_all()

AsyncWriteExt::write_all 写入缓存中所有的数据到写入者中。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
  let mut buffer = File::create("foo.txt").await?;
  
  buffer.write_all(b"some bytes").await?;
  Ok(())
}

每个 Trait 都包含了一些其他的函数,可以查看文档来全面的了解可用的函数。

Helper Functions

跟标准库 std 一样,tokio::io 模块也包含了许多适用于标准输入、标准输出及标准错误的实用函数,比如 tokio::io::copy 异步复制函数可用于从读取对象中读取所有数据到写入对象中。

use tokio::fs::File;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
  let mut reader: &[u8] = b"hello";
  let mut file = File::create("foo.txt").await?;
  
  io::copy(&mut reader, &mut file).await?;
  Ok(())
}

我们知道这里能够复制是因为字节数组同样实现了 AsyncRead

Echo Server

接下来会通过一个 Echo 服务来熟悉异步 IO。

该 Echo 服务会绑定一个 TcpListener 并用他在循环中接收进来的连接,都会该套接字中读取所有的数据,然后马上将数据写回该套接字。对于客户端来说,则是发送数据给服务端然后会接收到一模一样的回复。

我们会以两种不同的策略来实现 Echo 服务两次。

Using io::copy()

第一步我们先使用一个实用的 io::copy 函数来实现。

这是个 TCP 服务所以我们首先需要一个接收连接的循环,在每次接收到一个套接字时则会创建一个任务对其进行处理。

use tokio::io;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
  let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
  
  loop {
    let (mut socket, _) = listener.accept().await?;
    
    tokio::spawn(async move {
      // Copy data here
    })
  }
}

正如我们之前介绍的,这个实用函数需要 ReaderWriter 两个参数,并从 Reader 中拷贝数据到 Writer。我们现在只有一个同时实现了 AsyncReadAsyncWriteTcpStream,并且因为 io::copy 的两个参数都需要 &mut 类型,所以我们现在手上的套接字不能同时作为两个参数传递给它。

#![allow(unused)]
fn main() {
// This fails to cpmpile
io::copy(&mut socket, &,ut socket).await
}

Spliting A Reader + Writer

为了解决这个问题,我们需要将套接字拆分为 ReaderWriter,而拆分的最佳方法取决于我们所需要拆分的具体类型。

任何一个 Reader + Writer 都可以通过 io::split 来拆分,这个函数获取一个参数然后将其拆分为 ReaderWriter 返回,返回的这两个句柄现在可以独立的使用,比如将他们用到不同的任务中。

举个例子,Echo 服务可以像这样来使用 WriterReader

use tokio::io::{self, AsyncReaderExt, AsyncWriterExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
  let socket = TcpStream::connect("127.0.0.1:6142").await?;
  let (mut rf, mut wr) = io::split(socket);
  
  // Write data in the background
  let write_data = tokio::spawn(async move {
    wr.write_all(b"hello\r\n").await?;
    wr.write_all(b"world\r\n").await?;
    
    Ok::<_,io::Error>(())
  });
  
  let mut buf = vec![0; 128];
  
  lop {
    let n = rd.read(&mut buf).await?;
    
    if n == 0 {
      break;
    }
    
    println!("GOT {:?}", &buf[..n]);
  }
  
  Ok(())
}

io::split 支持将任意的实现了 AsyncRead + AsyncWrite 的值拆分为独立的句柄,在函数的内部,他使用 ArcMutex 来实现这个功能。为了避免这个额外的负荷,我们可以使用 TcpStream 提供的两个特殊的拆分函数。

TcpStream::split 获取一个自身的引用并返回 ReaderWriter。因为使用了自身的引用,所以两个返回的句柄需要保留在调用了 split() 进行拆分在同一个任务中,这个特殊的 split 调用是无需任何代价的,他不需要使用到任何类似 ArcMutex 之类的东西。TcpStream 同时也提供使用了 Arc 作为实现的 into_split 用来支持将拆分的 ReaderWriter 应用到不同的任务中。

因为 io::copy 是在持有 TcpStream 的同一个任务中调用的,所以我们使用 TcpSteram::split,处理请求的任务现在变为了

#![allow(unused)]
fn main() {
tokio::spawn(async move {
  let (mut rd, mut wr) = soket.split();
  
  if io::copy(&mut rd, &mut wr).await.is_err() {
    eprintln!("failed to copy");
  }
});
}

完整的代码可以在 这里 找到。

Manual Copying

接下来我们来实现一个手动复制数据的 Echo 服务,这里面我们使用了 AsyncReadExt::readAsyncWriteExt::write_all

完整的代码如下:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
  let mut listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
  
  loop {
    let (mut socket, _) = listener.accept().await?;
    
    tokio::spawn(async move {
      let mut buf = vec![0; 1024];
      
      loop {
        match socket.read(&mut buf).await {
          // Return value of `Ok(0)` signfies that eh remote has closed
          Ok(0) => return,
          Ok(n) => {
            // Copy the data back to socket
            if socket.write_all(&buf[..n]).await.is_err() {
              // Unexpected socket error. There isn't much we can
              // do here so just stop processing.
              return;
            }
          }
          Err(_) => {
            // Unexpected socket error. There isn't must we can do
            // here so just stop processing.
            return;
          }
        }
      }
    });
  }
}

接下来逐步拆分说明,因为使用了 AsyncReadAsyncWrite 相关的东西,所以需要在当前代码中引入下面的两个 Trait

#![allow(unused)]
fn main() {
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
}

Allocating A Buffer

当前的实现方案需要将数据从套接字中读取到缓存,然后再将其从缓存中写回套接字

#![allow(unused)]
fn main() {
let mut buf = vec![0; 1024];
}

一个创建在栈中的缓存是需要明确避免的,正如早前提到的,所有的任务数据都需要保存到任务中,用来在 .await 调用之间使用。在我们的例子中, buf 会跨越 .await 调用使用,所有的任务数据都需要保存到单次申请中,你可以假定将使用 enum 来保存那些需要跨越 .await 调用的数据。

如果将缓存表现为栈中的数组,在每次创建任务时产生的内部数据结构看起来会是这样:

#![allow(unused)]
fn main() {
struct Task {
  // internal task fields here
  task: enum {
    AwaitingRead {
      socket: TcpStream,
      buf: [BufferType],
    },
    AwaitingWriteAll {
      socket: TcpStream,
      buf: [BufferType],
    }
  }
}
}

如果使用一个栈中的数组作为上面的缓存类型,他会被内联的保存到任务的数据结构中,这会让任务的数据结构变得非常庞大。而且,缓存的大小通常跟页的大小设为相同的,这会让任务的数据大小变成一个不是很合适的尺寸: $page-size + a-few_bytes

编译器会将异步代码块优化的比我们看到的基础的 enum 更好,在实际中,变量并不像枚举那样在不同的状态中移动,但是任务的数据结构大小仍然会像我们看到的那么大。

因此,使用专用独立的内存分配来使用缓存会是更好的方式。

Handling EOF

当我们所读取的 TCP 流被关闭了,对其调用的 read() 会返回 Ok(0),这意味着我们需要中断这个读取循环了,忘记中断这个读取的循环是很多常见 Bug 的来源

#![allow(unused)]
fn main() {
loop {
  match socket.read(&mut buf).await {
    Ok(0) => return
  }
}
}

没有中断这个读取的循环会导致 CPU 被占满 100%,当套接字关闭后,socket.read() 会马上返回,所以这个循环会无限循环下去。

完整的代码可以在 这里 找到。