Source code for hftbacktest.data.utils.binancehistmktdata

import csv
from typing import Optional, Literal

import numpy as np
from numpy.typing import NDArray

from .. import merge_on_local_timestamp, correct, validate_data
from ... import DEPTH_EVENT, TRADE_EVENT, DEPTH_SNAPSHOT_EVENT


[docs]def convert_snapshot( snapshot_filename: str, output_filename: Optional[str] = None, feed_latency: float = 0, has_header: Optional[bool] = None, ) -> NDArray: r""" Converts Binance Historical Market Data files into a format compatible with HftBacktest. Since it doesn't have a local timestamp, it lacks feed latency information, which can result in a significant discrepancy between live and backtest results. Collecting feed data yourself or obtaining the high quality of data from a data vendor is strongly recommended. https://www.binance.com/en/landing/data Args: snapshot_filename: Snapshot filename output_filename: If provided, the converted data will be saved to the specified filename in ``npz`` format. feed_latency: Artificial feed latency value to be added to the exchange timestamp to create local timestamp. has_header: True if the given file has a header, it will automatically detect it if set to None. Returns: Converted data compatible with HftBacktest. """ ss_bid = [] ss_ask = [] timestamp_col = None side_col = None price_col = None qty_col = None # Reads snapshot file print('Reading %s' % snapshot_filename) with open(snapshot_filename, 'r', newline='') as f: reader = csv.reader(f, delimiter=',') for row in reader: if timestamp_col is None: if has_header is None: if row[0] == 'symbol': has_header = True else: has_header = False if has_header: header = row else: header = [ 'symbol', 'timestamp', 'trans_id', 'first_update_id', 'last_update_id', 'side', 'update_type', 'price', 'qty' ] if len(header) != len(row): raise ValueError timestamp_col = header.index('timestamp') side_col = header.index('side') price_col = header.index('price') qty_col = header.index('qty') if has_header: continue exch_timestamp = int(row[timestamp_col]) loc_timestamp = exch_timestamp + feed_latency side = 1 if row[side_col] == 'b' else -1 price = float(row[price_col]) qty = float(row[qty_col]) if side == 1: ss_bid.append([ DEPTH_SNAPSHOT_EVENT, exch_timestamp, loc_timestamp, side, price, qty ]) else: ss_ask.append([ DEPTH_SNAPSHOT_EVENT, exch_timestamp, loc_timestamp, side, price, qty ]) snapshot = [] snapshot += [cols for cols in sorted(ss_bid, key=lambda v: -float(v[4]))] snapshot += [cols for cols in sorted(ss_ask, key=lambda v: float(v[4]))] snapshot = np.asarray(snapshot, np.float64) if output_filename is not None: np.savez(output_filename, data=snapshot) return snapshot
[docs]def convert( depth_filename: str, trades_filename: str, output_filename: Optional[str] = None, buffer_size: int = 100_000_000, feed_latency: float = 0, base_latency: float = 0, method: Literal['separate', 'adjust'] = 'separate', depth_has_header: Optional[bool] = None, trades_has_header: Optional[bool] = None ) -> NDArray: r""" Converts Binance Historical Market Data files into a format compatible with HftBacktest. Since it doesn't have a local timestamp, it lacks feed latency information, which can result in a significant discrepancy between live and backtest results. Collecting feed data yourself or obtaining the high quality of data from a data vendor is strongly recommended. https://www.binance.com/en/landing/data Args: depth_filename: Depth data filename trades_filename: Trades data filename output_filename: If provided, the converted data will be saved to the specified filename in ``npz`` format. buffer_size: Sets a preallocated row size for the buffer. feed_latency: Artificial feed latency value to be added to the exchange timestamp to create local timestamp. base_latency: The value to be added to the feed latency. See :func:`.correct_local_timestamp`. method: The method to correct reversed exchange timestamp events. See :func:`..validation.correct`. depth_has_header: True if the given file has a header, it will automatically detect it if set to None. trades_has_header: True if the given file has a header, it will automatically detect it if set to None. Returns: Converted data compatible with HftBacktest. """ tmp_depth = np.empty((buffer_size, 6), np.float64) row_num = 0 timestamp_col = None side_col = None price_col = None qty_col = None print('Reading %s' % depth_filename) with open(depth_filename, 'r', newline='') as f: reader = csv.reader(f, delimiter=',') for row in reader: if timestamp_col is None: if depth_has_header is None: if row[0] == 'symbol': depth_has_header = True else: depth_has_header = False if depth_has_header: header = row else: header = [ 'symbol', 'timestamp', 'trans_id', 'first_update_id', 'last_update_id', 'side', 'update_type', 'price', 'qty' ] if len(header) != len(row): raise ValueError timestamp_col = header.index('timestamp') side_col = header.index('side') price_col = header.index('price') qty_col = header.index('qty') if depth_has_header: continue exch_timestamp = int(row[timestamp_col]) loc_timestamp = exch_timestamp + feed_latency side = 1 if row[side_col] == 'b' else -1 price = float(row[price_col]) qty = float(row[qty_col]) # Insert DEPTH_EVENT tmp_depth[row_num] = [ DEPTH_EVENT, exch_timestamp, loc_timestamp, side, price, qty ] row_num += 1 tmp_depth = tmp_depth[:row_num] tmp_trades = np.empty((buffer_size, 6), np.float64) row_num = 0 timestamp_col = None side_col = None price_col = None qty_col = None print('Reading %s' % trades_filename) with open(trades_filename, 'r', newline='') as f: reader = csv.reader(f, delimiter=',') for row in reader: if timestamp_col is None: if trades_has_header is None: if row[0] == 'id': trades_has_header = True else: trades_has_header = False if trades_has_header: header = row else: header = [ 'id', 'price', 'qty', 'quote_qty', 'time', 'is_buyer_maker' ] if len(header) != len(row): raise ValueError timestamp_col = header.index('time') side_col = header.index('is_buyer_maker') price_col = header.index('price') qty_col = header.index('qty') if trades_has_header: continue exch_timestamp = int(row[timestamp_col]) loc_timestamp = exch_timestamp + feed_latency side = -1 if row[side_col] else 1 # trade initiator's side price = float(row[price_col]) qty = float(row[qty_col]) # Insert TRADE_EVENT tmp_trades[row_num] = [ TRADE_EVENT, exch_timestamp, loc_timestamp, side, price, qty ] row_num += 1 tmp_trades = tmp_trades[:row_num] # A mingled exchange timestamp is frequently observed on Binance. # But, because the data doesn't have a local timestamp, there's difficulty in preserving the received order while # keeping the local timestamp in sequence. # A simple solution is to sort by the derived local timestamp to resolve the issue. tmp_depth = tmp_depth[tmp_depth[:, 2].argsort()] tmp_trades = tmp_trades[tmp_trades[:, 2].argsort()] print('Merging') data = merge_on_local_timestamp(tmp_depth, tmp_trades) data = correct(data, base_latency=base_latency, method=method) # Validate again. num_corr = validate_data(data) if num_corr < 0: raise ValueError if output_filename is not None: print('Saving to %s' % output_filename) np.savez(output_filename, data=data) return data