From 88740775708c9506c1fe5d0e7e199082e1236de3 Mon Sep 17 00:00:00 2001 From: heshuang <296673733@qq.com> Date: Thu, 8 May 2025 02:19:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=B6=E5=BA=8F=E5=8D=95=E5=9B=A0=E5=AD=90?= =?UTF-8?q?=E6=A3=80=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + common/helpers.py | 467 ++++++++++++++++++++++++++++++++++++++++++ factors/mtm.py | 16 ++ runtime/analysis.py | 64 ++++++ runtime/data_clean.py | 58 ++++++ runtime/settings.py | 13 ++ 6 files changed, 619 insertions(+) create mode 100644 common/helpers.py create mode 100644 factors/mtm.py create mode 100644 runtime/analysis.py create mode 100644 runtime/data_clean.py create mode 100644 runtime/settings.py diff --git a/.gitignore b/.gitignore index 04bb586..44de2b5 100644 --- a/.gitignore +++ b/.gitignore @@ -268,6 +268,7 @@ Backup of *.doc* # Excel Backup File *.xlk *.csv +*.pkl # PowerPoint temporary ~$*.ppt* diff --git a/common/helpers.py b/common/helpers.py new file mode 100644 index 0000000..0b0aa9e --- /dev/null +++ b/common/helpers.py @@ -0,0 +1,467 @@ +import os +import platform + +import numpy as np +import pandas as pd +from matplotlib import pyplot as plt, gridspec +from scipy.stats import stats, spearmanr +from rich.progress import track +import statsmodels.api as sm + + +plt.style.use('default') +plt.rcParams['figure.facecolor'] = 'white' +plt.rcParams['font.family'] = 'STHeiti' if platform.system() == 'Darwin' else 'SimHei' +plt.rcParams['axes.unicode_minus']=False + +def create_dir_not_exist(path): + if not os.path.exists(path): + os.makedirs(path) + else: + pass + +# 标准化函数 +def standardize(df: pd.DataFrame) -> pd.DataFrame: + return df.sub(df.mean(axis=1, skipna=True), axis=0).div(df.std(axis=1, skipna=True), axis=0) + + +def __filter_extreme_MAD(series, _n=3 * 1.4826): + if series.isna().all(): + print(series) + median = series.median(skipna=True) + new_median = ((series - median).abs()).median(skipna=True) + return series.clip(median - _n * new_median, median + _n * new_median) + +# MAD:中位数去极值 +def mad(df,n = 3 * 1.4826): + + # 离群值处理 + df = df.apply(lambda x :__filter_extreme_MAD(x, _n= n), axis=1) + + return df + +def neutralization(df_factor: pd.DataFrame, mcap: pd.DataFrame): + df = df_factor.copy() + mcap = np.log10(mcap).copy() + time_periods= df_factor.index.tolist() + for t in time_periods: + try: + x = mcap.loc[t] + y = df.loc[t] + df.loc[t] = sm.OLS(y.astype(float), x.astype(float), hasconst=False, missing='drop').fit().resid + except: + pass + return df + + +def filter_rebalance_data(df_factor, df_rtn, offset, hold_hour): + """ + 根据持仓周期和偏移时间过滤调仓时间点的数据 + + 参数: + df_factor : pd.DataFrame - 因子值数据(必须包含'timestamp'列) + df_rtn : pd.DataFrame - 收益率数据(必须包含'timestamp'列) + offset : int - 每日开仓偏移小时数(0-23) + hold_hour : int - 持仓周期小时数 + + 返回: + (pd.DataFrame, pd.DataFrame) - 过滤后的因子和收益率数据 + """ + df_factor.reset_index(drop=False, inplace=True) + df_rtn.reset_index(drop=False, inplace=True) + # 合并两个数据集(确保时间对齐) + merged = pd.merge(df_factor, df_rtn, on='timestamp', suffixes=('_factor', '_rtn')) + + # 转换时间列为datetime类型 + merged['timestamp'] = pd.to_datetime(merged['timestamp']) + + # 确保offset在0-23范围内 + offset = offset % 24 + + # 生成候选调仓时间序列 + min_time = merged['timestamp'].min() + max_time = merged['timestamp'].max() + + # 生成初始候选时间(数据开始日期的零点) + current = pd.Timestamp(min_time.date()) + pd.DateOffset(hours=offset) + + # 调整到第一个有效时间点 + while current < min_time: + current += pd.DateOffset(hours=hold_hour) + + # 收集所有候选时间点 + rebalance_times = [] + while current <= max_time: + rebalance_times.append(current) + current += pd.DateOffset(hours=hold_hour) + + for i in range(0, len(rebalance_times)): + rebalance_times[i] = rebalance_times[i] - pd.Timedelta(seconds=1) + # 筛选存在的时间点 + valid_times = merged[(merged['timestamp']).isin(rebalance_times)]['timestamp'] + + # 返回过滤后的数据 + + df_factor = df_factor[df_factor['timestamp'].isin(valid_times)] + df_rtn = df_rtn[df_rtn['timestamp'].isin(valid_times)] + df_factor.set_index('timestamp', inplace=True) + df_rtn.set_index('timestamp', inplace=True) + return df_factor, df_rtn + +def fast_ts_factor_rtn_IC(df_factor: pd.DataFrame, df_rtn: pd.DataFrame, name:str): + result = spearmanr(df_factor.values.tolist(), df_rtn.values.tolist())[0] + report = { + 'name': name, + 'IC': round(result, 4), + } + report = pd.DataFrame([report]) + return report + +def fast_ts_rolling_IC(df_factor: pd.DataFrame, df_rtn: pd.DataFrame, name:str, window:int): + df = pd.merge(df_factor, df_rtn, on='timestamp') + result = [] + for i in range(len(df) - window + 1): + window_df = df.iloc[i:i + window] + corr = window_df.corr(method='spearman').iloc[0, 1] + result.append(corr) + spearman_corr = pd.Series(result, index=df.index[window - 1:]) + return pd.DataFrame(spearman_corr, columns=['corr']) + +def fast_factor_rtn_IC(df: pd.DataFrame, rtn: pd.DataFrame, factor_name: str): + result = df.corrwith(rtn, axis=1, method='spearman').dropna(how='all') + t_stat = stats.ttest_1samp(result, 0)[0] + report = { + 'name': factor_name, + 'IC mean': round(result.mean(), 4), + 'IC std': round(result.std(), 4), + 'IR': round(result.mean()/result.std(), 4), + 'IC>0': round(len(result[result>0].dropna())/len(result),4), + 'ABS_IC>2%': round(len(result[abs(result) > 0.02].dropna()) / len(result), 4), + 't_stat': round(t_stat, 4), + } + report = pd.DataFrame([report]) + return result, report + +def ic_plot_by_period(ic_df: pd.DataFrame, period: str, factor_name: str): + ic_week = ic_df.resample(period).mean() + ic_week.index = ic_week.index.strftime('%F') + ic_week.plot(kind='bar', title=factor_name) + plt.show() + + +def cum_ic_merge_figure(ic_df_dict: dict): + ic_num = len(ic_df_dict) + titles = [f"{x}" for x in ic_df_dict.keys()] + num_cols = min(4, ic_num) + num_rows = (ic_num + num_cols - 1) // num_cols + fig = plt.figure(figsize=(15, 10)) + gs = gridspec.GridSpec(num_rows, num_cols) + for i, title in enumerate(titles): + ax = fig.add_subplot(gs[i // num_cols, i % num_cols]) + df = ic_df_dict[title] + df.cumsum().plot(ax=ax) + ax.set_title(title) + plt.tight_layout() + plt.show() + +def cum_ic_merge_figure_v2(ic_df: pd.DataFrame): + import warnings + + # 忽略所有UserWarning类型的警告 + warnings.filterwarnings("ignore", category=UserWarning) + fig = plt.figure(figsize=(12, 6)) + gs = gridspec.GridSpec(1, 2, width_ratios=[3, 1]) # 左侧 75%,右侧 25% + + # 在左侧绘制曲线 + ax = plt.subplot(gs[0]) + ic_df.cumsum().plot(ax=ax) + # ic_df.plot(ax=ax) + + # 在右侧放置图例 + ax.legend(loc='center left', bbox_to_anchor=(1, 0.5)) + + # 调整布局并显示 + plt.tight_layout() + plt.show() + +def parameters_IC_figure(df: pd.DataFrame, ic_column_name: str): + plt.figure(figsize=(10, 6)) + plt.plot(df["parameter"], df[ic_column_name], marker="o", linestyle="-", color="b") + plt.title(f"Parameter vs {ic_column_name}", fontsize=16) + plt.xlabel("Parameter", fontsize=14) + plt.ylabel(f"{ic_column_name}", fontsize=14) + plt.grid(True) + plt.show() + +def factor_corr_filter(factor_names: list, factor_performance: pd.DataFrame, highest=0.7): + _ic_summary = factor_performance[factor_names].mean()/factor_performance[factor_names].std() + passed_icir_factor_names = _ic_summary.abs().sort_values(ascending=False).index.tolist() + corr_df = factor_performance[passed_icir_factor_names].corr() + length = corr_df.shape[1] + passed_list = [] + for i in range(length): + try: + passed_list.append(corr_df.index.tolist()[0]) + corr_df = corr_df[(abs(corr_df.iloc[:, 0]) < highest)] + corr_df = corr_df[corr_df.index.tolist()] + except: + break + return passed_list + +def factor_exposure_hist(df: pd.DataFrame, name: str, bins: int = 50): + df.stack().to_frame(name).hist(bins=bins) + plt.show() + + +def fast_group_g(factor_name: str, df_factor: pd.DataFrame, df_rtn: pd.DataFrame, group_num: int = 10): + group = df_factor.stack().to_frame('factor') + group_rtn = df_rtn.stack().to_frame('rtn') + group['rtn'] = group_rtn['rtn'] + group.dropna(inplace=True) + group.reset_index(inplace=True) + group.columns = ['timestamp', 'symbol', 'factor', 'rtn'] + datetime_period = group.timestamp.drop_duplicates().tolist() + group_return = pd.DataFrame() + for i in range(0, len(datetime_period)): + single = group[group.timestamp == datetime_period[i]].sort_values(by='factor', ascending=True) + # 分箱 + try: + single.loc[:, 'group'] = pd.qcut(single.factor, group_num, list(range(1,group_num+1))).to_list() + except Exception as e: + # 如果分箱失败,跳过当前时间段 + continue + _group_result = single.groupby('group') + _group_ret_dict = {} + factor_mean_df = pd.DataFrame() + for index, _group in _group_result: + group_ret = _group['rtn'].mean() + group_factor_mean = _group['factor'].mean() + _group_ret_dict['timestamp'] = datetime_period[i] + _group_ret_dict['G'] = index + _group_ret_dict['factor_value_mean'] = group_factor_mean + _group_ret_dict['ret_mean'] = group_ret + _temp = pd.DataFrame(_group_ret_dict, index=[0]) + group_return = pd.concat([group_return, _temp], axis=0) + + adjust_times = sorted(list(set(group_return['timestamp'].tolist()))) + + ic_df = pd.DataFrame(index=adjust_times, columns=['ic'], dtype='float64') + + for i in adjust_times: + current = group_return[group_return['timestamp'] == i] + current.set_index('timestamp', inplace=True) + cur_factor_df = current[['factor_value_mean']] + cur_ret_df = current[['ret_mean']] + + _ic = float(spearmanr(cur_factor_df.values.tolist(), cur_ret_df.values.tolist())[0]) + ic_df.loc[i] = _ic + + t_stat = stats.ttest_1samp(ic_df, 0)[0][0] + + group_return_by_G = group_return.groupby('G') + result_temp = {} + result = pd.DataFrame() + for index, _group in group_return_by_G: + _ret = (_group['ret_mean'] + 1).cumprod() + result_temp['G'] = f"G{index}" + result_temp['ret_mean'] = _ret.iloc[-1] - 1 + _result_df = pd.DataFrame(result_temp, index=[0]) + result = pd.concat([result, _result_df], axis=0) + result.reset_index(inplace=True, drop=True) + corr_value = spearmanr(result.index.tolist(), result['ret_mean'].tolist())[0] + report = { + 'name': factor_name, + 'IC mean': round(float(ic_df.mean().iloc[0]), 4), + 'IC std': round(float(ic_df.std().iloc[0]), 4), + 'IR': round(float(ic_df.mean().iloc[0] / ic_df.std().iloc[0]), 4), + 'IC>0': round(len(ic_df[ic_df > 0].dropna()) / len(ic_df), 4), + 'ABS_IC>2%': round(len(ic_df[abs(ic_df) > 0.02].dropna()) / len(ic_df), 4), + 't_stat': round(t_stat, 4), + 'grouped_corr': round(corr_value, 4), + } + report = pd.DataFrame([report]) + return report, ic_df + + +def group_g(df_factor: pd.DataFrame, df_rtn: pd.DataFrame, factor_name: str, offset: int, mode: str = None, group_num: int = 10): + def check_binning_feasibility(single_series, _group_num): + """ + 检查当前时间截面的因子数据是否适合分箱 + """ + # 检查1:唯一值数量是否足够分箱 + unique_count = single_series.nunique() + if unique_count < _group_num: + return False, f"唯一值不足: {unique_count} < {_group_num}" + # 检查2:数据标准差是否为0(所有值相同) + if single_series.std() == 0: + return False, "标准差为0" + # 检查3:分位数是否重复(快速预判) + quantiles = single_series.quantile(np.linspace(0, 1, _group_num + 1)) + if quantiles.duplicated().any(): + return False, "分位数重复" + return True, "通过检查" + group = df_factor.stack().to_frame('factor') + group_rtn = df_rtn.stack().to_frame('rtn') + group['rtn'] = group_rtn['rtn'] + group.dropna(inplace=True) + group.reset_index(inplace=True) + group.columns = ['timestamp', 'symbol', 'factor', 'rtn'] + datetime_period = group.timestamp.drop_duplicates().tolist() + group_return = pd.DataFrame() + failed_dtypes = { + 'timestamp': 'datetime64[ns]', + 'error_type': 'object', + 'error_msg': 'object' + } + failed_periods = pd.DataFrame(columns=['timestamp', 'error_type', 'error_msg']).astype(failed_dtypes) + last_selected = {} + for i in track(range(0, len(datetime_period)), description=f"[green]截面分层和计算换手率、分层收益率中...", total=len(datetime_period)): + single = group[group.timestamp == datetime_period[i]].sort_values(by='factor', ascending=True) + is_feasible, error_msg = check_binning_feasibility(single['factor'], group_num) + current_timestamp = datetime_period[i] + if not is_feasible: + # 记录异常时间段 + failed_periods = pd.concat( + [failed_periods, pd.DataFrame([[current_timestamp, 'BINNING_ERROR', error_msg]], columns=failed_periods.columns)], + ignore_index=True) + continue # 跳过当前时间段的分箱 + try: + single.loc[:, 'group'] = pd.qcut(single.factor, group_num, list(range(1,group_num+1))).to_list() + except Exception as e: + # 捕获其他潜在错误 + failed_periods = pd.concat( + [failed_periods, pd.DataFrame([[current_timestamp, 'Q_CUT_ERROR', str(e)]], columns=failed_periods.columns)], + ignore_index=True) + continue + _group_result = single.groupby('group') + current_selected = {} + for index, _group in _group_result: + group_ret = _group['rtn'].mean() + _selected = _group['symbol'].tolist() + current_selected[index] = _selected + if last_selected == {}: + _turnover = 1.0 + else: + _turnover = len(set(_selected).difference(set(last_selected[index]))) / len(last_selected[index]) + _group_ret_dict = {'timestamp': datetime_period[i], 'G': index, 'ret': group_ret, 'turnover_ratio': _turnover} + _temp = pd.DataFrame(_group_ret_dict, index=[0]) + group_return = pd.concat([group_return, _temp], axis=0) + last_selected = current_selected + + group_return_by_G = group_return.groupby('G') + result_temp = {} + result = pd.DataFrame() + turnover_ratio_list = [] + net_curve_list = [] + for index, _group in group_return_by_G: + _ret = (_group['ret'] + 1).cumprod() + result_temp['G'] = f"G{index}" + result_temp['ret'] = _ret.iloc[-1] - 1 + _result_df = pd.DataFrame(result_temp, index=[0]) + _turnover_df = _group[['timestamp', 'turnover_ratio']].set_index('timestamp').sort_index() + _turnover_df.rename(columns={'turnover_ratio': f'G{index}_turnover_ratio'}, inplace=True) + turnover_ratio_list.append(_turnover_df) + result = pd.concat([result, _result_df], axis=0) + _curve_df = _group[['timestamp', 'ret']].set_index('timestamp').sort_index() + _curve_df['net'] = (_curve_df['ret'] + 1).cumprod() + _curve_df.rename(columns={'net': f'G{index}_net'}, inplace=True) + del _curve_df['ret'] + net_curve_list.append(_curve_df) + result.reset_index(inplace=True, drop=True) + + corr_value = spearmanr(result.index.tolist(), result['ret'].tolist())[0] + result['net'] = result['ret'] + 1 + net_curve = pd.concat(net_curve_list, axis=1, ignore_index=False) + benchmark_ret = net_curve.mean(axis=1) + turnover_ratio = pd.concat(turnover_ratio_list, ignore_index=False, axis=1) + turnover_ratio = turnover_ratio.iloc[1:] + start_time = df_factor.index[0] + end_time = df_factor.index[-1] + time_delta = ((end_time - start_time).days + 1) * 24 + result['annual_ret'] = ((result['net']) ** (8760 / time_delta)) - 1 + fig, axs = plt.subplots(2, 3, figsize=(16, 8)) + # 第一个子图:分层年化收益(单调性) + result['annual_ret'].plot(kind='bar', ax=axs[0, 0], title=f'{factor_name}_OFFSET{offset} 分层年化收益(单调性 {corr_value})') + axs[0, 0].set_title(f'{factor_name}_OFFSET{offset} 分层年化收益(单调性 {corr_value})') + # 第二个子图:分层累计净值曲线 + if mode == 'long_short': + long_short_curve = net_curve[['G1_net', 'G10_net']].copy() + long_short_curve['benchmart'] = benchmark_ret + long_short_curve.plot(ax=axs[0, 1], title=f'{factor_name}_OFFSET{offset} 多空效应检验') + axs[0, 1].set_title(f'{factor_name}_OFFSET{offset} 多空效应检验') + else: + net_curve.plot(ax=axs[0, 1], title=f'{factor_name}_OFFSET{offset} 分层累计净值曲线') + axs[0, 1].set_title(f'{factor_name}_OFFSET{offset} 分层累计净值曲线') + # 第三个子图:分层净值柱状图 + result['net'].plot(kind='bar', ax=axs[0, 2], title=f'{factor_name}_OFFSET{offset} 分层净值柱状图') + axs[0, 2].set_title(f'{factor_name}_OFFSET{offset} 分层净值柱状图') + # 第四个子图:逐年分层年化收益 + yby_performance = net_curve.pct_change().resample('YE').apply(lambda x: (1 + x).cumprod().iloc[-1]).T + yby_performance = yby_performance.replace(0, np.nan).dropna(how='all') + yby_performance = yby_performance - 1 + yby_performance.plot(kind='bar', ax=axs[1, 0], title=f'{factor_name}_OFFSET{offset} 逐年分层年化收益', + color=['powderblue', 'lightskyblue', 'cornflowerblue', 'steelblue', 'royalblue']) + axs[1, 0].set_title(f'{factor_name}_OFFSET{offset} 逐年分层年化收益') + # 第五个子图:平均换手率 + turnover_ratio.mean().plot(kind='bar', ax=axs[1, 1], title=f'{factor_name}_OFFSET{offset} 分层平均换手率') + axs[1, 1].set_title(f'{factor_name}_OFFSET{offset} 分层平均换手率') + + # 第六个子图:分层异常时间点分布 + axs[1, 2].scatter(failed_periods['timestamp'].tolist(), [1] * len(failed_periods)) + axs[1, 2].set_title(f'{factor_name}_OFFSET{offset} 分层异常时间点分布') + plt.tight_layout() + plt.show() + if not failed_periods.empty: + print(f"发现异常时间段 ({len(failed_periods)} 个):") + print(failed_periods) + + +def cta_ts_group_by_ret(df_factor: pd.DataFrame, df_ret: pd.DataFrame, factor_name: str, ic: float, group_nums: int = 10): + df = pd.merge(df_factor, df_ret, on='timestamp', how='left') + df = df.sort_values(by='factorVal') + sample_nums = df.shape[0] + num1, num2 = divmod(sample_nums, group_nums) + group_result = {} + for i in range(group_nums): + start_idx = i * num1 + min(i, num2) + end_idx = (i + 1) * num1 + min(i + 1, num2) + grouped_df = df.iloc[start_idx:end_idx] + group_result[i + 1] = grouped_df.mean() + grouped_df = pd.DataFrame(group_result).T + + plt.figure(figsize=(15, 8)) + plt.title(f"{factor_name} RollingIC:{ic}") + bars = plt.bar(grouped_df.index, grouped_df.ret) # 绘制柱状图 + # 添加因子值标注(核心代码仅3行) + for i, bar in enumerate(bars): + plt.text(bar.get_x() + bar.get_width() / 2, # x位置居中 + bar.get_height() + 0.0001, # y位置微高于柱子 + f"{grouped_df.factorVal[i + 1]:.4f}", # 显示4位小数 + ha='center', va='bottom', fontsize=9) # 对齐方式 + + plt.grid(ls='--', alpha=0.5) + plt.show() + +def cta_ts_group_v2(df_factor: pd.DataFrame, df_ret: pd.DataFrame, factor_name: str, ic: float, group_nums: int = 10): + df = pd.merge(df_factor, df_ret, on='timestamp', how='inner') + grouped, bins = pd.qcut(df.iloc[:, 0], group_nums, labels=list(range(1, group_nums + 1)), retbins=True) + df.loc[:, 'group'] = grouped + # 平均收益 + group_ret = df.groupby('group', observed=False).mean().ret # .plot(kind = 'bar') + + ax = group_ret.plot(kind='bar') + plt.title(f"{factor_name} RollingIC:{ic}") + bin_labels = [f"({bins[i]:.2f}~{bins[i + 1]:.2f}]" for i in range(len(bins) - 1)] + for i, group_num in enumerate(group_ret.index): + ax.text(i, group_ret[group_num] + 0.005, + bin_labels[group_num - 1], # 用 group_num-1 映射到 bin_labels 索引 + ha='center', rotation=45, fontsize=8) + + plt.tight_layout() # 防止文字重叠 + plt.show() + +def cta_rolling_ts_standard(df_factor: pd.DataFrame, n: int): + mean = df_factor.rolling(n*5).mean().shift(1) + std = df_factor.rolling(n*5).std().shift(1) + zscore = (df_factor - mean) / std + return zscore \ No newline at end of file diff --git a/factors/mtm.py b/factors/mtm.py new file mode 100644 index 0000000..36c277d --- /dev/null +++ b/factors/mtm.py @@ -0,0 +1,16 @@ +import pandas as pd + + +def mtm(df: pd.DataFrame, n: int): + mtm = df['close'] / df['close'].shift(n) - 1 + return mtm + + + + +def param_traversal(): + result = [] + step = 5 + for i in range(5, 300 + step, step): + result.append((i,)) + return result \ No newline at end of file diff --git a/runtime/analysis.py b/runtime/analysis.py new file mode 100644 index 0000000..60587a4 --- /dev/null +++ b/runtime/analysis.py @@ -0,0 +1,64 @@ +import os +import re + +import colorama +import pandas as pd +from rich.progress import track + +import settings +import common.helpers as helpers + +if __name__ == '__main__': + colorama.init(autoreset=True) + ret = pd.read_pickle(f"../cache/{settings.factor_name}/ret.pkl") + filenames = [f for f in os.listdir(f"../cache/{settings.factor_name}/{settings.symbol}") if + f.endswith('.pkl') and re.sub(r'(?= 0.05] + # if ic_performance_collection.empty: + # print("未有通过因子IC检验的参数") + # exit() + ic_performance_collection['IC abs'] = ic_performance_collection['rolling IC mean'].abs() + ic_performance_collection.sort_values('IC abs', ascending=False, inplace=True) + del ic_performance_collection['IC abs'] + ic_performance_collection = ic_performance_collection[ic_performance_collection['rolling IC mean'].abs() >= 0.05] + print(ic_performance_collection) + rolling_ic_df_collection = rolling_ic_df_collection[ic_performance_collection['name'].tolist()] + helpers.cum_ic_merge_figure_v2(rolling_ic_df_collection) + for i in ic_performance_collection['name'].tolist(): + _factor_value = factor_dict[i] + _ret = ret.copy() + # helpers.cta_ts_group_by_ret(_factor_value, _ret, f"{i}_{settings.symbol}", ic_performance_collection[ic_performance_collection['name']==i]['rolling IC mean'].values[0], 5) + helpers.cta_ts_group_v2(_factor_value, _ret, f"{i}_{settings.symbol}", ic_performance_collection[ic_performance_collection['name'] == i]['rolling IC mean'].values[0], 5) diff --git a/runtime/data_clean.py b/runtime/data_clean.py new file mode 100644 index 0000000..e6f0933 --- /dev/null +++ b/runtime/data_clean.py @@ -0,0 +1,58 @@ +import importlib +import inspect +from copy import copy + +import pandas as pd +import colorama + +import settings + + +from rich.progress import track +from rich import print + +from common.helpers import create_dir_not_exist + +klines_folder_path = '../data/klines/' + + +start_date = pd.to_datetime(settings.start_time) +end_date = pd.to_datetime(settings.end_time) + +filename = klines_folder_path + settings.symbol + '-USDT.csv' + +colorama.init(autoreset=True) +df = pd.read_csv(filename, encoding='gbk', skiprows=1, parse_dates=['candle_begin_time']) +df['candle_begin_time'] = df['candle_begin_time'] + pd.Timedelta(hours=1) - pd.Timedelta(seconds=1) +df.rename(columns={'candle_begin_time': 'timestamp'}, inplace=True) +df.set_index('timestamp', inplace=True) +df.sort_index(inplace=True) +ret = df['close'].pct_change(settings.signal_hold_hours).shift(-settings.signal_hold_hours).dropna(how='all').to_frame() +df = df[start_date:end_date] +ret = ret[start_date:end_date] +ret.rename(columns={'close': 'ret'}, inplace=True) + +factor_implement = tuple() +module = importlib.import_module(f'factors.{settings.factor_name}') +for name, obj in inspect.getmembers(module): + if inspect.isfunction(obj) and obj.__module__ == f"factors.{settings.factor_name}": + if name != 'param_traversal': + factor_implement = (name, obj) + +factor_params_list = getattr(module, 'param_traversal')() + +create_dir_not_exist(f'../cache/{settings.factor_name}') +create_dir_not_exist(f'../cache/{settings.factor_name}/{settings.symbol}') +ret.to_pickle(f'../cache/{settings.factor_name}/ret.pkl') + + +print(len(factor_params_list)) +for i in track(factor_params_list, description='[green]逐参数计算因子暴露度...', total=len(factor_params_list)): + factor_name = factor_implement[0] + implementation = factor_implement[1] + factor_value = implementation(copy(df), *i) + factor_value = factor_value.dropna(axis=0, how='all') + factor_value = factor_value.to_frame() + factor_value.columns = ['factorVal'] + _file_name = f"{factor_name}_{i}.pkl" + factor_value.to_pickle(f'../cache/{settings.factor_name}/{settings.symbol}/{_file_name}') diff --git a/runtime/settings.py b/runtime/settings.py new file mode 100644 index 0000000..aa2cae8 --- /dev/null +++ b/runtime/settings.py @@ -0,0 +1,13 @@ + + + + + +symbol = "bnb".upper() +factor_name = 'mtm' + + +start_time = '2024-01-01' +end_time = '2025-12-31' + +signal_hold_hours = 4 \ No newline at end of file