纯c协程框架NtyCo实现与原理
- 前言
- 1. 为什么会有协程?协程解决了哪些问题?
-
- 网络IO优化
- IO 同步 操作性能测试
- IO 异步 操作性能测试
- 协程的诞生
- 2. 原语
-
- yield()
- schedule
- resume()
- 代码 图 进行理解
- 如何实现yield和resume
- 3. 切换
-
- 寄存器介绍
- 汇编实现切换
- 4. 协程操作流程
-
- 如何使用协程,协程api
- 协程工作流程
-
- 创建协程
- 回调协程的子过程
- 协程封装posix api异步原理
- 使用案例简单
- 5. 协程 与 调度器 结构体定义
-
- 协程定义
- 调度器定义
- 6. 调度的策略
-
- 生产者消费模式
- 多状态运行
- 7. 协程api 与 hook
-
- 需要包装为异步posix api分析
- hook
- 8. 多核模式
- 9. 协程性能测试
前言
??c 两个更好的协程库libgo和libco,本文使用纯c代码实现协程ntyco。
??ntyco源码地址——> gopherWxf/NtyCo——>源码中有注释,本文是对的ntyco整体梳理。
??如果了解过golang请注意协程,golang调度器的策略见→ 深入理解GMP模型 ,golang的协程与ntyco协程不同,ntyco基于调度策略 io事件 来做的
。
??协程应用于以下三个方面
1. 文件io的操作 2. 网络io的操作 3. mysql,redis操作第三方库等
??本专栏的知识点是通过的网上课学习,梳理总结写文章,对c/c linux对课程感兴趣的读者可以点击链接 C/C 介绍后台高级服务器课程 详细查看课程服务。
1. 为什么会有协程?协程解决了哪些问题?
网络IO优化
??在CS,BS在开发模式下,服务器的吞吐量是一个值得关注的参数。吞吐量等于1秒内的业务处理次数,因此业务处理实际上是由 网络IO时间 业务处理时间 组成的。
??不同的业务有不同的业务处理时间,因此应根据业务场景优化业务处理时间。IO时间可以优化。
??也就是说,如何提高recv和send性能?Linux以服务器百万并发实现为例,使用epoll管理百万长连接,测试IO同步操作和异步操作的性能差异。
??对于响应式服务器,所有客户端的操作都来自于这个大循环,服务器处理网络IO,handle(sockfd) 实现有两种方式。IO同步;第二,IO异步。
IO 同步 操作性能测试
??对于IO 在同步操作方面,handle(sockfd) 函数内部实现如下,我们发现IO操作(recv,send)与 epoll_wait 在同一处理过程中。也就是说,同步:检测IO 与 读写IO 在同一个过程中
??Linux以同步的方式实现服务器百万并发实现。我们测试了每千个连接,耗时7.5秒左右。 优点:
- sockfd 管理方便
- 代码逻辑清晰
缺点:
- 依赖服务器程序 epoll_wait 循环,响应速度慢。
- 程序性能差
IO 异步 操作性能测试
??对于IO 对于异步操作,handle(sockfd) 函数内部将 sockfd 的操作,push 到线程池中 , 读写由其它线程进行,也就是说,异步:检测IO 与 读写IO 不在同一个过程中
。
??handle(sockfd) 函数将sockfd处理放在另一个线程中进行处理IO操作(recv,send) 与 epoll_wait 解耦不在处理过程中实现,称为IO 异步操作。如下图所示,每千个连接需要2次时间.5秒左右。
优点:
- 好的子模块规划
- 程序性能高
缺点:
- 由于子模块容易规划,模块之间的模块 sockfd 管理极其麻烦。每个子线程都需要管理 sockfd,避免在 IO 操作时,sockfd 关闭或其他异常。一般来说,需要避免一个fd多线程操作。
协程的诞生
对比项 | IO 同步操作 | IO 异步操作 | 协程 |
---|---|---|---|
Sockfd 管理 | 管理方便 | 多个线程共同管理 | 管理方便 |
代码逻辑 | 整个程序逻辑清晰 | 子模块逻辑清晰 | 整个程序逻辑清晰 |
程序性能 | 响应时间长,性能差 | 响应时间短,性能好 | 响应时间短,性能好 |
??我们从上面知道IO同步操作,编写代码逻辑清晰,但效率低;IO异步操作,fd管理复杂,但效率高。因此,协程出现了。
??协程:将两者结合起来,异步性能通过同步编程实现
。也就是说,在编写代码时,同步;操作逻辑,异步。如下图所示,代码是同步的,我们通过hook,它运行时,是异步的。
??有人可能会说,上面IO异步操作性能测试显然使用了两个线程,当然比一个线程同步IO快点。明确一个概念:同步和异步必须描述两者之间的关系
:
同步:检测IO 与 读写IO 在同一个过程中
异步:检测IO 与 读写IO 不在同一个过程中
??然后了解以下内容QA问题,在理解本文协程时要注意理解,然后回头看。
??Q:为什么协程效率更高?reactor用的是epoll_wait, 协程调度也是如此epoll_wait,总的来说不一样吗?
??A:协程性能和单线程不加业务分析reactor是差不多的 ,但是编程要容易得多。首先,使用协程业务代码相对简单,一个协程对应于一个fd,协程内部有业务逻辑;reactor提供的recv_cb和send_cb所有的业务流程。
??Q:有了业务分析,效率不一样吗?
??A:业务部分,如数据库操作,耗时堵塞IO,协程可以通过hook,把recv和send将数据库变成异步IO堵塞的时间切换到其他协程,所有堵塞等待的地方都会导致切换。reactor业务部分数据库IO只能等。
Q:为什么会有协程,协程解决了什么问题?
A:IO同步操作,写代码逻辑清晰,但是效率低;而IO异步操作,fd管理复杂,但是效率高。协程解决了IO同步效率低,IO异步fd管理发杂的问题,协程结合两者的优点,实现了以同步的编程方式,实现异步的性能
。
2. 原语
yield()
先来理解yield的含义,让出,将当前的执行流程让出,让出给调度器。
那么什么时候需要yield让出呢?很明显在recv之前,send之前,也就是在io之前,因为我们不知道io是否准备就绪了,所以我们先将fd加入epoll中,然后yield让出,将执行流程给调度器运行。
schedule
schedule调度器做什么事情呢?调度器就是io检测,调度器就是不断的调用epoll_wait,来检测哪些fd准备就绪了,然后就恢复相应fd的执行流程执行现场
。注意schedule不是原语,schedule是调度器。
resume()
从上面我们得知恢复是被schedule恢复的,那么现在恢复到了原来流程的哪里呢?其实是恢复到了yield的下一条代码处
。通常下面的代码都会将fd从epoll中移除,然后执行recv或send操作,因为一旦被resume,就说明肯定是准备就绪的。
代码+图 进行理解
那么如何以同步的编程方式来实现异步的性能呢?我们是否可以包装一层recv(不包装用hook也可以,后面再写hook),在调用recv之前(假设现在的流程为A),将fd加入epoll中,然后将执行流程让出切换给一个调度器(假设调度器的流程为schedule),调度器里面执行epoll_wait,由调度器选择接下来恢复执行哪个流程(这个流程一定是就绪的,因为是从epoll_wait中返回的),可能会恢复到流程B中,那么流程B先将自己的事件从epoll中摘除EPOLL_CTL_DEL,然后就可以执行recv了。我们发现检测IO 与 读写IO 不在同一个流程里
,实现了异步,而我们代码看起来却是同步的。
我们从中发现两个原语操作,让出yield
和恢复resume
如何实现yield和resume
- yield:从io操作流程切换到调度器流程(让出)
- resume:从调度器流程切换到io操作流程(恢复)
到现在为止应该能理解yield和resume的意思了,但是对于初学者来说,肯定会有疑问,这个切换怎么实现?
如何实现yield和resume:
- setjmp/longjmp
- ucontext
- 用汇编代码自己实现切换
本文采用汇编代码实现切换_switch()。
- yield=_switch(A,B)
- resume=_switch(B,A)
//new_ctx[%rdi]:即将运行协程的上下文寄存器列表; cur_ctx[%rsi]:正在运行协程的上下文寄存器列表
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
// yield让出
void nty_coroutine_yield(nty_coroutine *co) {
_switch(&co->sched->ctx, &co->ctx);
}
// resume协程恢复执行
int nty_coroutine_resume(nty_coroutine *co) {
//...
nty_schedule * sched = nty_coroutine_get_sched();
sched->curr_thread = co;
_switch(&co->ctx, &co->sched->ctx);
//...
}
如何从一个协程切换到另一个协程呢?我们只需要将当前协程的上下文从寄存器组中保存下来;将下一个要运行的协程的上下文放到寄存器组上去,即可实现协程的切换。
3. 切换
寄存器介绍
下面介绍的都是x86_64的寄存器。
- %rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函数参数,依次对应第1参数,第2参数…(这里我们只需关注%rdi和%rsi)
- %rbx,%rbp,%r12,%r13,%14,%15 用作数据存储,遵循被调用者使用规则,简单说就是随便用,调用子函数之前要备份它,以防他被修改
- new_ctx是一个指针,指向一块内存,它现在存在%rid里面,同理cur_ctx存在%rsi里面
- %rsp代表栈顶,%rbp代表栈底,%eip代表cpu下一条待取指令的地址(这也就是为什么resume之后会接着运行代码流程的原因)
//new_ctx[%rdi]:即将运行协程的上下文寄存器列表; cur_ctx[%rsi]:正在运行协程的上下文寄存器列表
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
汇编实现切换
//寄存器 cpu上下文
typedef struct _nty_cpu_ctx {
void *rsp;//栈顶
void *rbp;//栈底
void *eip;//CPU通过EIP寄存器读取即将要执行的指令
void *edi;
void *esi;
void *rbx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;
//new_ctx[%rdi]:即将运行协程的上下文寄存器列表; cur_ctx[%rsi]:正在运行协程的上下文寄存器列表
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
//默认x86_64
__asm__(
" .text \n"
" .p2align 4,,15 \n"
".globl _switch \n"
".globl __switch \n"
"_switch: \n"
"__switch: \n"
" movq %rsp, 0(%rsi) # save stack_pointer \n"
" movq %rbp, 8(%rsi) # save frame_pointer \n"
" movq (%rsp), %rax # save insn_pointer \n"
" movq %rax, 16(%rsi) # save eip \n"
" movq %rbx, 24(%rsi) # save rbx,r12-r15 \n"
" movq %r12, 32(%rsi) \n"
" movq %r13, 40(%rsi) \n"
" movq %r14, 48(%rsi) \n"
" movq %r15, 56(%rsi) \n"
" movq 56(%rdi), %r15 \n"
" movq 48(%rdi), %r14 \n"
" movq 40(%rdi), %r13 \n"
" movq 32(%rdi), %r12 \n"
" movq 24(%rdi), %rbx # restore rbx,r12-r15 \n"
" movq 8(%rdi), %rbp # restore frame_pointer \n"
" movq 0(%rdi), %rsp # restore stack_pointer \n"
" movq 16(%rdi), %rax # restore insn_pointer \n"
" movq %rax, (%rsp) # restore eip \n"
" ret # 出栈,回到栈指针,执行eip指向的指令。\n"
);
上下文切换,就是将 CPU 的寄存器暂时保存,再将即将运行的协程的上下文寄存器,分别mov 到相对应的寄存器上。此时上下文完成切换。
4. 协程的运行流程
协程如何使用,协程的api
在网络IO编程的时候,如果每次accept返回的时候,为新来的fd单独分配一个线程,这一个fd对应一个线程,就不会存在多个线程共用一个fd的问题了,虽然这样代码逻辑清晰易读,但是这是无稽之谈,线程创建与线程调度的代价是很大的
但是如果把线程换成协程,线程API的思维来使用协程,那不久可行了吗?
NtyCo封装了两类接口
- 一类是协程本身的api
//创建协程
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg);
//调度器运行
void nty_schedule_run(void);
- 一类是posix api的异步封装协程api
//POSIX 异步封装 API
int nty_socket(int domain, int type, int protocol);
int nty_accept(int fd, struct sockaddr *addr, socklen_t *len);
ssize_t nty_recv(int fd, void *buf, size_t len, int flags);
ssize_t nty_send(int fd, const void *buf, size_t len, int flags);
int nty_close(int fd);
int nty_connect(int fd, struct sockaddr *name, socklen_t len);
ssize_t nty_recvfrom(int fd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
ssize_t nty_sendto(int fd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
协程工作流程
创建协程
创建协程的时候,进行了如何的工作?
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg);
- nty_coroutine **new_co :需要传入空的协程的对象,这个对象是由内部创建的,并且在函数返回的时候,会返回一个内部创建的协程对象。
- proc_coroutine func :协程的子过程。当协程被调度的时候,就会执行该 函数。
- void *arg :需要传入到新协程子过程中的参数。
协程不存在亲属关系,都是一致的调度关系,接受调度器的调度。调用 create API就会创建一个新协程,新协程就会加入到调度器的就绪队列中。
回调协程的子过程
在 create 协程后,何时回调子过程?何种方式回调子过程?我们知道CPU的EIP寄存器就是存储cpu下一条指令的地址,我们可以把回调函数的地址存储到 EIP 中。这样在resume之后,就会执行协程的子过程了。
// eip 执行入口
static void _exec(void *lt) {
nty_coroutine *co = (nty_coroutine *) lt;
co->func(co->arg);
}
// 初始化协程栈
static void nty_coroutine_init(nty_coroutine *co) {
void **stack = (void **) (co->stack + co->stack_size);
stack[-3] = NULL;
stack[-2] = (void *) co;//设置参数
co->ctx.rsp = (void *) stack - (4 * sizeof(void *));
co->ctx.rbp = (void *) stack - (3 * sizeof(void *));
co->ctx.eip = (void *) _exec;//设置回调函数入口
co->status = BIT(NTY_COROUTINE_STATUS_READY);
}
协程封装posix api异步原理
在 send 与 recv 调用的时候,如何实现异步操作的?先来看一下一段代码: 在进行 IO 操作(recv,send)之前,先执行了 epoll_ctl 的 del 操作,将相应的 sockfd 从 epfd中删除掉,在执行完 IO 操作(recv,send)再进行 epoll_ctl 的 add 的动作。这段代码看起来似乎好像没有什么作用。
如果是在多个上下文中,这样的做法就很有意义了。能够保证 sockfd 只在一个上下文中能够操作 IO 的。不会出现在多个上下文同时对一个 IO 进行操作的。协程的 IO 异步操作正式是采用此模式进行的。
详细说明见目录中2.原语->代码+图 进行理解
// 创建协程recv接口
ssize_t nty_recv(int fd, void *buf, size_t len, int flags) {
struct epoll_event ev;
ev.events = POLLIN | POLLERR | POLLHUP;
ev.data.fd = fd;
//加入epoll,然后yield
nty_epoll_inner(&ev, 1, 1);
//resume
ssize_t ret = recv(fd, buf, len, flags);
return ret;
}
// 加入epoll,更改状态,加入wait集合,然后yield与resume
static int nty_epoll_inner(struct epoll_event *ev, int ev_num, int timeout) {
nty_schedule * sched = nty_coroutine_get_sched();
nty_coroutine *co = sched->curr_thread;
int i;
for (i = 0; i < ev_num; i++) {
epoll_ctl(sched->epfd, EPOLL_CTL_ADD, ev->data.fd, ev);
co->events = ev->events;
//加入wait集合,添加wait状态
nty_schedule_sched_wait(co, ev->data.fd, ev->events, timeout);
}
//yield
nty_coroutine_yield(co);
for (i = 0; i < ev_num; i++) {
epoll_ctl(sched->epfd, EPOLL_CTL_DEL, ev->data.fd, ev);
//移除wait集合,移除wait状态
nty_schedule_desched_wait(ev->data.fd);
}
return ev_num;
}
一个简单的使用案例
可以看到,我们编写代码只需以同步的编程方式,就能实现异步的性能了。
#include "nty_coroutine.h"
#include <arpa/inet.h>
void server_reader(void *arg) {
int fd = *(int *) arg;
ssize_t ret;
struct pollfd fds;
fds.fd = fd;
fds.events = POLLIN;
while (1) {
char buf[1024] = {
0};
ret = nty_recv(fd, buf, 1024, 0);
if (ret > 0) {
nty_send(fd, buf, strlen(buf), 0);
}
else if (ret == 0) {
nty_close(fd);
break;
}
}
}
void server(void *arg) {
unsigned short port = *(unsigned short *) arg;
int fd = nty_socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) return;
struct sockaddr_in local, remote;
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
bind(fd, (struct sockaddr *) &local, sizeof(struct sockaddr_in));
listen(fd, 128);
while (1) {
socklen_t len = sizeof(struct sockaddr_in);
int cli_fd = nty_accept(fd, (struct sockaddr *) &remote, &len);
printf("new client comming\n");
nty_coroutine *read_co;
nty_coroutine_create(&read_co, server_reader, &cli_fd);
}
}
int main(int argc, char *argv[]) {
nty_coroutine *co = NULL;
unsigned short port = 8080;
nty_coroutine_create(&co, server, &port);
nty_schedule_run(); //run
return 0;
}
5. 协程 与 调度器 结构体定义
协程定义
一个协程会有哪些状态呢?如果协程sleep了,那么就是睡眠状态,如果协程刚创建出来,那它肯定是就绪状态,如果协程在等待数据的到来,那就是等待状态。这里这里定义协程的三个运行状态{就绪,睡眠,等待}。
-
新创建的协程,加入就绪集合等待调度
-
io未就绪的协程,加入等待集合等待epoll_wait
-
有sleep操作的协程,加入睡眠集合
-
就绪集合没有设置优先级,所以在就绪集合里面的协程优先级一样,那么就可以用队列来存储,先进先出
-
等待集合就是等待IO准备就绪,这个等待IO是有时间长短的,这里用红黑树来存储
-
睡眠集合需要按照睡眠时间的长短进行唤醒,所以也用红黑树存储,key为睡眠时长
我们描述了每一个协程有自己的上下文环境,需要保存 CPU 的寄存器 ctx;需要有子过程的回调函数 func;需要有子过程回调函数的参数 arg;需要定义自己的栈空stack;需要有自己栈空间的大小 stack_size;需要定义协程的创建时间birth;需要定义协程当前的运行状态 status;需要定当前运行状态的结点(ready_next, wait_node, sleep_node);需要定义协程 id;需要定义调度器的全局对象 sched。
typedef struct _nty_coroutine {
//cpu ctx
nty_cpu_ctx ctx;
// func
proc_coroutine func;
void *arg;
// create time
uint64_t birth;
//stack
void *stack;
size_t stack_size;
size_t last_stack_size;
//status
nty_coroutine_status status;
//root
nty_schedule *sched;
//co id
uint64_t id;
//fd event
int fd;
uint16_t events;
//sleep time
uint64_t sleep_usecs;
//set
RB_ENTRY(_nty_coroutine) sleep_node;
RB_ENTRY(_nty_coroutine) wait_node;
TAILQ_ENTRY(_nty_coroutine) ready_node;
} nty_coroutine;
调度器定义
每个协程所需要使用的,而且不同的,就是协程的属性,那么每个协程所需要的,且相同的,就是调度器的属性。用来管理所有协程的属性,作为调度器的属性。调度器的属性,需要有保存 CPU 的寄存器上下文 ctx,可以从协程运行状态yield 到调度器运行的。从协程到调度器用 yield,从调度器到协程用 resume。
typedef struct _nty_schedule {
// create time
uint64_t birth;
//cpu ctx
nty_cpu_ctx ctx;
//stack_size
size_t stack_size;
//coroutine num
int spawned_coroutines;
//default_timeout
uint64_t default_timeout;
//当前调度的协程
struct _nty_coroutine *curr_thread;
//页大小
int page_size;
//epoll fd
int epfd;
//线程通知相关,暂未实现
int eventfd;
//events
struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
int num_new_events;
//set
nty_coroutine_queue ready;
nty_coroutine_rbtree_sleep sleeping;
nty_coroutine_rbtree_wait waiting;
} nty_schedule;
6. 调度的策略
调度器的实现,有两种方案,一种是生产者消费者模式,另一种多状态运行。
生产者消费者模式
逻辑代码如下
多状态运行
逻辑代码如下
7. 协程api 与 hook
需要封装为异步的posix api分析
所有对io的操作,我们都需要取重新封装一遍。为什么不能用posix api,而是我们需要再去封装一次呢?比如我们调用recv的时候,如果我们调用系统的,那么这个fd怎么yield到调度器上呢,所以我们需要在posix api的基础上封装,当然有些接口需要封装,有些不需要。
就像下面的伪代码一样,从同步的recv变成异步的ney_recv
//伪代码
ney_recv(){
epoll add fd;
yield();
epoll del fd;
recv(fd);
}
站在同步封装成异步的角度,如果不需要判断io是否就绪的这些api,则不需要封装为异步的。
需要封装的api,这些api在实现的时候,皆采用上面伪代码的策略
1. accept()
2. connect()
3. recv()
4. read()
5. send()
6. write()
7. recvfrom()
8. sendto()
不需要封装的api,这些api因为不会引起阻塞,所以不用封装。
socket()
listen()
close()
fcntl()
setsockopt()
getsockopt()
hook
我们有两种策略封装上面的api,第一种就是定义Nty_XXX(),框架独立定义一套标准接口出来。但是这种方法,如果跟mysql,redis建立连接,但是不去修改它们提供的客户端源码开发包的时候,就会发现连不上去,因为其源码用的是posix api,recv和send。而协程用的是nty_recv()和nty_send()。两者之间没有关联。
第二种就是使用hook,做成跟系统调用,跟posix api一样的接口,那么一样的接口就会引起冲突。这个冲突我们就使用hook来解决。
hook提供了两个接口;1. dlsym()是针对系统的,系统原始的api。2. dlopen()是针对第三方的库。
我们现在使用mysql的包去连接mysql来做hook演示,可以看到我们将系统调用的api截获了。对应下面的代码,就是这个意思。
原来的系统调用,被hook后,都变成了xxx_f , 所以我们就可以使用原来的名字,比如read,connect。而mysql包里面走的read,connect这些函数,就变成我们写的函数了,如果我们协程提供的api使用hook的做法,那么mysql,redis这类静态库,动态库的read,write都不用修改,使用了hook,就都会走我们定义的函数。
简单来说,原来的系统调用connect,被改名为connect_f , 所以原来的名字connect就空出来了,交由我们用户实现。而像mysql-dev源码里面用的都是connect,所以就会走我们用户写的函数了。
connect_f = dlsym(RTLD_NEXT, "connect");
#define _GNU_SOURCE