2026年期货量化交易数据库设计_数据存储与查询实践
本文介绍了量化交易日志系统的重要性及设计方法。日志系统对问题定位和策略分析至关重要,能帮助快速诊断交易异常、评估策略表现。文章详细讲解了日志系统的三个关键设计要素:1) 合理设置日志级别(DEBUG/INFO/WARNING等);2) 按功能分类管理日志(信号、订单、持仓等);3) 采用RotatingFileHandler实现日志轮转存储(10MB/30天)。此外,还展示了交易记录的JSON结构
·
免责声明:本文基于个人使用体验,与任何厂商无商业关系。内容仅供技术交流参考,不构成投资建议。
一、前言
量化交易会产生大量数据:行情数据、交易记录、策略状态、回测结果等。如何高效存储和查询这些数据?2026年了,数据库设计在量化交易中越来越重要。
今天分享一下我在量化交易数据库设计方面的实践经验。
二、数据库需求分析
1. 数据类型
| 数据类型 | 特点 | 存储需求 |
|---|---|---|
| 行情数据 | 高频、量大 | 时间序列数据库 |
| 交易记录 | 结构化、查询多 | 关系型数据库 |
| 策略状态 | 实时更新 | 内存数据库+持久化 |
| 回测结果 | 分析为主 | 关系型数据库 |
2. 查询需求
# 常见查询需求
queries = [
"查询某品种的历史K线",
"查询某时间段的交易记录",
"查询策略的实时状态",
"查询回测结果对比",
"统计分析交易表现",
]
三、数据库选择
1. 时间序列数据库
适用场景:行情数据存储
选择:
| 数据库 | 特点 | 适用 |
|---|---|---|
| InfluxDB | 专为时序数据设计 | 高频行情数据 |
| TimescaleDB | PostgreSQL扩展 | 需要SQL查询 |
| ClickHouse | 列式存储 | 大数据分析 |
示例:
# 使用InfluxDB存储行情数据
from influxdb import InfluxDBClient
client = InfluxDBClient(host='localhost', port=8086, database='trading')
def save_quote(symbol, quote):
"""保存行情数据"""
json_body = [{
"measurement": "quotes",
"tags": {
"symbol": symbol
},
"time": quote.datetime,
"fields": {
"open": quote.open,
"high": quote.high,
"low": quote.low,
"close": quote.close,
"volume": quote.volume,
}
}]
client.write_points(json_body)
# 查询
def query_quotes(symbol, start_time, end_time):
"""查询行情数据"""
query = f'''
SELECT * FROM quotes
WHERE symbol = '{symbol}'
AND time >= '{start_time}'
AND time <= '{end_time}'
'''
result = client.query(query)
return result
2. 关系型数据库
适用场景:交易记录、策略配置
选择:
| 数据库 | 特点 | 适用 |
|---|---|---|
| MySQL | 成熟稳定 | 中小规模 |
| PostgreSQL | 功能强大 | 复杂查询 |
| SQLite | 轻量级 | 单机应用 |
示例:
import sqlite3
import pandas as pd
# 创建数据库
conn = sqlite3.connect('trading.db')
# 创建表
def create_tables(conn):
"""创建表"""
cursor = conn.cursor()
# 交易记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
symbol TEXT NOT NULL,
direction TEXT NOT NULL,
volume INTEGER NOT NULL,
price REAL NOT NULL,
pnl REAL,
order_id TEXT,
strategy_name TEXT
)
''')
# 策略配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS strategies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
params TEXT,
status TEXT,
created_at TEXT
)
''')
conn.commit()
create_tables(conn)
3. 内存数据库
适用场景:实时状态、缓存
选择:
| 数据库 | 特点 | 适用 |
|---|---|---|
| Redis | 高性能 | 缓存、实时状态 |
| Memcached | 简单 | 简单缓存 |
四、数据库设计实践
1. 行情数据表设计
# K线数据表
CREATE TABLE klines (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
symbol VARCHAR(20) NOT NULL,
datetime DATETIME NOT NULL,
period INT NOT NULL, -- 周期(秒)
open DECIMAL(10,2),
high DECIMAL(10,2),
low DECIMAL(10,2),
close DECIMAL(10,2),
volume BIGINT,
open_interest BIGINT,
UNIQUE KEY uk_symbol_datetime_period (symbol, datetime, period),
INDEX idx_symbol_datetime (symbol, datetime)
);
# Tick数据表
CREATE TABLE ticks (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
symbol VARCHAR(20) NOT NULL,
datetime DATETIME(3) NOT NULL, -- 毫秒精度
price DECIMAL(10,2),
volume INT,
bid_price1 DECIMAL(10,2),
ask_price1 DECIMAL(10,2),
bid_volume1 INT,
ask_volume1 INT,
UNIQUE KEY uk_symbol_datetime (symbol, datetime),
INDEX idx_symbol_datetime (symbol, datetime)
);
2. 交易记录表设计
# 交易记录表
CREATE TABLE trades (
id BIGINT PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
symbol VARCHAR(20) NOT NULL,
direction VARCHAR(10) NOT NULL, -- BUY/SELL
offset VARCHAR(10) NOT NULL, -- OPEN/CLOSE
volume INT NOT NULL,
price DECIMAL(10,2) NOT NULL,
commission DECIMAL(10,2), -- 手续费
slippage DECIMAL(10,2), -- 滑点
pnl DECIMAL(10,2), -- 盈亏
order_id VARCHAR(50),
strategy_name VARCHAR(50),
signal_info TEXT, -- 信号信息(JSON)
INDEX idx_timestamp (timestamp),
INDEX idx_symbol (symbol),
INDEX idx_strategy (strategy_name)
);
3. 策略状态表设计
# 策略状态表
CREATE TABLE strategy_states (
id BIGINT PRIMARY KEY AUTOINCREMENT,
strategy_name VARCHAR(50) NOT NULL,
timestamp DATETIME NOT NULL,
equity DECIMAL(12,2), -- 权益
position_info TEXT, -- 持仓信息(JSON)
signal_info TEXT, -- 信号信息(JSON)
error_info TEXT, -- 错误信息
INDEX idx_strategy_timestamp (strategy_name, timestamp)
);
五、数据操作实践
1. 数据写入
def save_trade(conn, trade):
"""保存交易记录"""
cursor = conn.cursor()
cursor.execute('''
INSERT INTO trades
(timestamp, symbol, direction, offset, volume, price, pnl, order_id, strategy_name)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
trade['timestamp'],
trade['symbol'],
trade['direction'],
trade['offset'],
trade['volume'],
trade['price'],
trade.get('pnl', 0),
trade.get('order_id'),
trade.get('strategy_name')
))
conn.commit()
# 批量写入(提高性能)
def save_trades_batch(conn, trades):
"""批量保存交易记录"""
cursor = conn.cursor()
cursor.executemany('''
INSERT INTO trades
(timestamp, symbol, direction, offset, volume, price, pnl, order_id, strategy_name)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', [
(
t['timestamp'], t['symbol'], t['direction'], t['offset'],
t['volume'], t['price'], t.get('pnl', 0),
t.get('order_id'), t.get('strategy_name')
)
for t in trades
])
conn.commit()
2. 数据查询
def query_trades(conn, symbol=None, start_date=None, end_date=None, strategy=None):
"""查询交易记录"""
query = "SELECT * FROM trades WHERE 1=1"
params = []
if symbol:
query += " AND symbol = ?"
params.append(symbol)
if start_date:
query += " AND timestamp >= ?"
params.append(start_date)
if end_date:
query += " AND timestamp <= ?"
params.append(end_date)
if strategy:
query += " AND strategy_name = ?"
params.append(strategy)
query += " ORDER BY timestamp"
df = pd.read_sql_query(query, conn, params=params)
return df
# 使用
trades = query_trades(
conn,
symbol='SHFE.rb2505',
start_date='2025-01-01',
end_date='2025-01-31'
)
3. 数据分析
def analyze_trades(conn, strategy_name=None):
"""分析交易记录"""
query = "SELECT * FROM trades"
if strategy_name:
query += f" WHERE strategy_name = '{strategy_name}'"
df = pd.read_sql_query(query, conn)
if len(df) == 0:
return None
analysis = {
'total_trades': len(df),
'win_rate': (df['pnl'] > 0).sum() / len(df),
'total_pnl': df['pnl'].sum(),
'avg_profit': df[df['pnl'] > 0]['pnl'].mean() if (df['pnl'] > 0).any() else 0,
'avg_loss': df[df['pnl'] < 0]['pnl'].mean() if (df['pnl'] < 0).any() else 0,
'profit_loss_ratio': abs(df[df['pnl'] > 0]['pnl'].mean() / df[df['pnl'] < 0]['pnl'].mean()) if (df['pnl'] < 0).any() else 0,
}
return analysis
# 使用
analysis = analyze_trades(conn, strategy_name='ma_cross')
print(analysis)
六、性能优化
1. 索引优化
# 为常用查询字段创建索引
CREATE INDEX idx_trades_symbol_timestamp ON trades(symbol, timestamp);
CREATE INDEX idx_trades_strategy_timestamp ON trades(strategy_name, timestamp);
CREATE INDEX idx_klines_symbol_datetime ON klines(symbol, datetime);
2. 分区表
# 按时间分区(MySQL 5.7+)
CREATE TABLE trades_2025_01 PARTITION OF trades
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE trades_2025_02 PARTITION OF trades
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
3. 数据归档
def archive_old_data(conn, table_name, archive_date):
"""归档旧数据"""
# 导出旧数据
query = f"SELECT * FROM {table_name} WHERE timestamp < ?"
old_data = pd.read_sql_query(query, conn, params=[archive_date])
# 保存到归档文件
old_data.to_csv(f"archive_{table_name}_{archive_date}.csv", index=False)
# 删除旧数据
cursor = conn.cursor()
cursor.execute(f"DELETE FROM {table_name} WHERE timestamp < ?", [archive_date])
conn.commit()
七、不同工具的数据库支持
| 工具 | 数据库支持 | 特点 |
|---|---|---|
| TqSdk | 需自己实现 | 灵活,可自定义 |
| VnPy | 有数据库模块 | 内置支持 |
| 掘金量化 | 平台数据 | 在线存储 |
八、我的数据库设计经验
作为一个从业二十年的期货量化交易者,分享几点数据库设计经验:
1. 数据库选择
我的选择:
- 行情数据:使用InfluxDB(时序数据库)
- 交易记录:使用MySQL(关系型数据库)
- 实时状态:使用Redis(内存数据库)
2. 表设计
我的设计原则:
- 按业务划分表
- 合理使用索引
- 考虑查询性能
3. 数据管理
我的管理方法:
- 定期归档旧数据
- 备份重要数据
- 监控数据库性能
我目前使用TqSdk做交易,自己设计数据库。虽然多写一些代码,但更灵活,可以完全控制数据结构。
这只是我个人的经验,每个人需求不同,建议根据自己的情况设计。
九、总结
2026年期货量化交易数据库设计要点:
- 数据库选择:根据数据类型选择合适数据库
- 表设计:合理设计表结构,考虑查询需求
- 性能优化:索引、分区、归档
- 数据管理:定期备份、归档、监控
好的数据库设计是量化交易的重要基础,能帮助高效存储和查询数据。
本文仅作为技术介绍,不代表对任何工具的推荐。实际使用请自行评估。
声明:本文基于个人学习经验整理,仅供技术交流参考,不构成任何投资建议。
更多推荐

所有评论(0)