client.go 代码:
func main() {
flag.Parse()
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
Dial 源码:
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
DialContext 源码:
省略次部分代码
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
for _, opt := range opts {
opt.apply(&cc.dopts)
}
return cc, nil
}
connect() 方法:
func (ac *addrConn) connect() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
if ac.state != connectivity.Idle {
ac.mu.Unlock()
return nil
}
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
ac.resetTransport()
return nil
}
进入 resetTransport() 源码
func (ac *addrConn) resetTransport() {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
addrs := ac.addrs
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
dialDuration := minConnectTimeout
if ac.dopts.minConnectTimeout != nil {
dialDuration = ac.dopts.minConnectTimeout()
}
if dialDuration < backoffFor {
dialDuration = backoffFor
}
connectDeadline := time.Now().Add(dialDuration)
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
ac.cc.resolveNow(resolver.ResolveNowOptions{})
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.TransientFailure, err)
b := ac.resetBackoff
ac.mu.Unlock()
timer := time.NewTimer(backoffFor)
select {
case <-timer.C:
ac.mu.Lock()
ac.backoffIdx++
ac.mu.Unlock()
case <-b:
timer.Stop()
case <-ac.ctx.Done():
timer.Stop()
return
}
ac.mu.Lock()
if ac.state != connectivity.Shutdown {
ac.updateConnectivityState(connectivity.Idle, err)
}
ac.mu.Unlock()
return
}
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
}
如何计算重试连接等待时间?
进入Backoff方法
func (bc Exponential) Backoff(retries int) time.Duration {
if retries == 0 {
return bc.Config.BaseDelay
}
backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay)
for backoff < max && retries > 0 {
backoff *= bc.Config.Multiplier
retries--
}
if backoff > max {
backoff = max
}
backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
if backoff < 0 {
return 0
}
return time.Duration(backoff)
}
连接失败后,客户端会进行重试连接。
重试次数越多,等待下一次连接时间也会变长,但不能超过MaxDelay值。
更多Go云原生学习资料,收录于Github:
https://github.com/metashops/GoFamily
本文由
mdnice
多平台发布
grpc 版本1.50client.go 代码:func main() { flag.Parse() // Set up a connection to the server. conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials.
来自:指月 https://lixueduan.com
原文:https://lixueduan.com/post/
grpc
/09-retry/
本文主要记录了如何使用
gRPC
中的 自动
重试
功能。
1. 概述
gRPC
系列相关代码见 Github
gRPC
中已经内置了 retry 功能,可以直接使用,不需要我们手动来实现,非常方便。
2. Demo
Server
为了测试 retry 功能,
服务端
做了一点调整。
记录
客户端
的请求次数,只有满足条件的那一次(这里就是请求次数模4等于0的那一次)才
文章目录RPCRPC工作模式
gRPC
gRPC
的特性与优缺点1.基于HTTP/22.IDL使用ProtoBuf3.多语言支持HTTP和RPC的优缺点Go语言中的RPC支持与处理
grpc
protobuffer 为什么高效Protocol buffers 反序列化
RPC(Remote ProcedureCall,远程过程调用) 是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络细节的应用程序通信协议。
RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OS
项目中碰到了
grpc
双向流和单向流出现了
服务端
流context报错:context cancel error,而
客户端
无感知
连接
断开,依然认为还在
连接
中,导致数据传输中断。
遇到这种情况从以下几个维度去思考问题发生的可能:
网络
是否
稳定,能否复现
客户端
和
服务端
是否
有对context进行cancel
客户端
和
服务端
是否
有去处理context的cancel信号
由于出现的问题不是在开发环境中,因此开始只能看日志进行排查,但是日志中只得到了
服务端
会
收到context的cancel信号,而
客户端
没有任何
keepalive ping是一种通过transport发送HTTP2 ping来检查通道当前
是否
工作的方法。它是周期性发送的,如果在某个超时周期内该ping没有得到对等方的确认,则传输断开
连接
。
本指南记录了
gRPC
core中控制keepalive ping行为方式。
keepalive ping由两个重要的通道参数控制:
GRPC
_ARG_KEEPALIVE_TIME_MS
此channe...
我们以发送数据的场景,来介绍
重试
机制
。
1、
grpc
框架中的
重试
机制
的核心或者说实现主体思路?
假设
客户端
向服务器端发起服务请求,整个过程
会
经历很多阶段,并不是所有的阶段都能
重试
,只有某些阶段可以进行
重试
;
当某个操作执行成功后,将该操作缓存到一个切片里,假设已经将2个阶段的操作缓存到了切片里;
执行第3个操作时,假设执行
失败
;
那么此时,首先判断
是否
允许
重试
:
若允许
重试
的话,重新依次执行切片里的函数,最后在
重试
一下第3个操作即可;
2、
grpc
框架
重试
机制