// socket.cpp,写入 IOBufintSocket::Write(butil::IOBuf *data,const WriteOptions *options_in){
WriteOptions opt;if(options_in){
opt =*options_in;if(data->empty()){returnSetError(opt.id_wait, EINVAL);if(opt.pipelined_count > MAX_PIPELINED_COUNT){LOG(ERROR)<<"pipelined_count="<< opt.pipelined_count <<" is too large";returnSetError(opt.id_wait, EOVERFLOW);if(Failed()){constint rc =ConductError(opt.id_wait);if(rc <=0){return rc;if(!opt.ignore_eovercrowded && _overcrowded){returnSetError(opt.id_wait, EOVERCROWDED);// 对象池中获取一个 WriteRequest 对象
WriteRequest *req = butil::get_object<WriteRequest>();if(!req){returnSetError(opt.id_wait, ENOMEM);
req->data.swap(*data);// move 数据到 request 中// Set `req->next' to UNCONNECTED so that the KeepWrite thread will// wait until it points to a valid WriteRequest or NULL.// 先将 next 指针设为非法,后续判断依赖该操作
req->next = WriteRequest::UNCONNECTED;
req->id_wait = opt.id_wait;
req->set_pipelined_count_and_user_message(opt.pipelined_count,
DUMMY_USER_MESSAGE, opt.with_auth);returnStartWrite(req, opt);// 调用写入intSocket::StartWrite(WriteRequest *req,const WriteOptions &opt){// Release fence makes sure the thread getting request sees *req// 将链表头原子地替换为待写入的 request,原先的头部返回给 prev_head
WriteRequest *const prev_head =
_write_head.exchange(req, butil::memory_order_release);if(prev_head !=NULL){// Someone is writing to the fd. The KeepWrite thread may spin// until req->next to be non-UNCONNECTED. This process is not// lock-free, but the duration is so short(1~2 instructions,// depending on compiler) that the spin rarely occurs in practice// (I've not seen any spin in highly contended tests).// 如果 prev_head 非空,说明有其他线程拿到了权限在执行写入。// 此时将 next 指针设为 prev_head。// 该操作前 next 指向 WriteRequest::UNCONNECTED,写入的线程可以 spin 等待。
req->next = prev_head;return0;int saved_errno =0;
bthread_t th;
SocketUniquePtr ptr_for_keep_write;
ssize_t nw =0;// We've got the right to write.// 获得连接的写入权限,将指向改为 NULL// 下方所有操作均保证在单线程环境下执行
req->next =NULL;// Connect to remote_side() if not.// 尝试连接对机int ret =ConnectIfNot(opt.abstime, req);if(ret <0){
saved_errno = errno;SetFailed(errno,"Fail to connect %s directly: %m",description().c_str());goto FAIL_TO_WRITE;}elseif(ret ==1){// We are doing connection. Callback `KeepWriteIfConnected'// will be called with `req' at any moment afterreturn0;// NOTE: Setup() MUST be called after Connect which may call app_connect,// which is assumed to run before any SocketMessage.AppendAndDestroySelf()// in some protocols(namely RTMP).
req->Setup(this);// 不确定功能,暂时搁置if(ssl_state()!= SSL_OFF){// Writing into SSL may block the current bthread, always write// in the background.// 对于 SSL,始终使用后台写入goto KEEPWRITE_IN_BACKGROUND;// Write once in the calling thread. If the write is not complete,// continue it in KeepWrite thread.if(_conn){
butil::IOBuf *data_arr[1]={&req->data};
nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr,1);}else{// 执行一次写入,默认 size_hint 为 1MB
nw = req->data.cut_into_file_descriptor(fd());if(nw <0){// RTMP may return EOVERCROWDEDif(errno != EAGAIN && errno != EOVERCROWDED){
saved_errno = errno;// EPIPE is common in pooled connections + backup requests.PLOG_IF(WARNING, errno != EPIPE)<<"Fail to write into "<<*this;SetFailed(saved_errno,"Fail to write into %s: %s"
,description().c_str(),berror(saved_errno));goto FAIL_TO_WRITE;// 失败时跳转}else{AddOutputBytes(nw);if(IsWriteComplete(req,true,NULL)){// 判断所有写入完成,直接返回ReturnSuccessfulWriteRequest(req);return0;
KEEPWRITE_IN_BACKGROUND:// 写入未完成,启动后台 bthread 继续执行写入操作ReAddress(&ptr_for_keep_write);// 复制当前指针到新的 SocketUniquePtr
req->socket = ptr_for_keep_write.release();if(bthread_start_background(&th,&BTHREAD_ATTR_NORMAL, KeepWrite, req)!=0){LOG(FATAL)<<"Fail to start KeepWrite";// bthread 启动失败的情况下继续同步调用写入KeepWrite(req);return0;
FAIL_TO_WRITE:// `SetFailed' before `ReturnFailedWriteRequest' (which will calls// `on_reset' callback inside the id object) so that we immediately// know this socket has failed inside the `on_reset' callbackReleaseAllFailedWriteRequests(req);
errno = saved_errno;return-1;// iobuf_inl.hinline ssize_t IOBuf::cut_into_file_descriptor(int fd, size_t size_hint){returnpcut_into_file_descriptor(fd,-1, size_hint);// iobuf.cpp
ssize_t IOBuf::pcut_into_file_descriptor(int fd, off_t offset,
size_t size_hint){if(empty()){return0;const size_t nref = std::min(_ref_num(), IOBUF_IOV_MAX);structiovec vec[nref];// 将 IOBuf 转为 iovec,批量写入
size_t nvec =0;
size_t cur_len =0;do{
IOBuf::BlockRef const&r =_ref_at(nvec);
vec[nvec].iov_base = r.block->data + r.offset;
vec[nvec].iov_len = r.length;++nvec;
cur_len += r.length;}while(nvec < nref && cur_len < size_hint);// size_hint 非精确限制
ssize_t nw =0;if(offset >=0){static iobuf::iov_function pwritev_func = iobuf::get_pwritev_func();
nw =pwritev_func(fd, vec, nvec, offset);}else{
nw =::writev(fd, vec, nvec);// 非阻塞批量写入if(nw >0){pop_front(nw);// 写入成功的部分 pop 掉return nw;
KeepWrite 的流程为:
void*Socket::KeepWrite(void*void_arg){
g_vars->nkeepwrite <<1;
WriteRequest *req =static_cast<WriteRequest *>(void_arg);
SocketUniquePtr s(req->socket);// 恢复 socket 的 unique_ptr// When error occurs, spin until there's no more requests instead of// returning directly otherwise _write_head is permantly non-NULL which// makes later Write() abnormal.
WriteRequest *cur_tail =NULL;do{// req was written, skip it.if(req->next !=NULL&& req->data.empty()){
WriteRequest *const saved_req = req;
req = req->next;// 已经写完的包及时从链表中剔除,并回复
s->ReturnSuccessfulWriteRequest(saved_req);const ssize_t nw = s->DoWrite(req);// 尝试执行写入if(nw <0){if(errno != EAGAIN && errno != EOVERCROWDED){constint saved_errno = errno;PLOG(WARNING)<<"Fail to keep-write into "<<*s;
s->SetFailed(saved_errno,"Fail to keep-write into %s: %s",
s->description().c_str(),berror(saved_errno));break;}else{
s->AddOutputBytes(nw);// Release WriteRequest until non-empty data or last request.while(req->next !=NULL&& req->data.empty()){
WriteRequest *const saved_req = req;
req = req->next;
s->ReturnSuccessfulWriteRequest(saved_req);// TODO(gejun): wait for epollout when we actually have written// all the data. This weird heuristic reduces 30us delay...
// Update(12/22/2015): seem not working. better switch to correct code.// Update(1/8/2016, r31823): Still working.// Update(8/15/2017): Not working, performance downgraded.// if (nw <= 0 || req->data.empty()/*note*/) {if(nw <=0){
g_vars->nwaitepollout <<1;bool pollin =(s->_on_edge_triggered_events !=NULL);// NOTE: Waiting epollout within timeout is a must to force// KeepWrite to check and setup pending WriteRequests periodically,// which may turn on _overcrowded to stop pending requests from// growing infinitely.const timespec duetime =
butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);constint rc = s->WaitEpollOut(s->fd(), pollin,&duetime);if(rc <0&& errno != ETIMEDOUT){constint saved_errno = errno;PLOG(WARNING)<<"Fail to wait epollout of "<<*s;
s->SetFailed(saved_errno,"Fail to wait epollout of %s: %s",
s->description().c_str(),berror(saved_errno));break;if(NULL== cur_tail){for(cur_tail = req; cur_tail->next !=NULL; cur_tail = cur_tail->next)// Return when there's no more WriteRequests and req is completely// written.// 判断是否全部写完if(s->IsWriteComplete(cur_tail,(req == cur_tail),&cur_tail)){CHECK_EQ(cur_tail, req);
s->ReturnSuccessfulWriteRequest(req);returnNULL;}while(1);// Error occurred, release all requests until no new requests.
s->ReleaseAllFailedWriteRequests(req);returnNULL;
ssize_t Socket::DoWrite(WriteRequest *req){// Group butil::IOBuf in the list into a batch array.
butil::IOBuf *data_list[DATA_LIST_MAX];
size_t ndata =0;for(WriteRequest *p = req; p !=NULL&& ndata < DATA_LIST_MAX; p = p->next){
data_list[ndata++]=&p->data;// 收集一批待写入的数据包后批量写入if(ssl_state()== SSL_OFF){// Write IOBuf in the batch array into the fd.if(_conn){return _conn->CutMessageIntoFileDescriptor(fd(), data_list, ndata);}else{
ssize_t nw = butil::IOBuf::cut_multiple_into_file_descriptor(fd(), data_list, ndata);return nw;CHECK_EQ(SSL_CONNECTED,ssl_state());if(_conn){// TODO: Separate SSL stuff from SocketConnectionreturn _conn->CutMessageIntoSSLChannel(_ssl_session, data_list, ndata);int ssl_error =0;
ssize_t nw = butil::IOBuf::cut_multiple_into_SSL_channel(
_ssl_session, data_list, ndata,&ssl_error);switch(ssl_error){case SSL_ERROR_NONE:break;case SSL_ERROR_WANT_READ:// Disable renegotiation
errno = EPROTO;return-1;case SSL_ERROR_WANT_WRITE:
errno = EAGAIN;break;default:{constunsignedlong e =ERR_get_error();if(e !=0){LOG(WARNING)<<"Fail to write into ssl_fd="<<fd()<<": "<<SSLError(ERR_get_error());
errno = ESSL;}else{// System error with corresponding errno setPLOG(WARNING)<<"Fail to write into ssl_fd="<<fd();break;return nw;// 返回成功写入的长度// Check if there're new requests appended.// If yes, point old_head to to reversed new requests and return false;// If no:// old_head is fully written, set _write_head to NULL and return true;// old_head is not written yet, keep _write_head unchanged and return false;// `old_head' is last new_head got from this function or (in another word)// tail of current writing list.// `singular_node' is true iff `old_head' is the only node in its list.boolSocket::IsWriteComplete(Socket::WriteRequest *old_head,bool singular_node,
Socket::WriteRequest **new_tail){CHECK(NULL== old_head->next);// Try to set _write_head to NULL to mark that the write is done.
WriteRequest *new_head = old_head;
WriteRequest *desired =NULL;bool return_when_no_more =true;if(!old_head->
data.empty()||!singular_node){// 当前写入链表还不能判断已经写完(当前节点非空或者链表不止一个节点)
desired = old_head;// Write is obviously not complete if old_head is not fully written.
return_when_no_more =false;// CAS 检查是否存在需要写入的数据包if(_write_head.compare_exchange_strong(new_head, desired,
butil::memory_order_acquire)){// No one added new requests.if(new_tail){*new_tail = old_head;return return_when_no_more;// CAS 失败,new_head 获得最新的写入链表头部CHECK_NE(new_head, old_head);// Above acquire fence pairs release fence of exchange in Write() to make// sure that we see all fields of requests set.// Someone added new requests.// Reverse the list until old_head.
WriteRequest *tail =NULL;
WriteRequest *p = new_head;// 翻转链表,从 new_head 到 old_head 翻转do{while(p->next == WriteRequest::UNCONNECTED){// TODO(gejun): elaborate this// 如前文提到的,p->next 短时间内可能指向 UNCONNECTED,需要 spin 等待sched_yield();
WriteRequest *const saved_next = p->next;
p->next = tail;
tail = p;
p = saved_next;CHECK(p !=NULL);}while(p != old_head);// Link old list with new list.
old_head->next = tail;// Call Setup() from oldest to newest, notice that the calling sequence// matters for protocols using pipelined_count, this is why we don't// calling Setup in above loop which is from newest to oldest.for(WriteRequest *q = tail; q; q = q->next){
q->Setup(this);// 链表翻转完成后,头部是 old_head,尾部是 new_head,new_head 指向 NULL// 这一段将会在下次写入时批量完成if(new_tail){*new_tail = new_head;returnfalse;写入请求链表示意图
// event_dispatcher.cppDEFINE_int32(event_dispatcher_num,1,"Number of event dispatcher");// 默认 1
EventDispatcher &GetGlobalEventDispatcher(int fd){pthread_once(&g_edisp_once, InitializeGlobalDispatchers);if(FLAGS_event_dispatcher_num ==1){return g_edisp[0];int index = butil::fmix32(fd)% FLAGS_event_dispatcher_num;return g_edisp[index];voidInitializeGlobalDispatchers(){
g_edisp =new EventDispatcher[FLAGS_event_dispatcher_num];for(int i =0; i < FLAGS_event_dispatcher_num;++i){const bthread_attr_t attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;CHECK_EQ(0, g_edisp[i].Start(&attr));// 启动 EventDispatcher// This atexit is will be run before g_task_control.stop() because above// Start() initializes g_task_control by creating bthread (to run// epoll/kqueue).CHECK_EQ(0,atexit(StopAndJoinGlobalDispatchers));// Dispatch edge-triggered events of file descriptors to consumers// running in separate bthreads.classEventDispatcher{friendclassSocket;public:EventDispatcher();virtual~EventDispatcher();// Start this dispatcher in a bthread.// Use |*consumer_thread_attr| (if it's not NULL) as the attribute to// create bthreads running user callbacks.// Returns 0 on success, -1 otherwise.virtualintStart(const bthread_attr_t *consumer_thread_attr);// True iff this dispatcher is running in a bthreadboolRunning()const;// Stop bthread of this dispatcher.voidStop();// Suspend calling thread until bthread of this dispatcher stops.voidJoin();// When edge-triggered events happen on `fd', call// `on_edge_triggered_events' of `socket_id'.// Notice that this function also transfers ownership of `socket_id',// When the file descriptor is removed from internal epoll, the Socket// will be dereferenced once additionally.// Returns 0 on success, -1 otherwise.intAddConsumer(SocketId socket_id,int fd);// Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is// true, EPOLLIN event will also be included and EPOLL_CTL_MOD will// be used instead of EPOLL_CTL_ADD. When event arrives,// `Socket::HandleEpollOut' will be called with `socket_id'// Returns 0 on success, -1 otherwise and errno is setintAddEpollOut(SocketId socket_id,int fd,bool pollin);// Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event// will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL// Returns 0 on success, -1 otherwise and errno is setintRemoveEpollOut(SocketId socket_id,int fd,bool pollin);private:DISALLOW_COPY_AND_ASSIGN(EventDispatcher);// Calls Run()staticvoid*RunThis(void*arg);// Thread entry.voidRun();// Remove the file descriptor `fd' from epoll.intRemoveConsumer(int fd);// The epoll to watch events.int _epfd;// false unless Stop() is called.volatilebool _stop;// identifier of hosting bthread
bthread_t _tid;// The attribute of bthreads calling user callbacks.
bthread_attr_t _consumer_thread_attr;// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quitint _wakeup_fds[2];EventDispatcher::EventDispatcher():_epfd(-1),_stop(false),_tid(0),_consumer_thread_attr(BTHREAD_ATTR_NORMAL){
_epfd =epoll_create(1024*1024);if(_epfd <0){PLOG(FATAL)<<"Fail to create epoll";return;CHECK_EQ(0, butil::make_close_on_exec(_epfd));
_wakeup_fds[0]=-1;
_wakeup_fds[1]=-1;if(pipe(_wakeup_fds)!=0){// 用于 stop 操作时唤醒 epollPLOG(FATAL)<<"Fail to create pipe";return;intEventDispatcher::Start(const bthread_attr_t *consumer_thread_attr){if(_epfd <0){LOG(FATAL)<<"epoll was not created";return-1;if(_tid !=0){LOG(FATAL)<<"Already started this dispatcher("<<this<<") in bthread="<< _tid;return-1;// Set _consumer_thread_attr before creating epoll/kqueue thread to make sure// everyting seems sane to the thread.
_consumer_thread_attr =(consumer_thread_attr ?*consumer_thread_attr : BTHREAD_ATTR_NORMAL);// Polling thread uses the same attr for consumer threads (NORMAL right// now). Previously, we used small stack (32KB) which may be overflowed// when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this// is also a potential issue for consumer threads, using the same attr// should be a reasonable solution.// 启动 bthread 处理int rc =bthread_start_background(&_tid,&_consumer_thread_attr, RunThis,this);if(rc){LOG(FATAL)<<"Fail to create epoll/kqueue thread: "<<berror(rc);return-1;return0;void*EventDispatcher::RunThis(void*arg){((EventDispatcher *)arg)->Run();returnNULL;voidEventDispatcher::Run(){while(!_stop){
epoll_event e[32];constint n =epoll_wait(_epfd, e,ARRAY_SIZE(e),-1);// 注意没有设定超时if(_stop){// epoll_ctl/epoll_wait should have some sort of memory fencing// guaranteeing that we(after epoll_wait) see _stop set before// epoll_ctl.break;if(n <0){if(EINTR == errno){// We've checked _stop, no wake-up will be missed.continue;PLOG(FATAL)<<"Fail to epoll_wait epfd="<< _epfd;break;for(int i =0; i < n;++i){if(e[i].events &(EPOLLIN | EPOLLERR | EPOLLHUP)){// We don't care about the return value.// 处理读取事件Socket::StartInputEvent(e[i].data.u64, e[i].events,
_consumer_thread_attr);for(int i =0; i < n;++i){if(e[i].events &(EPOLLOUT | EPOLLERR | EPOLLHUP)){// We don't care about the return value.// 处理写入事件Socket::HandleEpollOut(e[i].data.u64);voidEventDispatcher::Stop(){
_stop =true;if(_epfd >=0){
epoll_event evt ={EPOLLOUT,{NULL}};// 停止时插入一个可写的 pipe fd,唤醒无超时的 epoll_waitepoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1],&evt);// 增加读取事件监听intEventDispatcher::AddConsumer(SocketId socket_id,int fd){if(_epfd <0){
errno = EINVAL;return-1;
epoll_event evt;
evt.events = EPOLLIN | EPOLLET;// 边缘触发
evt.data.u64 = socket_id;// 直接存储 socket_idreturnepoll_ctl(_epfd, EPOLL_CTL_ADD, fd,&evt);// 增加写入事件监听intEventDispatcher::AddEpollOut(SocketId socket_id,int fd,bool pollin){if(_epfd <0){
errno = EINVAL;return-1;
epoll_event evt;
evt.data.u64 = socket_id;
evt.events = EPOLLOUT | EPOLLET;if(pollin){
evt.events |= EPOLLIN;if(epoll_ctl(_epfd, EPOLL_CTL_MOD, fd,&evt)<0){// This fd has been removed from epoll via `RemoveConsumer',// in which case errno will be ENOENTreturn-1;}else{if(epoll_ctl(_epfd, EPOLL_CTL_ADD, fd,&evt)<0){return-1;return0;
// socket.cppintSocket::Create(const SocketOptions &options, SocketId *id){...if(m->ResetFileDescriptor(options.fd)!=0){constint saved_errno = errno;PLOG(ERROR)<<"Fail to ResetFileDescriptor";
m->SetFailed(saved_errno,"Fail to ResetFileDescriptor: %s",berror(saved_errno));return-1;...intSocket::ResetFileDescriptor(int fd){// Reset message sizes when fd is changed.
_last_msg_size =0;
_avg_msg_size =0;// MUST store `_fd' before adding itself into epoll device to avoid// race conditions with the callback function inside epoll
_fd.store(fd, butil::memory_order_release);
_reset_fd_real_us = butil::gettimeofday_us();// 判断 fd 是否合法if(!ValidFileDescriptor(fd)){return0;// OK to fail, non-socket fd does not support this.if(butil::get_local_side(fd,&_local_side)!=0){
_local_side = butil::EndPoint();// FIXME : close-on-exec should be set by new syscalls or worse: set right// after fd-creation syscall. Setting at here has higher probabilities of// race condition.
butil::make_close_on_exec(fd);// Make the fd non-blocking.// 非阻塞模式if(butil::make_non_blocking(fd)!=0){PLOG(ERROR)<<"Fail to set fd="<< fd <<" to non-blocking";return-1;// turn off nagling.// OK to fail, namely unix domain socket does not support this.
butil::make_no_delay(fd);if(_tos >0&&setsockopt(fd, IPPROTO_IP, IP_TOS,&_tos,sizeof(_tos))<0){PLOG(FATAL)<<"Fail to set tos of fd="<< fd <<" to "<< _tos;if(FLAGS_socket_send_buffer_size >0){int buff_size = FLAGS_socket_send_buffer_size;
socklen_t size =sizeof(buff_size);if(setsockopt(fd, SOL_SOCKET, SO_SNDBUF,&buff_size, size)!=0){PLOG(FATAL)<<"Fail to set sndbuf of fd="<< fd <<" to "<< buff_size;if(FLAGS_socket_recv_buffer_size >0){int buff_size = FLAGS_socket_recv_buffer_size;
socklen_t size =sizeof(buff_size);if(setsockopt(fd, SOL_SOCKET, SO_RCVBUF,&buff_size, size)!=0){PLOG(FATAL)<<"Fail to set rcvbuf of fd="<< fd <<" to "<< buff_size;if(_on_edge_triggered_events){// 监听连接的读取事件if(GetGlobalEventDispatcher(fd).AddConsumer(id(), fd)!=0){PLOG(ERROR)<<"Fail to add SocketId="<<id()<<" into EventDispatcher";
_fd.store(-1, butil::memory_order_release);return-1;return0;
当发现有可用读取事件时,事件循环会调用连接对应的 StartInputEvent 函数:
// socket.cpp
intSocket::StartInputEvent(SocketId id,uint32_t events,const bthread_attr_t &thread_attr){
SocketUniquePtr s;if(Address(id,&s)<0){return-1;if(NULL== s->_on_edge_triggered_events){// Callback can be NULL when receiving error epoll events// (Added into epoll by `WaitConnected')return0;if(s->fd()<0){CHECK(!(events & EPOLLIN))<<"epoll_events="<< events;return-1;// Passing e[i].events causes complex visibility issues and// requires stronger memory fences, since reading the fd returns// error as well, we don't pass the events.// _nevent 记录当前待处理的事件数量,保证至多有一个 bthread 处理读取事件if(s->_nevent.fetch_add(1, butil::memory_order_acq_rel)==0){// According to the stats, above fetch_add is very effective. In a// server processing 1 million requests per second, this counter// is just 1500~1700/s
g_vars->neventthread <<1;
bthread_t tid;// transfer ownership as well, don't use s anymore!
Socket *const p = s.release();
bthread_attr_t attr = thread_attr;
attr.keytable_pool = p->_keytable_pool;// 启动一个新的 bthread 处理读取事件if(bthread_start_urgent(&tid,&attr, ProcessEvent, p)!=0){LOG(FATAL)<<"Fail to start ProcessEvent";ProcessEvent(p);return0;void*Socket::ProcessEvent(void*arg){// the enclosed Socket is valid and free to access inside this function.
SocketUniquePtr s(static_cast<Socket *>(arg));// 调用 _on_edge_triggered_events 处理事件
s->_on_edge_triggered_events(s.get());returnNULL;
// 处理新消息voidInputMessenger::OnNewMessages(Socket *m){// Notes:// - If the socket has only one message, the message will be parsed and// processed in this bthread. nova-pbrpc and http works in this way.// - If the socket has several messages, all messages will be parsed (// meaning cutting from butil::IOBuf. serializing from protobuf is part of// "process") in this bthread. All messages except the last one will be// processed in separate bthreads. To minimize the overhead, scheduling// is batched(notice the BTHREAD_NOSIGNAL and bthread_flush).// - Verify will always be called in this bthread at most once and before// any process.
InputMessenger *messenger =static_cast<InputMessenger *>(m->user());const InputMessageHandler *handlers = messenger->_handlers;int progress = Socket::PROGRESS_INIT;// Notice that all *return* no matter successful or not will run last// message, even if the socket is about to be closed. This should be// OK in most cases.
std::unique_ptr<InputMessageBase, RunLastMessage> last_msg;bool read_eof =false;while(!read_eof){constint64_t received_us = butil::cpuwide_time_us();constint64_t base_realtime = butil::gettimeofday_us()- received_us;// Calculate bytes to be read.
size_t once_read = m->_avg_msg_size *16;if(once_read < MIN_ONCE_READ){
once_read = MIN_ONCE_READ;}elseif(once_read > MAX_ONCE_READ){
once_read = MAX_ONCE_READ;// Read.// 从连接中读取数据const ssize_t nr = m->DoRead(once_read);if(nr <=0){if(0== nr){// Set `read_eof' flag and proceed to feed EOF into `Protocol'// (implied by m->_read_buf.empty), which may produce a new// `InputMessageBase' under some protocols such as HTTPLOG_IF(WARNING, FLAGS_log_connection_close)<<*m <<" was closed by remote side";
read_eof =true;}elseif(errno != EAGAIN){if(errno == EINTR){continue;// just retryconstint saved_errno = errno;PLOG(WARNING)<<"Fail to read from "<<*m;
m->SetFailed(saved_errno,"Fail to read from %s: %s",
m->description().c_str(),berror(saved_errno));return;}elseif(!m->MoreReadEvents(&progress)){// 当没有更多可读取的消息时,bthread 退出return;}else{// new events during processingcontinue;
m->AddInputBytes(nr);// Avoid this socket to be closed due to idle_timeout_s
m->_last_readtime_us.store
(received_us, butil::memory_order_relaxed);
size_t last_size = m->_read_buf.length();int num_bthread_created =0;while(1){
size_t index =8888;// 尝试解析消息
ParseResult pr = messenger->CutInputMessage(m,&index, read_eof);if(!pr.is_ok()){if(pr.error()== PARSE_ERROR_NOT_ENOUGH_DATA){// incomplete message, re-read.// However, some buffer may have been consumed// under protocols like HTTP. Record this size
m->_last_msg_size +=(last_size - m->_read_buf.length());break;}elseif(pr.error()== PARSE_ERROR_TRY_OTHERS){LOG(WARNING)<<"Close "<<*m <<" due to unknown message: "<< butil::ToPrintable(m->_read_buf);
m->SetFailed(EINVAL,"Close %s due to unknown message",
m->description().c_str());return;}else{LOG(WARNING)<<"Close "<<*m <<": "<< pr.error_str();
m->SetFailed(EINVAL,"Close %s: %s", m->description().c_str(),
pr.error_str());return;
m->AddInputMessages(1);// Calculate average size of messagesconst size_t cur_size = m->_read_buf.length();if(cur_size ==0){// _read_buf is consumed, it's good timing to return blocks// cached internally back to TLS, otherwise the memory is not// reused until next message arrives which is quite uncertain// in situations that most connections are idle.
m->_read_buf.return_cached_blocks();
m->_last_msg_size +=(last_size - cur_size);
last_size = cur_size;const size_t old_avg = m->_avg_msg_size;if(old_avg !=0){
m->_avg_msg_size =(old_avg *(MSG_SIZE_WINDOW -1)+ m->_last_msg_size)/
MSG_SIZE_WINDOW;}else{
m->_avg_msg_size = m->_last_msg_size;
m->_last_msg_size =0;if(pr.message()==NULL){// the Process() step can be skipped.continue;
pr.message()->_received_us = received_us;
pr.message()->_base_real_us = base_realtime;// This unique_ptr prevents msg to be lost before transfering// ownership to last_msg
DestroyingPtr<InputMessageBase>msg(pr.message());// 处理新消息QueueMessage(last_msg.release(),&num_bthread_created, m->_keytable_pool);if(handlers[index].process ==NULL){LOG(ERROR)<<"process of index="<< index <<" is NULL";continue;
m->ReAddress(&msg->_socket);
m->PostponeEOF();
msg->_process = handlers[index].process;
msg->_arg = handlers[index].arg;if(handlers[index].verify !=NULL){int auth_error =0;if(0== m->FightAuthentication(&auth_error)){// Get the right to authenticateif(handlers[index].verify(msg.get())){
m->SetAuthentication(0);}else{
m->SetAuthentication(ERPCAUTH);LOG(WARNING)<<"Fail to authenticate "<<*m;
m->SetFailed(ERPCAUTH,"Fail to authenticate %s",
m->description().c_str());return;}else{LOG_IF(FATAL, auth_error !=0)<<
"Impossible! Socket should have been ""destroyed when authentication failed";if(!m->is_read_progressive()){// Transfer ownership to last_msg
last_msg.reset(msg.release());}else{QueueMessage(msg.release(),&num_bthread_created, m->_keytable_pool);bthread_flush();
num_bthread_created =0;if(num_bthread_created){bthread_flush();if(read_eof){
m->SetEOF();staticvoidQueueMessage(InputMessageBase *to_run_msg,int*num_bthread_created,
bthread_keytable_pool_t *keytable_pool){if(!to_run_msg){return;// Create bthread for last_msg. The bthread is not scheduled// until bthread_flush() is called (in the worse case).// TODO(gejun): Join threads.
bthread_t th;
bthread_attr_t tmp =(FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL)|
BTHREAD_NOSIGNAL;
tmp.keytable_pool = keytable_pool;// 启动新的 bthread 处理解析出来的消息if(bthread_start_background(&th,&tmp, ProcessInputMessage, to_run_msg)==0){++*num_bthread_created;}else{ProcessInputMessage(to_run_msg);
intSocket::ConnectIfNot(const timespec* abstime, WriteRequest* req){if(_fd.load(butil::memory_order_consume)>=0){return0;// Have to hold a reference for `req'// 范围内保证 Socket 存活
SocketUniquePtr s;ReAddress(&s);
req->socket = s.get();if(_conn){if(_conn->Connect(this, abstime, KeepWriteIfConnected, req)<0){return-1;}else{// Connect,注意设定了 KeepWriteIfConnected 的回调,data = reqif(Connect(abstime, KeepWriteIfConnected, req)<0){return-1;
s.release();return1;intSocket::Connect(const timespec* abstime,int(*on_connect)(int,int,void*),void* data){if(_ssl_ctx){
_ssl_state = SSL_CONNECTING;}else{
_ssl_state = SSL_OFF;
butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM,0));if(sockfd <0){PLOG(ERROR)<<"Fail to create socket";return-1;CHECK_EQ(0, butil::make_close_on_exec(sockfd));// We need to do async connect (to manage the timeout by ourselves).// 非阻塞CHECK_EQ(0, butil::make_non_blocking(sockfd));// 尝试连接,非阻塞一般返回 EINPROGRESSstructsockaddr_in serv_addr;bzero((char*)&serv_addr,sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr =remote_side().ip;
serv_addr.sin_port =htons(remote_side().port);constint rc =::connect(sockfd,(structsockaddr*)&serv_addr,sizeof(serv_addr));if(rc !=0&& errno != EINPROGRESS){PLOG(WARNING)<<"Fail to connect to "<<remote_side();return-1;if(on_connect){// 因为设定了 on_connect = KeepWriteIfConnected,走该路径。// 这个构造了一个新的 EpollOutRequest,注意它的生命周期。
EpollOutRequest* req =new(std::nothrow) EpollOutRequest;if(req ==NULL){LOG(FATAL)<<"Fail to new EpollOutRequest";return-1;
req->fd = sockfd;
req->timer_id =0;
req->on_epollout_event = on_connect;// 设定了 on_connect
req->
data = data;// 这里的 data 也就是传进来的 WriteRequest// A temporary Socket to hold `EpollOutRequest', which will// be added into epoll device soon
SocketId connect_id;
SocketOptions options;
options.user = req;// 使用 EpollOutRequest 构造一个临时的 Socket,用来处理 connectif(Socket::Create(options,&connect_id)!=0){LOG(FATAL)<<"Fail to create Socket";delete req;return-1;// From now on, ownership of `req' has been transferred to// `connect_id'. We hold an additional reference here to// ensure `req' to be valid in this scope
SocketUniquePtr s;// 保证 Socket 的存活CHECK_EQ(0,Socket::Address(connect_id,&s));// Add `sockfd' into epoll so that `HandleEpollOutRequest' will// be called with `req' when epoll event reaches// 将 connect_id 加入 Epoll 中,当连接可用或失败时,会调用 Socket::HandleEpollOutif(GetGlobalEventDispatcher(sockfd).AddEpollOut(connect_id, sockfd,false)!=0){constint saved_errno = errno;PLOG(WARNING)<<"Fail to add fd="<< sockfd <<" into epoll";
s->SetFailed(saved_errno,"Fail to add fd=%d into epoll: %s",(int)sockfd,berror(saved_errno));return-1;// Register a timer for EpollOutRequest. Note that the timeout// callback has no race with the one above as both of them try// to `SetFailed' `connect_id' while only one of them can succeed// It also work when `HandleEpollOutRequest' has already been// called before adding the timer since it will be removed// inside destructor of `EpollOutRequest' after leaving this scopeif(abstime){// 增加超时的处理int rc =bthread_timer_add(&req->timer_id,*abstime,
HandleEpollOutTimeout,(void*)connect_id);if(rc){LOG(ERROR)<<"Fail to add timer: "<<berror(rc);
s->SetFailed(rc,"Fail to add timer: %s",berror(rc));return-1;}else{if(WaitEpollOut(sockfd,false, abstime)!=0){PLOG(WARNING)<<"Fail to wait EPOLLOUT of fd="<< sockfd;return-1;if(CheckConnected(sockfd)!=0){return-1;return sockfd.release();// Epoll 回调intSocket::HandleEpollOut(SocketId id){
SocketUniquePtr s;// Since Sockets might have been `SetFailed' before they were// added into epoll, these sockets miss the signal inside// `SetFailed' and therefore must be signalled here using// `AddressFailedAsWell' to prevent waiting foreverif(Socket::AddressFailedAsWell(id,&s)<0){// Ignore recycled socketsreturn-1;
EpollOutRequest* req =dynamic_cast<EpollOutRequest*>(s->user());if(req !=NULL){// 对于 Connect,这里可以拿到之前创建的 EpollOutRequest 对象return s->HandleEpollOutRequest(0, req);// Currently `WaitEpollOut' needs `_epollout_butex'// TODO(jiangrujie): Remove this in the future
s->_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
bthread::butex_wake_except(s->_epollout_butex,0);return0;intSocket::HandleEpollOutRequest(int error_code, EpollOutRequest* req){// Only one thread can `SetFailed' this `Socket' successfully// Also after this `req' will be destroyed when its reference// hits zeroif(SetFailed()!=0){return-1;// We've got the right to call user callback// The timer will be removed inside destructor of EpollOutRequest// 将之前创建的 connect_id 从 Epoll 中删除GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd,false);// 调用回调函数,也就是 on_connect,也就是 KeepWriteIfConnectedreturn req->on_epollout_event(req->fd, error_code, req->data);intSocket::KeepWriteIfConnected(int fd,int err,void* data){
WriteRequest* req =static_cast<WriteRequest*>(data);
Socket* s = req->socket;if(err ==0&& s->ssl_state()== SSL_CONNECTING){// Run ssl connect in a new bthread to avoid blocking// the current bthread (thus blocking the EventDispatcher)
bthread_t th;// 启动一个新的 bthread 执行 CheckConnectedAndKeepWrite
google::protobuf::Closure*
thrd_func =
brpc::NewCallback(Socket::CheckConnectedAndKeepWrite, fd, err, data);if((err =bthread_start_background(&th,&BTHREAD_ATTR_NORMAL, RunClosure,
thrd_func))==0){return0;}else{// 注意这里 thrd_func 会内存泄漏,笔者给 brpc 交了一个 PR 了,等待合并PLOG(ERROR)<<"Fail to start bthread";// Fall through with non zero `err'CheckConnectedAndKeepWrite(fd, err, data);return0;voidSocket::CheckConnectedAndKeepWrite(int fd,int err,void* data){
butil::fd_guard sockfd(fd);
WriteRequest* req =static_cast<WriteRequest*>(data);
Socket* s = req->socket;CHECK_GE(sockfd,0);if(err ==0&& s->CheckConnected(sockfd)==0&&
s->ResetFileDescriptor(sockfd)==0){if(s->_app_connect){
s->_app_connect->StartConnect(req->socket, AfterAppConnected, req);}else{// Successfully created a connectionAfterAppConnected(0, req);// Release this socket for KeepWrite
sockfd.release();}else{if(err ==0){
err = errno ? errno :-1;AfterAppConnected(err, req);voidSocket::AfterAppConnected(int err,void* data){
WriteRequest* req =static_cast<WriteRequest*>(data);if(err ==0){
Socket*const s = req->socket;
SharedPart* sp = s->GetSharedPart();if(sp){
sp->num_continuous_connect_timeouts.store(0, butil::memory_order_relaxed);// requests are not setup yet. check the comment on Setup() in Write()
req->Setup(s);
bthread_t th;// 最终启动一个新的 bthread 执行 KeepWriteif(bthread_start_background(&th,&BTHREAD_ATTR_NORMAL, KeepWrite, req)!=0){PLOG(WARNING)<<"Fail to start KeepWrite";KeepWrite(req);}else{
SocketUniquePtr s(req->socket);if(err == ETIMEDOUT){
SharedPart* sp = s->GetOrNewSharedPart();if(sp->num_continuous_connect_timeouts.fetch_add(1, butil::memory_order_relaxed)+
FLAGS_connect_timeout_as_unreachable){// the race between store and fetch_add(in another thread) is// OK since a critial error is about to return.
sp->num_continuous_connect_timeouts.store(0,
butil::memory_order_relaxed);
err = ENETUNREACH;
s->SetFailed(err,"Fail to connect %s: %s", s->description().c_str(),berror(err));
s->ReleaseAllFailedWriteRequests(req);
Socket 的 Accept 流程:
// server.cppintServer::StartInternal(const butil::ip_t& ip,const PortRange& port_range,const ServerOptions *opt){...// 启动监听
butil::fd_guard sockfd(tcp_listen(_listen_addr));// 将监听的 sockfd 所有权转移给 Accetpor 对象if(_am->StartAccept(sockfd, _options.idle_timeout_sec,
_default_ssl_ctx)!=0){LOG(ERROR)<<"Fail to start acceptor";return-1;...// acceptor.cppintAcceptor::StartAccept(int listened_fd,int idle_timeout_sec,const std::shared_ptr<SocketSSLContext>& ssl_ctx){...
SocketOptions options;
options.fd = listened_fd;
options.user =this;// socket user 的设定
options.on_edge_triggered_events = OnNewConnections;// 触发函数的设定if(Socket
::Create(options,&_acception_id)!=0){// Close-idle-socket thread will be stopped inside destructorLOG(FATAL)<<"Fail to create _acception_id";return-1;...// Epoll 发现 listened_fd 可用时,触发 OnNewConnectionsvoidAcceptor::OnNewConnections(Socket* acception){int progress = Socket::PROGRESS_INIT;do{// 处理新连接直到 EAGAINOnNewConnectionsUntilEAGAIN(acception);if(acception->Failed()){return;// 需要处理完当前所有的可读事件}while(acception->MoreReadEvents(&progress));voidAcceptor::OnNewConnectionsUntilEAGAIN(Socket* acception){while(1){structsockaddr in_addr;
socklen_t in_len =sizeof(in_addr);// accept 接收 client 端的 connect 请求
butil::fd_guard in_fd(accept(acception->fd(),&in_addr,&in_len));if(in_fd <0){// no EINTR because listened fd is non-blocking.if(errno == EAGAIN){// 直到 EAGAINreturn;// Do NOT return -1 when `accept' failed, otherwise `_listened_fd'// will be closed. Continue to consume all the events until EAGAIN// instead.// If the accept was failed, the error may repeat constantly,// limit frequency of logging.PLOG_EVERY_SECOND(ERROR)<<"Fail to accept from listened_fd="<< acception->fd();continue;
Acceptor* am =dynamic_cast<Acceptor*>(acception->user());if(NULL== am){LOG(FATAL)<<"Impossible! acception->user() MUST be Acceptor";
acception->SetFailed(EINVAL,"Impossible! acception->user() MUST be Acceptor");return;
SocketId socket_id;
SocketOptions options;
options.keytable_pool = am->_keytable_pool;
options.fd = in_fd;
options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
options.user = acception->user();// 也就是 Acceptor 对象
options.on_edge_triggered_events = InputMessenger::OnNewMessages;// 后续新消息通过 OnNewMessages 处理
options.initial_ssl_ctx = am->_ssl_ctx;if(Socket::Create(options,&socket_id)!=0){LOG(ERROR)<<"Fail to create Socket";continue;
in_fd.release();// transfer ownership to socket_id// There's a funny race condition here. After Socket::Create, messages// from the socket are already handled and a RPC is possibly done// before the socket is added into _socket_map below. This is found in// ChannelTest.skip_parallel in test/brpc_channel_unittest.cpp (running// on machines with few cores) where the _messenger.ConnectionCount()// may surprisingly be 0 even if the RPC is already done.
SocketUniquePtr sock;if(Socket::AddressFailedAsWell(socket_id,&sock)>=0){bool is_running =true;BAIDU_SCOPED_LOCK(am->_map_mutex);
is_running =(am->status()== RUNNING);// Always add this socket into `_socket_map' whether it// has been `SetFailed' or not, whether `Acceptor' is// running or not. Otherwise, `Acceptor::BeforeRecycle'// may be called (inside Socket::OnRecycle) after `Acceptor'// has been destroyed
am->_socket_map.insert(socket_id,ConnectStatistics());if(!is_running){LOG(WARNING)<<"Acceptor on fd="<< acception->fd()<<" has been stopped, discard newly created "<<*sock;
sock->SetFailed(ELOGOFF,"Acceptor on fd=%d has been stopped, ""discard newly created %s",
acception->fd(), sock->description().c_str());return;}// else: The socket has already been destroyed, Don't add its id// into _socket_mapinlineboolSocket::MoreReadEvents(int* progress){// Fail to CAS means that new events arrived.// CAS 失败的话意味着有新的连接需要继续处理return!_nevent.compare_exchange_strong(*progress,0, butil::memory_order_release,
butil::memory_order_acquire);
Socket 的 Revive 流程:
// Socket 失败时增加定时任务检查连接是否可用intSocket::SetFailed(int error_code,constchar* error_fmt,...){...// Do health-checking even if we're not connected before, needed// by Channel to revive never-connected socket when server side
// comes online.if(_health_check_interval_s >0){GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();StartHealthCheck(id(),GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());...// health_check.cppvoidStartHealthCheck(SocketId id,int64_t delay_ms){// 启动定时任务PeriodicTaskManager::StartTaskAt(newHealthCheckTask(id),
butil::milliseconds_from_now(delay_ms));// periodic_task.cppvoidPeriodicTaskManager::StartTaskAt(PeriodicTask* task,const timespec& abstime){if(task ==NULL){LOG(ERROR)<<"Param[task] is NULL";return;
bthread_timer_t timer_id;// 设定定时器,执行 RunPeriodicTaskThreadconstint rc =bthread_timer_add(&timer_id, abstime, RunPeriodicTaskThread, task);if(rc !=0){LOG(ERROR)<<"Fail to add timer for RunPerodicTaskThread";
task->OnDestroyingTask();return;staticvoidRunPeriodicTaskThread(void* arg){
bthread_t th =0;// 时间达标时启动新的 bthread 调用 PeriodicTaskThreadint rc =bthread_start_background(&th,&BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg);if(rc !=0){LOG(ERROR)<<"Fail to start PeriodicTaskThread";static_cast<PeriodicTask*>(arg)->OnDestroyingTask();return;staticvoid*PeriodicTaskThread(void* arg){
PeriodicTask* task =static_cast<PeriodicTask*>(arg);
timespec abstime;// 执行 OnTriggeringTask,返回 false 则销毁该任务if(!task->OnTriggeringTask,(&abstime)){// end
task->OnDestroyingTask();returnNULL;// 否则继续加入到定时器PeriodicTaskManager::StartTaskAt(task, abstime);returnNULL;boolHealthCheckTask::OnTriggeringTask(timespec* next_abstime){
SocketUniquePtr ptr;constint rc =Socket::AddressFailedAsWell(_id,&ptr);// 获取指针CHECK(rc !=0);if(rc <0){
RPC_VLOG <<"SocketId="<< _id
<<" was abandoned before health checking";returnfalse;// Note: Making a Socket re-addessable is hard. An alternative is// creating another Socket with selected internal fields to replace// failed Socket. Although it avoids concurrent issues with in-place// revive, it changes SocketId: many code need to watch SocketId // and update on change, which is impractical. Another issue with// this method is that it has to move "selected internal fields" // which may be accessed in parallel, not trivial to be moved.// Finally we choose a simple-enough solution: wait until the// reference count hits `expected_nref', which basically means no// one is addressing the Socket(except here). Because the Socket // is not addressable, the reference count will not increase // again. This solution is not perfect because the `expected_nref'// is implementation specific. In our case, one reference comes // from SocketMapInsert(socket_map.cpp), one reference is here. // Although WaitAndReset() could hang when someone is addressing// the failed Socket forever (also indicating bug), this is not an // issue in current code. // 这里的英文注释很详细,简而言之,这里等待引用计数为 2 时再执行 Reset 保证线程安全if(_first_time){// Only check at first time.
_first_time =false;if(ptr->WaitAndReset(2/*note*/)!=0){LOG(INFO)<<"Cancel checking "<<*ptr;returnfalse;// g_vars must not be NULL because it is newed at the creation of// first Socket. When g_vars is used, the socket is at health-checking// state, which means the socket must be created and then g_vars can// not be NULL.
g_vars->nhealthcheck <<1;int hc =0;if(ptr->_user){
hc = ptr->_user->CheckHealth(ptr.get());}else{// 调用 Socket::CheckHealth
hc = ptr->CheckHealth();if(hc ==0){if(ptr->CreatedByConnect()){
g_vars->channel_conn <<-1;if(!FLAGS_health_check_path.empty()){
ptr->_ninflight_app_health_check.fetch_add(1, butil::memory_order_relaxed);
ptr->Revive();// 恢复版本和计数
ptr->_hc_count =0;if(!FLAGS_health_check_path.empty()){HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);returnfalse;}elseif(hc == ESTOP){LOG(INFO)<<"Cancel checking "<<*ptr;returnfalse;++ ptr->_hc_count;*next_abstime = butil::seconds_from_now(ptr->_health_check_interval_s);returntrue;// socket.cppintSocket::WaitAndReset(int32_t expected_nref){constuint32_t id_ver =VersionOfSocketId(_this_id);uint64_t vref;// Wait until nref == expected_nref.while(1){// The acquire fence pairs with release fence in Dereference to avoid// inconsistent states to be seen by others.
vref = _versioned_ref.load(butil::memory_order_acquire);if(VersionOfVRef(vref)!= id_ver +1){LOG(WARNING)<<"SocketId="<< _this_id <<" is already alive or recycled";return-1;if(NRefOfVRef(vref)> expected_nref){// sleep 等待引用计数if(bthread_usleep(1000L/*FIXME*/)<0){PLOG_IF(FATAL, errno != ESTOP)<<"Fail to sleep";return-1;}elseif(NRefOfVRef(vref)< expected_nref){
RPC_VLOG <<"SocketId="<< _this_id
<<" was abandoned during health checking";return-1;}else{break;// It's safe to close previous fd (provided expected_nref is correct).// 引用计数达标时,close 原先的 fdconstint prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);if(ValidFileDescriptor(prev_fd)){if(_on_edge_triggered_events !=NULL){GetGlobalEventDispatcher(prev_fd).RemoveConsumer(prev_fd);close(prev_fd);if(CreatedByConnect()){
g_vars->channel_conn <<-1;...// 执行其他 Reset 操作// We don't care about the return value of Revive.voidSocket::Revive(){constuint32_t id_ver =VersionOfSocketId(_this_id);uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);while(1){CHECK_EQ(id_ver +1,VersionOfVRef(vref));// vref 的版本在 SetFailed 时加了 1int32_t nref =NRefOfVRef(vref);if(nref <=1){CHECK_EQ(1, nref);LOG(WARNING)<<*this<<" was abandoned during revival";return;// +1 is the additional ref added in Create(). TODO(gejun): we should// remove this additional nref someday.// CAS 降低 vref 的版本到可用状态if(_versioned_ref.compare_exchange_weak(
vref,MakeVRef(id_ver, nref +1/*note*/),
butil::memory_order_release,
butil::memory_order_relaxed)){// Set this flag to true since we add additional ref again// 清理 recycle 标记
_recycle_flag.store(false, butil::memory_order_relaxed);if(_user){// 调用 AfterRevived,默认会打印 Revived ... (Connectable)
_user->AfterRevived(this);}else{LOG(INFO)<<"Revived "<<*this<<" (Connectable)";return;