Tick数据是什么,要如何获取?
我们日常看到的K线行情数据是基于时间单位的,而tick数据则记录了更细致的维度,即每次价格变化都被记录。
金融交易世界中,获取准确及时的信息至关重要。抓住交易良机的关键在于掌握实时数据。数据更新越快,可发现的赚钱机会就越多。这也是为何在高频交易领域,tick数据备受重视。与传统的行情数据相比,tick数据提供了更细致的市场变化记录,为交易者提供了更全面的视角。
首先,让我们简单了解一下什么是Tick数据。
什么是Tick数据?
我们日常看到的K线行情数据是基于时间单位的,而tick数据则记录了更细致的维度,即每次价格变化都被记录。举例来说:假设一个股票在一分钟内变动了30次价格,那么在一分钟的行情数据中,你只会看到4个价格:
- 开盘价
- 收盘价
- 最高价
- 最低价
因为只有这4项数据,就足够绘制出一个蜡烛图:
O=开盘价,L=最低价,H=最高价,C=收盘价
而另外的26次价格变动,则被忽略了。
Tick数据与此不同,它会提供特定时间内的所有变化,即刚才提到的30次价格变动都会被记录。也就是说,你能够在一分钟内看到股票价格变动了30次。因此,tick数据也被称为高频数据。
高频数据可以帮助我们更好地了解市场行为和微观结构,同时也能够在非常短的时间范围内探索各种交易策略和假设。相反,对于许多应用场景来说,仅采样离散的低频数据并不能提供足够全面的市场分析。基于每日数据的研究和分析很可能会忽略大量的重要信息。
这些本该被忽略的、看似混乱的价格变动其实包含了非常多宝贵的信息。传统的行情数据是每分钟更新一次报价,而tick数据真正做到了实时更新,即每发生一笔交易都记录在内,它的更新时间是不固定的,完全随机的,因为你无法知道下一次交易发生在什么时候。因此,我们可以从交易之间的时间间隔中推测出目前市场的波动性、流动性等市场趋势。
如果你拥有长时间的,比如说5年的tick历史数据库,你就可以用来进行回测。
Tick数据的用途
高频数据已经广泛运用于量化交易的各个环节,其中比较经典的应用场景是回溯测试,即所谓的回测(Backtesting)。
回测是量化交易里非常重要的环节,当需要验证一套交易策略是否有效时,最简单的方法就是将其应用到真实的历史行情中,观察整个策略在其中的收益和最大回撤。回溯测试是基于这样一种理念,即如果我这套策略在过去表现非常好,那么它可能在今天,甚至是未来,都会有非常不错的收益。而tick数据,则能为你构建这种真实的历史环境,用于验证你的交易策略。
Tick数据还被用于量化交易的风险管理。详细的tick数据通过提供对市场流动性、滑点和订单执行质量等信息,来帮助交易员管理风险。Tick数据所提供的碎片化信息是市场微观结构研究员的重要工具。另外,tick数据甚至被用在法律监管行业,金融机构通常需要访问tick数据以满足法规合规和报告的要求。
区分高频数据的质量
与大部分产品一样,tick数据也有质量高低之分,其质量直接影响着后续的应用效果。低质量的tick数据主要表现在数据损坏上,下面是几种典型的数据损坏表现:
数据中断
由于网络中断或系统故障,ticks可能在某段时间内没有被记录下来,导致数据中断。
无效交易
指交易价格为零,甚至是负数的tick记录。比如在数据中发现USDEUR的价格是零,显然是错误的,这种数据需要移除。
重复数据
比如USDEUR在同一时刻出现了多个价格相同的ticks,这可能是因为数据记录不完整或系统错误导致的。
重复时间戳
比如某个货币对在某一时刻实际上没有变化,但由于系统错误添加了多个时间戳,使数据看起来变化非常频繁。这种情况也有可能是人为的。有些数据提供商为了让他们的数据看起来更新非常快,故意给旧的数据添加时间戳,导致ticks不断增加,而价格实际上没有任何变化。
排除人为因素,tick数据损坏的主要原因是由于数据量过大导致的。比如在外汇交易中,一个货币对一天可能有几千笔交易,将所有货币一天的交易都记录下来,数据量会非常庞大,一两周下来可能会积累几百万条数据。此外,信号质量低、信号丢失或信号延迟也可能导致高频数据序列的损坏。
想要清理或检查其中的错误是非常困难的。处理这种量级的数据,Excel会比较吃力,一般的数据提供商都会使用更专业的工具来清理,比如谷歌开发的OpenRefine,或者使用Python的Pandas来编写数据清理脚本。
不同市场的数据差异
外汇市场和股票市场的数据相差比较大。外汇市场属于场外交易,数据难以获取,而股票、商品这种中心化的市场,所有交易都发生在交易所内,他们会负责记录每一笔的交易变化。这种市场的差异也能影响数据的质量。
如何利用Infoway API的数据接口查询tick数据
#1 获取K线:
下面演示如何用Infoway API的接口查询苹果股票的行情数据:
import requests
api_url = 'https://data.infoway.io/stock/batch_kline/1/10/AAPL.US'
# 申请token: www.infoway.io
# 设置请求头
headers = {
'User-Agent': 'Mozilla/5.0',
'Accept': 'application/json',
'apiKey': 'yourApikey'
}
# 发送GET请求
response = requests.get(api_url, headers=headers)
# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")
#2 获取成交报价:
import requests
api_url = 'https://data.infoway.io/stock/batch_trade/AAPL.US'
# 申请token: www.infoway.io
# 设置请求头
headers = {
'User-Agent': 'Mozilla/5.0',
'Accept': 'application/json',
'apiKey': 'yourApikey'
}
# 发送GET请求
response = requests.get(api_url, headers=headers)
# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")
#3 通过websocket订阅获取实时股票行情数据:
我们可以通过WebSocket一次性订阅股票的K线、最新成交、盘口,下面是代码:
import json
import time
import schedule
import threading
import websocket
from loguru import logger
class WebsocketExample:
def __init__(self):
self.session = None
self.ws_url = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey"
self.reconnecting = False
def connect_all(self):
"""建立WebSocket连接并启动自动重连机制"""
try:
self.connect(self.ws_url)
self.start_reconnection(self.ws_url)
except Exception as e:
logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")
def start_reconnection(self, url):
"""启动定时重连检查"""
def check_connection():
if not self.is_connected():
logger.debug("Reconnection attempt...")
self.connect(url)
# 使用线程定期检查连接状态
threading.Thread(target=lambda: schedule.every(10).seconds.do(check_connection), daemon=True).start()
def is_connected(self):
"""检查WebSocket连接状态"""
return self.session and self.session.connected
def connect(self, url):
"""建立WebSocket连接"""
try:
if self.is_connected():
self.session.close()
self.session = websocket.WebSocketApp(
url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# 启动WebSocket连接(非阻塞模式)
threading.Thread(target=self.session.run_forever, daemon=True).start()
except Exception as e:
logger.error(f"Failed to connect to the server: {str(e)}")
def on_open(self, ws):
"""WebSocket连接建立成功后的回调"""
logger.info(f"Connection opened")
try:
# 发送实时成交明细订阅请求
trade_send_obj = {
"code": 10000,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": {"codes": "BTCUSDT"}
}
self.send_message(trade_send_obj)
# 不同请求之间间隔一段时间
time.sleep(5)
# 发送实时盘口数据订阅请求
depth_send_obj = {
"code": 10003,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": {"codes": "BTCUSDT"}
}
self.send_message(depth_send_obj)
# 不同请求之间间隔一段时间
time.sleep(5)
# 发送实时K线数据订阅请求
kline_data = {
"arr": [
{
"type": 1,
"codes": "BTCUSDT"
}
]
}
kline_send_obj = {
"code": 10006,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": kline_data
}
self.send_message(kline_send_obj)
# 启动定时心跳任务
threading.Thread(target=lambda: schedule.every(30).seconds.do(self.ping), daemon=True).start()
except Exception as e:
logger.error(f"Error sending initial messages: {str(e)}")
def on_message(self, ws, message):
"""接收消息的回调"""
try:
logger.info(f"Message received: {message}")
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
def on_close(self, ws, close_status_code, close_msg):
"""连接关闭的回调"""
logger.info(f"Connection closed: {close_status_code} - {close_msg}")
def on_error(self, ws, error):
"""错误处理的回调"""
logger.error(f"WebSocket error: {str(error)}")
def send_message(self, message_obj):
"""发送消息到WebSocket服务器"""
if self.is_connected():
try:
self.session.send(json.dumps(message_obj))
except Exception as e:
logger.error(f"Error sending message: {str(e)}")
else:
logger.warning("Cannot send message: Not connected")
def ping(self):
"""发送心跳包"""
ping_obj = {
"code": 10010,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
}
self.send_message(ping_obj)
# 使用示例
if __name__ == "__main__":
ws_client = WebsocketExample()
ws_client.connect_all()
# 保持主线程运行
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Exiting...")
if ws_client.is_connected():
ws_client.session.close()

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)