零基础OKEX-API入门及实践03-Websocket API

零基础OKEX-API入门及实践03-Websocket API

一、WebSocket API的简单应用

RestAPI已经介绍过了,参考下面两篇文章。这篇来讲讲OKEX的WebSocket API怎么用。

Rest API简单理解是一问一答,你请求得到什么数据,服务器就给你什么数据。但Rest API有严格的频率限制,如果想拿到交易所实时的挂单数据及每一笔交易数据,Rest API就没办法满足要求了,这时候就要用到WebSocket API。

交易所的WebSocket API,说白了就是你向网站服务器的某个频道发起订阅,这个频道上有数据更新就自动推送给你,就和订牛奶,订报纸一样。以下代码可以获取OKEX币币交易BTC-USDT交易对的实时成交数据,可以感受一下。(wss的这个网址被墙了,com改成me也不行,可以租了个香港的阿里云,跑自动交易的程序也是要放在服务器上的,可以学一下怎么用云服务器,很简单)

import asyncio
import websockets
import json
import zlib
def inflate(data):
    decompress = zlib.decompressobj(
            -zlib.MAX_WBITS  # see above
    inflated = decompress.decompress(data)
    inflated += decompress.flush()
    return inflated
async def subscribe_without_login(url, channels):
    async with websockets.connect(url) as websocket:
        sub_param = {"op": "subscribe", "args": channels}
        sub_str = json.dumps(sub_param)
        await  websocket.send(sub_str)
        print(f"send: {sub_str}")
        print("receive:")
        res = await websocket.recv()
        res = inflate(res)
        print(f"{res}")
        while True:
            res = await websocket.recv()
            res = inflate(res)
            print(f"{res}")
if __name__ == '__main__':
    url = 'wss://real.okex.com:8443/ws/v3'
    channels = "spot/trade:BTC-USDT"
    loop = asyncio.get_event_loop()
    loop.run_until_complete(subscribe_without_login(url, channels))
    loop.close()
  • inflate()是数据解压函数
  • asyncio.get_event_loop()获取当前的事件循环
  • loop.run_until_complete()启动协程
  • loop.close()关闭事件循环

运行结果如下:

上面的代码来源于OKEX的open-api-v3-sdk [1] ,(最新版本的sdk已经更新 [2] ),对于只有基础编程知识的人来说会有很多疑问,asyncio,async, await都是什么?asyncio是一个异步I/O库,可以参考官方的文档说明。这里涉及到协程这个概念,我也刚懂一点,不懂的可以阅读文末的参考文献。

WebSocket API是属于I/O 密集型应用场景,什么意思?上面代码中的语句websocket.recv()用于接收服务器推送的消息数据,问题是这个数据什么时候来是不知道的,如果程序一直在这等,效率极低。那想一下,如果遇到需要等待结果的语句,CPU自动切换到其他地方继续工作,那效率是不是会高很多,这就是 async/await 关键字可以实现的功能(上述代码因为没有并发启动还实现不了,可以看下面的例子)。

二、协程的相关说明

做以下几点说明:

1、协程,又称微线程,纤程,英文名Coroutine。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行. [3]

2、asyncio模块提供了使用协程构建并发应用的工具。它使用一种单线程单进程的的方式实现并发,应用的各个部分彼此合作, 可以显示的切换任务,一般会在程序阻塞I/O操作的时候发生上下文切换,如等待读写文件,或者请求网络。 [4]

3、协程(Coroutine)本质上是一个函数,但要用async def声明,await语法只能出现在通过async修饰的函数中,否则会报SyntaxError错误。而且await后面的对象需要是一个Awaitable,或者实现了相关的协议的可等待对象 [5]

4、事件循环(Event loop)是asyncio应用的核心。Eventloop实例提供了注册、取消、执行任务和回调的方法。事件循环在线程中运行(通常是主线程),并在其线程中执行所有回调和任务。当一个任务在事件循环中运行时,没有其他任务可以在同一个线程中运行。 当一个任务执行一个 await 表达式时,正在运行的任务被挂起,事件循环执行下一个任务 [6]

5、async with...as是一个异步上下文管理器,这样写可以确保connect的关闭问题。 [7] [8]

三、实现心跳ping-pong

OKEX对WebSocket API做了以下限制:

连接限制 :1次/s
订阅限制 :每小时240次
连接上ws后如果一直没有数据返回,30s 后自动断开链接, 建议用户进行以下操作:
1,每次接收到消息后,用户设置一个定时器 ,定时N秒。
2,如果定时器被触发(N 秒内没有收到新消息),发送字符串 'ping'。
3,期待一个文字字符串'pong'作为回应。如果在 N秒内未收到,请发出错误或重新连接。
出现网络问题会自动断开连接

那现在用其他的协程来实现定时30s发送'ping'的功能,这部分OKEX的SDK没有,自己写的,如有错误或者好的实现方式还请告知。

import asyncio
import websockets
import json
import zlib
def inflate(data):
    decompress = zlib.decompressobj(
            -zlib.MAX_WBITS  # see above
    inflated = decompress.decompress(data)
    inflated += decompress.flush()
    return inflated
class TimeCount:
    def __init__(self):
        self.sCount = 0
        self.sDelta = 1
        self.sNoData = 30
        self.sNoPong = 5
        self.webStatus = 'YES'
async def time_trigger(TC_wss):
    while True:
        await asyncio.sleep(TC_wss.sDelta)
        TC_wss.sCount += TC_wss.sDelta
        print('TC_wss.sCount', TC_wss.sCount)
        if TC_wss.sCount > TC_wss.sNoData + TC_wss.sNoPong:
            TC_wss.webStatus = 'NO'
        if TC_wss.webStatus == 'NO':
            raise IOError
async def pingTest(url,TC_wss):
    async with websockets.connect(url) as websocket:
        while True:
            if TC_wss.sCount >= TC_wss.sNoData:
                await websocket.send('ping')
                res = await websocket.recv()
                res = inflate(res)
                res_str = str(res, encoding='utf-8')
                print('pong??',res_str)
                if res_str == 'pong':
                    TC_wss.sCount = 0
                else:
                    TC_wss.webStatus = 'NO'
            else:
                await asyncio.sleep(TC_wss.sDelta)
            if TC_wss.webStatus == 'NO':
                raise IOError
async def subscribe_without_login(url, channels, TC_wss):
    async with websockets.connect(url) as websocket:
        sub_param = {"op": "subscribe", "args": channels}
        sub_str = json.dumps(sub_param)
        await  websocket.send(sub_str)
        print(f"send: {sub_str}")
        print("receive:")
        res = await websocket.recv()
        res = inflate(res)
        print(f"{res}")
        while True:
            res = await websocket.recv()
            res = inflate(res)
            print(f"{res}")
            TC_wss.sCount = 0
            if TC_wss.webStatus == 'NO':
                raise IOError
if __name__ == '__main__':
    TC_wss = TimeCount()