diff --git a/wait_for_ping_message.py b/wait_for_ping_message.py new file mode 100755 index 0000000000000000000000000000000000000000..d391a6de0eba84b04b5d2fee4d4c889ca4c0af65 --- /dev/null +++ b/wait_for_ping_message.py @@ -0,0 +1,60 @@ +#!/usr/bin/python3 +# https://binance-docs.github.io/apidocs/spot/en/#test-new-order-trade + +# we have one thread to maintain the dataset, another thread would run the pytorch model every 5 seconds, to see if it's time to place order. + +import json, time +from websocket import create_connection + +# [(Snapshot_Time, [(buy_price1, amount1), ...] , [(sell_price1, amount1), ...]), ...] +# [(1620457034392, [(56000, 0.01), (55900, 1), (55700, 30), ...] , [(57000, 0.01), (57100, 1), ...] ), (1620457034394, [...]), ...] +# The snapshots should has almost identical time-interval. Good for LSTM. +# Time axis: [history, older, newer, ..., latest] +realtime_shortterm_dataset_aggtrade = [] +realtime_shortterm_dataset_aggtrade_size = 1024 + +# [(Trade_Time, PRICE, AMOUNT), ...] +# [(1620457034392, 56000, 0.5), (1620457034394, 56001, 0.05), ...] +# The trades usually have various time-interval. TODO: transform it to [(WeightedAvgPrice, Volume), ...] for every 1 minutes? +# Time axis: [history, older, newer, ..., latest] +realtime_shortterm_dataset_depth = [] +realtime_shortterm_dataset_depth_size = 1024*1024 + +def dataset_maintain_thread_main(): + global realtime_shortterm_dataset_aggtrade, realtime_shortterm_dataset_aggtrade_size, realtime_shortterm_dataset_depth, realtime_shortterm_dataset_depth_size + + ws = create_connection("wss://stream.binance.com:9443/ws/btcusdt") + ws.send(json.dumps({ + "method": "SUBSCRIBE", + "params": ["btcusdt@depth"], + "id": 1, + })) + + subs_response = ws.recv() + if '"result":null' not in subs_response: + raise RuntimeError("Failed to subscribe: server says: " + subs_response) + + _debug_tmp = 0 + while True: + message = ws.recv() + # print("DEBUG: message=", message) + parsed = json.loads(message) + if 'a' in parsed.keys() and 'b' in parsed.keys(): + # Is a depth snapshot update + a = 1 + else: + print("ERROR: unexpected message (maybe ping message): ", message) + + ###### DEBUG ONLY + # if int(time.time()) % 60 == 0 and _debug_tmp != int(time.time()): + # _debug_tmp = int(time.time()) + # print("DEBUG: dumping realtime_shortterm_dataset_aggtrade================") + # print(realtime_shortterm_dataset_aggtrade) + # print("DEBUG: dumping realtime_shortterm_dataset_depth================") + # print(realtime_shortterm_dataset_depth) + # print("DEBUG: dumping END ==|||||||||||||||||||||||||||||================") + + ws.close() + +dataset_maintain_thread_main() + diff --git a/ws.py b/ws.py old mode 100644 new mode 100755 index 3dbded367c5c2e6664499251fc6e8239dc88c384..cb7c128b67bb068ce9b9fd0d1cbc77366e27084a --- a/ws.py +++ b/ws.py @@ -1,3 +1,4 @@ +#!/usr/bin/python3 # https://binance-docs.github.io/apidocs/spot/en/#test-new-order-trade # we have one thread to maintain the dataset, another thread would run the pytorch model every 5 seconds, to see if it's time to place order. @@ -19,6 +20,10 @@ realtime_shortterm_dataset_aggtrade_size = 1024 realtime_shortterm_dataset_depth = [] realtime_shortterm_dataset_depth_size = 1024*1024 +# The trading thread would not start working, before finish analysing longterm dataset. +# Time-interval for longterm dataset is 1 minute. +longterm_dataset = [] + def dataset_maintain_thread_main(): global realtime_shortterm_dataset_aggtrade, realtime_shortterm_dataset_aggtrade_size, realtime_shortterm_dataset_depth, realtime_shortterm_dataset_depth_size @@ -35,24 +40,34 @@ def dataset_maintain_thread_main(): _debug_tmp = 0 while True: - result = ws.recv() - # print("DEBUG: result=", result) - parsed = json.loads(result) + message = ws.recv() + # print("DEBUG: message=", message) + parsed = json.loads(message) if 'p' in parsed.keys(): # Is a trade message trade_time, price, amount = parsed.get('T'), parsed.get('p'), parsed.get('q') - realtime_shortterm_dataset_depth.append((trade_time, price, amount)) + realtime_shortterm_dataset_aggtrade.append((trade_time, price, amount)) + if len(realtime_shortterm_dataset_aggtrade) > realtime_shortterm_dataset_aggtrade_size: + del realtime_shortterm_dataset_aggtrade[0] elif 'b' in parsed.keys(): # Is a depth snapshot update - print('TODO') - - if int(time.time()) % 60 == 0 and _debug_tmp != int(time.time()): - _debug_tmp = int(time.time()) - print("DEBUG: dumping realtime_shortterm_dataset_aggtrade================") - print(realtime_shortterm_dataset_aggtrade) - print("DEBUG: dumping realtime_shortterm_dataset_depth================") - print(realtime_shortterm_dataset_depth) - print("DEBUG: dumping END ==|||||||||||||||||||||||||||||================") + snapshot_time, buy_makers, sell_makers = parsed.get('E'), parsed.get('b'), parsed.get('a') + buy_makes_parsed = [(float(ele[0]), float(ele[1])) for ele in buy_makers] + sell_makes_parsed = [(float(ele[0]), float(ele[1])) for ele in sell_makers] + realtime_shortterm_dataset_depth.append((snapshot_time, buy_makes_parsed, sell_makes_parsed)) + if len(realtime_shortterm_dataset_depth) > realtime_shortterm_dataset_depth_size: + del realtime_shortterm_dataset_depth[0] + else: + print("ERROR: unexpected message (maybe ping message): ", message) + + ###### DEBUG ONLY + # if int(time.time()) % 60 == 0 and _debug_tmp != int(time.time()): + # _debug_tmp = int(time.time()) + # print("DEBUG: dumping realtime_shortterm_dataset_aggtrade================") + # print(realtime_shortterm_dataset_aggtrade) + # print("DEBUG: dumping realtime_shortterm_dataset_depth================") + # print(realtime_shortterm_dataset_depth) + # print("DEBUG: dumping END ==|||||||||||||||||||||||||||||================") ws.close()