金融交易世界中,获取准确及时的信息至关重要。抓住交易良机的关键在于掌握实时数据。数据更新越快,可发现的赚钱机会就越多。这也是为何在高频交易领域,tick数据备受重视。与传统的行情数据相比,tick数据提供了更细致的市场变化记录,为交易者提供了更全面的视角。

首先,让我们简单了解一下什么是Tick数据。

什么是Tick数据?

我们日常看到的K线行情数据是基于时间单位的,而tick数据则记录了更细致的维度,即每次价格变化都被记录。举例来说:假设一个股票在一分钟内变动了30次价格,那么在一分钟的行情数据中,你只会看到4个价格:

  1. 开盘价
  2. 收盘价
  3. 最高价
  4. 最低价

因为只有这4项数据,就足够绘制出一个蜡烛图:

O=开盘价,L=最低价,H=最高价,C=收盘价

而另外的26次价格变动,则被忽略了。

Tick数据与此不同,它会提供特定时间内的所有变化,即刚才提到的30次价格变动都会被记录。也就是说,你能够在一分钟内看到股票价格变动了30次。因此,tick数据也被称为高频数据。

高频数据可以帮助我们更好地了解市场行为和微观结构,同时也能够在非常短的时间范围内探索各种交易策略和假设。相反,对于许多应用场景来说,仅采样离散的低频数据并不能提供足够全面的市场分析。基于每日数据的研究和分析很可能会忽略大量的重要信息。

这些本该被忽略的、看似混乱的价格变动其实包含了非常多宝贵的信息。传统的行情数据是每分钟更新一次报价,而tick数据真正做到了实时更新,即每发生一笔交易都记录在内,它的更新时间是不固定的,完全随机的,因为你无法知道下一次交易发生在什么时候。因此,我们可以从交易之间的时间间隔中推测出目前市场的波动性、流动性等市场趋势。

如果你拥有长时间的,比如说5年的tick历史数据库,你就可以用来进行回测。

Tick数据的用途

高频数据已经广泛运用于量化交易的各个环节,其中比较经典的应用场景是回溯测试,即所谓的回测(Backtesting)。

回测是量化交易里非常重要的环节,当需要验证一套交易策略是否有效时,最简单的方法就是将其应用到真实的历史行情中,观察整个策略在其中的收益和最大回撤。回溯测试是基于这样一种理念,即如果我这套策略在过去表现非常好,那么它可能在今天,甚至是未来,都会有非常不错的收益。而tick数据,则能为你构建这种真实的历史环境,用于验证你的交易策略。

Tick数据还被用于量化交易的风险管理。详细的tick数据通过提供对市场流动性、滑点和订单执行质量等信息,来帮助交易员管理风险。Tick数据所提供的碎片化信息是市场微观结构研究员的重要工具。另外,tick数据甚至被用在法律监管行业,金融机构通常需要访问tick数据以满足法规合规和报告的要求。

区分高频数据的质量

与大部分产品一样,tick数据也有质量高低之分,其质量直接影响着后续的应用效果。低质量的tick数据主要表现在数据损坏上,下面是几种典型的数据损坏表现:

数据中断

由于网络中断或系统故障,ticks可能在某段时间内没有被记录下来,导致数据中断。

无效交易

指交易价格为零,甚至是负数的tick记录。比如在数据中发现USDEUR的价格是零,显然是错误的,这种数据需要移除。

重复数据

比如USDEUR在同一时刻出现了多个价格相同的ticks,这可能是因为数据记录不完整或系统错误导致的。

重复时间戳

比如某个货币对在某一时刻实际上没有变化,但由于系统错误添加了多个时间戳,使数据看起来变化非常频繁。这种情况也有可能是人为的。有些数据提供商为了让他们的数据看起来更新非常快,故意给旧的数据添加时间戳,导致ticks不断增加,而价格实际上没有任何变化。

排除人为因素,tick数据损坏的主要原因是由于数据量过大导致的。比如在外汇交易中,一个货币对一天可能有几千笔交易,将所有货币一天的交易都记录下来,数据量会非常庞大,一两周下来可能会积累几百万条数据。此外,信号质量低、信号丢失或信号延迟也可能导致高频数据序列的损坏。

想要清理或检查其中的错误是非常困难的。处理这种量级的数据,Excel会比较吃力,一般的数据提供商都会使用更专业的工具来清理,比如谷歌开发的OpenRefine,或者使用Python的Pandas来编写数据清理脚本。

不同市场的数据差异

外汇市场和股票市场的数据相差比较大。外汇市场属于场外交易,数据难以获取,而股票、商品这种中心化的市场,所有交易都发生在交易所内,他们会负责记录每一笔的交易变化。这种市场的差异也能影响数据的质量。

如何利用Infoway API的数据接口查询tick数据

#1 获取K线:

下面演示如何用Infoway API的接口查询苹果股票的行情数据:

import requests

api_url = 'https://data.infoway.io/stock/batch_kline/1/10/AAPL.US'

# 申请token: www.infoway.io
# 设置请求头
headers = {
    'User-Agent': 'Mozilla/5.0',
    'Accept': 'application/json',
    'apiKey': 'yourApikey'
}

# 发送GET请求
response = requests.get(api_url, headers=headers)

# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")

#2 获取成交报价:

import requests

api_url = 'https://data.infoway.io/stock/batch_trade/AAPL.US'

# 申请token: www.infoway.io
# 设置请求头
headers = {
    'User-Agent': 'Mozilla/5.0',
    'Accept': 'application/json',
    'apiKey': 'yourApikey'
}

# 发送GET请求
response = requests.get(api_url, headers=headers)

# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")

#3 通过websocket订阅获取实时股票行情数据:

我们可以通过WebSocket一次性订阅股票的K线、最新成交、盘口,下面是代码:

import json
import time
import schedule
import threading
import websocket
from loguru import logger

class WebsocketExample:
    def __init__(self):
        self.session = None
        self.ws_url = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey"
        self.reconnecting = False

    def connect_all(self):
        """建立WebSocket连接并启动自动重连机制"""
        try:
            self.connect(self.ws_url)
            self.start_reconnection(self.ws_url)
        except Exception as e:
            logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")

    def start_reconnection(self, url):
        """启动定时重连检查"""
        def check_connection():
            if not self.is_connected():
                logger.debug("Reconnection attempt...")
                self.connect(url)
        
        # 使用线程定期检查连接状态
        threading.Thread(target=lambda: schedule.every(10).seconds.do(check_connection), daemon=True).start()

    def is_connected(self):
        """检查WebSocket连接状态"""
        return self.session and self.session.connected

    def connect(self, url):
        """建立WebSocket连接"""
        try:
            if self.is_connected():
                self.session.close()
            
            self.session = websocket.WebSocketApp(
                url,
                on_open=self.on_open,
                on_message=self.on_message,
                on_error=self.on_error,
                on_close=self.on_close
            )
            
            # 启动WebSocket连接(非阻塞模式)
            threading.Thread(target=self.session.run_forever, daemon=True).start()
        except Exception as e:
            logger.error(f"Failed to connect to the server: {str(e)}")

    def on_open(self, ws):
        """WebSocket连接建立成功后的回调"""
        logger.info(f"Connection opened")
        
        try:
            # 发送实时成交明细订阅请求
            trade_send_obj = {
                "code": 10000,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": {"codes": "BTCUSDT"}
            }
            self.send_message(trade_send_obj)
            
            # 不同请求之间间隔一段时间
            time.sleep(5)
            
            # 发送实时盘口数据订阅请求
            depth_send_obj = {
                "code": 10003,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": {"codes": "BTCUSDT"}
            }
            self.send_message(depth_send_obj)
            
            # 不同请求之间间隔一段时间
            time.sleep(5)
            
            # 发送实时K线数据订阅请求
            kline_data = {
                "arr": [
                    {
                        "type": 1,
                        "codes": "BTCUSDT"
                    }
                ]
            }
            kline_send_obj = {
                "code": 10006,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": kline_data
            }
            self.send_message(kline_send_obj)
            
            # 启动定时心跳任务
            threading.Thread(target=lambda: schedule.every(30).seconds.do(self.ping), daemon=True).start()
            
        except Exception as e:
            logger.error(f"Error sending initial messages: {str(e)}")

    def on_message(self, ws, message):
        """接收消息的回调"""
        try:
            logger.info(f"Message received: {message}")
        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")

    def on_close(self, ws, close_status_code, close_msg):
        """连接关闭的回调"""
        logger.info(f"Connection closed: {close_status_code} - {close_msg}")

    def on_error(self, ws, error):
        """错误处理的回调"""
        logger.error(f"WebSocket error: {str(error)}")

    def send_message(self, message_obj):
        """发送消息到WebSocket服务器"""
        if self.is_connected():
            try:
                self.session.send(json.dumps(message_obj))
            except Exception as e:
                logger.error(f"Error sending message: {str(e)}")
        else:
            logger.warning("Cannot send message: Not connected")

    def ping(self):
        """发送心跳包"""
        ping_obj = {
            "code": 10010,
            "trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
        }
        self.send_message(ping_obj)

# 使用示例
if __name__ == "__main__":
    ws_client = WebsocketExample()
    ws_client.connect_all()
    
    # 保持主线程运行
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Exiting...")
        if ws_client.is_connected():
            ws_client.session.close()

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐