相关文章推荐
虚心的薯片  ·  @SpringBootConfigurati ...·  1 年前    · 
开心的机器人  ·  math - Softmax ...·  2 年前    · 
谈吐大方的香菜  ·  c++ ...·  2 年前    · 
bRPC 网络收发流程图 from 官方文档

1. Single Connection

bRPC 支持短连接、连接池和单连接,前两种是非常通用的方案。而单连接是指进程内所有 client 与一台 server 最多只有一个连接,在该连接上同时处理多个请求,不要求回复返回顺序与请求发送顺序一致。

对于连接池,请求时 client 端从连接池中取出一个可用的连接并独占使用,写入请求,server 端在该连接上收到请求后进行处理,最后在该连接上写入回复。对该连接来说,读和写并不会同时发生,实际工作模式是半双工的。而对于单连接,可以连续地在该连接上写入请求并同时读取回复,工作模式是全双工的,不过需要有通过回复定位请求的能力,可以通过 UUID 解决。连续的写入和读取也可以形成更好的批量效果,减少系统调用次数。在多线程环境下,bRPC 的单连接读写操作做到了 wait-free

2. Message Sending

“消息”是指向连接写出的有边界的二进制串。使用单连接的场景中,多线程可能会向同一个连接发送消息,该操作显然是非原子的,这时候就需要高效率地排队不同线程发送的数据包。bRPC 中使用了一个 MPSC 队列实现该需求,具体步骤如下:

  • 为每个连接维护一个 MPSC 的单向链表,当线程需要写消息时尝试获取该连接的独占写入权限(原子操作),成功获取权限的线程执行一次写入,而获取权限失败的线程仅将消息插入到该链表的头部(原子操作),并挂起等待回复;
  • 获得写入权限的线程根据连接对应的链表,批量地写入本线程以及其他线程发送的数据包,尽可能多的写入数据,但只写入一次防止本线程的请求产生过长的延迟,如果写入时连接的缓冲池已满无法写入,则启动一个新的 KeepWrite bthread 执行后续的写入操作;
  • KeepWrite bthread 负责将链表中的所有数据包写入连接直到链表为空,当连接缓冲池已满时则挂起等待 epoll 唤醒。
  • 下面对照代码进行分析。单连接的实现位于 src/brpc/socket.cpp ,写入的流程为:

    // socket.cpp,写入 IOBuf
    int Socket::Write(butil::IOBuf *data, const WriteOptions *options_in) {
      WriteOptions opt;
      if (options_in) {
        opt = *options_in;
      if (data->empty()) {
        return SetError(opt.id_wait, EINVAL);
      if (opt.pipelined_count > MAX_PIPELINED_COUNT) {
        LOG(ERROR) << "pipelined_count=" << opt.pipelined_count << " is too large";
        return SetError(opt.id_wait, EOVERFLOW);
      if (Failed()) {
        const int rc = ConductError(opt.id_wait);
        if (rc <= 0) {
          return rc;
      if (!opt.ignore_eovercrowded && _overcrowded) {
        return SetError(opt.id_wait, EOVERCROWDED);
      // 对象池中获取一个 WriteRequest 对象
      WriteRequest *req = butil::get_object<WriteRequest>();
      if (!req) {
        return SetError(opt.id_wait, ENOMEM);
      req->data.swap(*data);  // move 数据到 request 中
      // Set `req->next' to UNCONNECTED so that the KeepWrite thread will
      // wait until it points to a valid WriteRequest or NULL.
      // 先将 next 指针设为非法,后续判断依赖该操作
      req->next = WriteRequest::UNCONNECTED;
      req->id_wait = opt.id_wait;
      req->set_pipelined_count_and_user_message(opt.pipelined_count,
                                                DUMMY_USER_MESSAGE, opt.with_auth);
      return StartWrite(req, opt);  // 调用写入
    int Socket::StartWrite(WriteRequest *req, const WriteOptions &opt) {
      // Release fence makes sure the thread getting request sees *req
      // 将链表头原子地替换为待写入的 request,原先的头部返回给 prev_head
      WriteRequest *const prev_head =
          _write_head.exchange(req, butil::memory_order_release);
      if (prev_head != NULL) {
        // Someone is writing to the fd. The KeepWrite thread may spin
        // until req->next to be non-UNCONNECTED. This process is not
        // lock-free, but the duration is so short(1~2 instructions,
        // depending on compiler) that the spin rarely occurs in practice
        // (I've not seen any spin in highly contended tests).
        // 如果 prev_head 非空,说明有其他线程拿到了权限在执行写入。
        // 此时将 next 指针设为 prev_head。
        // 该操作前 next 指向 WriteRequest::UNCONNECTED,写入的线程可以 spin 等待。
        req->next = prev_head;
        return 0;
      int saved_errno = 0;
      bthread_t th;
      SocketUniquePtr ptr_for_keep_write;
      ssize_t nw = 0;
      // We've got the right to write.
      // 获得连接的写入权限,将指向改为 NULL
      // 下方所有操作均保证在单线程环境下执行
      req->next = NULL;
      // Connect to remote_side() if not.
      // 尝试连接对机
      int ret = ConnectIfNot(opt.abstime, req);
      if (ret < 0) {
        saved_errno = errno;
        SetFailed(errno, "Fail to connect %s directly: %m", description().c_str());
        goto FAIL_TO_WRITE;
      } else if (ret == 1) {
        // We are doing connection. Callback `KeepWriteIfConnected'
        // will be called with `req' at any moment after
        return 0;
      // NOTE: Setup() MUST be called after Connect which may call app_connect,
      // which is assumed to run before any SocketMessage.AppendAndDestroySelf()
      // in some protocols(namely RTMP).
      req->Setup(this);  // 不确定功能,暂时搁置
      if (ssl_state() != SSL_OFF) {
        // Writing into SSL may block the current bthread, always write
        // in the background.
        // 对于 SSL,始终使用后台写入
        goto KEEPWRITE_IN_BACKGROUND;
      // Write once in the calling thread. If the write is not complete,
      // continue it in KeepWrite thread.
      if (_conn) {
        butil::IOBuf *data_arr[1] = {&req->data};
        nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
      } else {
        // 执行一次写入,默认 size_hint 为 1MB
        nw = req->data.cut_into_file_descriptor(fd());
      if (nw < 0) {
        // RTMP may return EOVERCROWDED
        if (errno != EAGAIN && errno != EOVERCROWDED) {
          saved_errno = errno;
          // EPIPE is common in pooled connections + backup requests.
          PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
          SetFailed(saved_errno, "Fail to write into %s: %s"
    
    
    
    
        
    , description().c_str(),
                    berror(saved_errno));
          goto FAIL_TO_WRITE;  // 失败时跳转
      } else {
        AddOutputBytes(nw);
      if (IsWriteComplete(req, true, NULL)) {
        // 判断所有写入完成,直接返回
        ReturnSuccessfulWriteRequest(req);
        return 0;
    KEEPWRITE_IN_BACKGROUND:
      // 写入未完成,启动后台 bthread 继续执行写入操作
      ReAddress(&ptr_for_keep_write);  // 复制当前指针到新的 SocketUniquePtr
      req->socket = ptr_for_keep_write.release();
      if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req) !=
          0) {
        LOG(FATAL) << "Fail to start KeepWrite";
        // bthread 启动失败的情况下继续同步调用写入
        KeepWrite(req);
      return 0;
    FAIL_TO_WRITE:
      // `SetFailed' before `ReturnFailedWriteRequest' (which will calls
      // `on_reset' callback inside the id object) so that we immediately
      // know this socket has failed inside the `on_reset' callback
      ReleaseAllFailedWriteRequests(req);
      errno = saved_errno;
      return -1;
    // iobuf_inl.h
    inline ssize_t IOBuf::cut_into_file_descriptor(int fd, size_t size_hint) {
      return pcut_into_file_descriptor(fd, -1, size_hint);
    // iobuf.cpp
    ssize_t IOBuf::pcut_into_file_descriptor(int fd, off_t offset,
                                             size_t size_hint) {
      if (empty()) {
        return 0;
      const size_t nref = std::min(_ref_num(), IOBUF_IOV_MAX);
      struct iovec vec[nref];  // 将 IOBuf 转为 iovec,批量写入
      size_t nvec = 0;
      size_t cur_len = 0;
      do {
        IOBuf::BlockRef const &r = _ref_at(nvec);
        vec[nvec].iov_base = r.block->data + r.offset;
        vec[nvec].iov_len = r.length;
        ++nvec;
        cur_len += r.length;
      } while (nvec < nref && cur_len < size_hint);  // size_hint 非精确限制
      ssize_t nw = 0;
      if (offset >= 0) {
        static iobuf::iov_function pwritev_func = iobuf::get_pwritev_func();
        nw = pwritev_func(fd, vec, nvec, offset);
      } else {
        nw = ::writev(fd, vec, nvec);  // 非阻塞批量写入
      if (nw > 0) {
        pop_front(nw);  // 写入成功的部分 pop 掉
      return nw;
    

    KeepWrite 的流程为:

    void *Socket::KeepWrite(void *void_arg) {
      g_vars->nkeepwrite << 1;
      WriteRequest *req = static_cast<WriteRequest *>(void_arg);
      SocketUniquePtr s(req->socket);  // 恢复 socket 的 unique_ptr
      // When error occurs, spin until there's no more requests instead of
      // returning directly otherwise _write_head is permantly non-NULL which
      // makes later Write() abnormal.
      WriteRequest *cur_tail = NULL;
      do {
        // req was written, skip it.
        if (req->next != NULL && req->data.empty()) {
          WriteRequest *const saved_req = req;
          req = req->next;
          // 已经写完的包及时从链表中剔除,并回复
          s->ReturnSuccessfulWriteRequest(saved_req);
        const ssize_t nw = s->DoWrite(req);  // 尝试执行写入
        if (nw < 0) {
          if (errno != EAGAIN && errno != EOVERCROWDED) {
            const int saved_errno = errno;
            PLOG(WARNING) << "Fail to keep-write into " << *s;
            s->SetFailed(saved_errno, "Fail to keep-write into %s: %s",
                         s->description().c_str(), berror(saved_errno));
            break;
        } else {
          s->AddOutputBytes(nw);
        // Release WriteRequest until non-empty data or last request.
        while (req->next != NULL && req->data.empty()) {
          WriteRequest *const saved_req = req;
          req = req->next;
          s->ReturnSuccessfulWriteRequest(saved_req);
        // TODO(gejun): wait for epollout when we actually have written
        // all the data. This weird heuristic reduces 30us delay...
    
    
    
    
        
    
        // Update(12/22/2015): seem not working. better switch to correct code.
        // Update(1/8/2016, r31823): Still working.
        // Update(8/15/2017): Not working, performance downgraded.
        // if (nw <= 0 || req->data.empty()/*note*/) {
        if (nw <= 0) {
          g_vars->nwaitepollout << 1;
          bool pollin = (s->_on_edge_triggered_events != NULL);
          // NOTE: Waiting epollout within timeout is a must to force
          // KeepWrite to check and setup pending WriteRequests periodically,
          // which may turn on _overcrowded to stop pending requests from
          // growing infinitely.
          const timespec duetime =
              butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
          const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime);
          if (rc < 0 && errno != ETIMEDOUT) {
            const int saved_errno = errno;
            PLOG(WARNING) << "Fail to wait epollout of " << *s;
            s->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
                         s->description().c_str(), berror(saved_errno));
            break;
        if (NULL == cur_tail) {
          for (cur_tail = req; cur_tail->next != NULL; cur_tail = cur_tail->next)
        // Return when there's no more WriteRequests and req is completely
        // written.
        // 判断是否全部写完
        if (s->IsWriteComplete(cur_tail, (req == cur_tail), &cur_tail)) {
          CHECK_EQ(cur_tail, req);
          s->ReturnSuccessfulWriteRequest(req);
          return NULL;
      } while (1);
      // Error occurred, release all requests until no new requests.
      s->ReleaseAllFailedWriteRequests(req);
      return NULL;
    ssize_t Socket::DoWrite(WriteRequest *req) {
      // Group butil::IOBuf in the list into a batch array.
      butil::IOBuf *data_list[DATA_LIST_MAX];
      size_t ndata = 0;
      for (WriteRequest *p = req; p != NULL && ndata < DATA_LIST_MAX; p = p->next) {
        data_list[ndata++] = &p->data;  // 收集一批待写入的数据包后批量写入
      if (ssl_state() == SSL_OFF) {
        // Write IOBuf in the batch array into the fd.
        if (_conn) {
          return _conn->CutMessageIntoFileDescriptor(fd(), data_list, ndata);
        } else {
          ssize_t nw = butil::IOBuf::cut_multiple_into_file_descriptor(
              fd(), data_list, ndata);
          return nw;
      CHECK_EQ(SSL_CONNECTED, ssl_state());
      if (_conn) {
        // TODO: Separate SSL stuff from SocketConnection
        return _conn->CutMessageIntoSSLChannel(_ssl_session, data_list, ndata);
      int ssl_error = 0;
      ssize_t nw = butil::IOBuf::cut_multiple_into_SSL_channel(
          _ssl_session, data_list, ndata, &ssl_error);
      switch (ssl_error) {
      case SSL_ERROR_NONE:
        break;
      case SSL_ERROR_WANT_READ:
        // Disable renegotiation
        errno = EPROTO;
        return -1;
      case SSL_ERROR_WANT_WRITE:
        errno = EAGAIN;
        break;
      default: {
        const unsigned long e = ERR_get_error();
        if (e != 0) {
          LOG(WARNING) << "Fail to write into ssl_fd=" << fd() << ": "
                       << SSLError(ERR_get_error());
          errno = ESSL;
        } else {
          // System error with corresponding errno set
          PLOG(WARNING) << "Fail to write into ssl_fd=" << fd();
        break;
      return nw;  // 返回成功写入的长度
    // Check if there're new requests appended.
    // If yes, point old_head to to reversed new requests and return false;
    // If no:
    //    old_head is fully written, set _write_head to NULL and return true;
    //    old_head is not written yet, keep _write_head unchanged and return false;
    // `old_head' is last new_head got from this function or (in another word)
    // tail of current writing list.
    // `singular_node' is true iff `old_head' is the only node in its list.
    bool Socket::IsWriteComplete(Socket::WriteRequest *old_head, bool singular_node,
                                 Socket::WriteRequest **new_tail) {
      CHECK(NULL == old_head->next);
      // Try to set _write_head to NULL to mark that the write is done.
      WriteRequest *new_head = old_head;
      WriteRequest *desired = NULL;
      bool return_when_no_more = true;
      if (!old_head->
    
    
    
    
        
    data.empty() || !singular_node) {
        // 当前写入链表还不能判断已经写完(当前节点非空或者链表不止一个节点)
        desired = old_head;
        // Write is obviously not complete if old_head is not fully written.
        return_when_no_more = false;
      // CAS 检查是否存在需要写入的数据包
      if (_write_head.compare_exchange_strong(new_head, desired,
                                              butil::memory_order_acquire)) {
        // No one added new requests.
        if (new_tail) {
          *new_tail = old_head;
        return return_when_no_more;
      // CAS 失败,new_head 获得最新的写入链表头部
      CHECK_NE(new_head, old_head);
      // Above acquire fence pairs release fence of exchange in Write() to make
      // sure that we see all fields of requests set.
      // Someone added new requests.
      // Reverse the list until old_head.
      WriteRequest *tail = NULL;
      WriteRequest *p = new_head;
      // 翻转链表,从 new_head 到 old_head 翻转
      do {
        while (p->next == WriteRequest::UNCONNECTED) {
          // TODO(gejun): elaborate this
          // 如前文提到的,p->next 短时间内可能指向 UNCONNECTED,需要 spin 等待
          sched_yield();
        WriteRequest *const saved_next = p->next;
        p->next = tail;
        tail = p;
        p = saved_next;
        CHECK(p != NULL);
      } while (p != old_head);
      // Link old list with new list.
      old_head->next = tail;
      // Call Setup() from oldest to newest, notice that the calling sequence
      // matters for protocols using pipelined_count, this is why we don't
      // calling Setup in above loop which is from newest to oldest.
      for (WriteRequest *q = tail; q; q = q->next) {
        q->Setup(this);
      // 链表翻转完成后,头部是 old_head,尾部是 new_head,new_head 指向 NULL
      // 这一段将会在下次写入时批量完成
      if (new_tail) {
        *new_tail = new_head;
      return false;
    
    写入请求链表示意图

    3. Message Receiving

    传统 RPC 框架一般通过独立的 IO 线程监听并读取连接上的数据,存在的问题是单一时间内一个线程只能读取一个连接,当多个繁忙的连接聚集在同一个 IO 线程中时,会导致部分的连接的读取被延迟,影响可用性。bRPC 中使用 EventDispatcher 监听连接是否可用,当连接可读时,会在当前线程启动一个新的 bthread 并立即切换过去执行读取操作,使其有更好的缓存局部性。而 EventDispatcher 所在的 bthread 会重新加入 bthread 的队列中,依赖 work stealing 继续在其他线程中执行。该方法使得 bRPC 读取同一个连接时产生的竞争是 wait-free 的。当从连接上解析出多个数据包时,也会立即启动新的 bthread 并发处理这些数据包。这样连接间和连接内的消息在 bRPC 中都获得了并发处理,在高负载时仍能及时处理不同来源的消息,减少长尾。

    先来看 EventDispatcher 的代码(仅保留 Linux 平台实现):

    // event_dispatcher.cpp
    DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");  // 默认 1
    EventDispatcher &GetGlobalEventDispatcher(int fd) {
      pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
      if (FLAGS_event_dispatcher_num == 1) {
        return g_edisp[0];
      int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
      return g_edisp[index];
    void InitializeGlobalDispatchers() {
      g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num];
      for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
        const bthread_attr_t attr =
            FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
        CHECK_EQ(0, g_edisp[i].Start(&attr));  // 启动 EventDispatcher
      // This atexit is will be run before g_task_control.stop() because above
      // Start() initializes g_task_control by creating bthread (to run
      // epoll/kqueue).
      CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
    // Dispatch edge-triggered events of file descriptors to consumers
    // running in separate bthreads.
    class EventDispatcher {
      friend class Socket;
    public:
      EventDispatcher();
      virtual ~EventDispatcher();
      // Start this dispatcher in a bthread.
      // Use |*consumer_thread_attr| (if it's not NULL) as the attribute to
      // create bthreads running user callbacks.
      // Returns 0 on success, -1 otherwise.
      virtual int Start(const bthread_attr_t *consumer_thread_attr);
      // True iff this dispatcher is running in a bthread
      bool Running() const;
      // Stop bthread of this dispatcher.
      void Stop();
      // Suspend calling thread until bthread of this dispatcher stops.
      void Join();
      // When edge-triggered events happen on `fd', call
      // `on_edge_triggered_events' of `socket_id'.
      // Notice that this function also transfers ownership of `socket_id',
      // When the file descriptor is removed from internal epoll, the Socket
      // will be dereferenced once additionally.
      // Returns 0 on success, -1 otherwise.
      int AddConsumer(SocketId socket_id, int fd);
      // Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is
      // true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
      // be used instead of EPOLL_CTL_ADD. When event arrives,
      // `Socket::HandleEpollOut' will be called with `socket_id'
      // Returns 0 on success, -1 otherwise and errno is set
      int AddEpollOut(SocketId socket_id, int fd, bool pollin);
      // Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event
      // will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL
      // Returns 0 on success, -1 otherwise and errno is set
      int RemoveEpollOut(SocketId socket_id, int fd, bool pollin);
    private:
      DISALLOW_COPY_AND_ASSIGN(EventDispatcher);
      // Calls Run()
      static void *RunThis(void *arg);
      // Thread entry.
      void Run();
      // Remove the file descriptor `fd' from epoll.
      int RemoveConsumer(int fd);
      // The epoll to watch events.
      int _epfd;
      // false unless Stop() is called.
      volatile bool _stop;
      // identifier of hosting bthread
      bthread_t _tid;
      // The attribute of bthreads calling user callbacks.
    
    
    
    
        
    
      bthread_attr_t _consumer_thread_attr;
      // Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
      int _wakeup_fds[2];
    EventDispatcher::EventDispatcher()
        : _epfd(-1), _stop(false), _tid(0),
          _consumer_thread_attr(BTHREAD_ATTR_NORMAL) {
      _epfd = epoll_create(1024 * 1024);
      if (_epfd < 0) {
        PLOG(FATAL) << "Fail to create epoll";
        return;
      CHECK_EQ(0, butil::make_close_on_exec(_epfd));
      _wakeup_fds[0] = -1;
      _wakeup_fds[1] = -1;
      if (pipe(_wakeup_fds) != 0) {
        // 用于 stop 操作时唤醒 epoll
        PLOG(FATAL) << "Fail to create pipe";
        return;
    int EventDispatcher::Start(const bthread_attr_t *consumer_thread_attr) {
      if (_epfd < 0) {
        LOG(FATAL) << "epoll was not created";
        return -1;
      if (_tid != 0) {
        LOG(FATAL) << "Already started this dispatcher(" << this
                   << ") in bthread=" << _tid;
        return -1;
      // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure
      // everyting seems sane to the thread.
      _consumer_thread_attr =
          (consumer_thread_attr ? *consumer_thread_attr : BTHREAD_ATTR_NORMAL);
      // Polling thread uses the same attr for consumer threads (NORMAL right
      // now). Previously, we used small stack (32KB) which may be overflowed
      // when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this
      // is also a potential issue for consumer threads, using the same attr
      // should be a reasonable solution.
      // 启动 bthread 处理
      int rc =
          bthread_start_background(&_tid, &_consumer_thread_attr, RunThis, this);
      if (rc) {
        LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc);
        return -1;
      return 0;
    void *EventDispatcher::RunThis(void *arg) {
      ((EventDispatcher *)arg)->Run();
      return NULL;
    void EventDispatcher::Run() {
      while (!_stop) {
        epoll_event e[32];
        const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);  // 注意没有设定超时
        if (_stop) {
          // epoll_ctl/epoll_wait should have some sort of memory fencing
          // guaranteeing that we(after epoll_wait) see _stop set before
          // epoll_ctl.
          break;
        if (n < 0) {
          if (EINTR == errno) {
            // We've checked _stop, no wake-up will be missed.
            continue;
          PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
          break;
        for (int i = 0; i < n; ++i) {
          if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
            // We don't care about the return value.
            // 处理读取事件
            Socket::StartInputEvent(e[i].data.u64, e[i].events,
                                    _consumer_thread_attr);
        for (int i = 0; i < n; ++i) {
          if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
            // We don't care about the return value.
            // 处理写入事件
            Socket::HandleEpollOut(e[i].data.u64);
    void EventDispatcher::Stop() {
      _stop = true;
      if (_epfd >= 0) {
        epoll_event evt = {EPOLLOUT, {NULL}};
        // 停止时插入一个可写的 pipe fd,唤醒无超时的 epoll_wait
        epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
    // 增加读取事件监听
    int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
      if (_epfd < 0) {
        errno = EINVAL;
        return -1;
      epoll_event evt;
      evt.events = EPOLLIN | EPOLLET;  // 边缘触发
    
    
    
    
        
    
      evt.data.u64 = socket_id;  // 直接存储 socket_id
      return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
    // 增加写入事件监听
    int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
      if (_epfd < 0) {
        errno = EINVAL;
        return -1;
      epoll_event evt;
      evt.data.u64 = socket_id;
      evt.events = EPOLLOUT | EPOLLET;
      if (pollin) {
        evt.events |= EPOLLIN;
        if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
          // This fd has been removed from epoll via `RemoveConsumer',
          // in which case errno will be ENOENT
          return -1;
      } else {
        if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) {
          return -1;
      return 0;
    

    多线程下并发调用 epoll_waitepoll_ctl 是线程安全的,可以参考文献 2。bRPC 默认会启动一个 bthread 执行 EventDispatcher 的事件循环,创建连接时会将连接加入到 EventDispatcher 中:

    // socket.cpp
    int Socket::Create(const SocketOptions &options, SocketId *id) {
      ...
      if (m->ResetFileDescriptor(options.fd) != 0) {
        const int saved_errno = errno;
        PLOG(ERROR) << "Fail to ResetFileDescriptor";
        m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
                     berror(saved_errno));
        return -1;
      ...
    int Socket::ResetFileDescriptor(int fd) {
      // Reset message sizes when fd is changed.
      _last_msg_size = 0;
      _avg_msg_size = 0;
      // MUST store `_fd' before adding itself into epoll device to avoid
      // race conditions with the callback function inside epoll
      _fd.store(fd, butil::memory_order_release);
      _reset_fd_real_us = butil::gettimeofday_us();
      // 判断 fd 是否合法
      if (!ValidFileDescriptor(fd)) {
        return 0;
      // OK to fail, non-socket fd does not support this.
      if (butil::get_local_side(fd, &_local_side) != 0) {
        _local_side = butil::EndPoint();
      // FIXME : close-on-exec should be set by new syscalls or worse: set right
      // after fd-creation syscall. Setting at here has higher probabilities of
      // race condition.
      butil::make_close_on_exec(fd);
      // Make the fd non-blocking.
      // 非阻塞模式
      if (butil::make_non_blocking(fd) != 0) {
        PLOG(ERROR) << "Fail to set fd=" << fd << " to non-blocking";
        return -1;
      // turn off nagling.
      // OK to fail, namely unix domain socket does not support this.
      butil::make_no_delay(fd);
      if (_tos > 0 && setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) < 0) {
        PLOG(FATAL) << "Fail to set tos of fd=" << fd << " to " << _tos;
      if (FLAGS_socket_send_buffer_size > 0) {
        int buff_size = FLAGS_socket_send_buffer_size;
        socklen_t size = sizeof(buff_size);
        if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, size) != 0) {
          PLOG(FATAL) << "Fail to set sndbuf of fd=" << fd << " to " << buff_size;
      if (FLAGS_socket_recv_buffer_size > 0) {
        int buff_size = FLAGS_socket_recv_buffer_size;
        socklen_t size = sizeof(buff_size);
        if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, size) != 0) {
          PLOG(FATAL) << "Fail to set rcvbuf of fd=" << fd << " to " << buff_size;
      if (_on_edge_triggered_events) {
        // 监听连接的读取事件
        if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {
          PLOG(ERROR) << "Fail to add SocketId=" << id() << " into EventDispatcher";
          _fd.store(-1, butil::memory_order_release);
          return -1;
      return 0;
    

    当发现有可用读取事件时,事件循环会调用连接对应的 StartInputEvent 函数:

    // socket.cpp
    
    
    
    
        
    
    int Socket::StartInputEvent(SocketId id, uint32_t events,
                                const bthread_attr_t &thread_attr) {
      SocketUniquePtr s;
      if (Address(id, &s) < 0) {
        return -1;
      if (NULL == s->_on_edge_triggered_events) {
        // Callback can be NULL when receiving error epoll events
        // (Added into epoll by `WaitConnected')
        return 0;
      if (s->fd() < 0) {
        CHECK(!(events & EPOLLIN)) << "epoll_events=" << events;
        return -1;
      // Passing e[i].events causes complex visibility issues and
      // requires stronger memory fences, since reading the fd returns
      // error as well, we don't pass the events.
      // _nevent 记录当前待处理的事件数量,保证至多有一个 bthread 处理读取事件
      if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
        // According to the stats, above fetch_add is very effective. In a
        // server processing 1 million requests per second, this counter
        // is just 1500~1700/s
        g_vars->neventthread << 1;
        bthread_t tid;
        // transfer ownership as well, don't use s anymore!
        Socket *const p = s.release();
        bthread_attr_t attr = thread_attr;
        attr.keytable_pool = p->_keytable_pool;
        // 启动一个新的 bthread 处理读取事件
        if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
          LOG(FATAL) << "Fail to start ProcessEvent";
          ProcessEvent(p);
      return 0;
    void *Socket::ProcessEvent(void *arg) {
      // the enclosed Socket is valid and free to access inside this function.
      SocketUniquePtr s(static_cast<Socket *>(arg));
      // 调用 _on_edge_triggered_events 处理事件
      s->_on_edge_triggered_events(s.get());
      return NULL;
    

    _on_edge_triggered_eventsOnNewConnectionsOnNewMessages 两种取值,前者负责处理新连接,将在下一节详述,这里先看后者负责收消息的处理:

    // 处理新消息
    void InputMessenger::OnNewMessages(Socket *m) {
      // Notes:
      // - If the socket has only one message, the message will be parsed and
      //   processed in this bthread. nova-pbrpc and http works in this way.
      // - If the socket has several messages, all messages will be parsed (
      //   meaning cutting from butil::IOBuf. serializing from protobuf is part of
      //   "process") in this bthread. All messages except the last one will be
      //   processed in separate bthreads. To minimize the overhead, scheduling
      //   is batched(notice the BTHREAD_NOSIGNAL and bthread_flush).
      // - Verify will always be called in this bthread at most once and before
      //   any process.
      InputMessenger *messenger = static_cast<InputMessenger *>(m->user());
      const InputMessageHandler *handlers = messenger->_handlers;
      int progress = Socket::PROGRESS_INIT;
      // Notice that all *return* no matter successful or not will run last
      // message, even if the socket is about to be closed. This should be
      // OK in most cases.
      std::unique_ptr<InputMessageBase, RunLastMessage> last_msg;
      bool read_eof = false;
      while (!read_eof) {
        const int64_t received_us = butil::cpuwide_time_us();
        const int64_t base_realtime = butil::gettimeofday_us() - received_us;
        // Calculate bytes to be read.
        size_t once_read = m->_avg_msg_size * 16;
        if (once_read < MIN_ONCE_READ) {
          once_read = MIN_ONCE_READ;
        } else if (once_read > MAX_ONCE_READ) {
          once_read = MAX_ONCE_READ;
        // Read.
        // 从连接中读取数据
        const ssize_t nr = m->DoRead(once_read);
        if (nr <= 0) {
          if (0 == nr) {
            // Set `read_eof' flag and proceed to feed EOF into `Protocol'
            // (implied by m->_read_buf.empty), which may produce a new
            // `InputMessageBase' under some protocols such as HTTP
            LOG_IF(WARNING, FLAGS_log_connection_close)
                << *m << " was closed by remote side";
            read_eof = true;
          } else if (errno != EAGAIN) {
            if (errno == EINTR) {
              continue; // just retry
            const int saved_errno = errno;
            PLOG(WARNING) << "Fail to read from " << *m;
            m->SetFailed(saved_errno, "Fail to read from %s: %s",
                         m->description().c_str(), berror(saved_errno));
            return;
          } else if (!m->MoreReadEvents(&progress)) {
            // 当没有更多可读取的消息时,bthread 退出
            return;
          } else { // new events during processing
            continue;
        m->AddInputBytes(nr);
        // Avoid this socket to be closed due to idle_timeout_s
        m->_last_readtime_us.store
    
    
    
    
        
    (received_us, butil::memory_order_relaxed);
        size_t last_size = m->_read_buf.length();
        int num_bthread_created = 0;
        while (1) {
          size_t index = 8888;
          // 尝试解析消息
          ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);
          if (!pr.is_ok()) {
            if (pr.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
              // incomplete message, re-read.
              // However, some buffer may have been consumed
              // under protocols like HTTP. Record this size
              m->_last_msg_size += (last_size - m->_read_buf.length());
              break;
            } else if (pr.error() == PARSE_ERROR_TRY_OTHERS) {
              LOG(WARNING) << "Close " << *m << " due to unknown message: "
                           << butil::ToPrintable(m->_read_buf);
              m->SetFailed(EINVAL, "Close %s due to unknown message",
                           m->description().c_str());
              return;
            } else {
              LOG(WARNING) << "Close " << *m << ": " << pr.error_str();
              m->SetFailed(EINVAL, "Close %s: %s", m->description().c_str(),
                           pr.error_str());
              return;
          m->AddInputMessages(1);
          // Calculate average size of messages
          const size_t cur_size = m->_read_buf.length();
          if (cur_size == 0) {
            // _read_buf is consumed, it's good timing to return blocks
            // cached internally back to TLS, otherwise the memory is not
            // reused until next message arrives which is quite uncertain
            // in situations that most connections are idle.
            m->_read_buf.return_cached_blocks();
          m->_last_msg_size += (last_size - cur_size);
          last_size = cur_size;
          const size_t old_avg = m->_avg_msg_size;
          if (old_avg != 0) {
            m->_avg_msg_size =
                (old_avg * (MSG_SIZE_WINDOW - 1) + m->_last_msg_size) /
                MSG_SIZE_WINDOW;
          } else {
            m->_avg_msg_size = m->_last_msg_size;
          m->_last_msg_size = 0;
          if (pr.message() == NULL) { // the Process() step can be skipped.
            continue;
          pr.message()->_received_us = received_us;
          pr.message()->_base_real_us = base_realtime;
          // This unique_ptr prevents msg to be lost before transfering
          // ownership to last_msg
          DestroyingPtr<InputMessageBase> msg(pr.message());
          // 处理新消息
          QueueMessage(last_msg.release(), &num_bthread_created, m->_keytable_pool);
          if (handlers[index].process == NULL) {
            LOG(ERROR) << "process of index=" << index << " is NULL";
            continue;
          m->ReAddress(&msg->_socket);
          m->PostponeEOF();
          msg->_process = handlers[index].process;
          msg->_arg = handlers[index].arg;
          if (handlers[index].verify != NULL) {
            int auth_error = 0;
            if (0 == m->FightAuthentication(&auth_error)) {
              // Get the right to authenticate
              if (handlers[index].verify(msg.get())) {
                m->SetAuthentication(0);
              } else {
                m->SetAuthentication(ERPCAUTH);
                LOG(WARNING) << "Fail to authenticate " << *m;
                m->SetFailed(ERPCAUTH, "Fail to authenticate %s",
                             m->description().c_str());
                return;
            } else {
              LOG_IF(FATAL, auth_error != 0)
                  <<
    
    
    
    
        
     "Impossible! Socket should have been "
                     "destroyed when authentication failed";
          if (!m->is_read_progressive()) {
            // Transfer ownership to last_msg
            last_msg.reset(msg.release());
          } else {
            QueueMessage(msg.release(), &num_bthread_created, m->_keytable_pool);
            bthread_flush();
            num_bthread_created = 0;
        if (num_bthread_created) {
          bthread_flush();
      if (read_eof) {
        m->SetEOF();
    static void QueueMessage(InputMessageBase *to_run_msg, int *num_bthread_created,
                             bthread_keytable_pool_t *keytable_pool) {
      if (!to_run_msg) {
        return;
      // Create bthread for last_msg. The bthread is not scheduled
      // until bthread_flush() is called (in the worse case).
      // TODO(gejun): Join threads.
      bthread_t th;
      bthread_attr_t tmp =
          (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) |
          BTHREAD_NOSIGNAL;
      tmp.keytable_pool = keytable_pool;
      // 启动新的 bthread 处理解析出来的消息
      if (bthread_start_background(&th, &tmp, ProcessInputMessage, to_run_msg) ==
          0) {
        ++*num_bthread_created;
      } else {
        ProcessInputMessage(to_run_msg);
    

    4. Maintain Connection

    SocketConnect 过程:

    int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) {
      if (_fd.load(butil::memory_order_consume) >= 0) {
        return 0;
      // Have to hold a reference for `req'
      // 范围内保证 Socket 存活
      SocketUniquePtr s;
      ReAddress(&s);
      req->socket = s.get();
      if (_conn) {
        if (_conn->Connect(this, abstime, KeepWriteIfConnected, req) < 0) {
          return -1;
      } else {
        // Connect,注意设定了 KeepWriteIfConnected 的回调,data = req
        if (Connect(abstime, KeepWriteIfConnected, req) < 0) {
          return -1;
      s.release();
      return 1;
    int Socket::Connect(const timespec* abstime, int (*on_connect)(int, int, void*),
                        void* data) {
      if (_ssl_ctx) {
        _ssl_state = SSL_CONNECTING;
      } else {
        _ssl_state = SSL_OFF;
      butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
      if (sockfd < 0) {
        PLOG(ERROR) << "Fail to create socket";
        return -1;
      CHECK_EQ(0, butil::make_close_on_exec(sockfd));
      // We need to do async connect (to manage the timeout by ourselves).
      // 非阻塞
      CHECK_EQ(0, butil::make_non_blocking(sockfd));
      // 尝试连接,非阻塞一般返回 EINPROGRESS
      struct sockaddr_in serv_addr;
      bzero((char*)&serv_addr, sizeof(serv_addr));
      serv_addr.sin_family = AF_INET;
      serv_addr.sin_addr = remote_side().ip;
      serv_addr.sin_port = htons(remote_side().port);
      const int rc =
          ::connect(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
      if (rc != 0 && errno != EINPROGRESS) {
        PLOG(WARNING) << "Fail to connect to " << remote_side();
        return -1;
      if (on_connect) {
        // 因为设定了 on_connect = KeepWriteIfConnected,走该路径。
        // 这个构造了一个新的 EpollOutRequest,注意它的生命周期。
        EpollOutRequest* req = new (std::nothrow) EpollOutRequest;
        if (req == NULL) {
          LOG(FATAL) << "Fail to new EpollOutRequest";
          return -1;
        req->fd = sockfd;
        req->timer_id = 0;
        req->on_epollout_event = on_connect;  // 设定了 on_connect
        req->
    
    
    
    
        
    data = data;  // 这里的 data 也就是传进来的 WriteRequest
        // A temporary Socket to hold `EpollOutRequest', which will
        // be added into epoll device soon
        SocketId connect_id;
        SocketOptions options;
        options.user = req;
        // 使用 EpollOutRequest 构造一个临时的 Socket,用来处理 connect
        if (Socket::Create(options, &connect_id) != 0) {
          LOG(FATAL) << "Fail to create Socket";
          delete req;
          return -1;
        // From now on, ownership of `req' has been transferred to
        // `connect_id'. We hold an additional reference here to
        // ensure `req' to be valid in this scope
        SocketUniquePtr s;  // 保证 Socket 的存活
        CHECK_EQ(0, Socket::Address(connect_id, &s));
        // Add `sockfd' into epoll so that `HandleEpollOutRequest' will
        // be called with `req' when epoll event reaches
        // 将 connect_id 加入 Epoll 中,当连接可用或失败时,会调用 Socket::HandleEpollOut
        if (GetGlobalEventDispatcher(sockfd).AddEpollOut(connect_id, sockfd,
                                                         false) != 0) {
          const int saved_errno = errno;
          PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";
          s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s", (int)sockfd,
                       berror(saved_errno));
          return -1;
        // Register a timer for EpollOutRequest. Note that the timeout
        // callback has no race with the one above as both of them try
        // to `SetFailed' `connect_id' while only one of them can succeed
        // It also work when `HandleEpollOutRequest' has already been
        // called before adding the timer since it will be removed
        // inside destructor of `EpollOutRequest' after leaving this scope
        if (abstime) {
          // 增加超时的处理
          int rc = bthread_timer_add(&req->timer_id, *abstime,
                                     HandleEpollOutTimeout, (void*)connect_id);
          if (rc) {
            LOG(ERROR) << "Fail to add timer: " << berror(rc);
            s->SetFailed(rc, "Fail to add timer: %s", berror(rc));
            return -1;
      } else {
        if (WaitEpollOut(sockfd, false, abstime) != 0) {
          PLOG(WARNING) << "Fail to wait EPOLLOUT of fd=" << sockfd;
          return -1;
        if (CheckConnected(sockfd) != 0) {
          return -1;
      return sockfd.release();
    // Epoll 回调
    int Socket::HandleEpollOut(SocketId id) {
      SocketUniquePtr s;
      // Since Sockets might have been `SetFailed' before they were
      // added into epoll, these sockets miss the signal inside
      // `SetFailed' and therefore must be signalled here using
      // `AddressFailedAsWell' to prevent waiting forever
      if (Socket::AddressFailedAsWell(id, &s) < 0) {
        // Ignore recycled sockets
        return -1;
      EpollOutRequest* req = dynamic_cast<EpollOutRequest*>(s->user());
      if (req != NULL) {
        // 对于 Connect,这里可以拿到之前创建的 EpollOutRequest 对象
        return s->HandleEpollOutRequest(0, req);
      // Currently `WaitEpollOut' needs `_epollout_butex'
      // TODO(jiangrujie): Remove this in the future
      s->_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
      bthread::butex_wake_except(s->_epollout_butex, 0);
      return 0;
    int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) {
      // Only one thread can `SetFailed' this `Socket' successfully
      // Also after this `req' will be destroyed when its reference
      // hits zero
      if (SetFailed() != 0) {
        return -1;
      // We've got the right to call user callback
      // The timer will be removed inside destructor of EpollOutRequest
      // 将之前创建的 connect_id 从 Epoll 中删除
      GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd, false);
      // 调用回调函数,也就是 on_connect,也就是 KeepWriteIfConnected
      return req->on_epollout_event(req->fd, error_code, req->data);
    int Socket::KeepWriteIfConnected(int fd, int err, void* data) {
      WriteRequest* req = static_cast<WriteRequest*>(data);
      Socket* s = req->socket;
      if (err == 0 && s->ssl_state() == SSL_CONNECTING) {
        // Run ssl connect in a new bthread to avoid blocking
        // the current bthread (thus blocking the EventDispatcher)
        bthread_t th;
        // 启动一个新的 bthread 执行 CheckConnectedAndKeepWrite
        google::protobuf::Closure*
    
    
    
    
        
     thrd_func =
            brpc::NewCallback(Socket::CheckConnectedAndKeepWrite, fd, err, data);
        if ((err = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, RunClosure,
                                            thrd_func)) == 0) {
          return 0;
        } else {
          // 注意这里 thrd_func 会内存泄漏,笔者给 brpc 交了一个 PR 了,等待合并
          PLOG(ERROR) << "Fail to start bthread";
          // Fall through with non zero `err'
      CheckConnectedAndKeepWrite(fd, err, data);
      return 0;
    void Socket::CheckConnectedAndKeepWrite(int fd, int err, void* data) {
      butil::fd_guard sockfd(fd);
      WriteRequest* req = static_cast<WriteRequest*>(data);
      Socket* s = req->socket;
      CHECK_GE(sockfd, 0);
      if (err == 0 && s->CheckConnected(sockfd) == 0 &&
          s->ResetFileDescriptor(sockfd) == 0) {
        if (s->_app_connect) {
          s->_app_connect->StartConnect(req->socket, AfterAppConnected, req);
        } else {
          // Successfully created a connection
          AfterAppConnected(0, req);
        // Release this socket for KeepWrite
        sockfd.release();
      } else {
        if (err == 0) {
          err = errno ? errno : -1;
        AfterAppConnected(err, req);
    void Socket::AfterAppConnected(int err, void* data) {
      WriteRequest* req = static_cast<WriteRequest*>(data);
      if (err == 0) {
        Socket* const s = req->socket;
        SharedPart* sp = s->GetSharedPart();
        if (sp) {
          sp->num_continuous_connect_timeouts.store(0, butil::memory_order_relaxed);
        // requests are not setup yet. check the comment on Setup() in Write()
        req->Setup(s);
        bthread_t th;
        // 最终启动一个新的 bthread 执行 KeepWrite
        if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req) !=
            0) {
          PLOG(WARNING) << "Fail to start KeepWrite";
          KeepWrite(req);
      } else {
        SocketUniquePtr s(req->socket);
        if (err == ETIMEDOUT) {
          SharedPart* sp = s->GetOrNewSharedPart();
          if (sp->num_continuous_connect_timeouts.fetch_add(
                  1, butil::memory_order_relaxed) +
              FLAGS_connect_timeout_as_unreachable) {
            // the race between store and fetch_add(in another thread) is
            // OK since a critial error is about to return.
            sp->num_continuous_connect_timeouts.store(0,
                                                      butil::memory_order_relaxed);
            err = ENETUNREACH;
        s->SetFailed(err, "Fail to connect %s: %s", s->description().c_str(),
                     berror(err));
        s->ReleaseAllFailedWriteRequests(req);
    

    SocketAccept 流程:

    // server.cpp
    int Server::StartInternal(const butil::ip_t& ip,
                              const PortRange& port_range,
                              const ServerOptions *opt) {
      ...
      // 启动监听
      butil::fd_guard sockfd(tcp_listen(_listen_addr));
      // 将监听的 sockfd 所有权转移给 Accetpor 对象
      if (_am->StartAccept(sockfd, _options.idle_timeout_sec,
                           _default_ssl_ctx) != 0) {
        LOG(ERROR) << "Fail to start acceptor";
        return -1;
      ...
    // acceptor.cpp
    int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
                              const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
      ...
      SocketOptions options;
      options.fd = listened_fd;
      options.user = this;  // socket user 的设定
      options.on_edge_triggered_events = OnNewConnections;  // 触发函数的设定
      if (Socket
    
    
    
    
        
    ::Create(options, &_acception_id) != 0) {
        // Close-idle-socket thread will be stopped inside destructor
        LOG(FATAL) << "Fail to create _acception_id";
        return -1;
      ...
    // Epoll 发现 listened_fd 可用时,触发 OnNewConnections
    void Acceptor::OnNewConnections(Socket* acception) {
      int progress = Socket::PROGRESS_INIT;
      do {
        // 处理新连接直到 EAGAIN
        OnNewConnectionsUntilEAGAIN(acception);
        if (acception->Failed()) {
          return;
        // 需要处理完当前所有的可读事件
      } while (acception->MoreReadEvents(&progress));
    void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
      while (1) {
        struct sockaddr in_addr;
        socklen_t in_len = sizeof(in_addr);
        // accept 接收 client 端的 connect 请求
        butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len));
        if (in_fd < 0) {
          // no EINTR because listened fd is non-blocking.
          if (errno == EAGAIN) {
            // 直到 EAGAIN
            return;
          // Do NOT return -1 when `accept' failed, otherwise `_listened_fd'
          // will be closed. Continue to consume all the events until EAGAIN
          // instead.
          // If the accept was failed, the error may repeat constantly,
          // limit frequency of logging.
          PLOG_EVERY_SECOND(ERROR)
              << "Fail to accept from listened_fd=" << acception->fd();
          continue;
        Acceptor* am = dynamic_cast<Acceptor*>(acception->user());
        if (NULL == am) {
          LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";
          acception->SetFailed(EINVAL,
                               "Impossible! acception->user() MUST be Acceptor");
          return;
        SocketId socket_id;
        SocketOptions options;
        options.keytable_pool = am->_keytable_pool;
        options.fd = in_fd;
        options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
        options.user = acception->user();  // 也就是 Acceptor 对象
        options.on_edge_triggered_events = InputMessenger::OnNewMessages;  // 后续新消息通过 OnNewMessages 处理
        options.initial_ssl_ctx = am->_ssl_ctx;
        if (Socket::Create(options, &socket_id) != 0) {
          LOG(ERROR) << "Fail to create Socket";
          continue;
        in_fd.release();  // transfer ownership to socket_id
        // There's a funny race condition here. After Socket::Create, messages
        // from the socket are already handled and a RPC is possibly done
        // before the socket is added into _socket_map below. This is found in
        // ChannelTest.skip_parallel in test/brpc_channel_unittest.cpp (running
        // on machines with few cores) where the _messenger.ConnectionCount()
        // may surprisingly be 0 even if the RPC is already done.
        SocketUniquePtr sock;
        if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
          bool is_running = true;
            BAIDU_SCOPED_LOCK(am->_map_mutex);
            is_running = (am->status() == RUNNING);
            // Always add this socket into `_socket_map' whether it
            // has been `SetFailed' or not, whether `Acceptor' is
            // running or not. Otherwise, `Acceptor::BeforeRecycle'
            // may be called (inside Socket::OnRecycle) after `Acceptor'
            // has been destroyed
            am->_socket_map.insert(socket_id, ConnectStatistics());
          if (!is_running) {
            LOG(WARNING) << "Acceptor on fd=" << acception->fd()
                         << " has been stopped, discard newly created " << *sock;
            sock->SetFailed(ELOGOFF,
                            "Acceptor on fd=%d has been stopped, "
                            "discard newly created %s",
                            acception->fd(), sock->description().c_str());
            return;
        }  // else: The socket has already been destroyed, Don't add its id
           // into _socket_map
    inline bool Socket::MoreReadEvents(int* progress) {
        // Fail to CAS means that new events arrived.
        // CAS 失败的话意味着有新的连接需要继续处理
        return !_nevent.compare_exchange_strong(
            *progress, 0, butil::memory_order_release,
                butil::memory_order_acquire);
    

    SocketRevive 流程:

    // Socket 失败时增加定时任务检查连接是否可用
    int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
      ...
      // Do health-checking even if we're not connected before, needed
      // by Channel to revive never-connected socket when server side
    
    
    
    
        
    
      // comes online.
      if (_health_check_interval_s > 0) {
        GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
        StartHealthCheck(id(),
                         GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
      ...
    // health_check.cpp
    void StartHealthCheck(SocketId id, int64_t delay_ms) {
      // 启动定时任务
      PeriodicTaskManager::StartTaskAt(new HealthCheckTask(id),
                                       butil::milliseconds_from_now(delay_ms));
    // periodic_task.cpp
    void PeriodicTaskManager::StartTaskAt(PeriodicTask* task, const timespec& abstime) {
      if (task == NULL) {
        LOG(ERROR) << "Param[task] is NULL";
        return;
      bthread_timer_t timer_id;
      // 设定定时器,执行 RunPeriodicTaskThread
      const int rc = bthread_timer_add(
        &timer_id, abstime, RunPeriodicTaskThread, task);
      if (rc != 0) {
        LOG(ERROR) << "Fail to add timer for RunPerodicTaskThread";
        task->OnDestroyingTask();
        return;
    static void RunPeriodicTaskThread(void* arg) {
      bthread_t th = 0;
      // 时间达标时启动新的 bthread 调用 PeriodicTaskThread
      int rc = bthread_start_background(
        &th, &BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg);
      if (rc != 0) {
        LOG(ERROR) << "Fail to start PeriodicTaskThread";
        static_cast<PeriodicTask*>(arg)->OnDestroyingTask();
        return;
    static void* PeriodicTaskThread(void* arg) {
      PeriodicTask* task = static_cast<PeriodicTask*>(arg);
      timespec abstime;
      // 执行 OnTriggeringTask,返回 false 则销毁该任务
      if (!task->OnTriggeringTask,(&abstime)) { // end
        task->OnDestroyingTask();
        return NULL;
      // 否则继续加入到定时器
      PeriodicTaskManager::StartTaskAt(task, abstime);
      return NULL;
    bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
      SocketUniquePtr ptr;
      const int rc = Socket::AddressFailedAsWell(_id, &ptr);  // 获取指针
      CHECK(rc != 0);
      if (rc < 0) {
        RPC_VLOG << "SocketId=" << _id
          << " was abandoned before health checking";
        return false;
      // Note: Making a Socket re-addessable is hard. An alternative is
      // creating another Socket with selected internal fields to replace
      // failed Socket. Although it avoids concurrent issues with in-place
      // revive, it changes SocketId: many code need to watch SocketId 
      // and update on change, which is impractical. Another issue with
      // this method is that it has to move "selected internal fields" 
      // which may be accessed in parallel, not trivial to be moved.
      // Finally we choose a simple-enough solution: wait until the
      // reference count hits `expected_nref', which basically means no
      // one is addressing the Socket(except here). Because the Socket 
      // is not addressable, the reference count will not increase 
      // again. This solution is not perfect because the `expected_nref'
      // is implementation specific. In our case, one reference comes 
      // from SocketMapInsert(socket_map.cpp), one reference is here. 
      // Although WaitAndReset() could hang when someone is addressing
      // the failed Socket forever (also indicating bug), this is not an 
      // issue in current code. 
      // 这里的英文注释很详细,简而言之,这里等待引用计数为 2 时再执行 Reset 保证线程安全
      if (_first_time) {  // Only check at first time.
        _first_time = false;
        if (ptr->WaitAndReset(2/*note*/) != 0) {
          LOG(INFO) << "Cancel checking " << *ptr;
          return false;
      // g_vars must not be NULL because it is newed at the creation of
      // first Socket. When g_vars is used, the socket is at health-checking
      // state, which means the socket must be created and then g_vars can
      // not be NULL.
      g_vars->nhealthcheck << 1;
      int hc = 0;
      if (ptr->_user) {
        hc = ptr->_user->CheckHealth(ptr.get());
      } else {
        // 调用 Socket::CheckHealth
        hc = ptr->CheckHealth();
      if (hc == 0) {
        if (ptr->CreatedByConnect()) {
          g_vars->channel_conn << -1;
        if (!FLAGS_health_check_path.empty()) {
          ptr->_ninflight_app_health_check.fetch_add(
            1, butil::memory_order_relaxed);
        ptr->Revive();  // 恢复版本和计数
        ptr->_hc_count = 0;
        if (!FLAGS_health_check_path.empty()) {
          HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);
        return false;
      } else if (hc == ESTOP) {
        LOG(INFO) << "Cancel checking " << *ptr;
        return false;
      ++ ptr->_hc_count;
      *next_abstime = butil::seconds_from_now(ptr->_health_check_interval_s);
      return true;
    // socket.cpp
    int Socket::WaitAndReset(int32_t expected_nref) {
      const uint32_t id_ver = VersionOfSocketId(_this_id);
      uint64_t vref;
      // Wait until nref == expected_nref.
      while (1) {
        // The acquire fence pairs with release fence in Dereference to avoid
        // inconsistent states to be seen by others.
        vref = _versioned_ref.load(butil::memory_order_acquire);
        if (VersionOfVRef(vref) != id_ver + 1) {
          LOG(WARNING) << "SocketId=" << _this_id << " is already alive or recycled";
          return -1;
        if (NRefOfVRef(vref) > expected_nref) {
          // sleep 等待引用计数
          if (bthread_usleep(1000L/*FIXME*/) < 0) {
            PLOG_IF(FATAL, errno != ESTOP) << "Fail to sleep";
            return -1;
        } else if (NRefOfVRef(vref) < expected_nref) {
          RPC_VLOG << "SocketId=" << _this_id 
            << " was abandoned during health checking";
          return -1;
        } else {
          break;
      // It's safe to close previous fd (provided expected_nref is correct).
      // 引用计数达标时,close 原先的 fd
      const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
      if (ValidFileDescriptor(prev_fd)) {
        if (_on_edge_triggered_events != NULL) {
          GetGlobalEventDispatcher(prev_fd).RemoveConsumer(prev_fd);
        close(prev_fd);
        if (CreatedByConnect()) {
          g_vars->channel_conn << -1;
      ...  // 执行其他 Reset 操作
    // We don't care about the return value of Revive.
    void Socket::Revive() {
      const uint32_t id_ver = VersionOfSocketId(_this_id);
      uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
      while (1) {
        CHECK_EQ(id_ver + 1, VersionOfVRef(vref));  // vref 的版本在 SetFailed 时加了 1
        int32_t nref = NRefOfVRef(vref);
        if (nref <= 1) {
          CHECK_EQ(1, nref);
          LOG(WARNING) << *this << " was abandoned during revival";
          return;
        // +1 is the additional ref added in Create(). TODO(gejun): we should
        // remove this additional nref someday.
        // CAS 降低 vref 的版本到可用状态
        if (_versioned_ref.compare_exchange_weak(
          vref, MakeVRef(id_ver, nref + 1/*note*/),
          butil::memory_order_release,
          butil::memory_order_relaxed)) {
          // Set this flag to true since we add additional ref again
          // 清理 recycle 标记
          _recycle_flag.store(false, butil::memory_order_relaxed);
          if (_user) {
            // 调用 AfterRevived,默认会打印 Revived ... (Connectable)
            _user->AfterRevived(this);
          } else {
            LOG(INFO) << "Revived " << *this << " (Connectable)";
          return;
    

    References

  • "bRPC IO", incubator-brpc
  • "epoll_wait(2)", Linux Manual Page
  • Except where otherwise noted, content on this site is licensed under a CC BY-SA 4.0 license.