时序单因子检验

This commit is contained in:
heshuang 2025-05-08 02:19:27 +08:00
parent 1168ebe9bb
commit 8874077570
6 changed files with 619 additions and 0 deletions

1
.gitignore vendored
View File

@ -268,6 +268,7 @@ Backup of *.doc*
# Excel Backup File # Excel Backup File
*.xlk *.xlk
*.csv *.csv
*.pkl
# PowerPoint temporary # PowerPoint temporary
~$*.ppt* ~$*.ppt*

467
common/helpers.py Normal file
View File

@ -0,0 +1,467 @@
import os
import platform
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt, gridspec
from scipy.stats import stats, spearmanr
from rich.progress import track
import statsmodels.api as sm
plt.style.use('default')
plt.rcParams['figure.facecolor'] = 'white'
plt.rcParams['font.family'] = 'STHeiti' if platform.system() == 'Darwin' else 'SimHei'
plt.rcParams['axes.unicode_minus']=False
def create_dir_not_exist(path):
if not os.path.exists(path):
os.makedirs(path)
else:
pass
# 标准化函数
def standardize(df: pd.DataFrame) -> pd.DataFrame:
return df.sub(df.mean(axis=1, skipna=True), axis=0).div(df.std(axis=1, skipna=True), axis=0)
def __filter_extreme_MAD(series, _n=3 * 1.4826):
if series.isna().all():
print(series)
median = series.median(skipna=True)
new_median = ((series - median).abs()).median(skipna=True)
return series.clip(median - _n * new_median, median + _n * new_median)
# MAD:中位数去极值
def mad(df,n = 3 * 1.4826):
# 离群值处理
df = df.apply(lambda x :__filter_extreme_MAD(x, _n= n), axis=1)
return df
def neutralization(df_factor: pd.DataFrame, mcap: pd.DataFrame):
df = df_factor.copy()
mcap = np.log10(mcap).copy()
time_periods= df_factor.index.tolist()
for t in time_periods:
try:
x = mcap.loc[t]
y = df.loc[t]
df.loc[t] = sm.OLS(y.astype(float), x.astype(float), hasconst=False, missing='drop').fit().resid
except:
pass
return df
def filter_rebalance_data(df_factor, df_rtn, offset, hold_hour):
"""
根据持仓周期和偏移时间过滤调仓时间点的数据
参数
df_factor : pd.DataFrame - 因子值数据必须包含'timestamp'
df_rtn : pd.DataFrame - 收益率数据必须包含'timestamp'
offset : int - 每日开仓偏移小时数0-23
hold_hour : int - 持仓周期小时数
返回
(pd.DataFrame, pd.DataFrame) - 过滤后的因子和收益率数据
"""
df_factor.reset_index(drop=False, inplace=True)
df_rtn.reset_index(drop=False, inplace=True)
# 合并两个数据集(确保时间对齐)
merged = pd.merge(df_factor, df_rtn, on='timestamp', suffixes=('_factor', '_rtn'))
# 转换时间列为datetime类型
merged['timestamp'] = pd.to_datetime(merged['timestamp'])
# 确保offset在0-23范围内
offset = offset % 24
# 生成候选调仓时间序列
min_time = merged['timestamp'].min()
max_time = merged['timestamp'].max()
# 生成初始候选时间(数据开始日期的零点)
current = pd.Timestamp(min_time.date()) + pd.DateOffset(hours=offset)
# 调整到第一个有效时间点
while current < min_time:
current += pd.DateOffset(hours=hold_hour)
# 收集所有候选时间点
rebalance_times = []
while current <= max_time:
rebalance_times.append(current)
current += pd.DateOffset(hours=hold_hour)
for i in range(0, len(rebalance_times)):
rebalance_times[i] = rebalance_times[i] - pd.Timedelta(seconds=1)
# 筛选存在的时间点
valid_times = merged[(merged['timestamp']).isin(rebalance_times)]['timestamp']
# 返回过滤后的数据
df_factor = df_factor[df_factor['timestamp'].isin(valid_times)]
df_rtn = df_rtn[df_rtn['timestamp'].isin(valid_times)]
df_factor.set_index('timestamp', inplace=True)
df_rtn.set_index('timestamp', inplace=True)
return df_factor, df_rtn
def fast_ts_factor_rtn_IC(df_factor: pd.DataFrame, df_rtn: pd.DataFrame, name:str):
result = spearmanr(df_factor.values.tolist(), df_rtn.values.tolist())[0]
report = {
'name': name,
'IC': round(result, 4),
}
report = pd.DataFrame([report])
return report
def fast_ts_rolling_IC(df_factor: pd.DataFrame, df_rtn: pd.DataFrame, name:str, window:int):
df = pd.merge(df_factor, df_rtn, on='timestamp')
result = []
for i in range(len(df) - window + 1):
window_df = df.iloc[i:i + window]
corr = window_df.corr(method='spearman').iloc[0, 1]
result.append(corr)
spearman_corr = pd.Series(result, index=df.index[window - 1:])
return pd.DataFrame(spearman_corr, columns=['corr'])
def fast_factor_rtn_IC(df: pd.DataFrame, rtn: pd.DataFrame, factor_name: str):
result = df.corrwith(rtn, axis=1, method='spearman').dropna(how='all')
t_stat = stats.ttest_1samp(result, 0)[0]
report = {
'name': factor_name,
'IC mean': round(result.mean(), 4),
'IC std': round(result.std(), 4),
'IR': round(result.mean()/result.std(), 4),
'IC>0': round(len(result[result>0].dropna())/len(result),4),
'ABS_IC>2%': round(len(result[abs(result) > 0.02].dropna()) / len(result), 4),
't_stat': round(t_stat, 4),
}
report = pd.DataFrame([report])
return result, report
def ic_plot_by_period(ic_df: pd.DataFrame, period: str, factor_name: str):
ic_week = ic_df.resample(period).mean()
ic_week.index = ic_week.index.strftime('%F')
ic_week.plot(kind='bar', title=factor_name)
plt.show()
def cum_ic_merge_figure(ic_df_dict: dict):
ic_num = len(ic_df_dict)
titles = [f"{x}" for x in ic_df_dict.keys()]
num_cols = min(4, ic_num)
num_rows = (ic_num + num_cols - 1) // num_cols
fig = plt.figure(figsize=(15, 10))
gs = gridspec.GridSpec(num_rows, num_cols)
for i, title in enumerate(titles):
ax = fig.add_subplot(gs[i // num_cols, i % num_cols])
df = ic_df_dict[title]
df.cumsum().plot(ax=ax)
ax.set_title(title)
plt.tight_layout()
plt.show()
def cum_ic_merge_figure_v2(ic_df: pd.DataFrame):
import warnings
# 忽略所有UserWarning类型的警告
warnings.filterwarnings("ignore", category=UserWarning)
fig = plt.figure(figsize=(12, 6))
gs = gridspec.GridSpec(1, 2, width_ratios=[3, 1]) # 左侧 75%,右侧 25%
# 在左侧绘制曲线
ax = plt.subplot(gs[0])
ic_df.cumsum().plot(ax=ax)
# ic_df.plot(ax=ax)
# 在右侧放置图例
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
# 调整布局并显示
plt.tight_layout()
plt.show()
def parameters_IC_figure(df: pd.DataFrame, ic_column_name: str):
plt.figure(figsize=(10, 6))
plt.plot(df["parameter"], df[ic_column_name], marker="o", linestyle="-", color="b")
plt.title(f"Parameter vs {ic_column_name}", fontsize=16)
plt.xlabel("Parameter", fontsize=14)
plt.ylabel(f"{ic_column_name}", fontsize=14)
plt.grid(True)
plt.show()
def factor_corr_filter(factor_names: list, factor_performance: pd.DataFrame, highest=0.7):
_ic_summary = factor_performance[factor_names].mean()/factor_performance[factor_names].std()
passed_icir_factor_names = _ic_summary.abs().sort_values(ascending=False).index.tolist()
corr_df = factor_performance[passed_icir_factor_names].corr()
length = corr_df.shape[1]
passed_list = []
for i in range(length):
try:
passed_list.append(corr_df.index.tolist()[0])
corr_df = corr_df[(abs(corr_df.iloc[:, 0]) < highest)]
corr_df = corr_df[corr_df.index.tolist()]
except:
break
return passed_list
def factor_exposure_hist(df: pd.DataFrame, name: str, bins: int = 50):
df.stack().to_frame(name).hist(bins=bins)
plt.show()
def fast_group_g(factor_name: str, df_factor: pd.DataFrame, df_rtn: pd.DataFrame, group_num: int = 10):
group = df_factor.stack().to_frame('factor')
group_rtn = df_rtn.stack().to_frame('rtn')
group['rtn'] = group_rtn['rtn']
group.dropna(inplace=True)
group.reset_index(inplace=True)
group.columns = ['timestamp', 'symbol', 'factor', 'rtn']
datetime_period = group.timestamp.drop_duplicates().tolist()
group_return = pd.DataFrame()
for i in range(0, len(datetime_period)):
single = group[group.timestamp == datetime_period[i]].sort_values(by='factor', ascending=True)
# 分箱
try:
single.loc[:, 'group'] = pd.qcut(single.factor, group_num, list(range(1,group_num+1))).to_list()
except Exception as e:
# 如果分箱失败,跳过当前时间段
continue
_group_result = single.groupby('group')
_group_ret_dict = {}
factor_mean_df = pd.DataFrame()
for index, _group in _group_result:
group_ret = _group['rtn'].mean()
group_factor_mean = _group['factor'].mean()
_group_ret_dict['timestamp'] = datetime_period[i]
_group_ret_dict['G'] = index
_group_ret_dict['factor_value_mean'] = group_factor_mean
_group_ret_dict['ret_mean'] = group_ret
_temp = pd.DataFrame(_group_ret_dict, index=[0])
group_return = pd.concat([group_return, _temp], axis=0)
adjust_times = sorted(list(set(group_return['timestamp'].tolist())))
ic_df = pd.DataFrame(index=adjust_times, columns=['ic'], dtype='float64')
for i in adjust_times:
current = group_return[group_return['timestamp'] == i]
current.set_index('timestamp', inplace=True)
cur_factor_df = current[['factor_value_mean']]
cur_ret_df = current[['ret_mean']]
_ic = float(spearmanr(cur_factor_df.values.tolist(), cur_ret_df.values.tolist())[0])
ic_df.loc[i] = _ic
t_stat = stats.ttest_1samp(ic_df, 0)[0][0]
group_return_by_G = group_return.groupby('G')
result_temp = {}
result = pd.DataFrame()
for index, _group in group_return_by_G:
_ret = (_group['ret_mean'] + 1).cumprod()
result_temp['G'] = f"G{index}"
result_temp['ret_mean'] = _ret.iloc[-1] - 1
_result_df = pd.DataFrame(result_temp, index=[0])
result = pd.concat([result, _result_df], axis=0)
result.reset_index(inplace=True, drop=True)
corr_value = spearmanr(result.index.tolist(), result['ret_mean'].tolist())[0]
report = {
'name': factor_name,
'IC mean': round(float(ic_df.mean().iloc[0]), 4),
'IC std': round(float(ic_df.std().iloc[0]), 4),
'IR': round(float(ic_df.mean().iloc[0] / ic_df.std().iloc[0]), 4),
'IC>0': round(len(ic_df[ic_df > 0].dropna()) / len(ic_df), 4),
'ABS_IC>2%': round(len(ic_df[abs(ic_df) > 0.02].dropna()) / len(ic_df), 4),
't_stat': round(t_stat, 4),
'grouped_corr': round(corr_value, 4),
}
report = pd.DataFrame([report])
return report, ic_df
def group_g(df_factor: pd.DataFrame, df_rtn: pd.DataFrame, factor_name: str, offset: int, mode: str = None, group_num: int = 10):
def check_binning_feasibility(single_series, _group_num):
"""
检查当前时间截面的因子数据是否适合分箱
"""
# 检查1唯一值数量是否足够分箱
unique_count = single_series.nunique()
if unique_count < _group_num:
return False, f"唯一值不足: {unique_count} < {_group_num}"
# 检查2数据标准差是否为0所有值相同
if single_series.std() == 0:
return False, "标准差为0"
# 检查3分位数是否重复快速预判
quantiles = single_series.quantile(np.linspace(0, 1, _group_num + 1))
if quantiles.duplicated().any():
return False, "分位数重复"
return True, "通过检查"
group = df_factor.stack().to_frame('factor')
group_rtn = df_rtn.stack().to_frame('rtn')
group['rtn'] = group_rtn['rtn']
group.dropna(inplace=True)
group.reset_index(inplace=True)
group.columns = ['timestamp', 'symbol', 'factor', 'rtn']
datetime_period = group.timestamp.drop_duplicates().tolist()
group_return = pd.DataFrame()
failed_dtypes = {
'timestamp': 'datetime64[ns]',
'error_type': 'object',
'error_msg': 'object'
}
failed_periods = pd.DataFrame(columns=['timestamp', 'error_type', 'error_msg']).astype(failed_dtypes)
last_selected = {}
for i in track(range(0, len(datetime_period)), description=f"[green]截面分层和计算换手率、分层收益率中...", total=len(datetime_period)):
single = group[group.timestamp == datetime_period[i]].sort_values(by='factor', ascending=True)
is_feasible, error_msg = check_binning_feasibility(single['factor'], group_num)
current_timestamp = datetime_period[i]
if not is_feasible:
# 记录异常时间段
failed_periods = pd.concat(
[failed_periods, pd.DataFrame([[current_timestamp, 'BINNING_ERROR', error_msg]], columns=failed_periods.columns)],
ignore_index=True)
continue # 跳过当前时间段的分箱
try:
single.loc[:, 'group'] = pd.qcut(single.factor, group_num, list(range(1,group_num+1))).to_list()
except Exception as e:
# 捕获其他潜在错误
failed_periods = pd.concat(
[failed_periods, pd.DataFrame([[current_timestamp, 'Q_CUT_ERROR', str(e)]], columns=failed_periods.columns)],
ignore_index=True)
continue
_group_result = single.groupby('group')
current_selected = {}
for index, _group in _group_result:
group_ret = _group['rtn'].mean()
_selected = _group['symbol'].tolist()
current_selected[index] = _selected
if last_selected == {}:
_turnover = 1.0
else:
_turnover = len(set(_selected).difference(set(last_selected[index]))) / len(last_selected[index])
_group_ret_dict = {'timestamp': datetime_period[i], 'G': index, 'ret': group_ret, 'turnover_ratio': _turnover}
_temp = pd.DataFrame(_group_ret_dict, index=[0])
group_return = pd.concat([group_return, _temp], axis=0)
last_selected = current_selected
group_return_by_G = group_return.groupby('G')
result_temp = {}
result = pd.DataFrame()
turnover_ratio_list = []
net_curve_list = []
for index, _group in group_return_by_G:
_ret = (_group['ret'] + 1).cumprod()
result_temp['G'] = f"G{index}"
result_temp['ret'] = _ret.iloc[-1] - 1
_result_df = pd.DataFrame(result_temp, index=[0])
_turnover_df = _group[['timestamp', 'turnover_ratio']].set_index('timestamp').sort_index()
_turnover_df.rename(columns={'turnover_ratio': f'G{index}_turnover_ratio'}, inplace=True)
turnover_ratio_list.append(_turnover_df)
result = pd.concat([result, _result_df], axis=0)
_curve_df = _group[['timestamp', 'ret']].set_index('timestamp').sort_index()
_curve_df['net'] = (_curve_df['ret'] + 1).cumprod()
_curve_df.rename(columns={'net': f'G{index}_net'}, inplace=True)
del _curve_df['ret']
net_curve_list.append(_curve_df)
result.reset_index(inplace=True, drop=True)
corr_value = spearmanr(result.index.tolist(), result['ret'].tolist())[0]
result['net'] = result['ret'] + 1
net_curve = pd.concat(net_curve_list, axis=1, ignore_index=False)
benchmark_ret = net_curve.mean(axis=1)
turnover_ratio = pd.concat(turnover_ratio_list, ignore_index=False, axis=1)
turnover_ratio = turnover_ratio.iloc[1:]
start_time = df_factor.index[0]
end_time = df_factor.index[-1]
time_delta = ((end_time - start_time).days + 1) * 24
result['annual_ret'] = ((result['net']) ** (8760 / time_delta)) - 1
fig, axs = plt.subplots(2, 3, figsize=(16, 8))
# 第一个子图:分层年化收益(单调性)
result['annual_ret'].plot(kind='bar', ax=axs[0, 0], title=f'{factor_name}_OFFSET{offset} 分层年化收益(单调性 {corr_value})')
axs[0, 0].set_title(f'{factor_name}_OFFSET{offset} 分层年化收益(单调性 {corr_value})')
# 第二个子图:分层累计净值曲线
if mode == 'long_short':
long_short_curve = net_curve[['G1_net', 'G10_net']].copy()
long_short_curve['benchmart'] = benchmark_ret
long_short_curve.plot(ax=axs[0, 1], title=f'{factor_name}_OFFSET{offset} 多空效应检验')
axs[0, 1].set_title(f'{factor_name}_OFFSET{offset} 多空效应检验')
else:
net_curve.plot(ax=axs[0, 1], title=f'{factor_name}_OFFSET{offset} 分层累计净值曲线')
axs[0, 1].set_title(f'{factor_name}_OFFSET{offset} 分层累计净值曲线')
# 第三个子图:分层净值柱状图
result['net'].plot(kind='bar', ax=axs[0, 2], title=f'{factor_name}_OFFSET{offset} 分层净值柱状图')
axs[0, 2].set_title(f'{factor_name}_OFFSET{offset} 分层净值柱状图')
# 第四个子图:逐年分层年化收益
yby_performance = net_curve.pct_change().resample('YE').apply(lambda x: (1 + x).cumprod().iloc[-1]).T
yby_performance = yby_performance.replace(0, np.nan).dropna(how='all')
yby_performance = yby_performance - 1
yby_performance.plot(kind='bar', ax=axs[1, 0], title=f'{factor_name}_OFFSET{offset} 逐年分层年化收益',
color=['powderblue', 'lightskyblue', 'cornflowerblue', 'steelblue', 'royalblue'])
axs[1, 0].set_title(f'{factor_name}_OFFSET{offset} 逐年分层年化收益')
# 第五个子图:平均换手率
turnover_ratio.mean().plot(kind='bar', ax=axs[1, 1], title=f'{factor_name}_OFFSET{offset} 分层平均换手率')
axs[1, 1].set_title(f'{factor_name}_OFFSET{offset} 分层平均换手率')
# 第六个子图:分层异常时间点分布
axs[1, 2].scatter(failed_periods['timestamp'].tolist(), [1] * len(failed_periods))
axs[1, 2].set_title(f'{factor_name}_OFFSET{offset} 分层异常时间点分布')
plt.tight_layout()
plt.show()
if not failed_periods.empty:
print(f"发现异常时间段 ({len(failed_periods)} 个):")
print(failed_periods)
def cta_ts_group_by_ret(df_factor: pd.DataFrame, df_ret: pd.DataFrame, factor_name: str, ic: float, group_nums: int = 10):
df = pd.merge(df_factor, df_ret, on='timestamp', how='left')
df = df.sort_values(by='factorVal')
sample_nums = df.shape[0]
num1, num2 = divmod(sample_nums, group_nums)
group_result = {}
for i in range(group_nums):
start_idx = i * num1 + min(i, num2)
end_idx = (i + 1) * num1 + min(i + 1, num2)
grouped_df = df.iloc[start_idx:end_idx]
group_result[i + 1] = grouped_df.mean()
grouped_df = pd.DataFrame(group_result).T
plt.figure(figsize=(15, 8))
plt.title(f"{factor_name} RollingIC:{ic}")
bars = plt.bar(grouped_df.index, grouped_df.ret) # 绘制柱状图
# 添加因子值标注核心代码仅3行
for i, bar in enumerate(bars):
plt.text(bar.get_x() + bar.get_width() / 2, # x位置居中
bar.get_height() + 0.0001, # y位置微高于柱子
f"{grouped_df.factorVal[i + 1]:.4f}", # 显示4位小数
ha='center', va='bottom', fontsize=9) # 对齐方式
plt.grid(ls='--', alpha=0.5)
plt.show()
def cta_ts_group_v2(df_factor: pd.DataFrame, df_ret: pd.DataFrame, factor_name: str, ic: float, group_nums: int = 10):
df = pd.merge(df_factor, df_ret, on='timestamp', how='inner')
grouped, bins = pd.qcut(df.iloc[:, 0], group_nums, labels=list(range(1, group_nums + 1)), retbins=True)
df.loc[:, 'group'] = grouped
# 平均收益
group_ret = df.groupby('group', observed=False).mean().ret # .plot(kind = 'bar')
ax = group_ret.plot(kind='bar')
plt.title(f"{factor_name} RollingIC:{ic}")
bin_labels = [f"({bins[i]:.2f}~{bins[i + 1]:.2f}]" for i in range(len(bins) - 1)]
for i, group_num in enumerate(group_ret.index):
ax.text(i, group_ret[group_num] + 0.005,
bin_labels[group_num - 1], # 用 group_num-1 映射到 bin_labels 索引
ha='center', rotation=45, fontsize=8)
plt.tight_layout() # 防止文字重叠
plt.show()
def cta_rolling_ts_standard(df_factor: pd.DataFrame, n: int):
mean = df_factor.rolling(n*5).mean().shift(1)
std = df_factor.rolling(n*5).std().shift(1)
zscore = (df_factor - mean) / std
return zscore

16
factors/mtm.py Normal file
View File

@ -0,0 +1,16 @@
import pandas as pd
def mtm(df: pd.DataFrame, n: int):
mtm = df['close'] / df['close'].shift(n) - 1
return mtm
def param_traversal():
result = []
step = 5
for i in range(5, 300 + step, step):
result.append((i,))
return result

64
runtime/analysis.py Normal file
View File

@ -0,0 +1,64 @@
import os
import re
import colorama
import pandas as pd
from rich.progress import track
import settings
import common.helpers as helpers
if __name__ == '__main__':
colorama.init(autoreset=True)
ret = pd.read_pickle(f"../cache/{settings.factor_name}/ret.pkl")
filenames = [f for f in os.listdir(f"../cache/{settings.factor_name}/{settings.symbol}") if
f.endswith('.pkl') and re.sub(r'(?<!^)([A-Z])', r'_\1', settings.factor_name).lower() in f]
ic_performance_collection = pd.DataFrame()
rolling_ic_df_collection = pd.DataFrame()
factor_dict = {}
for _filename in track(filenames, description="[green]整理因子数据和收益率数据,逐参数IC检验"):
_factor_value = pd.read_pickle(f"../cache/{settings.factor_name}/{settings.symbol}/{_filename}")
factor_name = _filename.replace('.pkl', '')
factor_dict[factor_name] = _factor_value
_ret = ret.copy()
_ret = _ret.loc[_factor_value.index[0]:_factor_value.index[-1]]
_rolling_ic_df = helpers.fast_ts_rolling_IC(_factor_value, _ret, factor_name, 24 * 30)
_rolling_ic_mean = _rolling_ic_df.mean().iloc[0]
_rolling_ic_std = _rolling_ic_df.std().iloc[0]
report = {
'name': factor_name,
'rolling IC mean': round(_rolling_ic_mean, 4),
'rolling IC std': round(_rolling_ic_std, 4),
'rolling IR': round(_rolling_ic_mean / _rolling_ic_std, 4)
}
ic_performance = pd.DataFrame([report])
ic_performance_collection = pd.concat([ic_performance_collection, ic_performance])
rolling_ic_df_collection[factor_name] = _rolling_ic_df
helpers.factor_exposure_hist(_factor_value, factor_name, 100)
# 提取参数
ic_performance_collection["parameter"] = ic_performance_collection["name"].apply(
lambda x: int(re.search(r"\d+", x).group()))
# 按参数排序
ic_performance_collection = ic_performance_collection.sort_values("parameter")
parameter_ic_df = ic_performance_collection[['parameter', 'rolling IC mean']].copy()
helpers.parameters_IC_figure(parameter_ic_df, "rolling IC mean")
del ic_performance_collection['parameter']
# ic_performance_collection = ic_performance_collection[ic_performance_collection['IC'].abs() >= 0.05]
# if ic_performance_collection.empty:
# print("未有通过因子IC检验的参数")
# exit()
ic_performance_collection['IC abs'] = ic_performance_collection['rolling IC mean'].abs()
ic_performance_collection.sort_values('IC abs', ascending=False, inplace=True)
del ic_performance_collection['IC abs']
ic_performance_collection = ic_performance_collection[ic_performance_collection['rolling IC mean'].abs() >= 0.05]
print(ic_performance_collection)
rolling_ic_df_collection = rolling_ic_df_collection[ic_performance_collection['name'].tolist()]
helpers.cum_ic_merge_figure_v2(rolling_ic_df_collection)
for i in ic_performance_collection['name'].tolist():
_factor_value = factor_dict[i]
_ret = ret.copy()
# helpers.cta_ts_group_by_ret(_factor_value, _ret, f"{i}_{settings.symbol}", ic_performance_collection[ic_performance_collection['name']==i]['rolling IC mean'].values[0], 5)
helpers.cta_ts_group_v2(_factor_value, _ret, f"{i}_{settings.symbol}", ic_performance_collection[ic_performance_collection['name'] == i]['rolling IC mean'].values[0], 5)

58
runtime/data_clean.py Normal file
View File

@ -0,0 +1,58 @@
import importlib
import inspect
from copy import copy
import pandas as pd
import colorama
import settings
from rich.progress import track
from rich import print
from common.helpers import create_dir_not_exist
klines_folder_path = '../data/klines/'
start_date = pd.to_datetime(settings.start_time)
end_date = pd.to_datetime(settings.end_time)
filename = klines_folder_path + settings.symbol + '-USDT.csv'
colorama.init(autoreset=True)
df = pd.read_csv(filename, encoding='gbk', skiprows=1, parse_dates=['candle_begin_time'])
df['candle_begin_time'] = df['candle_begin_time'] + pd.Timedelta(hours=1) - pd.Timedelta(seconds=1)
df.rename(columns={'candle_begin_time': 'timestamp'}, inplace=True)
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
ret = df['close'].pct_change(settings.signal_hold_hours).shift(-settings.signal_hold_hours).dropna(how='all').to_frame()
df = df[start_date:end_date]
ret = ret[start_date:end_date]
ret.rename(columns={'close': 'ret'}, inplace=True)
factor_implement = tuple()
module = importlib.import_module(f'factors.{settings.factor_name}')
for name, obj in inspect.getmembers(module):
if inspect.isfunction(obj) and obj.__module__ == f"factors.{settings.factor_name}":
if name != 'param_traversal':
factor_implement = (name, obj)
factor_params_list = getattr(module, 'param_traversal')()
create_dir_not_exist(f'../cache/{settings.factor_name}')
create_dir_not_exist(f'../cache/{settings.factor_name}/{settings.symbol}')
ret.to_pickle(f'../cache/{settings.factor_name}/ret.pkl')
print(len(factor_params_list))
for i in track(factor_params_list, description='[green]逐参数计算因子暴露度...', total=len(factor_params_list)):
factor_name = factor_implement[0]
implementation = factor_implement[1]
factor_value = implementation(copy(df), *i)
factor_value = factor_value.dropna(axis=0, how='all')
factor_value = factor_value.to_frame()
factor_value.columns = ['factorVal']
_file_name = f"{factor_name}_{i}.pkl"
factor_value.to_pickle(f'../cache/{settings.factor_name}/{settings.symbol}/{_file_name}')

13
runtime/settings.py Normal file
View File

@ -0,0 +1,13 @@
symbol = "bnb".upper()
factor_name = 'mtm'
start_time = '2024-01-01'
end_time = '2025-12-31'
signal_hold_hours = 4