本文介绍了如何利用gRPC的异步API(gRPCAsyncIOAPI)和Python的AsyncIO库来处理高并发网络通信场景。通过创建proto文件定义服务,编译生成Python代码,然后构建异步服务器和客户端。服务器端使用grpc.aio.server启动,客户端使用asyncwith语句进行连接。示例展示了如何在异步环境中编写和执行服务端和客户端的代码,以实现高效的异步请求处理。 摘要生成于 ,由 DeepSeek-R1 满血版支持,

gRPC因其传输速度快,很适合业务量大、高并发的网络通信场景,线程池的实现方式性能受限,而AsyncIO异步方式是1个高性能的处理并发请求的框架,gRPC 应用了 python AsyncIO模块技术,编写并提供了一套异步API接口集— gRPC AsyncIO API,其性能稳定,非常适合于高并发、大流量的网络通信场景。

下面以实例来说明如何实现异步 gRPC的过程。
本文实例已在 windows10, ubuntu上运行测试通过。

gRPC相关知识点及示例,可参考本人文章

1、准备 probobuf 接口文件

按下面内容新建 demo.proto 文件

syntax = "proto3";
package demo;
service RouteGuide {
  rpc GetFeature(Point) returns (Feature) {}
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
message Feature {
  string name = 1;
  Point location = 2;

编译proto 文件

python -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. demo.proto

生成如下两个文件:
demo_pb2.py
demo_pb2_grpc.py

2. 异步Server端实现

gRPC Server构造器

由于异步方式是通过协程来执行任务,gRPC异步接口是基于AsyncIO开发,因此其稳定还是可靠的。Server 构造器比较简单

grpc.aio.server( 
	migration_thread_pool=None, 
	handlers=None, 
	interceptors=None, 
	options=None, 
	aximum_concurrent_rpcs=None, 
	compression=None

构造器参数,通常只需要设置 maximum_concurrent_rpcs 参数即可,即允许rpc最大数量,默认无限制。 由于Asyncio是基于单线程运行,因此使用默认值即可。

服务端的主要方法

本例不使用SSL, 绑定网络地址方法:
add_insecure_port(address)

下面3个方法是异步方式实现:
async start() 启动服务
async stop(grace) 停止服务
async wait_for_termination(timeout=None) 中止event loop循环

服务器代码
接口消息类中,接口任务函数要用异步方式来执行。
注意:不要在异步执行的函数中使用阻塞进程的语句。

from concurrent import futures from datetime import datetime import grpc import demo_pb2 import demo_pb2_grpc import asyncio class RouteGuideServicer(demo_pb2_grpc.RouteGuideServicer): """Provides methods that implement functionality of route guide server.""" async def GetFeature(self, request, context): print(datetime.now(),'\n',request) return demo_pb2.Feature(name="abc", location=request) async def server(): server = grpc.aio.server(maximum_concurrent_rpcs=100) demo_pb2_grpc.add_RouteGuideServicer_to_server( RouteGuideServicer(), server) server.add_insecure_port('[::]:50051') await server.start() await server.wait_for_termination() if __name__ == '__main__': asyncio.run(server())

3. 异步客户端实现

当服务器采用异步方式时,gRPC的客户端采用普通方式,异步方式都可以。 本节介绍异步客户端的实现

异步客户端构建器

channel 类是客户端类,非加密客户端的构建方法:
grpc.aio.insecure_channel(target, options=None, compression=None, interceptors=None

客户端代码
客户端stub 层的接口调用函数要有异步方式执行。

from __future__ import print_function
import grpc
import demo_pb2
import demo_pb2_grpc
import asyncio
CHANNEL_OPTIONS = [('grpc.lb_policy_name', 'pick_first'),
                   ('grpc.enable_retries', 0),
                   ('grpc.keepalive_timeout_ms', 10000)]
async def guide_get_feature(stub):
    point = demo_pb2.Point(latitude=409146138, longitude=-746188906)
    feature = await stub.GetFeature(point)
    if not feature.location:
        print("Server returned incomplete feature")
        return
    if feature.name:
        print("Feature called %s at %s" % (feature.name, feature.location))
    else:
        print("Found no feature at %s" % feature.location)
async def main():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    # with grpc.aio.insecure_channel('localhost:50051') as channel:
    async with grpc.aio.insecure_channel(target='localhost:50051',
                                         options=CHANNEL_OPTIONS) as channel:
        stub = demo_pb2_grpc.RouteGuideStub(channel)
        print("-------------- GetFeature --------------")
        await guide_get_feature(stub)
if __name__ == '__main__':
    asyncio.run(main())

4. 测试代码

打开两个终端窗口,分别运行server.py, client.py
server.py 窗口

(enva) D:\workplace\python\Simple_project\grpc\gRPC_demo>py server_a.py
2023-02-05 23:57:34.983625
 latitude: 409146138
longitude: -746188906
2023-02-05 23:57:36.518631
 latitude: 409146138
longitude: -746188906

client.py 窗口

(enva) D:\workplace\python\Simple_project\grpc\gRPC_demo>py client_a.py
-------------- GetFeature --------------
Feature called abc at latitude: 409146138
longitude: -746188906
(enva) D:\workplace\python\Simple_project\grpc\gRPC_demo>py client_a.py
-------------- GetFeature --------------
Feature called abc at latitude: 409146138
longitude: -746188906
(enva) D:\workplace\python\Simple_project\grpc\gRPC_demo>

异步gRPC与之前普通方式gRPC实际过程基本一致,主要的区别如下:

  • 异步server构造器不需要线程池参数,异步协程是在同1个线程中执行
  • 异步gRPC要求接口函数及调用都使用 async – await 来修饰
  • 执行时使用异步 event loop,即通过ayscio.run( )来运行。
去发现同类优质开源项目:https://gitcode.com/ 是一个基于 Tornado Web 框架的轻量级 Python 异步 RPC 库。它允许你轻松地在分布式系统中实现服务间通信,并提供高性能、低延迟的远程调用。 异步支持:利用 Tornado 的非阻塞 I/O 和协程特性,实现高效的并发处理。 简单易... service StreamRpc{ rpc GetServerResult(Requests) returns (Reply); rpc GetServerStream(Requests) returns (stream Reply); rpc ClientSendStream(stream Requests) returns (Reply); rpc ServerCl
转载自:https://www.jianshu.com/p/43fdfeb105ff description: 只要代码可以跑起来, 很多难题都会迎刃而解. so, keep coding and stay hungry. 之前用 swoole 写 server 时就接触过 protobuf, 本来以为基于 protobuf 的 grpc, 上手起来会轻轻松松, 没想到结结实实的折腾了许久, 从 php 开始配置 grpc 需要的环境, 到无奈转到 grpc 最亲和 的 go 语言, 又无奈面对各种 g
教程提供了 Python 程序员使用 gRPC 的基本介绍。通过浏览此示例,您将学习如何:在 .proto 文件中定义服务。 使用协议缓冲区编译器生成服务器和客户端代码。 使用 Python gRPC API 为您的服务编写一个简单的客户端和服务器。 它假设您已经阅读了gRPC 简介并且熟悉协议缓冲区. 您可以在proto3 语言指南中找到更多信息和Python 生成的代码指南.我们的示例是一个简单的路线映射应用程序,它允许客户端获取有关其路线上的特征的信息,创建路线摘要,并与服务器和其他客户端交换路线信