零基础OKEX-API入门及实践03-Websocket API
一、WebSocket API的简单应用
RestAPI已经介绍过了,参考下面两篇文章。这篇来讲讲OKEX的WebSocket API怎么用。
Rest API简单理解是一问一答,你请求得到什么数据,服务器就给你什么数据。但Rest API有严格的频率限制,如果想拿到交易所实时的挂单数据及每一笔交易数据,Rest API就没办法满足要求了,这时候就要用到WebSocket API。
交易所的WebSocket API,说白了就是你向网站服务器的某个频道发起订阅,这个频道上有数据更新就自动推送给你,就和订牛奶,订报纸一样。以下代码可以获取OKEX币币交易BTC-USDT交易对的实时成交数据,可以感受一下。(wss的这个网址被墙了,com改成me也不行,可以租了个香港的阿里云,跑自动交易的程序也是要放在服务器上的,可以学一下怎么用云服务器,很简单)
import asyncio
import websockets
import json
import zlib
def inflate(data):
decompress = zlib.decompressobj(
-zlib.MAX_WBITS # see above
inflated = decompress.decompress(data)
inflated += decompress.flush()
return inflated
async def subscribe_without_login(url, channels):
async with websockets.connect(url) as websocket:
sub_param = {"op": "subscribe", "args": channels}
sub_str = json.dumps(sub_param)
await websocket.send(sub_str)
print(f"send: {sub_str}")
print("receive:")
res = await websocket.recv()
res = inflate(res)
print(f"{res}")
while True:
res = await websocket.recv()
res = inflate(res)
print(f"{res}")
if __name__ == '__main__':
url = 'wss://real.okex.com:8443/ws/v3'
channels = "spot/trade:BTC-USDT"
loop = asyncio.get_event_loop()
loop.run_until_complete(subscribe_without_login(url, channels))
loop.close()
- inflate()是数据解压函数
- asyncio.get_event_loop()获取当前的事件循环
- loop.run_until_complete()启动协程
- loop.close()关闭事件循环
运行结果如下:
上面的代码来源于OKEX的open-api-v3-sdk [1] ,(最新版本的sdk已经更新 [2] ),对于只有基础编程知识的人来说会有很多疑问,asyncio,async, await都是什么?asyncio是一个异步I/O库,可以参考官方的文档说明。这里涉及到协程这个概念,我也刚懂一点,不懂的可以阅读文末的参考文献。
WebSocket API是属于I/O 密集型应用场景,什么意思?上面代码中的语句websocket.recv()用于接收服务器推送的消息数据,问题是这个数据什么时候来是不知道的,如果程序一直在这等,效率极低。那想一下,如果遇到需要等待结果的语句,CPU自动切换到其他地方继续工作,那效率是不是会高很多,这就是 async/await 关键字可以实现的功能(上述代码因为没有并发启动还实现不了,可以看下面的例子)。
二、协程的相关说明
做以下几点说明:
1、协程,又称微线程,纤程,英文名Coroutine。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行. [3]
2、asyncio模块提供了使用协程构建并发应用的工具。它使用一种单线程单进程的的方式实现并发,应用的各个部分彼此合作, 可以显示的切换任务,一般会在程序阻塞I/O操作的时候发生上下文切换,如等待读写文件,或者请求网络。 [4]
3、协程(Coroutine)本质上是一个函数,但要用async def声明,await语法只能出现在通过async修饰的函数中,否则会报SyntaxError错误。而且await后面的对象需要是一个Awaitable,或者实现了相关的协议的可等待对象 [5] 。
4、事件循环(Event loop)是asyncio应用的核心。Eventloop实例提供了注册、取消、执行任务和回调的方法。事件循环在线程中运行(通常是主线程),并在其线程中执行所有回调和任务。当一个任务在事件循环中运行时,没有其他任务可以在同一个线程中运行。
当一个任务执行一个
await
表达式时,正在运行的任务被挂起,事件循环执行下一个任务
[6]
。
5、async with...as是一个异步上下文管理器,这样写可以确保connect的关闭问题。 [7] [8]
三、实现心跳ping-pong
OKEX对WebSocket API做了以下限制:
连接限制 :1次/s
订阅限制 :每小时240次
连接上ws后如果一直没有数据返回,30s 后自动断开链接, 建议用户进行以下操作:
1,每次接收到消息后,用户设置一个定时器 ,定时N秒。
2,如果定时器被触发(N 秒内没有收到新消息),发送字符串 'ping'。
3,期待一个文字字符串'pong'作为回应。如果在 N秒内未收到,请发出错误或重新连接。
出现网络问题会自动断开连接
那现在用其他的协程来实现定时30s发送'ping'的功能,这部分OKEX的SDK没有,自己写的,如有错误或者好的实现方式还请告知。
import asyncio
import websockets
import json
import zlib
def inflate(data):
decompress = zlib.decompressobj(
-zlib.MAX_WBITS # see above
inflated = decompress.decompress(data)
inflated += decompress.flush()
return inflated
class TimeCount:
def __init__(self):
self.sCount = 0
self.sDelta = 1
self.sNoData = 30
self.sNoPong = 5
self.webStatus = 'YES'
async def time_trigger(TC_wss):
while True:
await asyncio.sleep(TC_wss.sDelta)
TC_wss.sCount += TC_wss.sDelta
print('TC_wss.sCount', TC_wss.sCount)
if TC_wss.sCount > TC_wss.sNoData + TC_wss.sNoPong:
TC_wss.webStatus = 'NO'
if TC_wss.webStatus == 'NO':
raise IOError
async def pingTest(url,TC_wss):
async with websockets.connect(url) as websocket:
while True:
if TC_wss.sCount >= TC_wss.sNoData:
await websocket.send('ping')
res = await websocket.recv()
res = inflate(res)
res_str = str(res, encoding='utf-8')
print('pong??',res_str)
if res_str == 'pong':
TC_wss.sCount = 0
else:
TC_wss.webStatus = 'NO'
else:
await asyncio.sleep(TC_wss.sDelta)
if TC_wss.webStatus == 'NO':
raise IOError
async def subscribe_without_login(url, channels, TC_wss):
async with websockets.connect(url) as websocket:
sub_param = {"op": "subscribe", "args": channels}
sub_str = json.dumps(sub_param)
await websocket.send(sub_str)
print(f"send: {sub_str}")
print("receive:")
res = await websocket.recv()
res = inflate(res)
print(f"{res}")
while True:
res = await websocket.recv()
res = inflate(res)
print(f"{res}")
TC_wss.sCount = 0
if TC_wss.webStatus == 'NO':
raise IOError
if __name__ == '__main__':
TC_wss = TimeCount()