Linux下原生异步IO接口Libaio的用法

2011-10-29 13:30 / no comment / 151 views /

Libaio是Linux下原生的异步IO接口,其项目地址在这里。关于异步I/O的介绍参看这里。linux下有两种异步I/O库:一种是glibc实现的aio,其函数命名以aio_*开始,在用户态通过线程+阻塞模型实现;另一种是Linux内核中实现的异步I/O,但是glibc并没有对它进行封装。Libaio库对Linux内核态的异步IO进行了封装,提供以以io_*开头的接口函数。Libaio由oracle维护,在Oracle数据库中使用。网上对其使用方法讨论较少,这里做个简单说明。做开发,首先需要安装开发包(gcc不自带):Redhat:sudo yum install libaio-devel;Ubuntu:sudo apt-get install libaio-dev。Libaio的使用过程分以下几步:libaio的初始化,io请求的下发和回收,libaio销毁。

一、libaio接口

libaio提供下面五个主要API函数:

int io_setup(int maxevents, io_context_t *ctxp);
int io_destroy(io_context_t ctx);
int io_submit(io_context_t ctx, long nr, struct iocb *ios[]);
int io_cancel(io_context_t ctx, struct iocb *iocb, struct io_event *evt);
int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

五个宏定义

void io_set_callback(struct iocb *iocb, io_callback_t cb);
void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);
void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);
void io_prep_pwritev(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset);
void io_prep_preadv(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset);

这五个宏定义都是操作struct iocb的结构体。struct iocb是libaio中很重要的一个结构体,用于表示IO,但是其结构略显复杂,为了保持封装性不建议直接操作其元素而用上面五个宏定义操作。

二、libaio的初始化和销毁

观察libaio五个主要API,都用到类型为io_context的变量,这个变量为libaio的工作空间。不用具体去了解这个变量的结构,只需要了解其相关操作。创建和销毁libaio分别用到io_setup(也可以用io_queue_init,区别只是名字不一样而已)和io_destroy。

int io_setup(int maxevents, io_context_t *ctxp);

创建可以接收最多maxevents个请求的异步I/O环境变量。注意:ctxp不能指向其它环境变量,而且必需初始化为零。

一点实际经验

如果要操作多个磁盘,可以有两种办法:

  1. 为每一盘创建个io_context,每个盘的I/O发到它自己的io_context。好处是:在回收的时候在每个盘相关联的io_context中取回其请求。
  2. 只创建一个io_context,所有盘的I/O都发到这个io_context。

实验中发现,用第1种办法在刚开始运行的时候总是出现一些写请求出错运行一会后错误不再出现,用第2种方法则没有这种情况。个人感觉应该是libaio库的问题,也许在其它版本中不存在这个问题,待查证。

int io_destroy(io_context_t ctx);

io_destroy用于销毁环境变量,同时可以取消当前正在进行的I/O。

三、libaio读写请求的下发和回收

1. 请求下发

libaio的读写请求都用io_submit下发。下发前通过io_prep_pwrite和io_prep_pread生成iocb的结构体,做为io_submit的参数。这个结构体中指定了读写类型、起始扇区、长度和设备标志符。

libaio的初始化不是针对一个具体设备进行初始,而是创建一个libaio的工作环境。读写请求下发到哪个设备是通过open函数打开的设备标志符指定。

io_submit返回当前队列中的I/O数,如果队列中没有I/O等待返回零,出错返回负数。

2. 请求返回

读写请求下发之后,使用io_getevents函数等待io结束信号:

int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

io_getevents返回events的数组,其参数events为数组首地址,nr为数组长度(即最大返回的event数),min_nr为最少返回的events数。timeout可填NULL表示无等待超时。io_event结构体的声明为:

struct io_event {
    PADDEDptr(void *data, __pad1);
    PADDEDptr(struct iocb *obj,  __pad2);
    PADDEDul(res,  __pad3);
    PADDEDul(res2, __pad4);
};

其中,res为实际完成的字节数;res2为读写成功状态,0表示成功;obj为之前下发的struct iocb结构体。这里有必要了解一下struct iocb这个结构体的主要内容:

iocbp->iocb.u.c.nbytes 字节数
iocbp->iocb.u.c.offset 偏移
iocbp->iocb.u.c.buf 缓冲空间
iocbp->iocb.u.c.flags 读写

3. 自定义字段

struct iocb除了自带的元素外,还留有供用户自定义的元素,包括回调函数和void *的data指针。如果在请求下发前用io_set_callback绑定用户自定义的回调函数,那么请求返回后就可以显示的调用该函数。回调函数的类型为:

void callback_function(io_context_t ctx, struct iocb *iocb, long res, long res2);

另外,还可以通过iocbp->data指针挂上用户自己的数据。

注意:实际使用中发现回调函数和data指针不能同时用,可能回调函数本身就是使用的data指针。

四、使用例子

通过上面的说明并不能完整的了解libaio的用法,下面通过简单的例子进一步说明。

一个例子:http://www.hadoopor.com/archiver/tid-956.html

另一个例子:

#include <stdlib.h>
#include <stdio.h>
#include <libaio.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <libaio.h>

int srcfd=-1;
int odsfd=-1;

#define AIO_BLKSIZE  1024
#define AIO_MAXIO 64

static void wr_done(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
       if(res2 != 0)
       {
              printf(“aio write error\n”);
       }
       if(res != iocb->u.c.nbytes)
       {
              printf( “write missed bytes expect %d got %d\n”, iocb->u.c.nbytes, res);
              exit(1);
       }
       free(iocb->u.c.buf);
       free(iocb);
}

static void rd_done(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
       /*library needs accessors to look at iocb*/
       int iosize = iocb->u.c.nbytes;
       char *buf = (char *)iocb->u.c.buf;
       off_t offset = iocb->u.c.offset;
       int  tmp;
       char *wrbuff = NULL;
       if(res2 != 0)
       {
             printf(“aio read\n”);
       }
       if(res != iosize)
       {
              printf( “read missing bytes expect %d got %d”, iocb->u.c.nbytes, res);
              exit(1);
       }
       /*turn read into write*/
       tmp = posix_memalign((void **)&wrbuff, getpagesize(), AIO_BLKSIZE);
       if(tmp < 0)
       {
              printf(“posix_memalign222\n”);
              exit(1);
       }
       snprintf(wrbuff, iosize + 1, “%s”, buf);
       printf(“wrbuff-len = %d:%s\n”, strlen(wrbuff), wrbuff);
       printf(“wrbuff_len = %d\n”, strlen(wrbuff));
       free(buf);
       io_prep_pwrite(iocb, odsfd, wrbuff, iosize, offset);
       io_set_callback(iocb, wr_done);
       if(1!= (res=io_submit(ctx, 1, &iocb)))
              printf(“io_submit write error\n”);
       printf(“\nsubmit  %d  write request\n”, res);
}

void main(int args,void * argv[])
{
    int length = sizeof(“abcdefg”);
    char * content = (char * )malloc(length);
    io_context_t myctx;
    int rc;
    char * buff=NULL;
    int offset=0;
    int num,i,tmp;
    if(args<3)
    {
        printf(“the number of param is wrong\n”);
        exit(1);
    }

      if((srcfd=open(argv[1],O_RDWR))<0)
      {
        printf(“open srcfile error\n”);
        exit(1);
      }

      printf(“srcfd=%d\n”,srcfd);

      lseek(srcfd,0,SEEK_SET);
      write(srcfd,”abcdefg”,length);
      lseek(srcfd,0,SEEK_SET);
      read(srcfd,content,length);

      printf(“write in the srcfile successful,content is %s\n”,content);

      if((odsfd=open(argv[2],O_RDWR))<0)
      {
        close(srcfd);
        printf(“open odsfile error\n”);
        exit(1);
      }

    memset(&myctx, 0, sizeof(myctx));
    io_queue_init(AIO_MAXIO, &myctx);
       struct iocb *io = (struct iocb*)malloc(sizeof(struct iocb));
       int iosize = AIO_BLKSIZE;
       tmp = posix_memalign((void **)&buff, getpagesize(), AIO_BLKSIZE);
        if(tmp < 0)
        {
              printf(“posix_memalign error\n”);
              exit(1);
         }
        if(NULL == io)
        {
            printf( “io out of memeory\n”);
             exit(1);
        }

        io_prep_pread(io, srcfd, buff, iosize, offset);
        io_set_callback(io, rd_done);
         printf(“START…\n\n”);
         rc = io_submit(myctx, 1, &io);
          if(rc < 0)
               printf(“io_submit read error\n”);
          printf(“\nsubmit  %d  read request\n”, rc);
          //m_io_queue_run(myctx);
       struct io_event events[AIO_MAXIO];
          io_callback_t cb;
          num = io_getevents(myctx, 1, AIO_MAXIO, events, NULL);
          printf(“\n%d io_request completed\n\n”, num);
          for(i=0;i<num;i++)
         {
              cb = (io_callback_t)events[i].data;
              struct iocb *io = events[i].obj;
              printf(“events[%d].data = %x, res = %d, res2 = %d\n”, i, cb, events[i].res, events[i].res2);
              cb(myctx, io, events[i].res, events[i].res2);
          }
}

参考资料:

  • Linux下用户态直接读写磁盘扇区
  • Linux下异步IO(libaio)的使用以及性能
  • nginx 0.8.x稳定版对linux aio的支持
  • Project: libaio-oracle
  • Kernel Asynchronous I/O (AIO) Support for Linux
  • 深入浅出异步I/O模型
  • Tags: , .

    Get a Trackback link

    No Comments Yet

    You can be the first to comment!

    Leave a comment

    XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>