免责声明:本文基于个人使用体验,与任何厂商无商业关系。内容仅供技术交流参考,不构成投资建议。


一、前言

量化交易会产生大量数据:行情数据、交易记录、策略状态、回测结果等。如何高效存储和查询这些数据?2026年了,数据库设计在量化交易中越来越重要。

今天分享一下我在量化交易数据库设计方面的实践经验。


二、数据库需求分析

1. 数据类型

数据类型 特点 存储需求
行情数据 高频、量大 时间序列数据库
交易记录 结构化、查询多 关系型数据库
策略状态 实时更新 内存数据库+持久化
回测结果 分析为主 关系型数据库

2. 查询需求

# 常见查询需求
queries = [
    "查询某品种的历史K线",
    "查询某时间段的交易记录",
    "查询策略的实时状态",
    "查询回测结果对比",
    "统计分析交易表现",
]

三、数据库选择

1. 时间序列数据库

适用场景:行情数据存储

选择

数据库 特点 适用
InfluxDB 专为时序数据设计 高频行情数据
TimescaleDB PostgreSQL扩展 需要SQL查询
ClickHouse 列式存储 大数据分析

示例

# 使用InfluxDB存储行情数据
from influxdb import InfluxDBClient

client = InfluxDBClient(host='localhost', port=8086, database='trading')

def save_quote(symbol, quote):
    """保存行情数据"""
    json_body = [{
        "measurement": "quotes",
        "tags": {
            "symbol": symbol
        },
        "time": quote.datetime,
        "fields": {
            "open": quote.open,
            "high": quote.high,
            "low": quote.low,
            "close": quote.close,
            "volume": quote.volume,
        }
    }]
    client.write_points(json_body)

# 查询
def query_quotes(symbol, start_time, end_time):
    """查询行情数据"""
    query = f'''
    SELECT * FROM quotes
    WHERE symbol = '{symbol}'
    AND time >= '{start_time}'
    AND time <= '{end_time}'
    '''
    result = client.query(query)
    return result

2. 关系型数据库

适用场景:交易记录、策略配置

选择

数据库 特点 适用
MySQL 成熟稳定 中小规模
PostgreSQL 功能强大 复杂查询
SQLite 轻量级 单机应用

示例

import sqlite3
import pandas as pd

# 创建数据库
conn = sqlite3.connect('trading.db')

# 创建表
def create_tables(conn):
    """创建表"""
    cursor = conn.cursor()
    
    # 交易记录表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS trades (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        timestamp TEXT NOT NULL,
        symbol TEXT NOT NULL,
        direction TEXT NOT NULL,
        volume INTEGER NOT NULL,
        price REAL NOT NULL,
        pnl REAL,
        order_id TEXT,
        strategy_name TEXT
    )
    ''')
    
    # 策略配置表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS strategies (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL UNIQUE,
        params TEXT,
        status TEXT,
        created_at TEXT
    )
    ''')
    
    conn.commit()

create_tables(conn)

3. 内存数据库

适用场景:实时状态、缓存

选择

数据库 特点 适用
Redis 高性能 缓存、实时状态
Memcached 简单 简单缓存

四、数据库设计实践

1. 行情数据表设计

# K线数据表
CREATE TABLE klines (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    symbol VARCHAR(20) NOT NULL,
    datetime DATETIME NOT NULL,
    period INT NOT NULL,  -- 周期(秒)
    open DECIMAL(10,2),
    high DECIMAL(10,2),
    low DECIMAL(10,2),
    close DECIMAL(10,2),
    volume BIGINT,
    open_interest BIGINT,
    UNIQUE KEY uk_symbol_datetime_period (symbol, datetime, period),
    INDEX idx_symbol_datetime (symbol, datetime)
);

# Tick数据表
CREATE TABLE ticks (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    symbol VARCHAR(20) NOT NULL,
    datetime DATETIME(3) NOT NULL,  -- 毫秒精度
    price DECIMAL(10,2),
    volume INT,
    bid_price1 DECIMAL(10,2),
    ask_price1 DECIMAL(10,2),
    bid_volume1 INT,
    ask_volume1 INT,
    UNIQUE KEY uk_symbol_datetime (symbol, datetime),
    INDEX idx_symbol_datetime (symbol, datetime)
);

2. 交易记录表设计

# 交易记录表
CREATE TABLE trades (
    id BIGINT PRIMARY KEY AUTOINCREMENT,
    timestamp DATETIME NOT NULL,
    symbol VARCHAR(20) NOT NULL,
    direction VARCHAR(10) NOT NULL,  -- BUY/SELL
    offset VARCHAR(10) NOT NULL,      -- OPEN/CLOSE
    volume INT NOT NULL,
    price DECIMAL(10,2) NOT NULL,
    commission DECIMAL(10,2),         -- 手续费
    slippage DECIMAL(10,2),          -- 滑点
    pnl DECIMAL(10,2),               -- 盈亏
    order_id VARCHAR(50),
    strategy_name VARCHAR(50),
    signal_info TEXT,                -- 信号信息(JSON)
    INDEX idx_timestamp (timestamp),
    INDEX idx_symbol (symbol),
    INDEX idx_strategy (strategy_name)
);

3. 策略状态表设计

# 策略状态表
CREATE TABLE strategy_states (
    id BIGINT PRIMARY KEY AUTOINCREMENT,
    strategy_name VARCHAR(50) NOT NULL,
    timestamp DATETIME NOT NULL,
    equity DECIMAL(12,2),            -- 权益
    position_info TEXT,              -- 持仓信息(JSON)
    signal_info TEXT,                -- 信号信息(JSON)
    error_info TEXT,                 -- 错误信息
    INDEX idx_strategy_timestamp (strategy_name, timestamp)
);

五、数据操作实践

1. 数据写入

def save_trade(conn, trade):
    """保存交易记录"""
    cursor = conn.cursor()
    cursor.execute('''
    INSERT INTO trades 
    (timestamp, symbol, direction, offset, volume, price, pnl, order_id, strategy_name)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', (
        trade['timestamp'],
        trade['symbol'],
        trade['direction'],
        trade['offset'],
        trade['volume'],
        trade['price'],
        trade.get('pnl', 0),
        trade.get('order_id'),
        trade.get('strategy_name')
    ))
    conn.commit()

# 批量写入(提高性能)
def save_trades_batch(conn, trades):
    """批量保存交易记录"""
    cursor = conn.cursor()
    cursor.executemany('''
    INSERT INTO trades 
    (timestamp, symbol, direction, offset, volume, price, pnl, order_id, strategy_name)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', [
        (
            t['timestamp'], t['symbol'], t['direction'], t['offset'],
            t['volume'], t['price'], t.get('pnl', 0),
            t.get('order_id'), t.get('strategy_name')
        )
        for t in trades
    ])
    conn.commit()

2. 数据查询

def query_trades(conn, symbol=None, start_date=None, end_date=None, strategy=None):
    """查询交易记录"""
    query = "SELECT * FROM trades WHERE 1=1"
    params = []
    
    if symbol:
        query += " AND symbol = ?"
        params.append(symbol)
    
    if start_date:
        query += " AND timestamp >= ?"
        params.append(start_date)
    
    if end_date:
        query += " AND timestamp <= ?"
        params.append(end_date)
    
    if strategy:
        query += " AND strategy_name = ?"
        params.append(strategy)
    
    query += " ORDER BY timestamp"
    
    df = pd.read_sql_query(query, conn, params=params)
    return df

# 使用
trades = query_trades(
    conn,
    symbol='SHFE.rb2505',
    start_date='2025-01-01',
    end_date='2025-01-31'
)

3. 数据分析

def analyze_trades(conn, strategy_name=None):
    """分析交易记录"""
    query = "SELECT * FROM trades"
    if strategy_name:
        query += f" WHERE strategy_name = '{strategy_name}'"
    
    df = pd.read_sql_query(query, conn)
    
    if len(df) == 0:
        return None
    
    analysis = {
        'total_trades': len(df),
        'win_rate': (df['pnl'] > 0).sum() / len(df),
        'total_pnl': df['pnl'].sum(),
        'avg_profit': df[df['pnl'] > 0]['pnl'].mean() if (df['pnl'] > 0).any() else 0,
        'avg_loss': df[df['pnl'] < 0]['pnl'].mean() if (df['pnl'] < 0).any() else 0,
        'profit_loss_ratio': abs(df[df['pnl'] > 0]['pnl'].mean() / df[df['pnl'] < 0]['pnl'].mean()) if (df['pnl'] < 0).any() else 0,
    }
    
    return analysis

# 使用
analysis = analyze_trades(conn, strategy_name='ma_cross')
print(analysis)

六、性能优化

1. 索引优化

# 为常用查询字段创建索引
CREATE INDEX idx_trades_symbol_timestamp ON trades(symbol, timestamp);
CREATE INDEX idx_trades_strategy_timestamp ON trades(strategy_name, timestamp);
CREATE INDEX idx_klines_symbol_datetime ON klines(symbol, datetime);

2. 分区表

# 按时间分区(MySQL 5.7+)
CREATE TABLE trades_2025_01 PARTITION OF trades
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE trades_2025_02 PARTITION OF trades
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

3. 数据归档

def archive_old_data(conn, table_name, archive_date):
    """归档旧数据"""
    # 导出旧数据
    query = f"SELECT * FROM {table_name} WHERE timestamp < ?"
    old_data = pd.read_sql_query(query, conn, params=[archive_date])
    
    # 保存到归档文件
    old_data.to_csv(f"archive_{table_name}_{archive_date}.csv", index=False)
    
    # 删除旧数据
    cursor = conn.cursor()
    cursor.execute(f"DELETE FROM {table_name} WHERE timestamp < ?", [archive_date])
    conn.commit()

七、不同工具的数据库支持

工具 数据库支持 特点
TqSdk 需自己实现 灵活,可自定义
VnPy 有数据库模块 内置支持
掘金量化 平台数据 在线存储

八、我的数据库设计经验

作为一个从业二十年的期货量化交易者,分享几点数据库设计经验:

1. 数据库选择

我的选择:

  • 行情数据:使用InfluxDB(时序数据库)
  • 交易记录:使用MySQL(关系型数据库)
  • 实时状态:使用Redis(内存数据库)

2. 表设计

我的设计原则:

  • 按业务划分表
  • 合理使用索引
  • 考虑查询性能

3. 数据管理

我的管理方法:

  • 定期归档旧数据
  • 备份重要数据
  • 监控数据库性能

我目前使用TqSdk做交易,自己设计数据库。虽然多写一些代码,但更灵活,可以完全控制数据结构。

这只是我个人的经验,每个人需求不同,建议根据自己的情况设计。


九、总结

2026年期货量化交易数据库设计要点:

  1. 数据库选择:根据数据类型选择合适数据库
  2. 表设计:合理设计表结构,考虑查询需求
  3. 性能优化:索引、分区、归档
  4. 数据管理:定期备份、归档、监控

好的数据库设计是量化交易的重要基础,能帮助高效存储和查询数据。

本文仅作为技术介绍,不代表对任何工具的推荐。实际使用请自行评估。


声明:本文基于个人学习经验整理,仅供技术交流参考,不构成任何投资建议。

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐