muduo中的Channel和Poller

Channel

每一个Channel对象只负责一个文件描述符的IO时间分发,但她并不拥有这个文件描述符,也不会在析构的时候关闭该文件描述符。channel把不同的IO事件分发为不同的回调,例如readCallBack,writeCallback等,而回调用boost::function表示,用户无需继承channel。用户一般不直接使用,而回使用更上一层的封装,如TcpConnection。
channel的部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
namespace muduo
{
namespace net
{

class EventLoop;

class Channel : boost::noncopyable
{
public:
typedef boost::function<void()> EventCallback;
typedef boost::function<void(Timestamp)> ReadEventCallback;

Channel(EventLoop* loop, int fd);
~Channel();

void handleEvent(Timestamp receiveTime);
void setReadCallback(const ReadEventCallback& cb)
{ readCallback_ = cb; }

void setWriteCallback(const EventCallback& cb)
{ writeCallback_ = cb; }

void setCloseCallback(const EventCallback& cb)
{ closeCallback_ = cb; }

void setErrorCallback(const EventCallback& cb)
{ errorCallback_ = cb; }


void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
bool isWriting() const { return events_ & kWriteEvent; }
bool isReading() const { return events_ & kReadEvent; }

private:

EventLoop* loop_;
const int fd_;
int events_;
int revents_; // it's the received event types of epoll or poll
int index_; // used by Poller.

ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};

}
}
//revents_由poller来设置,根据触发的事件类型来执行相应的回调
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
LOG_TRACE << reventsToString();
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}

if (revents_ & POLLNVAL)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
}

if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}

Poller

Poller calss为IO multiplexing的封装。Poller并不拥有channel,channel在析构前必须自己unregister(EventLoop::removeChannel()),避免空悬指针。
Poller使用两个数据结构来记录需要监听的channel:

1
2
3
4
typedef std::map<int, Channel*> ChannelMap;
typedef std::vector<struct pollfd> PollFdList;
ChannelMap channels_;
PollFdList pollfds_;

pollfds_用来记录需要监听的文件描述符以及对应监听的事件,每一个channel在这里面都有一个元素与之对应,元素的下标就是channel成员变量中的index_,poller可以跟根据每一个channel中的index来对pollfds_进行更新(removeChannel, updateChannel等操作)。

channels_用来记录每一个文件描述符对应的channel指针,这里的key为channel的文件描述符,不可以为channel的index_,因为index_会改变。当Poller获得文件描述符的监听事件后,就根据channels_来获得有时间发生的文件描述符的channel,并设置对应channel的revents_(实际发生的事件),接着将每一个有事件的channel放进一个vector里面,最后将vector返回给EventLoop,作为activateChannels。

之所以需要返回activateChannels,而不是在遍历发生事件的文件描述符的同时执行对应的channel中的handleEvent函数,是因为handleEvent函数有可能会对pollfds_进行修改,如删除channel,使得在迭代期间pollfds_大小发生改变,这是一件很危险的事情。另一个原因就是简化Poller的职责,Poller只负责监听,不负责事件的分发与处理。

如果某一个channel暂时不关心任何事件,那么可以吧pollfd.fd设置为负数,这样poll会忽略此文件描述符。不能将pollfd.events设置为0,因为无法屏蔽POLLERR事件。muduo的改进做法是吧pollfd.fd设为channel->fd()的相反数减一(文件描述符从0开始,减一是为了兼容0,因为0的相反数还是0)。

但需要删除channel时,由于pollfds_使用的是vector来存储channel的,可以根据channle本身的index_来确定该channel在vector中所在的位置。如果直接删除该位置的话,那么后面的channel都需要上移一个位置,导致后面的channel都需要更新自己的index_,这样效率会很低。这里有一个小技巧,就是直接将vector最后的元素A与该位置进行呼唤,只需要修改A的index_就好,其他的channel不受影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void PollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
assert(channel->isNoneEvent());
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
size_t n = channels_.erase(channel->fd());
assert(n == 1); (void)n;
if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
{
pollfds_.pop_back();
}
else
{
int channelAtEnd = pollfds_.back().fd;
iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
if (channelAtEnd < 0)
{
channelAtEnd = -channelAtEnd-1;
}
channels_[channelAtEnd]->set_index(idx);
pollfds_.pop_back();
}
}