muduo中的TCPConnection

在muduo中,socket的事件分发由channel来完成,针对不同的事件,channel负责调用不同的回调函数。TCPConnection则是对channel的封装,TCPConnection用来表示一个已建立的链接,每一个TCPConnection负责一个channel。TCPConnection属于应用层的类,拥有TCP链接的属性,如对端socket信息、链接信息等。TCPConnection有着不同事件处理函数,并且对外提供三个回调函数接口,应用可以对于不同的事件注册不同的回调函数,在TCPConnection执行完自己的事件处理流程后,会调用上一层注册的相应回调函数。TCPConnection直接将自己的事件处理函数作为回调函数注册到channel中,这样对于在channel中,实际上执行的事件处理函数为TCPConnection中的事件处理函数,用户可以通过向TCPConnection注册回调函数来控制socket不同事件所应采取的操作。

TCPConnection中有着接受和输出buffer缓冲,数据的读取都需要和buffer打交道,发送数据要比读取数据难,需要注意两个地方:

  1. 当没有数据可以发送的时候,需要关闭读事件,否则的话会造成busy loop。
  2. 当发送的数据堆积在buffer中,client一直没有接收或是数据接收很慢造成buffer太大的时候,需要进行处理,否则的话,会造成内存浪费。

TCPConnection有两个回调函数,WriteCompleteCallBackHighWaterMarkCallBack,当buffer没有数据发送时,可以执行WriteCompleteCallBack函数,当buffer太大时,可以执行HighWaterMarkCallBack函数(一般默认是直接关闭链接,回收内存),TCPConnection暴露这两个回调接口,提供该上一层注册。
TCPConnection的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//TCPConnection维护的部分信息以及属性
EventLoop* loop_;
const string name_;
StateE state_; // FIXME: use atomic variable
// we don't expose those classes to client.
boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;
//该回调函数在链接建立时执行
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;
size_t highWaterMark_;
Buffer inputBuffer_;
Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.
boost::any context_;
bool reading_;


//建立连接
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024),
reading_(true)
{
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(
boost::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
boost::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
boost::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}

void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading();

connectionCallback_(shared_from_this());
}

//关闭连接
void TcpConnection::forceCloseInLoop()
{
loop_->assertInLoopThread();
if (state_ == kConnected || state_ == kDisconnecting)
{
// as if we received 0 byte in handleRead();
handleClose();
}
}

//事件处理函数

//当socket可读,但是读取数据返回0的时候,说明对端关闭了链接,此时,执行handleClose函数
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{

handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}

void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}

void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
assert(state_ == kConnected || state_ == kDisconnecting);
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected);
channel_->disableAll();

TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis);
// must be the last line
closeCallback_(guardThis);
}

void TcpConnection::handleError()
{
int err = sockets::getSocketError(channel_->fd());
LOG_ERROR << "TcpConnection::handleError [" << name_
<< "] - SO_ERROR = " << err << " " << strerror_tl(err);
}