马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
from websocket import create_connection
import gzip
import time
import json
from concurrent.futures import ThreadPoolExecutor
def get_ws_clinet():
while True:
try:
ws = create_connection("wss://api.huobipro.com/ws")
return ws
except Exception as e:
print('connect ws error,retry...')
time.sleep(5)
def get_subscribe_data(trade):
name = trade['sub'].split(".")[1]
ws = get_ws_clinet()
trade_json = json.dumps(trade)
ws.send(trade_json)
while True:
compressData = ws.recv()
result_json = gzip.decompress(compressData).decode('utf-8')
result = json.loads(result_json)
# {"ch":"market.btcusdt.kline.1min","ts":1609249181634,"tick":{"id":1609249140,"open":26846.04,"close":26865.54,"low":26844.23,"high":26865.55,"amount":4.39722167701809,"vol":118064.40113187107,"count":280}}
data = result.get("tick", {})
# 如果没有数据,表示websocket服务主动断开了连接,所以要保持心跳
if not data:
# 获取服务器发过来的心跳时间戳
ts = result.get("pong", None) or result.get("ts", None)
if not ts:
# 重新发起订阅请求
ws.send(trade_json)
continue
else:
# 响应服务器心跳
ws.send(json.dumps({"ping": result.get("ts")}))
continue
# print(data)
bt_close = data.get("close")
#a = data[1]
print(name, '-----------',bt_close,type(bt_close))
a = open("G:\lianghua\lianghua1.txt", 'w')
a.write(name, '-----------',bt_close)
a.close()
if __name__ == '__main__':
# ws_client = get_ws_clinet()
#
# get_subscribe_data(ws_client)
trades = [
{"sub": "market.btcusdt.kline.1min", "id": "id10"},
{"sub": "market.ethusdt.kline.1min", "id": "id10"},
{"sub": "market.linkusdt.kline.1min", "id": "id10"},
{"sub": "market.bchusdt.kline.1min", "id": "id10"},
{"sub": "market.ltcusdt.kline.1min", "id": "id10"},
{"sub": "market.bsvusdt.kline.1min", "id": "id10"},
{"sub": "market.bsvusdt.kline.1min", "id": "id10"},
{"sub": "market.adausdt.kline.1min", "id": "id10"},
{"sub": "market.eosusdt.kline.1min", "id": "id10"},
]
excuter = ThreadPoolExecutor()
for trade in trades:
excuter.submit(get_subscribe_data, trade)
|