client.go 代码:

func main() {
 flag.Parse()
 // Set up a connection to the server.
 conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
 if err != nil {
  log.Fatalf("did not connect: %v", err)
 }
 defer conn.Close()
 c := pb.NewGreeterClient(conn)

 // Contact the server and print out its response.
 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 defer cancel()
 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
 if err != nil {
  log.Fatalf("could not greet: %v", err)
 }
 log.Printf("Greeting: %s", r.GetMessage())
}

Dial 源码:

func Dial(target string, opts ...DialOption) (*ClientConn, error) {
 return DialContext(context.Background(), target, opts...)
}

DialContext 源码:

省略次部分代码

// 首先会创建 ClientConn,初始化相关字段
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
 cc := &ClientConn{
  target:            target,
  csMgr:             &connectivityStateManager{},
  conns:             make(map[*addrConn]struct{}),
  dopts:             defaultDialOptions(),
  blockingpicker:    newPickerWrapper(),
  czData:            new(channelzData),
  firstResolveEvent: grpcsync.NewEvent(),
 }

 // 将用户设置的连接参数更新到客户端连接器 ClientConn
 for _, opt := range opts {
  opt.apply(&cc.dopts)
 }

 return cc, nil
}

connect() 方法:

func (ac *addrConn) connect() error {
 ac.mu.Lock()
 // if 校验状态
 if ac.state == connectivity.Shutdown {
  ac.mu.Unlock()
  return errConnClosing
 }
 if ac.state != connectivity.Idle {
  ac.mu.Unlock()
  return nil
 }
 ac.updateConnectivityState(connectivity.Connecting, nil)
 ac.mu.Unlock()
 // 主要看这个方法,重试连接
 ac.resetTransport()
 return nil
}

进入 resetTransport() 源码

func (ac *addrConn) resetTransport() {
 ac.mu.Lock()
 // 判断状态若为 shutdown,则不再连接直接推出
 if ac.state == connectivity.Shutdown {
  ac.mu.Unlock()
  return
 }

 addrs := ac.addrs
 // 连接失败,需要进行重试的
 // Backoff 是需要等待的时间,ac.backoffIdx表示第几次重试
 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
 // 计算出本次向gRPC服务尝试建立TCP连接的最长时间
 // 若超过这个时间还是连接不上,则主动断开,等待尝试下次连接
 // This will be the duration that dial gets to finish.
 dialDuration := minConnectTimeout
 if ac.dopts.minConnectTimeout != nil {
  dialDuration = ac.dopts.minConnectTimeout()
 }

 if dialDuration < backoffFor {
  // Give dial more time as we keep failing to connect.
  dialDuration = backoffFor
 }
 connectDeadline := time.Now().Add(dialDuration)

 // 更新结构addrConn状态为connecting
 ac.updateConnectivityState(connectivity.Connecting, nil)
 ac.mu.Unlock()

 // 向服务器连接失败后需要做的逻辑
 if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
  ac.cc.resolveNow(resolver.ResolveNowOptions{})
  // After exhausting all addresses, the addrConn enters
  // TRANSIENT_FAILURE.
  ac.mu.Lock()
  if ac.state == connectivity.Shutdown {
   ac.mu.Unlock()
   return
  }
  ac.updateConnectivityState(connectivity.TransientFailure, err)

  // Backoff.
  b := ac.resetBackoff
  ac.mu.Unlock()

  // 定时的超时间
  timer := time.NewTimer(backoffFor)
  // 1.timer.C如果连接超时,重试的次数+1,继续下一次重试连接,但需要等待一段时间
  // 2. b,直接关闭,将继续进行重新连接
  // 2. ac.ctx.Done,走这里的话,上下文结束,这里不会再次重试了
  select {
  case <-timer.C:
   ac.mu.Lock()
   ac.backoffIdx++
   ac.mu.Unlock()
  case <-b:
   timer.Stop()
  case <-ac.ctx.Done():
   timer.Stop()
   return
  }

  ac.mu.Lock()
  // 状态 != shutdown就更新为空闲状态
  if ac.state != connectivity.Shutdown {
   ac.updateConnectivityState(connectivity.Idle, err)
  }
  ac.mu.Unlock()
  return
 }
 // 连接成功,重新设置backoff为原始值0
 ac.mu.Lock()
 ac.backoffIdx = 0
 ac.mu.Unlock()
}

如何计算重试连接等待时间?

进入Backoff方法


func (bc Exponential) Backoff(retries int) time.Duration {
 if retries == 0 {
  return bc.Config.BaseDelay
 }
 backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay)
 for backoff < max && retries > 0 {
  // 幂次方
  backoff *= bc.Config.Multiplier
  retries--
 }
 // 不能超过最大延时时间
 if backoff > max {
  backoff = max
 }
 // Randomize backoff delays so that if a cluster of requests start at
 // the same time, they won't operate in lockstep.
 backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
 if backoff < 0 {
  return 0
 }
 return time.Duration(backoff)
}
连接失败后,客户端会进行重试连接。 重试次数越多,等待下一次连接时间也会变长,但不能超过MaxDelay值。

更多Go云原生学习资料,收录于Github: https://github.com/metashops/GoFamily

本文由 mdnice 多平台发布

grpc 版本1.50client.go 代码:func&nbsp;main()&nbsp;{&nbsp;flag.Parse()&nbsp;//&nbsp;Set&nbsp;up&nbsp;a&nbsp;connection&nbsp;to&nbsp;the&nbsp;server.&nbsp;conn,&nbsp;err&nbsp;:=&nbsp;grpc.Dial(*addr,&nbsp;grpc.WithTransportCredentials(insecure.NewCredentials.
来自:指月 https://lixueduan.com 原文:https://lixueduan.com/post/ grpc /09-retry/ 本文主要记录了如何使用 gRPC 中的 自动 重试 功能。 1. 概述 gRPC 系列相关代码见 Github gRPC 中已经内置了 retry 功能,可以直接使用,不需要我们手动来实现,非常方便。 2. Demo Server 为了测试 retry 功能, 服务端 做了一点调整。 记录 客户端 的请求次数,只有满足条件的那一次(这里就是请求次数模4等于0的那一次)才
文章目录RPCRPC工作模式 gRPC gRPC 的特性与优缺点1.基于HTTP/22.IDL使用ProtoBuf3.多语言支持HTTP和RPC的优缺点Go语言中的RPC支持与处理 grpc protobuffer 为什么高效Protocol buffers 反序列化 RPC(Remote ProcedureCall,远程过程调用) 是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络细节的应用程序通信协议。 RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OS
项目中碰到了 grpc 双向流和单向流出现了 服务端 流context报错:context cancel error,而 客户端 无感知 连接 断开,依然认为还在 连接 中,导致数据传输中断。 遇到这种情况从以下几个维度去思考问题发生的可能: 网络 是否 稳定,能否复现 客户端 服务端 是否 有对context进行cancel 客户端 服务端 是否 有去处理context的cancel信号 由于出现的问题不是在开发环境中,因此开始只能看日志进行排查,但是日志中只得到了 服务端 收到context的cancel信号,而 客户端 没有任何
keepalive ping是一种通过transport发送HTTP2 ping来检查通道当前 是否 工作的方法。它是周期性发送的,如果在某个超时周期内该ping没有得到对等方的确认,则传输断开 连接 。 本指南记录了 gRPC core中控制keepalive ping行为方式。 keepalive ping由两个重要的通道参数控制: GRPC _ARG_KEEPALIVE_TIME_MS 此channe...
我们以发送数据的场景,来介绍 重试 机制 。 1、 grpc 框架中的 重试 机制 的核心或者说实现主体思路? 假设 客户端 向服务器端发起服务请求,整个过程 经历很多阶段,并不是所有的阶段都能 重试 ,只有某些阶段可以进行 重试 ; 当某个操作执行成功后,将该操作缓存到一个切片里,假设已经将2个阶段的操作缓存到了切片里; 执行第3个操作时,假设执行 失败 ; 那么此时,首先判断 是否 允许 重试 : 若允许 重试 的话,重新依次执行切片里的函数,最后在 重试 一下第3个操作即可; 2、 grpc 框架 重试 机制