I tested DNN, SVR, and transformer models. The DNN models outperformed SVR and transformer models.
import sys, os
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.svm import LinearSVR
import datetime
import random, platform
from tensorflow import keras
from tensorflow.keras import layers, Input
from pathlib import Path
from config import m_test_size, pre_selected_str, m_cutoff
target1 = 'NQ=F'
m_dir = ""
m_select = 2
m_add = []
m_features = []
print("m_test", m_test_size)
def read_data():
global pre_selected_str
selected = set(pre_selected_str.split(" "))
pre_selected_str = " ".join(pre_selected_str.split(" ")[-5:])
print("selected", selected)
selected.add(target1)
selected = list(selected)
print("selected", selected)
df = pd.read_csv(m_dir + "data_1d_update.csv", index_col=0)
print("df.shape", df.shape)
print("df.columns", df.columns)
features = list(df.columns)
added = set(random.sample(features, m_select))
print("selected", selected)
return added, df[selected], selected
m_add, df1, m_features = read_data()
m_select = len(m_features)
selected_str = " "
print("df1.shape", df1.shape)
print("df1.columns", df1.columns)
print("m_dir", m_dir)
print("m_select", m_select, "m_cutoff", m_cutoff)
output_file = (m_dir + "Run_" + datetime.datetime.now().strftime('%Y-%m-%d') +
"_" + str(m_select) + ".csv")
print("output_file", output_file)
today_str = datetime.datetime.now().strftime('%Y-%m-%d')
print("today_str", today_str)
for symbol in m_features:
df1[f'{symbol}_Price'] = df1[symbol]
print("df1.shape", df1.shape)
print("df1.columns", df1.columns)
# -----------------------------------------------------------------
# 2. Compute Daily Returns (Price Changes)
# -----------------------------------------------------------------
# Calculate daily returns for each crypto's price and volume
for symbol in m_features:
# Price returns
df1[f'{symbol}_Price_Return'] = df1[f'{symbol}_Price'].pct_change(periods=1) * 100.0
df1[f'{symbol}_Price2d_Return'] = df1[f'{symbol}_Price'].pct_change(periods=2) * 100.0
if symbol != target1:
df1[f'{symbol}_Ratio_Return'] = df1[f'{symbol}_Price'] / df1[f'{target1}_Price']
#print(f'{symbol}_Ratio_Return', df1[f'{symbol}_Ratio_Return'])
print("df.index[-1]", df1.index[-1])
# Drop NaN rows from returns calculation
df1.dropna(inplace=True)
print("df1.shape", df1.shape, len(df1))
if len(df1) < 1000:
print("Error! df1.shape", df1.shape)
exit()
if True:
# Define target: BTC's next-day price return (shifted by -1)
df_target = df1[target1 + '_Price_Return'].shift(-1).to_frame(name=target1 + '_NextDay_Return')
df_features = df1[[col for col in df1.columns if '_Return' in col]] # Use returns as features
print("df_features.shape", df_features.shape, len(df_features))
print("df_features.columns", df_features.columns)
# Align features and target (drop last row with NaN target)
df_features_all = df_features
#df_features = df_features.iloc[:-1]
#df_target = df_target.iloc[:-1]
print("df_features_all.shape", df_features_all.shape)
print("df_features.shape", df_features.shape)
#print("df_target", df_target)
df_target.iloc[-1] = 0
#print("df_target", df_target)
#exit()
def build_matrix():
scaled_features = df_features.values
scaled_target = df_target.values
print("scaled_features.shape", scaled_features.shape)
print("scaled_target.shape", scaled_target.shape)
# Split into sequences (X) and targets (y)
X, y = [], []
for i in range(len(scaled_features) - SEQ_LENGTH):
X.append(scaled_features[i:i + SEQ_LENGTH])
y.append(scaled_target[i + SEQ_LENGTH]) # Predict BTC's next-day return
X, y = np.array(X), np.array(y)
print("X.shape", X.shape)
print("y.shape", y.shape)
X_flat = X.reshape(X.shape[0], -1)
print("X_flat.shape", X_flat.shape)
# Train-test split (last 100 samples for testing)
split = len(X_flat) - m_test_size
X_train, X_test = X_flat[:split], X_flat[split:]
y_train, y_test = y[:split], y[split:]
# Flatten y to 1D arrays if needed for SVR
y_train = y_train.flatten()
y_test = y_test.flatten()
return X_train, X_test, y_train, y_test
def build_DNN(hidden_units=64, epochs=10, act1='relu', batch_size=32, dropout_rate=0.2, model_name="m1"):
# 2. Build the DNN Model
model = keras.Sequential()
model.add(Input(shape=(X_train.shape[1],))) # <-- recommended approach
model.add(layers.Dense(hidden_units, activation=act1))
model.add(layers.Dropout(dropout_rate))
model.add(layers.Dense(hidden_units // 2, activation=act1))
model.add(layers.Dropout(dropout_rate))
#model.add(layers.Dense(hidden_units, activation='relu', input_shape=(X_train.shape[1],)))
#model.add(layers.Dense(hidden_units//2, activation='relu'))
model.add(layers.Dense(1)) # Output layer: a single neuron
# 3. Compile the DNN Model
model.compile(optimizer='adam', loss='mean_absolute_error')
# 4. Train the DNN Model
history = model.fit(
X_train, y_train,
epochs=epochs,
batch_size=batch_size,
verbose=0,
validation_split=0.1
)
model.save(model_name)
print("Model saved to disk as " + model_name)
return model
m_pred_count = 0
import random
def run_DNN(hidden_units=64, epochs=10, act1='relu', batch_size=32, dropout_rate=0.2):
global m_cutoff, m_pred_count
model_name = "dnn" + str (m_select) + "_h" + str(hidden_units) + "-e" + str(epochs) + "-" + act1 + "-S" + str(SEQ_LENGTH) + ".h5"
path = Path(model_name)
m_pred_count = m_pred_count + 1
random_number = random.random()
print("random_number", random_number)
if path.exists() and random_number > 0.006:
model = keras.models.load_model(model_name)
else:
print("Model does not exist. m_pred_count", m_pred_count)
model = build_DNN(hidden_units, epochs, act1, batch_size, dropout_rate, model_name)
total_params = model.count_params()
print(f"Total parameters: {total_params}")
# 5. Evaluate Predictions
y_pred_actual = model.predict(X_test).flatten()
y_test_actual = y_test.flatten()
mae = mean_absolute_error(y_test_actual, y_pred_actual) / mean_absolute_error(y_test_actual, 0 * y_pred_actual)
m_cutoff = (m_cutoff + mae) / 2.0
with open(output_file, mode='a') as out_file:
out_file.write(f"{mae:.4f}" +
f", {y_pred_actual[-1]:.3f}%" +
"," + str(df1.index[-1]) +
",DNN2-" + str(len(m_features)) +
"," + str(len(df1)) +
",H" + str(hidden_units) + "-e" + str(epochs) + "-" + act1 +
"-S" + str(SEQ_LENGTH) + "-d" + str(dropout_rate) + ", " + target1 +
"," + datetime.datetime.now().strftime('%Y-%m-%d %H%M') +
"," + f"{df1[f'{target1}_Price'].iloc[-1]:.2f}" +
"," + pre_selected_str + ", " + selected_str +
f", {total_params}" + f", {y_pred_actual[-2]:.3f}%" + '\n')
for SEQ_LENGTH in range(2, 8, 2):
X_train, X_test, y_train, y_test = build_matrix()
for h1 in [100, 200, 300, 400, 500]:
for a1 in ['sigmoid', 'elu', 'relu']:
run_DNN(h1, 40, a1, 32, 0.1)
run_DNN(h1, 80, a1, 32, 0.1)
run_DNN(h1, 120, a1, 32, 0.1)
