I tested DNN, SVR, and transformer models. The DNN models outperformed SVR and transformer models.
import sys, os import numpy as np import pandas as pd from sklearn.metrics import mean_absolute_error from sklearn.svm import LinearSVR import datetime import random, platform from tensorflow import keras from tensorflow.keras import layers, Input from pathlib import Path from config import m_test_size, pre_selected_str, m_cutoff target1 = 'NQ=F' m_dir = "" m_select = 2 m_add = [] m_features = [] print("m_test", m_test_size) def read_data(): global pre_selected_str selected = set(pre_selected_str.split(" ")) pre_selected_str = " ".join(pre_selected_str.split(" ")[-5:]) print("selected", selected) selected.add(target1) selected = list(selected) print("selected", selected) df = pd.read_csv(m_dir + "data_1d_update.csv", index_col=0) print("df.shape", df.shape) print("df.columns", df.columns) features = list(df.columns) added = set(random.sample(features, m_select)) print("selected", selected) return added, df[selected], selected m_add, df1, m_features = read_data() m_select = len(m_features) selected_str = " " print("df1.shape", df1.shape) print("df1.columns", df1.columns) print("m_dir", m_dir) print("m_select", m_select, "m_cutoff", m_cutoff) output_file = (m_dir + "Run_" + datetime.datetime.now().strftime('%Y-%m-%d') + "_" + str(m_select) + ".csv") print("output_file", output_file) today_str = datetime.datetime.now().strftime('%Y-%m-%d') print("today_str", today_str) for symbol in m_features: df1[f'{symbol}_Price'] = df1[symbol] print("df1.shape", df1.shape) print("df1.columns", df1.columns) # ----------------------------------------------------------------- # 2. Compute Daily Returns (Price Changes) # ----------------------------------------------------------------- # Calculate daily returns for each crypto's price and volume for symbol in m_features: # Price returns df1[f'{symbol}_Price_Return'] = df1[f'{symbol}_Price'].pct_change(periods=1) * 100.0 df1[f'{symbol}_Price2d_Return'] = df1[f'{symbol}_Price'].pct_change(periods=2) * 100.0 if symbol != target1: df1[f'{symbol}_Ratio_Return'] = df1[f'{symbol}_Price'] / df1[f'{target1}_Price'] #print(f'{symbol}_Ratio_Return', df1[f'{symbol}_Ratio_Return']) print("df.index[-1]", df1.index[-1]) # Drop NaN rows from returns calculation df1.dropna(inplace=True) print("df1.shape", df1.shape, len(df1)) if len(df1) < 1000: print("Error! df1.shape", df1.shape) exit() if True: # Define target: BTC's next-day price return (shifted by -1) df_target = df1[target1 + '_Price_Return'].shift(-1).to_frame(name=target1 + '_NextDay_Return') df_features = df1[[col for col in df1.columns if '_Return' in col]] # Use returns as features print("df_features.shape", df_features.shape, len(df_features)) print("df_features.columns", df_features.columns) # Align features and target (drop last row with NaN target) df_features_all = df_features #df_features = df_features.iloc[:-1] #df_target = df_target.iloc[:-1] print("df_features_all.shape", df_features_all.shape) print("df_features.shape", df_features.shape) #print("df_target", df_target) df_target.iloc[-1] = 0 #print("df_target", df_target) #exit() def build_matrix(): scaled_features = df_features.values scaled_target = df_target.values print("scaled_features.shape", scaled_features.shape) print("scaled_target.shape", scaled_target.shape) # Split into sequences (X) and targets (y) X, y = [], [] for i in range(len(scaled_features) - SEQ_LENGTH): X.append(scaled_features[i:i + SEQ_LENGTH]) y.append(scaled_target[i + SEQ_LENGTH]) # Predict BTC's next-day return X, y = np.array(X), np.array(y) print("X.shape", X.shape) print("y.shape", y.shape) X_flat = X.reshape(X.shape[0], -1) print("X_flat.shape", X_flat.shape) # Train-test split (last 100 samples for testing) split = len(X_flat) - m_test_size X_train, X_test = X_flat[:split], X_flat[split:] y_train, y_test = y[:split], y[split:] # Flatten y to 1D arrays if needed for SVR y_train = y_train.flatten() y_test = y_test.flatten() return X_train, X_test, y_train, y_test def build_DNN(hidden_units=64, epochs=10, act1='relu', batch_size=32, dropout_rate=0.2, model_name="m1"): # 2. Build the DNN Model model = keras.Sequential() model.add(Input(shape=(X_train.shape[1],))) # <-- recommended approach model.add(layers.Dense(hidden_units, activation=act1)) model.add(layers.Dropout(dropout_rate)) model.add(layers.Dense(hidden_units // 2, activation=act1)) model.add(layers.Dropout(dropout_rate)) #model.add(layers.Dense(hidden_units, activation='relu', input_shape=(X_train.shape[1],))) #model.add(layers.Dense(hidden_units//2, activation='relu')) model.add(layers.Dense(1)) # Output layer: a single neuron # 3. Compile the DNN Model model.compile(optimizer='adam', loss='mean_absolute_error') # 4. Train the DNN Model history = model.fit( X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=0, validation_split=0.1 ) model.save(model_name) print("Model saved to disk as " + model_name) return model m_pred_count = 0 import random def run_DNN(hidden_units=64, epochs=10, act1='relu', batch_size=32, dropout_rate=0.2): global m_cutoff, m_pred_count model_name = "dnn" + str (m_select) + "_h" + str(hidden_units) + "-e" + str(epochs) + "-" + act1 + "-S" + str(SEQ_LENGTH) + ".h5" path = Path(model_name) m_pred_count = m_pred_count + 1 random_number = random.random() print("random_number", random_number) if path.exists() and random_number > 0.006: model = keras.models.load_model(model_name) else: print("Model does not exist. m_pred_count", m_pred_count) model = build_DNN(hidden_units, epochs, act1, batch_size, dropout_rate, model_name) total_params = model.count_params() print(f"Total parameters: {total_params}") # 5. Evaluate Predictions y_pred_actual = model.predict(X_test).flatten() y_test_actual = y_test.flatten() mae = mean_absolute_error(y_test_actual, y_pred_actual) / mean_absolute_error(y_test_actual, 0 * y_pred_actual) m_cutoff = (m_cutoff + mae) / 2.0 with open(output_file, mode='a') as out_file: out_file.write(f"{mae:.4f}" + f", {y_pred_actual[-1]:.3f}%" + "," + str(df1.index[-1]) + ",DNN2-" + str(len(m_features)) + "," + str(len(df1)) + ",H" + str(hidden_units) + "-e" + str(epochs) + "-" + act1 + "-S" + str(SEQ_LENGTH) + "-d" + str(dropout_rate) + ", " + target1 + "," + datetime.datetime.now().strftime('%Y-%m-%d %H%M') + "," + f"{df1[f'{target1}_Price'].iloc[-1]:.2f}" + "," + pre_selected_str + ", " + selected_str + f", {total_params}" + f", {y_pred_actual[-2]:.3f}%" + '\n') for SEQ_LENGTH in range(2, 8, 2): X_train, X_test, y_train, y_test = build_matrix() for h1 in [100, 200, 300, 400, 500]: for a1 in ['sigmoid', 'elu', 'relu']: run_DNN(h1, 40, a1, 32, 0.1) run_DNN(h1, 80, a1, 32, 0.1) run_DNN(h1, 120, a1, 32, 0.1)