
在当今数字化金融时代,个人投资者通过 API 接口实现量化交易已成为可能。本文将详细介绍如何利用 iTick 提供的港美股实时行情 API,结合富途牛牛(Futu OpenAPI)的交易接口,构建一个完整的个人量化交易系统。我们将从系统架构设计开始,逐步深入到具体实现,最后提供完整的 Python 代码和回测策略。
一、系统架构设计
本系统采用模块化设计,各组件之间松耦合,便于单独扩展和维护。以下是完整的系统架构:
二、技术选型与准备
1. 行情数据源 - iTick API
iTick 提供稳定可靠的港美股实时行情数据,具有以下特点:
- 低延迟:毫秒级行情推送
- 全面覆盖:港股、美股、A 股(通过港股通)等全球市场
- 丰富数据类型:tick 数据、分钟线、日线等
- 稳定可靠:99.9%的可用性保证
2. 交易接口 - 富途牛牛 Futu OpenAPI
富途牛牛开放平台提供完善的交易 API:
- 支持港股、美股、A 股(港股通)交易
- 提供模拟交易环境
- 完善的文档和社区支持
- 相对较高的交易执行速度
3. 开发环境准备
# 所需Python库
import requests # 用于API调用
import pandas as pd # 数据处理
import numpy as np # 数值计算
from futu import * # 富途OpenAPI
import matplotlib.pyplot as plt # 可视化
import sqlite3 # 本地数据存储
import time # 时间控制
import threading # 多线程处理
三、系统核心模块实现
1. 行情数据获取模块
class ITickDataFetcher:
def __init__(self):
self.headers = {
"accept": "application/json",
"token": ITICK_API_KEY
}
def get_realtime_data(self,symbol,region):
"""获取实时行情数据"""
url = f"https://api.itick.org/stock/quote?symbol={symbol}®ion={region}"
try:
response = requests.get(url, headers=self.headers)
data = response.json()
return self._parse_data(data)
except Exception as e:
print(f"获取实时数据失败: {str(e)}")
return None
def get_historical_data(self,symbol, region, freq='1'):
"""获取历史数据"""
url = f"https://api.itick.org/stock/kline?symbol={symbol}®ion={region}&kType={freq}"
try:
response = requests.get(url, headers=self.headers)
data = response.json()
df = self._parse_historical_data(data)
except Exception as e:
print(f"获取{symbol}历史数据失败: {str(e)}")
return None
def _parse_data(self, raw_data):
"""解析实时数据"""
# 实现数据解析逻辑
pass
def _parse_historical_data(self, raw_data):
"""解析历史数据"""
# 实现历史数据解析逻辑
pass
2. 数据预处理模块
class DataProcessor:
def __init__(self):
self.conn = sqlite3.connect('quant_trading.db')
def clean_data(self, df):
"""数据清洗"""
# 处理缺失值
df = df.dropna()
# 去除异常值
df = df[(df['v'] > 0) & (df['c'] > 0)]
return df
def calculate_technical_indicators(self, df):
"""计算技术指标"""
# 移动平均线
df['ma5'] = df['c'].rolling(5).mean()
df['ma20'] = df['c'].rolling(20).mean()
# RSI
delta = df['c'].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
# MACD
exp12 = df['c'].ewm(span=12, adjust=False).mean()
exp26 = df['c'].ewm(span=26, adjust=False).mean()
df['macd'] = exp12 - exp26
df['signal'] = df['macd'].ewm(span=9, adjust=False).mean()
return df
def save_to_db(self, df, table_name):
"""保存数据到数据库"""
df.to_sql(table_name, self.conn, if_exists='append', index=False)
def load_from_db(self, table_name):
"""从数据库加载数据"""
return pd.read_sql(f"SELECT * FROM {table_name}", self.conn)
3. 策略引擎
class DualMovingAverageStrategy:
def __init__(self, fast_window=5, slow_window=20):
self.fast_window = fast_window
self.slow_window = slow_window
self.position = 0 # 0:无持仓, 1:多头, -1:空头
self.signals = []
def generate_signals(self, df):
"""生成交易信号"""
signals = pd.DataFrame(index=df.index)
signals['price'] = df['c']
signals['fast_ma'] = df['c'].rolling(self.fast_window).mean()
signals['slow_ma'] = df['c'].rolling(self.slow_window).mean()
# 生成交易信号
signals['signal'] = 0
signals['signal'][self.fast_window:] = np.where(
signals['fast_ma'][self.fast_window:] > signals['slow_ma'][self.fast_window:], 1, 0)
# 计算实际的买卖信号
signals['positions'] = signals['signal'].diff()
return signals
def on_tick(self, tick_data):
"""实时行情处理"""
# 实现实时行情处理逻辑
pass
4. 交易执行模块
class FutuTrader:
def __init__(self, host='127.0.0.1', port=11111):
self.quote_ctx = OpenQuoteContext(host=host, port=port)
self.trade_ctx = OpenTradeContext(host=host, port=port)
def get_account_info(self):
"""获取账户信息"""
ret, data = self.trade_ctx.accinfo_query()
if ret == RET_OK:
return data
else:
print(f"获取账户信息失败: {data}")
return None
def place_order(self, symbol, price, quantity, trd_side, order_type=OrderType.NORMAL):
"""下单"""
ret, data = self.trade_ctx.place_order(
price=price,
qty=quantity,
code=symbol,
trd_side=trd_side,
order_type=order_type,
trd_env=TrdEnv.SIMULATE # 模拟交易,实盘改为TrdEnv.REAL
)
if ret == RET_OK:
print(f"下单成功: {data}")
return data
else:
print(f"下单失败: {data}")
return None
def close_all_positions(self):
"""平掉所有仓位"""
ret, data = self.trade_ctx.position_list_query()
if ret == RET_OK:
for _, row in data.iterrows():
if row['qty'] > 0:
self.place_order(
row['code'],
row['cost_price'],
row['qty'],
TrdSide.SELL
)
else:
print(f"获取持仓失败: {data}")
def subscribe(self, symbols, subtype_list=[SubType.QUOTE]):
"""订阅行情"""
ret, data = self.quote_ctx.subscribe(symbols, subtype_list)
if ret != RET_OK:
print(f"订阅行情失败: {data}")
def unsubscribe(self, symbols, subtype_list=[SubType.QUOTE]):
"""取消订阅"""
ret, data = self.quote_ctx.unsubscribe(symbols, subtype_list)
if ret != RET_OK:
print(f"取消订阅失败: {data}")
def __del__(self):
self.quote_ctx.close()
self.trade_ctx.close()
四、回测系统实现
class BacktestEngine:
def __init__(self, initial_capital=100000):
self.initial_capital = initial_capital
self.results = None
def run_backtest(self, strategy, data):
"""运行回测"""
signals = strategy.generate_signals(data)
# 初始化投资组合
portfolio = pd.DataFrame(index=signals.index)
portfolio['signal'] = signals['positions']
portfolio['price'] = signals['price']
portfolio['cash'] = self.initial_capital
portfolio['shares'] = 0
portfolio['total'] = self.initial_capital
# 执行交易
for i in range(1, len(portfolio)):
# 买入信号
if portfolio['signal'][i] == 1:
shares_to_buy = portfolio['cash'][i-1] // portfolio['price'][i]
portfolio['shares'][i] = portfolio['shares'][i-1] + shares_to_buy
portfolio['cash'][i] = portfolio['cash'][i-1] - shares_to_buy * portfolio['price'][i]
# 卖出信号
elif portfolio['signal'][i] == -1:
portfolio['cash'][i] = portfolio['cash'][i-1] + portfolio['shares'][i-1] * portfolio['price'][i]
portfolio['shares'][i] = 0
# 无信号
else:
portfolio['shares'][i] = portfolio['shares'][i-1]
portfolio['cash'][i] = portfolio['cash'][i-1]
# 更新总资产
portfolio['total'][i] = portfolio['cash'][i] + portfolio['shares'][i] * portfolio['price'][i]
self.results = portfolio
return portfolio
def analyze_results(self):
"""分析回测结果"""
if self.results is None:
print("请先运行回测")
return None
returns = self.results['total'].pct_change()
cumulative_returns = (returns + 1).cumprod()
# 计算年化收益率
total_days = len(self.results)
annualized_return = (cumulative_returns.iloc[-1] ** (252/total_days)) - 1
# 计算最大回撤
max_drawdown = (self.results['total'].cummax() - self.results['total']).max()
# 计算夏普比率
sharpe_ratio = returns.mean() / returns.std() * np.sqrt(252)
# 可视化
plt.figure(figsize=(12, 6))
plt.plot(cumulative_returns)
plt.title("Cumulative Returns")
plt.xlabel("Date")
plt.ylabel("Returns")
plt.grid(True)
plt.show()
return {
'annualized_return': annualized_return,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio,
'final_value': self.results['total'].iloc[-1]
}
五、总结
本文详细介绍了如何利用iTick港美股实时行情API和富途牛牛交易接口构建个人量化交易系统。系统包含以下核心组件:
- 行情数据获取模块:从iTick API获取实时和历史数据
- 数据预处理模块:清洗数据并计算技术指标
- 策略引擎:实现双均线交易策略
- 交易执行模块:通过富途OpenAPI执行交易
- 回测系统:评估策略历史表现