East - IO Coroutine Scheduler Module

IO Manager

熟悉了协程调度器之后,我们再来分析一下IO协程调度器,顾名思义,它主要用来调用IO相关的task。

在之前的协程调度器一节中,我们知道,协程调度器是一个线程池,每个线程都会从队列中取出任务并以放在协程中运行,如果一个任务一旦开始执行,就必须执行完,那么这个协程调度器就是一个单纯的线程池而已,但是学习了协程之后,我们是可以让一个任务在执行过程中,让出CPU,把资源让出去的,那么什么时候让出去呢?当某个任务确实不需要CPU或者占着CPU也做不了事情的时候,比如,网络IO的相关函数,当我们调用某个IO相关的API并且是阻塞调用的时候,在等待数据就绪的时间我们其实没必要占着CPU,这个时间可以将资源让出去做其他的事情,如此,协程就派上了用场!

Implement

IOManager 继承了 Scheduler, 并且重写了idle(), tickle(), stopping()函数。
前面提到,Scheduler中的idle并没有做什么事情,但是IOManager的idle函数就非常重要了,在某个线程进入idle函数的时候,意味着它当前没有任务可执行,idle会执行epoll_wait,这个调用会携带一个超时时间,结果返回后,将超时的事件放入调度队列,其他触发的事件也会按照类型放入调度队列,最后idle协程切换出去转换为调度协程,开始取任务执行。

我们先了解几个核心函数:

tickle()函数, 用来唤醒陷入idle的线程

1
2
3
4
5
6
7
8
void IOManager::tickle() {
if (!hasIdleThreads()) //如果没有空闲的线程,直接返回,没必要唤醒
return;
//我们约定,m_tickleFds[0]是读端,m_tickleFds[1]是发送端,在这里写入数据
//其他线程在epoll_wait就可以读取到事件,从而达到唤醒的目的
int cnt = write(m_tickleFds[1], "t", 1);
EAST_ASSERT2(cnt == 1, "tickle pipe failed");
}

核心函数-idle()函数
核心处都做了注释,再简单梳理下流程:

  1. 取出最近的超时器和3s做个对比取小的

  2. 执行epoll_wait,携带第一步计算得到的时间

  3. 执行完毕后,找出所有已经超时的任务放到任务队列中

  4. 获取触发的事件,根据事件类型将对应的回调函数也加入到调度队列中,如果对应的fd ctx的事件没有全部执行完毕,通过epoll_ctl修改,若处理完毕,就删除对应的fd

  5. 将自身(idle协程)切换出去,转到t_master_fiber,即调度协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
void IOManager::idle() {
ELOG_DEBUG(g_logger) << "idle";
constexpr uint32_t MAX_EVENTS = 256; //一次epoll_wait最多处理的事件数
//使用智能指针管理epoll_event数组,避免内存泄漏
std::unique_ptr<epoll_event[]> ep_events(new epoll_event[MAX_EVENTS]);

while (true) {
uint64_t next_timeout{0};
if (stopping(next_timeout)) { //获取下一个定时器的超时时间,同时返回IOManager是否正在停止

ELOG_DEBUG(g_logger) << "IOManager stopping";
break;
}

int res{0};
do {
constexpr int MAX_TIMEOUT = 3000;
//看看现在最靠前的定时器是否小于这个超时时间,取较小的一个
if (next_timeout != ~0ull)
next_timeout = std::min(MAX_TIMEOUT, (int)next_timeout);
else
next_timeout = MAX_EVENTS;

ELOG_DEBUG(g_logger) << "epoll wait, timeout: " << next_timeout;
res = epoll_wait(m_epfd, ep_events.get(), MAX_EVENTS, (int)next_timeout);

if (res < 0 && errno == EINTR) {
continue;
} else {
break;
}
} while (true);

std::vector<std::function<void()>> timer_cbs{};
listExpiredCb(timer_cbs); //获取所有已经超时的定时器的回调函数
if (!timer_cbs.empty()) {
schedule(timer_cbs.begin(),
timer_cbs.end()); //将符合条件的timer的回调放进去
timer_cbs.clear();
}

ELOG_DEBUG(g_logger) << "idle: epoll wait, res: " << res;
for (int i = 0; i < res; ++i) {
epoll_event& event = ep_events[i];
if (event.data.fd == m_tickleFds[0]) {
uint8_t dummy{};
while (read(event.data.fd, &dummy, 1) > 0) //如果是被tickle唤醒的,将所有的数据全都读取出来
;
continue;
}

FdContext* fd_ctx = static_cast<FdContext*>(event.data.ptr);
FdContext::MutextType::LockGuard lock(fd_ctx->mutex); //对fd_ctx加锁,防止其他线程在epoll_wait期间修改fd_ctx的状态
ELOG_DEBUG(g_logger) << "epoll wait, event:" << event.events;
if (event.events & (EPOLLERR | EPOLLHUP)) {
event.events |= EPOLLIN | EPOLLOUT;
}
int real_events{NONE};
if (event.events & EPOLLIN) {
real_events |= READ;
}
if (event.events & EPOLLOUT) {
real_events |= WRITE;
}

int left_events =
(~real_events) & fd_ctx->events; //这次没有触发的事件之后继续触发
int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;

ELOG_DEBUG(g_logger) << __FUNCTION__ << ", fd: " << event.data.fd
<< ", fd ctx's fd: " << fd_ctx->fd
<< ", fd ctx's events: " << fd_ctx->events
<< ", event: " << event.events
<< ", left_events: " << left_events << ", op: " << op
<< ", real_events: " << real_events;
event.events = left_events | EPOLLET; //边缘触发模式
int res2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event); //将没有处理完的事件继续放进去或者是处理完了就删除掉不再监听
if (res2 != 0) {
ELOG_ERROR(g_logger)
<< "epoll_ctl failed, ep fd: " << m_epfd << ", op: " << op
<< ", fd: " << fd_ctx->fd << ", fd events: " << fd_ctx->events
<< ", errno: " << errno << ", strerrno: " << strerror(errno);
continue;
}

if (real_events & READ) {
fd_ctx->triggerEvent(READ); //触发读事件,会将回调函数放入调度器的任务队列中
--m_pendingEventCount;
}
if (real_events & WRITE) {
fd_ctx->triggerEvent(WRITE); //触发写事件,会将回调函数放入调度器的任务队列中
--m_pendingEventCount;
}
}
auto cur_fiber = Fiber::GetThis();
auto raw_ptr = cur_fiber.get();
cur_fiber.reset();
ELOG_DEBUG(g_logger) << "ready to yield, cur fiber id: " << raw_ptr->getId()
<< ", cur fiber state: " << raw_ptr->getState();
raw_ptr->yield(); //将当前协程切换到后台执行,进入调度器协程,开始执行任务
}
}
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2015-2025 Xudong0722
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信