muduo中的EventLoop

muduo主要采用Reactor模式来实现C++网络库。一个EventLoop对应一个线程。服务器就是在EventLoop中来实现对网络连接的管理的,所有的socket的读写都在该loop中实现。EventLoop的执行大体步骤如下:

EventLoop执行流程图

代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";

while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}

LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}

pendingFunctors的定义如下:

1
2
typedef boost::function<void()> Functor;
std::vector<Functor> pendingFunctors_;

在EventLoop中,只有pendingFunctors暴露给其他的线程,所以对于该成员的修改需要mutex来保护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void EventLoop::runInLoop(Functor&& cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}

void EventLoop::queueInLoop(Functor&& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(std::move(cb)); // emplace_back
}

if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}

在遍历pendingFunctors的时候,采用的是先剪切,再遍历:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;

{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}

for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}

这样可以在遍历的时候,不会阻塞其他线程往EventLoop里面注册函数,并且,由于回调函数里面有可能也会调用queueInLoop函数,所以这样也可以避免死循环,先剪切后,遍历的是新的functors,原来的pendingfunctors为空,如果回调函数又执行queueInLoop的话,注册的是pendingFunctors,跟新的Functors无关。所以可以安全的遍历新的Functors。

由于EventLoop有可能会一直阻塞在poll中,此时如果有回调函数需要EventLoop马上执行的话,那么需要立马唤醒EventLoop的阻塞,为了实现该功能,EventLoop使用了Linux现有的eventfd。

eventfd:实现了线程之间事件通知的方式,eventfd的缓冲区大小是sizeof(uint64_t);向其write可以递增这个计数器,read操作可以读取,并进行清零;eventfd也可以放到监听队列中,当计数器不是0时,有可读事件发生,可以进行读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(NULL)
{
LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallback(
boost::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
}

int createEventfd()
{

int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_SYSERR << "Failed in eventfd";
abort();
}
return evtfd;
}

void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}

void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
}

EventLoop提供一个接口来直接退出EventLoop循环,该接口为public,允许外面的对象访问:

1
2
3
4
5
6
7
8
9
10
11
void EventLoop::quit()
{
quit_ = true;
// There is a chance that loop() just executes while(!quit_) and exits,
// then EventLoop destructs, then we are accessing an invalid object.
// Can be fixed using mutex_ in both places.
if (!isInLoopThread())
{
wakeup();
}
}