大家好!我是晨晨💓
希望大家多多支持我!
为了感谢每一个关注我的小可爱:💓文章留言“学习”即可获取晨晨精心准备的学习大礼包书籍教程,都是无偿分享滴💓
最后——如果文章有帮助到你,记得“关注”、“点赞”、“评论”三连哦~
————————————————

本文参考Python官方文档: https://docs.python.org/zh-cn/3.8/library/asyncio-stream.html#examples

本文参考Python官方文档针对官方文档示例进行解析,解析不完整只为了便于理解

流是用于处理网络连接的高级async/await-ready原语。流允许发送和接收数据,而不需要使用回调或低级协议和传输。

Stream函数

下面的高级 asyncio 函数可以用来创建和处理流: coroutine asyncio. open_connection

(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)

建立网络连接并返回一对 (reader, writer) 对象。

返回的 reader 和 writer 对象是 StreamReader StreamWriter 类的实例。

注意:使用ayncio.open_connection()方法创建和处理流时只有在await时才返回reader和writer对象

为了方便测试我们在本地搭建一个nginx服务器,首页index.html内容为“Hello World” 在这里插入图片描述
示例:

import asyncio
async def wget(host):
    connect = asyncio.open_connection(host,80)
    print(type(connect))
    reader,writer = await connect
    print(type(reader),type(writer))
async def main():
    # 获取表头主机列表
    hosts = ['192.168.1.100']
   # 根据主机列表获取一个tasks列表
    tasks = [asyncio.create_task(wget(host)) for host in hosts]
    # 等待任务列表执行结果
    await asyncio.gather(*tasks)
asyncio.run(main() 

运行输出如下:

<class 'coroutine'>
<class 'asyncio.streams.StreamReader'> <class 'asyncio.streams.StreamWriter'>
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000236AA956F70>
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\lib\asyncio\proactor_events.py", line 116, in __del__
    self.close()
  File "C:\ProgramData\Anaconda3\lib\asyncio\proactor_events.py", line 108, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 719, in call_soon
    self._check_closed()
  File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

解析:运行报错是因为获取了writer对象但是并没有写数据

打印类型可以看到在这里插入图片描述
完善代码使用writer对象发送请求至服务器,然后reader对象就可以收取服务器发送过来的数据

# asyncio.open_connection创建数据流 start
import asyncio
async def wget(host):
    connect = asyncio.open_connection(host,80)
    print(type(connect))
    reader,writer = await connect
    print(type(reader),type(writer))
    # 定义请求头部,格式是固定格式
    header = 'GET / HTTP/1.0\r\n Host:{0}\r\n\r\n'.format(host)
    # 通过writer对象往http服务器发送请求,请求是二进制格式的需要使用encode()方法编码
    writer.write(header.encode('utf-8'))
    # writer.write方法需要与drain()方法一起使用
    await writer.drain()
    # 阻塞获取服务器发送过来的所有数据,read()方法一次性获取所有数据,数据多可以使用readline()方法一行行获取
    data = await reader.read()
    # 打印获取的数据,获取数据为二进制格式不加decode()解码则打印原始数据
    print(data.decode())
    # 关闭writer需要和writer.wait_closed()一起使用,这里可以省略
    writer.close()
    await writer.wait_closed()
async def main():
    hosts = ['192.168.1.100']
    tasks = [asyncio.create_task(wget(host)) for host in hosts]
    await asyncio.gather(*tasks)
asyncio.run(main())
# asyncio.open_connection创建数据流 end

输出如下:

<class 'coroutine'>
<class 'asyncio.streams.StreamReader'> <class 'asyncio.streams.StreamWriter'>
HTTP/1.1 200 OK
Server: nginx/1.14.0
Date: Sat, 30 Oct 2021 09:37:17 GMT
Content-Type: text/html
Content-Length: 12
Last-Modified: Fri, 29 Oct 2021 07:41:00 GMT
Connection: close
ETag: "617ba58c-c"
Accept-Ranges: bytes
Hello World

本次代码演示了连接http服务器并且向服务器发送一个GET请求,服务器收到GET请求以后把数据返回给客户,然后通过reader对象获取到服务器发送过来的数据。

注意:本次发送的是一个GET请求,格式是固定的
在这里插入图片描述
本次发送的完整数据为

'GET / HTTP/1.0\r\n Host:192.168.1.100\r\n\r\n'

对应关系如下图在这里插入图片描述

拆分解析如下

GET / HTTP/1.0\r\n Host:192.168.1.100\r\n\r\n
GET #请求方法为GET
/ # 请求URL为/即根目录
HTTP/1.0 # 协议版本
\r\n # 回车符和换行符
Host # 头部字段为Host
: # 固定格式的符号:
192.168.1.100 # Host的值,即本次请求的主机值
\r\n # 请求头部的回车符和换行符
\r\n # 最后的回车符和换行符

使用流的TCP回显客户端和服务器

使用流的TCP回显客户端

tcp_stream_client.py
import asyncio
# 回显客户端协程函数,传递参数message发送给服务器端,服务器端接收信息原样返回
async def tcp_echo_client(message):
    # 创建reader,writer对象分别用于接收和发送信息
    reader,writer = await asyncio.open_connection('127.0.0.1',8888)
    print(f'Send:{message!r}')
    # 往服务器端写信息,需要编码后发送
    writer.write(message.encode())
    # await writer.drain()
    # 从服务器端读取信息读取100个字节
    data = await reader.read(100)
    # 打印解码后的信息
    print(f'Received:{data.decode()!r}')
    print('Close the connection')
    writer.close()
    # await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))

注意:这里没有使用writer.drain()writer.wait_closed()也可以

使用流的TCP回显服务器端

tcp_stream_server.py
import asyncio
# 启动服务后当客户端建立新连接时调用该函数
# 接受参数为reader,writer
# reader是类StreamReader的实例,而writer是类StreamWriter的实例
# 即客户端和服务器端的reader和writer是一一对应的,分别用于接收对方数据流和往对方发送数据流
async def handle_echo(reader, writer):
    # 服务器从客户端读取信息
    # 即客户端通过writer往服务器写的信息
    data = await reader.read(100)
    # 信息解码
    message = data.decode()
    # 该方法获取客户端的ip地址信息
    addr = writer.get_extra_info('peername')
    print(f"Received {message!r} from {addr!r}")
    print(f"Send: {message!r}")
    # 服务器端把从客户端读取的信息又发送给客户端
    writer.write(data)
    await writer.drain()
    # 关闭连接
    print("Close the connection")
    writer.close()
async def main():
    # start_server()方法启动套接字服务,返回一个server对象
    # 当一个新的客户端连接被建立时,回调函数会被调用。该函数会接收到一对参数(reader,writer)
    # reader是类StreamReader的实例,而writer是类StreamWriter的实例
    # client_connected_cb 即可以是普通的可调用对象也可以是一个 协程函数; 如果它是一个协程函数,它将自动作为 Task 被调度。
    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
    # 以下方法可以获取启动的ip地址和端口信息返回一个元组其实即使start_server方法传递的ip和端口信息('127.0.0.1',8888)
    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')
    # 启动服务端
    # Server对象是异步上下文管理器。当用于async with语句时,异步上下文管理器可以确保Server对象被关闭
    # 并且在async with完成后不接受新的连接。
    async with server:
        # server_forver()方法
        # 开始接受连接,直到协程被取消。server_forever任务的取消将导致服务器被关闭
        await server.serve_forever()
asyncio.run(main())

打开两个窗口,先启动服务器端在这里插入图片描述
服务器端开启了8888端口等待客户端连接

运行客户端

运行客户端的时候客户端和服务器端建立了连接才启动协程函数handle_echo在这里插入图片描述
服务器端接收的信息为

在这里插入图片描述
你们的支持是我最大的动力!!记得三连哦~

关注小编获取更多精彩内容!

制作不易,记得一键三连哦!!

在页面上点击组件,选择了一个文件,页面上的上传文件组件数据有了变化,于是页面通知后台重新执行一遍代码,同时页面把上传文件组件的文件数据返回给后台,代码开始重新执行第一句代码时,函数返回了页面给的文件数据,因此函数有了返回值,不再是None。千万别忘了,如果没有选择任何工作表,要提前跳出执行看看,效果,很不错。方法很简单,独定义一个函数,接收文件对象,函数里面就是 Pandas 加载数据的代码,然后在函数上方打上装饰器,表明这是一个缓存函数,为了证明其缓存函数生效,我们在函数里面打印内容,到控制台。 pip install git+https://github.com/sandabuliu/python-stream.git git clone https://github.com/sandabuliu/python-stream.git cd python-agent python setup.py install QuickStart Examples Word Count from pystream.executor.source import Memory from pystream.executor.executor import Map, Iterator, ReducebyKey data = Memory([ Python数据流 Tributary是用于在python中构造数据流图的库。 不像在python许多其他DAG库(,,, , , 等),支路不是被设计数据/ ETL管道或调度考虑。 相反,支流与 , , streamz或pyfunctional之类的库更相似,因为它被设计用作数据模型的实现。 greeks库就是一个这样的例子,它利用支流建立了用于期权定价的数据模型。 用pip安装: pip install tributary 或使用conda: conda install -c conda-forge tributary 或从来源: python setup.py install 注意:如果是从源代码安装或通过pip安装, .graphviz()使用.graphviz()方法可视化图形,还需要Graphviz本身。 支流提供几种流: HTTP流 HTTPStream是 Python 的 HTTP 客户端库,它使用方便的基于资源的接口包装标准库 HTTP 客户端,还提供对增量 JSON 文档检索和 RFC 6570 URI 模板的支持。 HTTPStream 托管在 PyPI 上,因此要安装,只需使用pip : pip install httpstream >>> from httpstream import get >>> get("https://api.duckduckgo.com/?q=neo4j&format=json").content {'Abstract': 'Neo4j is an open-source graph database, implemented in Java.', 'AbstractSource': 'Wikipedia', 'AbstractText': 'Ne “流”是个不简单的东西。从FILE *开始初具雏形,到C++iostream,到Java繁杂的流系统,到Boost.iostreams,流机制从一种貌似可有可无的小玩意,到在实践中大显威力,最后直到以一种全新的程序设计哲学引发人们的深思——尽管实际上,和所有新潮语言里面新奇的玩意一样,这在计算机科学中从来也不是什么新鲜东西,而是几十年前就完备的理论。然而在这里,我们抛开理论,只来讨论一个通俗的问题... read():一次读取文件的全部内容,容易导致爆内存,read(size)控制读取大小 readline():每次读取一行 readlines():一次读取所有内容并按行返回list 数据流的基本解析和转换一、前言二、数据封装1、字符串编码1、数字编码3、binascii编码3.1 a2b_hex3.2 b2a_hex4、hex方法三、数据解析1、字符串解码2、数字解码四、总结 数据是我们在编写代码过程中,尤其是通信过程中,是经常要使用到的,牵扯到数据的封装,解析,转换,这一张我们为大家介绍基本的数据操作 二、数据封装 1、字符串编码 一般我们发送字符串之前,要对字符串进行编码,形成字节流,使用encode方法,一般英文字符使用的都是utf-8编码 import socket 一个由flask封装起来的算法,一个由django封装的后台,我希望在django里通过requests调用flask的算法接口,flask可以分析一帧返回一帧结果,追求分析结果的实时返回,而不是完全分析完再完整返回结果 能想到的模式暂时有三种: 一问一答:等待完整的分析结果然后返回,最不济就用这种 我要你给(长链接):flask返回一个generator,django取next就得到下一... Redis增删改查:独立消费创建消费组消费控制消息的长度 redis5.0 新特性,官方介绍:https://redis.io/topics/streams-intro 参考:https://blog.csdn.net/shellquery/article/details/80562422 增删改查: xadd(self, name, fields, id=’*’, maxlen=None, ... python会在文本模式下做一些转换:当在windows下用文本模式读取文件时,python会将\r\n转换成\n,相反,当在windows下用文本模式写文件时,会把\n转换成\r\n。懒惰行迭代的原因:在读取非常大的文件,readlines会占用太多的内存,而懒惰行迭代只是读取文件中实际需要的部分。file.readline(n)当n为空是,读取单独一行,并读取文件换行符,当n为非负值时读取文件的前n个字符。file.readlines()读取一个文件中的所有行,并以列表的返回。 前言只有光头才能变强。文本已收录至我的GitHub仓库,欢迎Star:https://github.com/ZhongFuCheng3y/3y上一篇讲解到了Lambda表... 点击上方“Python共享之家”,进行关注回复“资源”即可获赠Python学习资料今日鸡汤海内存知己,天涯若比邻。大家好,我是皮皮。一、前言前几天在Python最强王者交流群【德善堂小儿推...