async def hello ( websocket , path ) : # path标识请求路径,可以来自定义需求 name = await websocket . recv ( ) print ( f"< { name } " ) greeting = f"Hello { name } !" await websocket . send ( greeting ) print ( f"> { greeting } " ) start_server = websockets . serve ( hello , "localhost" , 8765 ) asyncio . get_event_loop ( ) . run_until_complete ( start_server ) asyncio . get_event_loop ( ) . run_forever ( )

客户端 :

客户端连接到server,执行协程处理器 handler,当协程退出后与server断开连接。

# WS client example
import asyncio
import websockets
async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        name = input("What's your name? ")
        await websocket.send(name)
        print(f"> {name}")
        greeting = await websocket.recv()
        print(f"< {greeting}")
asyncio.get_event_loop().run_until_complete(hello())

connect()通过 上下文管理器保证了协程退出之前关闭socket连接。

SSL安全连接示例

安全的WebSocket连接提高了保密性和可靠性,降低了了使用不安全的proxy代理服务的风险。

WSS协议之于WS就像HTTPS之于HTTP: 连接是用传输层安全(TLS)加密的,TLS通常被称为安全套接字层(SSL)。WSS需要类似HTTPS的TLS证书。

下面介绍如改写上面的服务器示例以提供安全连接。请参阅ssl模块的文档配置上下文。

# WSS (WS over TLS) server example, with a self-signed certificate import asyncio import pathlib import ssl import websockets async def hello(websocket, path): name = await websocket.recv() print(f"< {name}") greeting = f"Hello {name}!" await websocket.send(greeting) print(f"> {greeting}") ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) localhost_pem = pathlib.Path(__file__).with_name("localhost.pem") ssl_context.load_cert_chain(localhost_pem) start_server = websockets.serve( hello, "localhost", 8765, ssl=ssl_context asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()

改写相应的客户端安全连接实现:

# WSS (WS over TLS) client example, with a self-signed certificate
import asyncio
import pathlib
import ssl
import websockets
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("localhost.pem")
ssl_context.load_verify_locations(localhost_pem)
async def hello():
    uri = "wss://localhost:8765"
    async with websockets.connect(
        uri, ssl=ssl_context
    ) as websocket:
        name = input("What's your name? ")
        await websocket.send(name)
        print(f"> {name}")
        greeting = await websocket.recv()
        print(f"< {greeting}")
asyncio.get_event_loop().run_until_complete(hello())

此处客户端需要一个ssl上下文,因为此处使用的是一个 自签名的CA证书。

如果使用有效的证书(即由Python安装信任的CA签名)连接到安全的WebSocket服务器的客户端可以简单地将ssl=True传递给connect(),而不必构建ssl上下文。

使用浏览器连接到服务器

从浏览器连接到我们自己实现的服务器:

从console里运行以下程序:

# WS server that sends messages at random intervals
import asyncio
import datetime
import random
import websockets
async def time(websocket, path):
    while True:
        now = datetime.datetime.utcnow().isoformat() + "Z"
        await websocket.send(now)
        await asyncio.sleep(random.random() * 3)
start_server = websockets.serve(time, "127.0.0.1", 5678)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

将以下代码写入html文件,并从浏览器里打开

<!DOCTYPE html>
        <title>WebSocket demo</title>
    </head>
        <script>
            var ws = new WebSocket("ws://127.0.0.1:5678/"),
                messages = document.createElement('ul');
            ws.onmessage = function (event) {
                var messages = document.getElementsByTagName('ul')[0],
                    message = document.createElement('li'),
                    content = document.createTextNode(event.data);
                message.appendChild(content);
                messages.appendChild(message);
            document.body.appendChild(messages);
        </script>
    </body>
</html>

多客户端同步

一个websocket服务器可以从多个客户端接收消息,处理消息并同步状态到所有客户端。

以下示例展示了任意一客户端将一变量counter进行增加或减少后,所有已连接的客户端都能同步更新counter的实时值。

协程模块 asyncio保证了变量的更新是按先后顺序的。

服务端代码:

# WS server example that synchronizes state across clients
import asyncio
import json
import logging
import websockets
logging.basicConfig()
STATE = {"value": 0}
# 保存所有在线客户端
USERS = set()
def state_event():
    return json.dumps({"type": "state", **STATE})
def users_event():
    return json.dumps({"type": "users", "count": len(USERS)})
# 更新所有客户端显示的counter值
async def notify_state():
    if USERS:  # asyncio.wait doesn't accept an empty list
        message = state_event()
        await asyncio.wait([user.send(message) for user in USERS])
# 通知客户端在线数量
async def notify_users():
    if USERS:  # asyncio.wait doesn't accept an empty list
        message = users_event()
        await asyncio.wait([user.send(message) for user in USERS])
# 注册客户端
async def register(websocket):
    USERS.add(websocket)
    await notify_users()
# 注销客户端
async def unregister(websocket):
    USERS.remove(websocket)
    await notify_users()
async def counter(websocket, path):
    # register(websocket) sends user_event() to websocket
    await register(websocket)
    try:
        await websocket.send(state_event())
        # 迭代websocket以不断接收消息,此处要求对象实现了 __iter__()、__await__()、 __aenter__()、 __aexit__() 方法。
        async for message in websocket:
            data = json.loads(message)
            if data["action"] == "minus":
                STATE["value"] -= 1
                await notify_state()
            elif data["action"] == "plus":
                STATE["value"] += 1
                await notify_state()
            else:
                logging.error("unsupported event: {}", data)
    finally:
    # 客户端断开后,退出上面的 for 循环,即客户端协程退出后
        await unregister(websocket)
start_server = websockets.serve(counter, "localhost", 6789)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

将以下代码写入html文件,并用浏览器打开多个页面模拟多个ws客户端

<title>WebSocket demo</title> <style type="text/css"> body { font-family: "Courier New", sans-serif; text-align: center; .buttons { font-size: 4em; display: flex; justify-content: center; .button, .value { line-height: 1; padding: 2rem; margin: 2rem; border: medium solid; min-height: 1em; min-width: 1em; .button { cursor: pointer; user-select: none; .minus { color: red; .plus { color: green; .value { min-width: 2em; .state { font-size: 2em; </style> </head> <div class="buttons"> <div class="minus button">-</div> <div class="value">?</div> <div class="plus button">+</div> </div> <div class="state"> <span class="users">?</span> online </div> <script> var minus = document.querySelector('.minus'), plus = document.querySelector('.plus'), value = document.querySelector('.value'), users = document.querySelector('.users'), websocket = new WebSocket("ws://127.0.0.1:6789/"); minus.onclick = function (event) { websocket.send(JSON.stringify({action: 'minus'})); plus.onclick = function (event) { websocket.send(JSON.stringify({action: 'plus'})); websocket.onmessage = function (event) { data = JSON.parse(event.data); switch (data.type) { case 'state': value.textContent = data.value; break; case 'users': users.textContent = ( data.count.toString() + " user" + (data.count == 1 ? "" : "s")); break; default: console.error( "unsupported event", data); </script> </body> </html>

Server常用模板

你要在连接的生命周期里处理多条消息,你必须实现一个loop,以下为你提供了一个构建Websocket Server的基础模板。

接收消息并且传递到消费者协程中。

async def consumer_handler(websocket, path):
    async for message in websocket:
        await consumer(message)

当客户端断开连接后终止迭代。

从生产者协程生成消息,并且发送出去

async def producer_handler(websocket, path):
    while True:
        message = await producer()
        await websocket.send(message)

此处生产者代表你的产生消息的业务逻辑。

注意:当客户端断开连接后,send() 会引发 ConnectionClosed异常,从而从 while True的 loop 中 退出。

生产者+消费者

你可以将上面两种模式结合起来,两个协程任务并行。

async def handler(websocket, path):
    consumer_task = asyncio.ensure_future(
        consumer_handler(websocket, path))
    producer_task = asyncio.ensure_future(
        producer_handler(websocket, path))
    done, pending = await asyncio.wait(
        [consumer_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED,
    for task in pending:
        task.cancel()

注册客户端

参考上面多客户端同步的代码,你需要记录所有已连接的客户端,当他们连接到server时进行注册,断开时注销。

connected = set()
async def handler(websocket, path):
    # Register.
    connected.add(websocket)
    try:
        # Implement logic here.
        await asyncio.wait([ws.send("Hello!") for ws in connected])
        await asyncio.sleep(10)
    finally:
        # Unregister.
        connected.remove(websocket)

这个简单的示例展示了如何在内存中跟踪连接的客户端,这只在运行单个进程时有效。在实际应用程序中,handler可以注册到消息代理broker 上的某些channels。

Cheat sheet

Server

Client

  • 使用connect()创建一个客户端,它类似于asyncio中loop的create_connection()。您还可以将它用作异步上下文管理器。

    • 对于高级定制,您可以继承WebSocketClientProtocol的子类,并将这个子类或工厂函数作为 create_protocol 参数传递。
  • 随时调用recv()和send()来接收和发送消息。

  • 如果你愿意,你可以ping()或pong(),但一般不需要。

  • 如果没有使用connect()作为上下文管理器,请调用close()来终止连接。

Debugging

如果你不了解websocket的工作原理,请打开日志调试

import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

如果你不了解asyncio库, 建议查看官方文档develop with asyncio.

传递额外的参数到handler

import asyncio
import functools
import websockets
async def handler(websocket, path, extra_argument):
    ...
bound_handler = functools.partial(handler, extra_argument='spam')
start_server = websockets.serve(bound_handler, '127.0.0.1', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

实现此结果的另一种方法是在存在extra_argument变量的范围内定义handler协程,而不是通过参数注入它。

Deployment

如何关闭server

  • 使用异步上下文管理器 async with
  • 调用 close() 方法,然后等待它自身的wait_closed()方法执行结束。

Unix系统上(windows就不要试了),退出通常是通过发送一个信号来触发的。

import asyncio
import signal
import websockets
async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(message)
async def echo_server(stop):
    async with websockets.serve(echo, "localhost", 8765):
        await stop
loop = asyncio.get_event_loop()
# The stop condition is set when receiving SIGTERM.
stop = loop.create_future()
loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
# Run the server until the stop condition is met.
loop.run_until_complete(echo_server(stop))

如果你的server不是运行在主线程上,可以使用 call_soon_threadsafe().

Websocket是HTTP/1.1.的扩展,在同一个端口上同时提供HTTP和WebSocket是ok的。

WebSocket的作者并不认为这是一个好主意,因为HTTP和WebSocket的操作特性有很大的不同。

websockets使用process_request参数钩子,为响应HTTP请求提供了最低限度的支持。典型的用例包括健康检查。这里有一个例子

# WS echo server with HTTP endpoint at /health/
import asyncio
import http
import websockets
async def health_check(path, request_headers):
    if path == "/health/":
        return http.HTTPStatus.OK, [], b"OK\n"
async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(message)
start_server = websockets.serve(
    echo, "localhost", 8765, process_request=health_check
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Extensions

websockets 支持扩展

RFC 7692

WebSocket Per-Message Deflate

serve() and connect() 默认支持Per-Message Deflate, 你可以
通过参数禁用compression=None.

如果希望自定义Per-Message Deflate参数,还可以显式配置每个消息的Deflate扩展。

import websockets
from websockets.extensions import permessage_deflate
websockets.serve(
    ...,
    extensions=[
        permessage_deflate.ServerPerMessageDeflateFactory(
            server_max_window_bits=11,
            client_max_window_bits=11,
            compress_settings={'memLevel': 4},
from websockets.extensions import permessage_deflate
websockets.connect(
    ...,
    extensions=[
        permessage_deflate.ClientPerMessageDeflateFactory(
            server_max_window_bits=11,
            client_max_window_bits=11,
            compress_settings={'memLevel': 4},

参考API文档ServerPerMessageDeflateFactoryClientPerMessageDeflateFactory 了解更多细节。

python websockets 网络聊天室V1 程序打包链接:https://pan.baidu.com/s/1L0Cwur-VnD-BLjkf5n1K1w 提取码:gsqt 将程序打包成exe文件 可以在局域网下通信,不用傻傻的在同一台电脑自娱自乐 聊天记录保存在chat.log里 注:由于编程能力有限,退出务必是按ctrl+c,直接关闭窗口会生成孤儿进程 我都写了啥 py里有p_web 和 p_chat_server 两个进程,分别负责处理HTML的路由和聊天的后端 主要看html 里的ws.onmessage 和 py里的async def chat(websocket 不必担心握手、ping和pong等细节,WebSocket规范中的其他行为,websockets会在幕后自动处理,这样你就可以专注于你的应用程序!websockets 是一个专注于正确性、简洁性、健壮性和性能的Python库,用于构建WebSocket服务器和客户端。 websockets 是现在 python 最火的 websocket 依赖库,优于原版 websocket 和 ws4。 项目地址:aaugustin / websockets 文档地址:官方文档 我就不搬官网 demo 了,想看自己一看即可。 这里我们研究一下断线重连。 服务端很简单,只接受消息: import asyncio import websockets async def hello(websocket, path): while True: message_queues ={}client_socket_fd_map={}defstart_socket_select_server():sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)sock.bind(('0.0.... 不要将网络用于HTTP和WebSocket请求,而是直接调用Python方法 API不得假定您了解PyQt或泄漏PyQt内部 不要采用任何特定的后端编程风格(通道,线程,异步等) 最小的依赖关系(仅PyQt) 安装PyQt4(例如apt-get install python-qt4 )或PySide(例如apt-get install python-pyside )。 克隆此存储库,然后python setup.py install 。 from webkitwindow import WebkitWindow, Message class ExampleHandler(): def sta python websockets 提供的 websocket 实现框架,支持双向通信,长连接。 编程比socket 更简单,异步方式实现 服务器端 、客户端 ,javascript 的 websocket源代码。 Python websockets是用于在Python中构建WebSocket服务器和客户端的库,它基于asyncio异步IO建立,提供基于协程的API。运行代码会报错:AttributeError: module ‘websocket‘ has no attribute ‘enableTrace‘发现一个问题就是,客户端根本不需要加载公钥证书。无论是js浏览器客户端去连,还是使用python写的客户端去连。而服务端需要公钥和私钥。依赖的三方库为: websocket, websocket-client. 用websocket搭个聊天室实时互动小游戏,确实非常方便,但在线人数多了 就没那么简单了。一到300人在线就开始掉了。后来经过调整 终于好多了。以下是改进的方案记录:调整websocket.io的传输方式websoket.io Socket.IO 是一个功能非常强大的框架,能够帮助你构建基于 WebSocket 的跨浏览器的实时应用。支持主流浏览器,多种平台,多种传输模式。关于传输模式主要有两... url = 'wss://xxxxxx.net'#websocket连接地址,地址为虚拟地址 #websocket.enableTrace(True) ...