# --- NECESSARY IMPORTS ---
import pandas as pd
import numpy as np
from scipy import stats as sp_stats
import re
from datetime import datetime, timedelta
import warnings
# Data Acquisition & Feature Engineering
import yfinance as yf
import pandas_ta as ta
# Scikit-learn (Preprocessing, Models, Metrics)
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, MinMaxScaler # MinMaxScaler
for LSTM often preferred
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, GridSearchCV, TimeSeriesSplit
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error,
mean_absolute_percentage_error
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor
from sklearn.feature_selection import SelectKBest, f_regression, RFE
# Statsmodels (Time Series Models)
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.ets import ETSModel
import pmdarima as pm # For auto_arima
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import seasonal_decompose
# TensorFlow/Keras (LSTM Model)
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l1_l2
# Plotting
import matplotlib.pyplot as plt
import seaborn as sns
# %matplotlib inline # Uncomment if running in Jupyter for inline plots
warnings.filterwarnings('ignore')
tf.get_logger().setLevel('ERROR') # Suppress TensorFlow INFO messages
# --- CONFIGURATION DICTIONARY ---
CONFIG = {
# --- Run Mode & General ---
"RUN_MODE": 'fetch_and_predict_stock_ultimate',
# Options: 'simulate_and_regress', 'load_and_regress', 'load_and_forecast_arima_ets',
# 'fetch_and_predict_stock_lite', # (Price + TA -> Reg, LSTM)
# 'load_and_predict_stock_lite',
# 'fetch_and_predict_stock_ultimate', # (Price + TA + Fundamentals -> Reg, LSTM)
# 'load_and_predict_stock_ultimate',
"FILE_PATH": 'your_comprehensive_stock_data.csv', # For 'load_*' modes
"VERBOSE": True, # Print detailed step-by-step information
# --- Data Acquisition (Stock Price & yfinance Fundamentals) ---
"STOCK_TICKERS": ['AAPL'], # Single ticker string or list of tickers
"STOCK_START_DATE": '2017-01-01', # Longer period for fundamentals
"STOCK_END_DATE": datetime.now().strftime('%Y-%m-%d'), # Today
"STOCK_INTERVAL": '1d', # '1d', '1wk', '1mo', '1h', '30m', etc.
"PRICE_COLUMN_FOR_TARGET": 'Close', # Typically 'Close' or 'Adj Close'
"FETCH_YFINANCE_FUNDAMENTALS": True, # For ultimate modes, fetch from yfinance
"YFINANCE_FUNDAMENTALS_FREQUENCY": 'quarterly', # 'quarterly' or 'annual'
# --- Date & Frequency Handling ---
"DATE_COLUMN_NAME_HINTS": ['date', 'timestamp', 'time', 'period'], # For auto-detection if not
index
"DEFAULT_DATA_FREQUENCY": None, # E.g., 'B' (business daily). If None, script will try to
infer/handle.
"RESAMPLE_TO_FREQUENCY": None, # E.g., 'B'. If None, uses inferred/original or default.
# --- Feature Engineering ---
"HLOCV_MAP": {'Open':'Open', 'High':'High', 'Low':'Low', 'Close':'Close', 'Volume':'Volume'}, # For
loaded CSVs
"CALCULATE_TECHNICAL_INDICATORS": True,
"CUSTOM_TECHNICAL_INDICATORS": None, # List of dicts for pandas_ta.Strategy, or None for
defaults
"FUNDAMENTALS_MAP": { # For mapping columns in loaded CSV to standard fundamental items
'NetIncome': 'Net Income', 'TotalAssets': 'Total Assets', 'TotalLiabilities': 'Total Liabilities',
'TotalEquity': 'Total Equity', 'Revenue': 'Total Revenue',
'SharesOutstanding': 'Basic Weighted Average Shares Outstanding', # Example name
'EBITDA': 'EBITDA', 'OperatingCashFlow': 'Cash Flow From Operating Activities'
},
"EXPLICIT_SHARES_OUTSTANDING_COL_NAME": None, # If your CSV has a specific name for this
"CALCULATE_FINANCIAL_RATIOS": True,
"NEWS_SENTIMENT_COLUMN_NAME": None, # If you provide a CSV with a pre-calculated
sentiment column
# --- Data Cleaning & Preprocessing ---
"MISSING_NUMERIC_STRATEGY": 'median', # 'mean', 'median', 'knn'
"MISSING_CATEGORICAL_STRATEGY": 'most_frequent',
"OUTLIER_HANDLING_METHOD": 'iqr', # 'iqr', 'zscore', None (IQR threshold is 1.5)
"DROP_COLS_THRESHOLD_MISSING": 0.8, # Drop cols if >80% missing
"DROP_ROWS_THRESHOLD_MISSING_TARGET": True, # Drop rows if target is NaN
# --- Regression Models (Scikit-learn) ---
"REGRESSION_MODELS_TO_RUN": ['LinearRegression', 'RandomForestRegressor',
'GradientBoostingRegressor'],
"REGRESSION_TARGET_SHIFT_PERIODS": -1, # Shift target N periods to predict future (e.g., -1 for
next day)
"PERFORM_HYPERPARAMETER_TUNING_REG": False, # True can be very slow
"FEATURE_SELECTION_REG_METHOD": None, # 'SelectKBest', 'RFE', None
"NUM_FEATURES_TO_SELECT_REG": 20,
# --- ARIMA/ETS Time Series Models ---
"ARIMA_ETS_TARGET_COLUMN": "Close", # For univariate forecasting modes
"ARIMA_ETS_SEASONAL_PERIOD": 0, # 0 or 1 for non-seasonal, >1 for seasonal (e.g. 252 for daily
data, yearly season)
# --- LSTM Model (TensorFlow/Keras) ---
"LSTM_TARGET_COLUMN": "Close", # Column LSTM aims to predict
"LSTM_FEATURE_COLUMNS": ['SMA_20', 'RSI_14', 'MACD_12_26_9', 'BBANDS_20_2.0_BBM',
'PE_calculated', 'ROE_calculated'], # Example features for LSTM
"LSTM_N_STEPS_LOOKBACK": 60, # Sequence length (number of past time steps)
"LSTM_EPOCHS": 5, # Keep low for testing; increase to 50-200 for serious training
"LSTM_BATCH_SIZE": 32,
"LSTM_VALIDATION_SPLIT": 0.1, # Proportion of training data for validation during training
"LSTM_SCALER": "MinMaxScaler", # "MinMaxScaler" or "StandardScaler"
"LSTM_ADD_DROPOUT": True,
"LSTM_DROPOUT_RATE": 0.2,
"LSTM_REGULARIZATION": None, # e.g. {'l1':0.01, 'l2':0.01}, or None
# --- General ML Parameters ---
"TEST_SPLIT_RATIO": 0.2,
"RANDOM_STATE": 42,
# --- Simulation (for 'simulate_and_regress' mode) ---
"SIM_SAMPLES": 300, "SIM_NUM_FEATURES": 5, "SIM_CAT_FEATURES": 2
# --- HELPER FUNCTIONS (Printing, etc.) ---
def print_section_header(title, level=1):
if CONFIG["VERBOSE"]:
hashes = "=" * (80 - level * 4)
print(f"\n{hashes}\n{' '*(level-1)}{title.upper()}\n{hashes}\n")
def print_subsection_header(title):
if CONFIG["VERBOSE"]: print_section_header(title, level=2)
def print_df_info(df, name="DataFrame"):
if CONFIG["VERBOSE"] and df is not None:
print_subsection_header(f"{name} Info")
print(f"Shape: {df.shape}")
print(f"Index type: {type(df.index)}")
if isinstance(df.index, pd.DatetimeIndex):
print(f"Index Freq: {df.index.freqstr if df.index.freq else pd.infer_freq(df.index)}")
print(f"Columns: {df.columns.tolist()}")
print("Head:")
print(df.head(3))
print("Tail:")
print(df.tail(3))
missing_vals = df.isnull().sum()
print(f"Missing values summary (top 5):\n{missing_vals[missing_vals >
0].sort_values(ascending=False).head()}")
# --- 1. DATA ACQUISITION MODULE ---
def find_and_set_date_column(df, date_col_hints):
print_subsection_header("Date Column Identification & Setting Index")
if isinstance(df.index, pd.DatetimeIndex):
print("DataFrame already has a DatetimeIndex.")
return df
potential_date_cols = []
# Prioritize hints
if date_col_hints:
for hint in date_col_hints:
for col in df.columns:
if hint.lower() in col.lower():
potential_date_cols.append(col)
# Add all object/string columns as candidates
potential_date_cols.extend(df.select_dtypes(include=['object', 'string']).columns.tolist())
potential_date_cols = list(dict.fromkeys(potential_date_cols)) # Unique, preserve order
for col_name in potential_date_cols:
try:
# Attempt conversion with robust parsing
converted_col = pd.to_datetime(df[col_name], errors='coerce', infer_datetime_format=True)
# Check if a significant portion converted successfully and dates are reasonable
if converted_col.notnull().sum() / len(df) > 0.8: # More than 80% valid dates
# Check if dates are not all in the distant past/future (heuristic)
min_date, max_date = converted_col.min(), converted_col.max()
if pd.NaT not in [min_date, max_date] and min_date.year > 1950 and max_date.year <
datetime.now().year + 5:
print(f"Identified '{col_name}' as the primary date column. Setting as index.")
df[col_name] = converted_col
df = df.set_index(col_name).sort_index()
return df
except Exception:
continue # Try next column
print("Warning: Could not automatically identify a reliable date column or set DatetimeIndex.")
return df
def detect_and_set_frequency(df, default_freq=None, resample_to_freq=None):
print_subsection_header("Frequency Detection & Resampling")
if not isinstance(df.index, pd.DatetimeIndex):
print("Index is not DatetimeIndex. Cannot infer frequency.")
return df
inferred_freq = pd.infer_freq(df.index)
print(f"Inferred frequency: {inferred_freq}")
target_freq = resample_to_freq or inferred_freq or default_freq
if target_freq:
print(f"Attempting to set/resample to frequency: {target_freq}")
try:
# If index is already somewhat regular, asfreq might work.
# For stock data, 'B' (business day) is often a good target if daily.
if df.index.has_duplicates:
print("Warning: Duplicate dates found in index. Aggregating using mean.")
# Group by index and take mean for numeric, first for object (simplistic aggregation)
numeric_cols = df.select_dtypes(include=np.number).columns
object_cols = df.select_dtypes(include='object').columns
agg_dict = {col: 'mean' for col in numeric_cols}
agg_dict.update({col: 'first' for col in object_cols})
df = df.groupby(df.index).agg(agg_dict)
df = df.asfreq(target_freq) # Fills missing dates with NaNs
print(f"Successfully set frequency to {target_freq}. New shape: {df.shape}")
except ValueError as e: # Typically if index is not monotonic or has duplicates
print(f"Could not directly use asfreq due to: {e}. Attempting resample if target_freq
specified.")
if resample_to_freq: # Only resample if user explicitly wants it
try:
# Simplistic resample (mean for numeric, first for object). User might need specific logic.
df_numeric = df.select_dtypes(include=np.number).resample(resample_to_freq).mean()
df_object = df.select_dtypes(exclude=np.number).resample(resample_to_freq).first()
df = pd.concat([df_numeric, df_object], axis=1)
print(f"Resampled to {resample_to_freq}. New shape: {df.shape}")
except Exception as res_e:
print(f"Resampling failed: {res_e}")
else:
print("No target frequency specified or inferable. Using original index.")
# Forward fill after freq setting to handle NaNs from new dates or resampling gaps (common for
stock data)
# Group by ticker if present for ffill
group_col = 'Ticker' if 'Ticker' in df.columns else None
if group_col:
df = df.groupby(group_col, group_keys=False).ffill()
else:
df = df.ffill()
print("Forward-filled NaNs after frequency adjustment.")
return df
def fetch_stock_price_data_yf(tickers, start_date, end_date, interval):
print_subsection_header(f"Fetching Stock Prices via yfinance for {tickers}")
try:
if isinstance(tickers, str): tickers = [tickers]
data = yf.download(tickers, start=start_date, end=end_date, interval=interval,
progress=CONFIG["VERBOSE"])
if data.empty: print(f"No price data fetched for {tickers}."); return None
if len(tickers) == 1 and not isinstance(data.columns, pd.MultiIndex):
data.columns = [col.capitalize() for col in data.columns]
data['Ticker'] = tickers[0]
elif isinstance(data.columns, pd.MultiIndex): # Multi-ticker download
data = data.stack(level=1).rename_axis(['Date', 'Ticker']).reset_index(level=1)
# Capitalize HLOCV type columns, leave Ticker as is
data.columns = [col.capitalize() if col.lower() in ['open','high','low','close','adj close','volume']
else col for col in data.columns]
print_df_info(data, "Fetched Stock Price Data")
return data
except Exception as e: print(f"Error fetching stock price data: {e}"); return None
def fetch_yfinance_fundamentals_single_ticker(ticker_symbol, freq='quarterly'):
"""Fetches financials, balance sheet, cash flow for a single ticker."""
print_subsection_header(f"Fetching yfinance Fundamentals for {ticker_symbol} ({freq})")
ticker_obj = yf.Ticker(ticker_symbol)
data_frames = {}
try: data_frames['financials'] = ticker_obj.quarterly_financials if freq == 'quarterly' else
ticker_obj.financials
except Exception as e: print(f"Could not fetch financials for {ticker_symbol}: {e}")
try: data_frames['balance_sheet'] = ticker_obj.quarterly_balance_sheet if freq == 'quarterly' else
ticker_obj.balance_sheet
except Exception as e: print(f"Could not fetch balance sheet for {ticker_symbol}: {e}")
try: data_frames['cash_flow'] = ticker_obj.quarterly_cashflow if freq == 'quarterly' else
ticker_obj.cashflow
except Exception as e: print(f"Could not fetch cash flow for {ticker_symbol}: {e}")
all_fundamentals_df = None
for key, df_fund in data_frames.items():
if df_fund is not None and not df_fund.empty:
df_fund_T = df_fund.T # Dates are columns, transpose
df_fund_T.index = pd.to_datetime(df_fund_T.index) # Convert index (was date strings) to
DatetimeIndex
df_fund_T.columns = [f"{key.replace('_',' ').title()}_{col.replace(' ','')}" for col in
df_fund_T.columns] # Prefix columns
if all_fundamentals_df is None:
all_fundamentals_df = df_fund_T
else: # Merge, some columns might overlap (e.g. 'Net Income' in financials and derived)
all_fundamentals_df = pd.merge(all_fundamentals_df, df_fund_T, left_index=True,
right_index=True, how='outer', suffixes=('', f'_{key}_dup'))
if all_fundamentals_df is not None:
all_fundamentals_df['Ticker'] = ticker_symbol
return all_fundamentals_df
def merge_price_and_fundamentals(price_df, fundamentals_df_list):
print_subsection_header("Merging Price and Fundamental Data")
if not fundamentals_df_list or all(df is None for df in fundamentals_df_list):
print("No fundamental data to merge."); return price_df
all_ticker_fundamentals = pd.concat([df for df in fundamentals_df_list if df is not None])
if all_ticker_fundamentals.empty: print("Concatenated fundamentals empty."); return price_df
print_df_info(all_ticker_fundamentals, "Combined Raw Fundamentals (All Tickers)")
# Ensure price_df has 'Ticker' column if fundamentals are per ticker
if 'Ticker' in all_ticker_fundamentals.columns and 'Ticker' not in price_df.columns and
len(price_df['Ticker'].unique()) == 1 : # Single ticker price data
price_df['Ticker'] = price_df['Ticker'].unique()[0] # Should already be there if fetched for single
ticker
if 'Ticker' in all_ticker_fundamentals.columns and 'Ticker' in price_df.columns:
print("Merging fundamentals per ticker using merge_asof.")
# Sort both dataframes by Ticker and Date index for merge_asof
price_df = price_df.sort_index()
all_ticker_fundamentals = all_ticker_fundamentals.sort_index()
merged_df = pd.merge_asof(
left=price_df.reset_index().sort_values('Date'), # merge_asof needs sorted regular column
right=all_ticker_fundamentals.reset_index().sort_values('Date'),
on='Date',
by='Ticker', # Crucial for multi-ticker
direction='backward', # Use last known fundamental value
suffixes=('_price', '_fund')
merged_df = merged_df.set_index('Date').sort_index() # Restore DatetimeIndex
elif 'Ticker' not in all_ticker_fundamentals.columns and 'Ticker' not in price_df.columns: # Single
ticker, no 'Ticker' col
print("Merging fundamentals (single ticker, no Ticker column) using merge_asof.")
merged_df = pd.merge_asof(
left=price_df.sort_index(),
right=all_ticker_fundamentals.sort_index(),
left_index=True,
right_index=True,
direction='backward',
suffixes=('_price', '_fund')
else:
print("Warning: Ticker column mismatch between price and fundamentals. Cannot merge
effectively for multi-ticker. Returning price data.")
return price_df
print_df_info(merged_df, "Merged Price and Fundamentals")
return merged_df
def load_data_from_file(file_path): # Wrapper for existing load_data
print_subsection_header(f"Loading Data from File: {file_path}")
df = None
try:
if file_path.endswith('.csv'): df = pd.read_csv(file_path)
elif file_path.endswith(('.xls', '.xlsx')): df = pd.read_excel(file_path)
else: raise ValueError("Unsupported file format.")
print(f"Successfully loaded. Shape: {df.shape}")
except Exception as e: print(f"Error loading file: {e}"); return None
# Attempt to find and set date column right after loading
df = find_and_set_date_column(df, CONFIG["DATE_COLUMN_NAME_HINTS"])
print_df_info(df, "Loaded File Data")
return df
# --- 2. FEATURE ENGINEERING MODULE ---
def add_technical_indicators_robust(df, hlocv_map, custom_ta_list=None):
print_subsection_header("Adding Technical Indicators")
if not CONFIG["CALCULATE_TECHNICAL_INDICATORS"]: print("Skipping TA calculation by config.");
return df
df_ta = df.copy()
# Standard HLOCV names expected by pandas_ta (lowercase)
std_hlocv_pandas_ta = {'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close', 'volume':
'Volume'}
# Map user-defined HLOCV names (from CONFIG["HLOCV_MAP"]) to pandas_ta standard
lowercase names
current_to_pdtaname_map = {}
pdtaname_to_current_map = {} # For renaming back if needed
all_required_present = True
for pta_std_name, user_config_std_name in std_hlocv_pandas_ta.items():
actual_col_name_in_df = hlocv_map.get(user_config_std_name) # Get the name from user's
CSV (e.g. 'MyOpen')
if actual_col_name_in_df and actual_col_name_in_df in df_ta.columns:
current_to_pdtaname_map[actual_col_name_in_df] = pta_std_name
pdtaname_to_current_map[pta_std_name] = actual_col_name_in_df
else:
print(f"Warning: Essential column for TA '{user_config_std_name}' (mapped to
'{actual_col_name_in_df}') not found in DataFrame. Skipping TA.")
all_required_present = False; break
if not all_required_present: return df_ta # Return original if essential HLOCV missing
df_ta.rename(columns=current_to_pdtaname_map, inplace=True) # Rename to lowercase for
pandas_ta
default_ta_strategy = ta.Strategy(
name="Default TAs", description="SMA, EMA, RSI, MACD, BBANDS, VWAP, ATR",
ta=[ {"kind": "sma", "length": l} for l in [10, 20, 50] ] + \
[ {"kind": "ema", "length": l} for l in [10, 20, 50] ] + \
[ {"kind": "rsi", "length": 14} ] + \
[ {"kind": "macd", "fast": 12, "slow": 26, "signal": 9} ] + \
[ {"kind": "bbands", "length": 20, "std": 2} ] + \
[ {"kind": "vwap"} ] + \
[ {"kind": "atr", "length": 14} ]
)
ta_strategy_to_run = ta.Strategy(name="Custom TAs", ta=custom_ta_list) if custom_ta_list else
default_ta_strategy
try:
group_col = 'Ticker' if 'Ticker' in df_ta.columns else None # pandas_ta uses original column
names if not renamed
if group_col and df_ta[group_col].nunique() > 1:
print(f"Calculating TAs per ticker (grouping by '{group_col}').")
# Ensure data is sorted by Date within each Ticker group for TA calculation
df_ta_sorted = df_ta.sort_values(by=[group_col, df_ta.index.name if df_ta.index.name else
'Date'])
df_ta_sorted.groupby(group_col, group_keys=False).apply(lambda x:
x.ta.strategy(ta_strategy_to_run, append=True))
df_ta = df_ta_sorted # Keep the result with TAs
else:
df_ta.ta.strategy(ta_strategy_to_run, append=True)
print("Successfully added technical indicators.")
except Exception as e:
print(f"Error adding technical indicators: {e}. Ensure HLOCV columns are numeric after
mapping.")
df_ta.rename(columns=pdtaname_to_current_map, inplace=True) # Rename back to original
user-mapped HLOCV names
print_df_info(df_ta, "Data with Technical Indicators")
return df_ta
def get_mapped_col_name(df_columns, standard_key, user_map,
common_keywords_map_for_key):
"""Finds column in df matching standard_key via user_map or keywords."""
# 1. Check direct user map
if user_map and standard_key in user_map and user_map[standard_key] in df_columns:
return user_map[standard_key]
# 2. Check common keywords for this standard_key
for keyword in common_keywords_map_for_key.get(standard_key, []):
for col in df_columns: # Iterate case-insensitively
if keyword.lower() in col.lower():
return col
return None # Not found
def calculate_financial_ratios_robust(df, price_col_name, fundamentals_map, explicit_shares_col):
print_subsection_header("Calculating Financial Ratios")
if not CONFIG["CALCULATE_FINANCIAL_RATIOS"]: print("Skipping financial ratio calculation by
config."); return df
df_ratios = df.copy()
df_cols = df_ratios.columns.tolist()
# Keywords for auto-detection if mapping is incomplete for yfinance fetched data
# yfinance fundamental columns are often prefixed like "Financials_NetIncome",
"BalanceSheet_TotalAssets"
# These keywords will try to match parts of such names.
yf_fund_keywords = {
'NetIncome': ['NetIncome', 'Net Income', 'NetEarnings'], 'TotalAssets': ['TotalAssets', 'Total
Assets'],
'TotalLiabilities': ['TotalLiabilities','Total Liabilities'], 'TotalEquity':
['TotalEquity','StockholdersEquity','Total Stockholder Equity'],
'Revenue': ['TotalRevenue', 'Revenue', 'NetSales', 'Sales'], 'EBITDA': ['EBITDA'],
'OperatingCashFlow': ['OperatingCashFlow','CashFlowFromOperatingActivities',
'CashFromOperations'],
'SharesOutstanding': ['SharesOutstanding','DilutedAverageShares','BasicAverageShares',
'WeightedAverageShares']
# Helper to get column names using map first, then keywords
def find_col(std_key):
return get_mapped_col_name(df_cols, std_key, fundamentals_map, yf_fund_keywords)
ni_col = find_col('NetIncome')
assets_col = find_col('TotalAssets')
liabilities_col = find_col('TotalLiabilities')
equity_col = find_col('TotalEquity')
revenue_col = find_col('Revenue')
ebitda_col = find_col('EBITDA')
shares_col = explicit_shares_col if explicit_shares_col and explicit_shares_col in df_cols else
find_col('SharesOutstanding')
calculated_ratios_info = []
# Ensure price_col_name is valid
if price_col_name not in df_cols or df_ratios[price_col_name].isnull().all():
print(f"Warning: Price column '{price_col_name}' for ratios is missing or all NaN. Most ratios
cannot be calculated.")
else:
# P/E Ratio
if ni_col and shares_col and df_ratios[ni_col].notnull().any() and
df_ratios[shares_col].notnull().any():
# Ensure shares are positive before division
df_ratios['EPS_calculated'] = np.where(df_ratios[shares_col] > 0, df_ratios[ni_col] /
df_ratios[shares_col], np.nan)
df_ratios['PE_calculated'] = np.where(df_ratios['EPS_calculated'] != 0,
df_ratios[price_col_name] / df_ratios['EPS_calculated'], np.nan)
calculated_ratios_info.append("P/E")
else: print("Skipping P/E: Missing NetIncome or SharesOutstanding, or they are all NaN.")
# P/B Ratio
if equity_col and shares_col and df_ratios[equity_col].notnull().any() and
df_ratios[shares_col].notnull().any():
df_ratios['BVPS_calculated'] = np.where(df_ratios[shares_col] > 0, df_ratios[equity_col] /
df_ratios[shares_col], np.nan)
df_ratios['PB_calculated'] = np.where(df_ratios['BVPS_calculated'] != 0,
df_ratios[price_col_name] / df_ratios['BVPS_calculated'], np.nan)
calculated_ratios_info.append("P/B")
else: print("Skipping P/B: Missing TotalEquity or SharesOutstanding, or they are all NaN.")
# P/S Ratio
if revenue_col and shares_col and df_ratios[revenue_col].notnull().any() and
df_ratios[shares_col].notnull().any():
df_ratios['SPS_calculated'] = np.where(df_ratios[shares_col] > 0, df_ratios[revenue_col] /
df_ratios[shares_col], np.nan)
df_ratios['PS_calculated'] = np.where(df_ratios['SPS_calculated'] != 0,
df_ratios[price_col_name] / df_ratios['SPS_calculated'], np.nan)
calculated_ratios_info.append("P/S")
else: print("Skipping P/S: Missing Revenue or SharesOutstanding, or they are all NaN.")
# ROE
if ni_col and equity_col and df_ratios[ni_col].notnull().any() and
df_ratios[equity_col].notnull().any():
df_ratios['ROE_calculated'] = np.where(df_ratios[equity_col] != 0, df_ratios[ni_col] /
df_ratios[equity_col], np.nan)
calculated_ratios_info.append("ROE")
else: print("Skipping ROE: Missing NetIncome or TotalEquity, or they are all NaN.")
# Debt-to-Equity
if liabilities_col and equity_col and df_ratios[liabilities_col].notnull().any() and
df_ratios[equity_col].notnull().any():
df_ratios['DebtToEquity_calculated'] = np.where(df_ratios[equity_col] != 0,
df_ratios[liabilities_col] / df_ratios[equity_col], np.nan)
calculated_ratios_info.append("Debt/Equity")
else: print("Skipping Debt/Equity: Missing TotalLiabilities or TotalEquity, or they are all NaN.")
# Net Profit Margin
if ni_col and revenue_col and df_ratios[ni_col].notnull().any() and
df_ratios[revenue_col].notnull().any():
df_ratios['NetProfitMargin_calculated'] = np.where(df_ratios[revenue_col] != 0,
df_ratios[ni_col] / df_ratios[revenue_col], np.nan)
calculated_ratios_info.append("Net Profit Margin")
else: print("Skipping Net Profit Margin: Missing NetIncome or Revenue, or they are all NaN.")
# EV/EBITDA (Enterprise Value is complex, simplified proxy here or skip if too complex for auto)
# For simplicity, we'll skip full EV calculation here as it needs Market Cap, Total Debt, Cash &
Equivalents.
# If EBITDA is available, could do Price/EBITDA per share as a proxy if needed.
if ebitda_col and price_col_name in df_cols and shares_col and
df_ratios[ebitda_col].notnull().any() and df_ratios[shares_col].notnull().any():
df_ratios['EBITDA_per_Share_calculated'] = np.where(df_ratios[shares_col] > 0,
df_ratios[ebitda_col] / df_ratios[shares_col], np.nan)
df_ratios['Price_to_EBITDA_per_Share_calculated'] =
np.where(df_ratios['EBITDA_per_Share_calculated'] !=0, df_ratios[price_col_name] /
df_ratios['EBITDA_per_Share_calculated'], np.nan)
calculated_ratios_info.append("Price/EBITDA_per_Share")
else: print("Skipping Price/EBITDA per Share: Missing EBITDA, Price, or Shares, or they are all
NaN.")
# Forward-fill calculated ratios and raw fundamentals used (they are reported less frequently)
raw_fundamental_cols_used = [c for c in [ni_col, assets_col, liabilities_col, equity_col,
revenue_col, ebitda_col, shares_col] if c is not None]
newly_calculated_ratio_cols = [col for col in df_ratios.columns if col.endswith('_calculated') and
col not in df.columns]
cols_to_ffill_bfill = list(set(raw_fundamental_cols_used + newly_calculated_ratio_cols))
group_col = 'Ticker' if 'Ticker' in df_ratios.columns else None
if cols_to_ffill_bfill:
print(f"Forward/Backward filling fundamental source data and calculated ratios:
{newly_calculated_ratio_cols}")
if group_col:
df_ratios[cols_to_ffill_bfill] = df_ratios.groupby(group_col,
group_keys=False)[cols_to_ffill_bfill].ffill().bfill()
else:
df_ratios[cols_to_ffill_bfill] = df_ratios[cols_to_ffill_bfill].ffill().bfill()
if calculated_ratios_info: print(f"Successfully calculated ratios: {', '.join(calculated_ratios_info)}")
else: print("No financial ratios were calculated due to missing underlying data.")
print_df_info(df_ratios, "Data with Financial Ratios")
return df_ratios
def add_news_sentiment_placeholder(df, news_sentiment_col_name=None):
print_subsection_header("News Sentiment Integration (Conceptual)")
if news_sentiment_col_name and news_sentiment_col_name in df.columns:
print(f"Using provided news sentiment column: '{news_sentiment_col_name}'")
# Ensure it's numeric
df[news_sentiment_col_name] = pd.to_numeric(df[news_sentiment_col_name],
errors='coerce')
else:
print("Conceptual step: News sentiment analysis would be performed here.")
print("This would involve fetching news (API), NLP processing (e.g., VADER, TextBlob, FinBERT),")
print("and aligning sentiment scores with dates. For now, no action taken if column not
provided.")
# df['News_Sentiment_Placeholder'] = np.random.rand(len(df)) * 2 - 1 # Example if you want to
simulate
return df
# --- 3. DATA CLEANING & PREPROCESSING MODULE ---
def clean_and_preprocess_data(df, target_variable_name, config):
print_section_header("Data Cleaning & Preprocessing")
df_c = df.copy()
# 1. Handle Duplicates (Index and Rows)
if df_c.index.has_duplicates:
print(f"Warning: Duplicate dates found in index. Pre-aggregation count: {len(df_c)}")
# Keep first for simplicity. For financial data, averaging numerics might be better if appropriate.
df_c = df_c[~df_c.index.duplicated(keep='first')]
print(f"Removed duplicate index entries. Post-aggregation count: {len(df_c)}")
df_c.drop_duplicates(inplace=True) # Drop duplicate rows
# 2. Drop columns with too many missing values
df_c.dropna(axis=1, thresh=int(config["DROP_COLS_THRESHOLD_MISSING"] * len(df_c)),
inplace=True)
print(f"Shape after dropping sparse columns: {df_c.shape}")
# 3. Impute Missing Values (Iterative ffill/bfill then specific strategies)
# Iterative ffill/bfill grouped by Ticker is crucial for financial time series
group_col = 'Ticker' if 'Ticker' in df_c.columns else None
if group_col:
df_c = df_c.groupby(group_col, group_keys=False).apply(lambda x: x.ffill().bfill())
else:
df_c = df_c.ffill().bfill()
print("Performed initial group-wise ffill/bfill.")
numeric_cols = df_c.select_dtypes(include=np.number).columns.tolist()
categorical_cols = df_c.select_dtypes(include=['object', 'category']).columns.tolist()
for col in numeric_cols:
if df_c[col].isnull().any():
if config["MISSING_NUMERIC_STRATEGY"] == 'median': df_c[col].fillna(df_c[col].median(),
inplace=True)
elif config["MISSING_NUMERIC_STRATEGY"] == 'mean': df_c[col].fillna(df_c[col].mean(),
inplace=True)
elif config["MISSING_NUMERIC_STRATEGY"] == 'knn': # KNNImputer needs all numeric input
pass # Handled later if chosen, as it needs all numerics at once
else: df_c[col].fillna(0, inplace=True) # Fallback to 0 if strategy unknown
if config["MISSING_NUMERIC_STRATEGY"] == 'knn' and numeric_cols:
print("Applying KNNImputer for remaining numeric NaNs...")
knn_imputer = KNNImputer(n_neighbors=5)
# KNN Imputer expects no NaNs in columns used for imputation distance calculation itself -
apply to a subset if needed
# For simplicity, apply to all numeric, assuming prior ffill/bfill reduced widespread NaNs
try:
df_c[numeric_cols] = knn_imputer.fit_transform(df_c[numeric_cols])
except Exception as e:
print(f"KNN Imputation failed: {e}. Check for columns that are entirely NaN or other issues.
Falling back to median for remaining NaNs.")
for col in numeric_cols: df_c[col].fillna(df_c[col].median(), inplace=True)
for col in categorical_cols:
if df_c[col].isnull().any():
if config["MISSING_CATEGORICAL_STRATEGY"] == 'most_frequent':
df_c[col].fillna(df_c[col].mode()[0], inplace=True)
else: df_c[col].fillna('Unknown', inplace=True)
# 4. Outlier Handling
if config["OUTLIER_HANDLING_METHOD"] == 'iqr' and numeric_cols:
print("Handling outliers using IQR capping...")
for col in numeric_cols:
# Avoid clipping target if it's highly volatile, or make it configurable
# if col == target_variable_name and "stock" in config["RUN_MODE"]: continue
Q1, Q3 = df_c[col].quantile(0.25), df_c[col].quantile(0.75)
IQR = Q3 - Q1
if IQR > 0: # Avoid issues with constant columns or very low variance
lower_b, upper_b = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
df_c[col] = np.clip(df_c[col], lower_b, upper_b)
# (Add Z-score outlier handling if needed)
# 5. Ensure target variable exists and drop rows if still NaN after all imputations (critical)
if target_variable_name and target_variable_name in df_c.columns:
if config["DROP_ROWS_THRESHOLD_MISSING_TARGET"] and
df_c[target_variable_name].isnull().any():
print(f"Dropping rows where target '{target_variable_name}' is NaN.")
df_c.dropna(subset=[target_variable_name], inplace=True)
elif target_variable_name: # Target was specified but not found
print(f"Warning: Specified target variable '{target_variable_name}' not found in columns after
initial cleaning. Models requiring it may fail.")
# 6. Convert data types (e.g. ensure numeric cols are float for scalers)
for col in numeric_cols:
if col in df_c.columns: # Check if col still exists after sparse col drop
df_c[col] = pd.to_numeric(df_c[col], errors='coerce')
df_c = df_c.select_dtypes(exclude=['datetime64', 'timedelta64']) # Remove any stray date/time
objects in columns
print_df_info(df_c, "Cleaned and Preprocessed Data")
if df_c.isnull().sum().sum() > 0:
print(f"Warning: {df_c.isnull().sum().sum()} NaNs still present after cleaning. Review data and
imputation.")
return df_c
# --- 4. EXPLORATORY DATA ANALYSIS (EDA) MODULE ---
def perform_full_eda(df, target_variable=None, run_mode="general", fundamental_cols=None,
ratio_cols=None):
print_section_header("Exploratory Data Analysis")
if df is None or df.empty: print("EDA: DataFrame is empty."); return
print_subsection_header("Overall Descriptive Statistics")
print(df.describe(include='all').transpose()) # Transposed for better readability
numeric_cols = df.select_dtypes(include=np.number).columns.tolist()
categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
# Distributions of Numeric Features
if numeric_cols:
print_subsection_header("Distributions of Numeric Features (Sample)")
# Plot a sample of numeric columns to avoid too many plots
sample_numeric_cols = numeric_cols[:min(len(numeric_cols), 10)] # Plot up to 10
num_plots = len(sample_numeric_cols)
if num_plots > 0:
cols_per_row = min(3, num_plots)
rows = (num_plots + cols_per_row - 1) // cols_per_row
fig, axes = plt.subplots(rows, cols_per_row, figsize=(cols_per_row * 5, rows * 4))
axes = np.array(axes).flatten()
for i, col in enumerate(sample_numeric_cols):
try: sns.histplot(df[col].dropna(), kde=True, ax=axes[i]); axes[i].set_title(f'Distribution of
{col}')
except Exception as e: print(f"Could not plot hist for {col}: {e}")
for j in range(i + 1, len(axes)): fig.delaxes(axes[j])
plt.tight_layout(); plt.show()
# Counts of Categorical Features
if categorical_cols:
print_subsection_header("Counts of Categorical Features (Sample)")
sample_cat_cols = categorical_cols[:min(len(categorical_cols), 5)]
for col in sample_cat_cols:
if df[col].nunique() < 50 and df[col].nunique() > 0 : # Plot if not too many unique values
plt.figure(figsize=(8, max(4, df[col].nunique()*0.3)))
try: sns.countplot(y=df[col], order=df[col].value_counts().index[:20]); plt.title(f'Counts of
{col} (Top 20)')
except Exception as e: print(f"Could not plot count for {col}: {e}")
plt.tight_layout(); plt.show()
else: print(f"Skipping count plot for {col} (too many unique values or no variance).")
# Correlation Matrix
if len(numeric_cols) > 1:
print_subsection_header("Correlation Matrix of Numeric Features")
# Select a subset of numeric columns if too many, for readability of heatmap
sample_corr_cols = numeric_cols
if len(numeric_cols) > 30: # Heuristic
print("Too many numeric features for full heatmap, showing sample.")
# Prioritize target, TAs, key ratios if available for sample
priority_cols = [target_variable] if target_variable and target_variable in numeric_cols else []
if "stock" in run_mode:
priority_cols.extend([c for c in df.columns if any(ta_key in c for ta_key in
['SMA','EMA','RSI','MACD']) and c in numeric_cols][:5])
priority_cols.extend([c for c in df.columns if c.endswith('_calculated') and c in
numeric_cols][:5])
remaining_cols = [c for c in numeric_cols if c not in priority_cols]
sample_corr_cols = list(dict.fromkeys(priority_cols + remaining_cols[:max(0, 20-
len(priority_cols))]))
plt.figure(figsize=(max(10, len(sample_corr_cols)*0.4), max(8, len(sample_corr_cols)*0.3)))
try:
corr_matrix = df[sample_corr_cols].corr()
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt=".2f", linewidths=.5,
annot_kws={"size": 8})
plt.title('Correlation Matrix (Sample of Numeric Features)'); plt.show()
except Exception as e: print(f"Could not plot correlation matrix: {e}")
if target_variable and target_variable in numeric_cols:
print(f"\nTop Correlations with Target Variable ({target_variable}):")
try:
print(df[numeric_cols].corr()[target_variable].abs().sort_values(ascending=False).head(15))
except Exception as e: print(f"Could not compute correlations with target: {e}")
# Time Series Plot of Target (if applicable)
if target_variable and target_variable in df.columns and isinstance(df.index, pd.DatetimeIndex):
print_subsection_header(f"Time Series Plot of Target ({target_variable})")
plt.figure(figsize=(14, 7))
# Plot per ticker if 'Ticker' column exists
group_col = 'Ticker' if 'Ticker' in df.columns else None
if group_col and df[group_col].nunique() > 1 and df[group_col].nunique() <= 5: # Plot if few
tickers
for ticker, data in df.groupby(group_col):
plt.plot(data.index, data[target_variable], label=f'{target_variable} ({ticker})')
elif group_col and df[group_col].nunique() > 5:
print("Too many tickers to plot target time series individually. Plotting for first ticker.")
first_ticker_data = df[df[group_col] == df[group_col].unique()[0]]
plt.plot(first_ticker_data.index, first_ticker_data[target_variable], label=f'{target_variable}
({df[group_col].unique()[0]})')
else: # Single ticker or no ticker column
plt.plot(df.index, df[target_variable], label=target_variable)
plt.title(f'{target_variable} Over Time'); plt.xlabel('Date'); plt.ylabel(target_variable);
plt.legend(); plt.grid(True); plt.show()
# Plot Key Financial Ratios Over Time (if available)
if ratio_cols:
print_subsection_header("Key Financial Ratios Over Time (Sample)")
sample_ratio_cols = ratio_cols[:min(len(ratio_cols), 4)] # Plot up to 4 ratios
if sample_ratio_cols and isinstance(df.index, pd.DatetimeIndex):
plt.figure(figsize=(14, len(sample_ratio_cols) * 3))
for i, ratio_col in enumerate(sample_ratio_cols):
if ratio_col in df.columns:
ax = plt.subplot(len(sample_ratio_cols), 1, i + 1)
# Plot per ticker if exists
if group_col and df[group_col].nunique() > 1 and df[group_col].nunique() <=3:
for ticker, data in df.groupby(group_col): ax.plot(data.index, data[ratio_col],
label=f'{ratio_col} ({ticker})')
elif group_col and df[group_col].nunique() >3:
first_ticker_data = df[df[group_col] == df[group_col].unique()[0]]
ax.plot(first_ticker_data.index, first_ticker_data[ratio_col], label=f'{ratio_col}
({df[group_col].unique()[0]})')
else:
ax.plot(df.index, df[ratio_col], label=ratio_col)
ax.set_title(f'{ratio_col} Over Time'); ax.legend(); ax.grid(True)
plt.tight_layout(); plt.show()
print("--- EDA Finished ---")
# --- 5. MODELING MODULE (Regression, ARIMA/ETS, LSTM) ---
def create_regression_pipeline(numeric_features, categorical_features, config_dict):
"""Creates a preprocessing pipeline for regression."""
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy=config_dict["MISSING_NUMERIC_STRATEGY"] if
config_dict["MISSING_NUMERIC_STRATEGY"] != 'knn' else 'median')), # KNN done globally
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy=config_dict["MISSING_CATEGORICAL_STRATEGY"])),
('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# remainder='drop' will drop any columns not specified as numeric or categorical
# This is important if e.g. date columns or Ticker ID slipped into features_to_use
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)],
remainder='drop')
return preprocessor
def run_regression_models(df_model_data, target_col, features_list, config_dict):
print_section_header(f"Running Regression Models for Target: {target_col}")
if target_col not in df_model_data.columns: print(f"Target '{target_col}' missing."); return None, {}
df_run = df_model_data.copy().dropna(subset=[target_col]) # Drop rows if target is NaN
if df_run.empty: print("No data for regression after target NaN drop."); return None, {}
# Filter features_list to only include columns present in df_run
features_list = [f for f in features_list if f in df_run.columns]
# Remove any features that are all NaN
features_list = [f for f in features_list if not df_run[f].isnull().all()]
if not features_list: print("No valid features for regression."); return None, {}
X = df_run[features_list]; y = df_run[target_col]
numeric_feats = X.select_dtypes(include=np.number).columns.tolist()
categorical_feats = X.select_dtypes(include=['object', 'category']).columns.tolist()
# Ensure numeric_feats and categorical_feats only contain features from the final features_list
numeric_feats = [f for f in numeric_feats if f in features_list]
categorical_feats = [f for f in categorical_feats if f in features_list]
X_train, X_test, y_train, y_test = train_test_split(X, y,
test_size=config_dict["TEST_SPLIT_RATIO"],
random_state=config_dict["RANDOM_STATE"],
shuffle=False) # No shuffle for time-series like data
if X_train.empty or X_test.empty: print("Train or test split resulted in empty set."); return None, {}
preproc_pipeline = create_regression_pipeline(numeric_feats, categorical_feats, config_dict)
try:
X_train_processed = preproc_pipeline.fit_transform(X_train)
X_test_processed = preproc_pipeline.transform(X_test)
except ValueError as e:
print(f"Error during preprocessing: {e}. This might be due to all-NaN columns after split, or
unexpected data types.")
print(f"Numeric features given to preprocessor: {numeric_feats}")
print(f"Categorical features given to preprocessor: {categorical_feats}")
# You might want to inspect X_train here if this error occurs often
return preproc_pipeline, {} # Return fitted preprocessor for inspection
# Get feature names after preprocessing (OneHotEncoding changes names)
try: processed_feature_names = preproc_pipeline.get_feature_names_out()
except: processed_feature_names = [f"feat_{i}" for i in range(X_train_processed.shape[1])] #
Fallback
# Feature Selection (Optional)
# ... (Full RFE/SelectKBest logic as in previous comprehensive script, applied to X_train_processed,
X_test_processed) ...
X_train_final, X_test_final, final_feature_names = X_train_processed, X_test_processed,
processed_feature_names # Placeholder if no selection
model_zoo_reg = {
'LinearRegression': (LinearRegression(), {}),
'Ridge': (Ridge(random_state=config_dict["RANDOM_STATE"]), {'alpha': [0.1, 1.0, 10.0]}),
'Lasso': (Lasso(random_state=config_dict["RANDOM_STATE"], max_iter=2000), {'alpha': [0.01,
0.1, 1.0]}),
'RandomForestRegressor':
(RandomForestRegressor(random_state=config_dict["RANDOM_STATE"], n_jobs=-1),
{'n_estimators': [50, 100], 'max_depth': [None, 10, 20]}),
'GradientBoostingRegressor':
(GradientBoostingRegressor(random_state=config_dict["RANDOM_STATE"]),
{'n_estimators': [50, 100], 'learning_rate': [0.05, 0.1]}),
'SVR': (SVR(), {'C': [0.1, 1, 10], 'kernel': ['rbf']}), # SVR can be slow
'MLPRegressor': (MLPRegressor(random_state=config_dict["RANDOM_STATE"], max_iter=500,
early_stopping=True, learning_rate_init=0.01),
{'hidden_layer_sizes': [(50,), (100,50)], 'alpha': [0.0001, 0.001]})
results_summary = {}
for model_name_str in config_dict["REGRESSION_MODELS_TO_RUN"]:
if model_name_str not in model_zoo_reg: print(f"Model {model_name_str} not in zoo.
Skipping."); continue
print_subsection_header(f"Training {model_name_str}")
model_instance, param_grid_reg = model_zoo_reg[model_name_str]
if config_dict["PERFORM_HYPERPARAMETER_TUNING_REG"] and param_grid_reg:
# ... (Full GridSearchCV logic as before, ensure cv folds are appropriate) ...
search = GridSearchCV(model_instance, param_grid_reg, cv=min(3, len(X_train_final)//10 or
2), scoring='r2', n_jobs=-1)
try: search.fit(X_train_final, y_train); best_model_reg = search.best_estimator_; print(f"Best
params: {search.best_params_}")
except: best_model_reg = model_instance.fit(X_train_final, y_train) # Fallback
else:
best_model_reg = model_instance.fit(X_train_final, y_train)
y_pred_train_reg = best_model_reg.predict(X_train_final)
y_pred_test_reg = best_model_reg.predict(X_test_final)
# ... (Store metrics, feature importances as before) ...
results_summary[model_name_str] = {
'r2_test': r2_score(y_test, y_pred_test_reg), 'mse_test': mean_squared_error(y_test,
y_pred_test_reg),
'model_object': best_model_reg, 'feature_names_processed': final_feature_names,
'y_test_actual': y_test, 'y_test_pred': y_pred_test_reg
if hasattr(best_model_reg, 'feature_importances_'):
imp = pd.Series(best_model_reg.feature_importances_,
index=final_feature_names).sort_values(ascending=False)
results_summary[model_name_str]['feature_importances'] = imp
print(f"Top 5 Feature Importances:\n{imp.head()}")
elif hasattr(best_model_reg, 'coef_'):
coef = pd.Series(best_model_reg.coef_, index=final_feature_names).sort_values(key=abs,
ascending=False)
results_summary[model_name_str]['coefficients'] = coef
print(f"Top 5 Coefficients (abs value):\n{coef.head()}")
# Plot Actual vs Predicted for each model
plt.figure(figsize=(8,6))
plt.scatter(y_test, y_pred_test_reg, alpha=0.6, label='Actual vs. Predicted')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='Ideal')
plt.xlabel('Actual'); plt.ylabel('Predicted'); plt.title(f'{model_name_str}: Actual vs. Predicted')
plt.legend(); plt.grid(True); plt.show()
return preproc_pipeline, results_summary
def run_arima_ets_forecast(df_ts_data, target_col_ts, date_col_ts, model_type_ts,
seasonal_period_ts, config_dict_ts):
print_section_header(f"Running {model_type_ts} Forecast for Target: {target_col_ts}")
# ... (Full ARIMA/ETS logic as in previous comprehensive script) ...
# Ensure date_col_ts is used to set index if not already.
# This is a placeholder for the full function.
print(f"Placeholder: {model_type_ts} forecast. Full logic from previous script should be here.")
mock_idx_ts = pd.date_range(start='1/1/2022', periods=100, freq='D')
mock_data_ts = pd.Series(np.random.rand(100), index=mock_idx_ts, name=target_col_ts)
mock_model_obj = "Mock ARIMA/ETS model object"
return {"model_name": model_type_ts, "mae":0.5, "rmse":0.7, "mape":0.05,
"model_details":"mock details",
"forecast_values":mock_data_ts[-10:], "actual_test_values":mock_data_ts[-10:],
"full_historical_data": mock_data_ts, "confidence_interval":None}, mock_model_obj
def prepare_lstm_sequences(df_features_lstm, series_target_lstm, n_steps_lstm,
scaler_type="MinMaxScaler"):
"""Scales data and creates sequences for multi-feature LSTM."""
if df_features_lstm.empty or series_target_lstm.empty: return np.array([]), np.array([]), None,
None
feature_scaler_lstm = MinMaxScaler() if scaler_type == "MinMaxScaler" else StandardScaler()
target_scaler_lstm = MinMaxScaler() if scaler_type == "MinMaxScaler" else StandardScaler()
# Ensure all feature columns are numeric
df_features_lstm_numeric = df_features_lstm.apply(pd.to_numeric, errors='coerce').fillna(0) #
Coerce errors and fill resulting NaNs with 0
scaled_features_lstm = feature_scaler_lstm.fit_transform(df_features_lstm_numeric.values)
scaled_target_lstm = target_scaler_lstm.fit_transform(series_target_lstm.values.reshape(-1,1))
X_lstm, y_lstm = [], []
for i in range(n_steps_lstm, len(scaled_features_lstm)):
X_lstm.append(scaled_features_lstm[i-n_steps_lstm:i, :])
y_lstm.append(scaled_target_lstm[i, 0])
if not X_lstm: return np.array([]), np.array([]), feature_scaler_lstm, target_scaler_lstm # Not
enough data
return np.array(X_lstm), np.array(y_lstm), feature_scaler_lstm, target_scaler_lstm
def run_lstm_model(df_lstm_input_data, target_col_lstm_name, feature_cols_lstm_list,
config_dict_lstm):
print_section_header(f"Running Multi-Feature LSTM for Target: {target_col_lstm_name}")
if target_col_lstm_name not in df_lstm_input_data.columns: print(f"LSTM Target
'{target_col_lstm_name}' missing."); return None
# Filter to ensure all specified LSTM feature columns exist and are not all NaN
valid_feature_cols_lstm = [f for f in feature_cols_lstm_list if f in df_lstm_input_data.columns and
not df_lstm_input_data[f].isnull().all()]
if not valid_feature_cols_lstm: print("No valid features for LSTM after checking NaNs. Using only
target history if possible (univariate LSTM).");
# Fallback to univariate if no other features are valid (could be implemented as a separate path)
# For now, if no external features, it might fail or perform poorly.
# Let's try to proceed with an empty feature set if user insists, prepare_lstm_sequences will
handle it
df_model_lstm = df_lstm_input_data[[target_col_lstm_name] + valid_feature_cols_lstm].copy()
df_model_lstm.dropna(inplace=True) # Drop rows with any NaNs in the selected subset for LSTM
if len(df_model_lstm) < config_dict_lstm["LSTM_N_STEPS_LOOKBACK"] * 2: # Need enough for
train/test sequences
print(f"Insufficient data ({len(df_model_lstm)} rows) for LSTM. Skipping."); return None
features_for_lstm_df = df_model_lstm[valid_feature_cols_lstm]
target_for_lstm_series = df_model_lstm[target_col_lstm_name]
# Chronological train-test split
train_size_lstm = int(len(df_model_lstm) * (1 - config_dict_lstm["TEST_SPLIT_RATIO"]))
train_features_df = features_for_lstm_df.iloc[:train_size_lstm]
test_features_df = features_for_lstm_df.iloc[train_size_lstm:]
train_target_series = target_for_lstm_series.iloc[:train_size_lstm]
test_target_series = target_for_lstm_series.iloc[train_size_lstm:]
# Create sequences for training (fit scalers here)
X_train_lstm, y_train_lstm, fitted_feature_scaler, fitted_target_scaler = prepare_lstm_sequences(
train_features_df, train_target_series, config_dict_lstm["LSTM_N_STEPS_LOOKBACK"],
config_dict_lstm["LSTM_SCALER"]
if X_train_lstm.shape[0] == 0: print("LSTM training data sequence creation failed."); return None
# Create sequences for testing (use fitted scalers)
# Combine last n_steps of train with test data to form sequences for test predictions
full_features_for_test_seq = pd.concat([train_features_df.iloc[-
config_dict_lstm["LSTM_N_STEPS_LOOKBACK"]:], test_features_df])
full_target_for_test_seq = pd.concat([train_target_series.iloc[-
config_dict_lstm["LSTM_N_STEPS_LOOKBACK"]:], test_target_series])
X_test_lstm, y_test_scaled_lstm, _, _ = prepare_lstm_sequences(
full_features_for_test_seq, full_target_for_test_seq,
config_dict_lstm["LSTM_N_STEPS_LOOKBACK"], config_dict_lstm["LSTM_SCALER"]
# y_test_scaled_lstm are the scaled target values that correspond to the X_test_lstm sequences.
# These are the "actuals" we will compare against (after inverse transform).
if X_test_lstm.shape[0] == 0: print("LSTM test data sequence creation failed."); return None
# Build LSTM model
n_features_lstm = X_train_lstm.shape[2] # Number of features used
model_lstm = Sequential()
model_lstm.add(Input(shape=(config_dict_lstm["LSTM_N_STEPS_LOOKBACK"], n_features_lstm)))
model_lstm.add(LSTM(100, activation='relu', return_sequences=True,
kernel_regularizer=l1_l2(l1=config_dict_lstm.get("LSTM_REGULARIZATION",{}).get('l1',0.0),
l2=config_dict_lstm.get("LSTM_REGULARIZATION",{}).get('l2',0.0)) if
config_dict_lstm.get("LSTM_REGULARIZATION") else None))
if config_dict_lstm["LSTM_ADD_DROPOUT"]:
model_lstm.add(Dropout(config_dict_lstm["LSTM_DROPOUT_RATE"]))
model_lstm.add(LSTM(50, activation='relu',
kernel_regularizer=l1_l2(l1=config_dict_lstm.get("LSTM_REGULARIZATION",{}).get('l1',0.0),
l2=config_dict_lstm.get("LSTM_REGULARIZATION",{}).get('l2',0.0)) if
config_dict_lstm.get("LSTM_REGULARIZATION") else None))
if config_dict_lstm["LSTM_ADD_DROPOUT"]:
model_lstm.add(Dropout(config_dict_lstm["LSTM_DROPOUT_RATE"]))
model_lstm.add(Dense(1))
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
model_lstm.compile(optimizer=optimizer, loss='mean_squared_error', metrics=['mae'])
print(model_lstm.summary())
callbacks_list = [
EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1),
ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001, verbose=1)
print("Training LSTM model...")
history_lstm = model_lstm.fit(X_train_lstm, y_train_lstm,
epochs=config_dict_lstm["LSTM_EPOCHS"],
batch_size=config_dict_lstm["LSTM_BATCH_SIZE"],
validation_split=config_dict_lstm["LSTM_VALIDATION_SPLIT"],
callbacks=callbacks_list, verbose=1)
# Plot training history
plt.figure(figsize=(10,6))
plt.plot(history_lstm.history['loss'], label='Training Loss')
plt.plot(history_lstm.history['val_loss'], label='Validation Loss')
plt.title('LSTM Model Training & Validation Loss')
plt.xlabel('Epoch'); plt.ylabel('Loss (MSE)'); plt.legend(); plt.grid(True); plt.show()
# Make predictions
predicted_scaled_lstm = model_lstm.predict(X_test_lstm)
predicted_unscaled_lstm =
fitted_target_scaler.inverse_transform(predicted_scaled_lstm).flatten()
# Actual values for comparison (already prepared as y_test_scaled_lstm, now inverse transform
them)
actual_unscaled_lstm = fitted_target_scaler.inverse_transform(y_test_scaled_lstm.reshape(-
1,1)).flatten()
# Get dates for plotting predictions
# The predictions correspond to the target values from y_test_scaled_lstm.
# The original dates for these target values start from train_size + n_steps -1.
# We need to find the original index of the `test_target_series` that corresponds to
`y_test_scaled_lstm`.
# Since `prepare_lstm_sequences` for test data effectively starts predicting for targets
# that are `n_steps` after the start of the `test_target_series` (due to lookback),
# the actual dates for `y_test_scaled_lstm` are the index of `test_target_series`
# starting from the first prediction point.
# `X_test_lstm` has `len(test_target_series) - n_steps + 1` sequences if `n_steps > 0`.
# So, `y_test_scaled_lstm` corresponds to `test_target_series.iloc[n_steps-1:]` if `n_steps > 0`
# More precisely, test_target_series starts at index `train_size`.
# The first y_test_scaled_lstm value corresponds to the original data at index `train_size + n_steps
-1`.
num_predictions = len(predicted_unscaled_lstm)
if num_predictions > 0:
prediction_dates = df_model_lstm.index[train_size_lstm +
config_dict_lstm["LSTM_N_STEPS_LOOKBACK"] -1:][:num_predictions]
else:
prediction_dates = pd.Index([])
if len(predicted_unscaled_lstm) != len(actual_unscaled_lstm):
print("Warning: LSTM pred/actual length mismatch. Trimming for eval.")
min_len_lstm = min(len(predicted_unscaled_lstm), len(actual_unscaled_lstm))
predicted_unscaled_lstm = predicted_unscaled_lstm[:min_len_lstm]
actual_unscaled_lstm = actual_unscaled_lstm[:min_len_lstm]
prediction_dates = prediction_dates[:min_len_lstm]
lstm_mse_test = mean_squared_error(actual_unscaled_lstm, predicted_unscaled_lstm)
lstm_mae_test = mean_absolute_error(actual_unscaled_lstm, predicted_unscaled_lstm)
print(f"\nLSTM Test Eval: MSE={lstm_mse_test:.4f}, MAE={lstm_mae_test:.4f},
RMSE={np.sqrt(lstm_mse_test):.4f}")
plt.figure(figsize=(14,7))
plt.plot(prediction_dates, actual_unscaled_lstm, label='Actual Prices', color='blue', marker='.',
markersize=4)
plt.plot(prediction_dates, predicted_unscaled_lstm, label='LSTM Predicted Prices', color='red',
linestyle='--')
plt.title(f'LSTM Prediction: {target_col_lstm_name}'); plt.xlabel('Date');
plt.ylabel(target_col_lstm_name); plt.legend(); plt.grid(True); plt.show()
return {
'model_name': 'LSTM_MultiFeature', 'target_column': target_col_lstm_name,
'mse_test': lstm_mse_test, 'mae_test': lstm_mae_test, 'rmse_test': np.sqrt(lstm_mse_test),
'actual_test_values': pd.Series(actual_unscaled_lstm, index=prediction_dates),
'predicted_test_values': pd.Series(predicted_unscaled_lstm, index=prediction_dates),
'feature_scaler': fitted_feature_scaler, 'target_scaler': fitted_target_scaler,
'n_steps': config_dict_lstm["LSTM_N_STEPS_LOOKBACK"], 'model_object': model_lstm,
'features_used_for_lstm': valid_feature_cols_lstm, 'training_history': history_lstm.history
# --- 6. CONCLUSIONS & AI QUERY MODULE ---
def generate_comprehensive_conclusions(original_shape, cleaned_shape, primary_target,
run_mode_str,
reg_res_dict=None, ts_arima_ets_res_dict=None, lstm_res_dict=None,
num_ta_added=0, num_ratios_calc=0):
print_section_header("Comprehensive Analysis Conclusions", level=1)
# ... (Fuller conclusion logic as in previous detailed response, including plots for best models) ...
print(f"Run Mode: {run_mode_str}")
print(f"Data: Original {original_shape} -> Cleaned {cleaned_shape}. Primary Target:
{primary_target}")
print(f"Features Added: {num_ta_added} TAs, {num_ratios_calc} Financial Ratios.")
if reg_res_dict:
best_reg_model = min(reg_res_dict.items(), key=lambda x: x[1]['mse_test'] if 'mse_test' in x[1]
else float('inf'))[0] if reg_res_dict else "N/A"
best_reg_r2 = reg_res_dict[best_reg_model]['r2_test'] if best_reg_model != "N/A" and 'r2_test'
in reg_res_dict[best_reg_model] else "N/A"
print(f"Regression: Best model (by Test MSE) was '{best_reg_model}' with R2:
{best_reg_r2:.3f}")
if ts_arima_ets_res_dict: print(f"ARIMA/ETS ({ts_arima_ets_res_dict['model_name']}):
MAE={ts_arima_ets_res_dict['mae']:.3f}")
if lstm_res_dict: print(f"LSTM ({lstm_res_dict['model_name']}):
MAE={lstm_res_dict['mae_test']:.3f}")
print("\n**Key Considerations for Stock Market Prediction:**")
print(" - Markets are complex & influenced by myriad unpredictable factors (news, global events,
sentiment).")
print(" - Models are based on historical patterns, which may not hold in the future (non-
stationarity).")
print(" - **This script is for educational & experimental purposes. DO NOT use for live trading
without extensive, rigorous backtesting, validation, risk management, and understanding of its
limitations.**")
print(" - Overfitting is a major risk. Performance on unseen data is the true test.")
print(" - Transaction costs, slippage, and bid-ask spreads are not modeled here but impact real
trading returns.")
print(" - For robust fundamental analysis, ensure data quality and accurate alignment of reporting
dates.")
print_section_header("End of Conclusions", level=2)
def ai_query_module(df_final_cleaned, reg_results_ai, ts_results_ai, lstm_results_ai,
all_cols_list, fundamental_cols_list_ai, ratio_cols_list_ai, config_ai):
print_section_header("Interactive AI Query Assistant", level=1)
# ... (Full AI query module from previous most advanced response, updated to query new feature
types) ...
# This is a placeholder for the full function.
def print_ai_help_mock():
print("Mock AI Help: 'list columns', 'describe [col]', 'filter [col] > X', 'results [model_type]', 'exit'")
print("AI Assistant: Type 'help' for commands, 'exit' to quit.")
while True:
q = input("AI Assistant> ").lower().strip()
if q == 'exit': break
elif q == 'help': print_ai_help_mock()
elif q == 'list columns': print("Columns:", all_cols_list[:10], "...") # Show sample
elif q.startswith("describe "): print(f"Mock describe for {q.split(' ')[1]}")
else: print("AI: Query not understood by mock assistant.")
print("--- AI Query Module Exited ---")
# --- 7. MAIN EXECUTION WORKFLOW ---
if __name__ == '__main__':
print_section_header("Comprehensive Financial Analysis & Prediction Engine", level=0)
# --- Initialize Main Variables ---
df_main = None; original_shape_main = (0,0); df_cleaned_final = None
actual_fundamental_cols = []; calculated_ratio_cols = []; technical_indicator_cols = []
regression_results = None; arima_ets_results = None; lstm_results = None; main_preprocessor =
None
primary_analysis_target = None # This will be set based on run mode
# --- Stage 1: Data Acquisition ---
print_section_header("Stage 1: Data Acquisition", level=1)
run_mode = CONFIG["RUN_MODE"]
if "fetch" in run_mode:
df_main = fetch_stock_price_data_yf(CONFIG["STOCK_TICKERS"],
CONFIG["STOCK_START_DATE"],
CONFIG["STOCK_END_DATE"], CONFIG["STOCK_INTERVAL"])
if df_main is not None and CONFIG["FETCH_YFINANCE_FUNDAMENTALS"] and ("ultimate" in
run_mode or "fundamentals" in run_mode):
all_fund_dfs = []
tickers_to_fetch = df_main['Ticker'].unique() if 'Ticker' in df_main.columns else
(CONFIG["STOCK_TICKERS"] if isinstance(CONFIG["STOCK_TICKERS"], list) else
[CONFIG["STOCK_TICKERS"]])
for ticker in tickers_to_fetch:
fund_df_single = fetch_yfinance_fundamentals_single_ticker(ticker,
CONFIG["YFINANCE_FUNDAMENTALS_FREQUENCY"])
if fund_df_single is not None: all_fund_dfs.append(fund_df_single)
if all_fund_dfs:
df_main = merge_price_and_fundamentals(df_main, all_fund_dfs)
# Store names of fundamental columns that came from yfinance
for fund_df in all_fund_dfs: actual_fundamental_cols.extend([c for c in fund_df.columns if c
not in ['Ticker','Date']])
actual_fundamental_cols = list(set(actual_fundamental_cols))
elif "load" in run_mode:
df_main = load_data_from_file(CONFIG["FILE_PATH"])
# For loaded data, user needs to map fundamental columns in CONFIG if they exist
# The get_fundamental_col_name will use this map later.
for key, mapped_name in CONFIG["FUNDAMENTALS_MAP"].items():
if mapped_name in df_main.columns: actual_fundamental_cols.append(mapped_name)
if CONFIG["EXPLICIT_SHARES_OUTSTANDING_COL_NAME"] and
CONFIG["EXPLICIT_SHARES_OUTSTANDING_COL_NAME"] in df_main.columns:
actual_fundamental_cols.append(CONFIG["EXPLICIT_SHARES_OUTSTANDING_COL_NAME"])
actual_fundamental_cols = list(set(actual_fundamental_cols))
elif "simulate" in run_mode:
df_main = simulate_data(CONFIG["SIM_SAMPLES"], CONFIG["SIM_NUM_FEATURES"],
CONFIG["SIM_CAT_FEATURES"])
primary_analysis_target = 'target' # From simulation
else:
print(f"Error: RUN_MODE '{run_mode}' not recognized.")
exit()
if df_main is None or df_main.empty: print("Critical Error: No data loaded or fetched. Exiting.");
exit()
original_shape_main = df_main.shape
# --- Stage 2: Initial Date & Frequency Processing ---
print_section_header("Stage 2: Date & Frequency Processing", level=1)
# If index is not DatetimeIndex, find_and_set_date_column (already called in load_data_from_file)
# For yfinance fetched data, index is usually already Datetime.
if not isinstance(df_main.index, pd.DatetimeIndex):
df_main = find_and_set_date_column(df_main, CONFIG["DATE_COLUMN_NAME_HINTS"])
if not isinstance(df_main.index, pd.DatetimeIndex): # If still not DatetimeIndex, major issue.
print("CRITICAL: Could not establish a DatetimeIndex. Further processing may fail.")
else: # Only if DatetimeIndex, proceed with frequency handling
df_main = detect_and_set_frequency(df_main, CONFIG["DEFAULT_DATA_FREQUENCY"],
CONFIG["RESAMPLE_TO_FREQUENCY"])
# --- Stage 3: Feature Engineering ---
print_section_header("Stage 3: Feature Engineering", level=1)
df_featured = df_main.copy()
if CONFIG["CALCULATE_TECHNICAL_INDICATORS"] and ("stock" in run_mode or "ultimate" in
run_mode): # Only for stock modes
df_featured = add_technical_indicators_robust(df_featured, CONFIG["HLOCV_MAP"],
CONFIG["CUSTOM_TECHNICAL_INDICATORS"])
technical_indicator_cols = [c for c in df_featured.columns if c not in df_main.columns] # Get
newly added TA cols
print(f"Added {len(technical_indicator_cols)} TA columns.")
if CONFIG["CALCULATE_FINANCIAL_RATIOS"] and ("ultimate" in run_mode or "fundamentals" in
run_mode):
price_col_for_ratios = CONFIG["HLOCV_MAP"].get(CONFIG["PRICE_COLUMN_FOR_TARGET"],
CONFIG["PRICE_COLUMN_FOR_TARGET"])
df_featured = calculate_financial_ratios_robust(df_featured, price_col_for_ratios,
CONFIG["FUNDAMENTALS_MAP"],
CONFIG["EXPLICIT_SHARES_OUTSTANDING_COL_NAME"])
calculated_ratio_cols = [c for c in df_featured.columns if c.endswith('_calculated') and c not in
df_main.columns and c not in technical_indicator_cols]
print(f"Calculated {len(calculated_ratio_cols)} financial ratio columns.")
# Update actual_fundamental_cols to include raw mapped ones and calculated ones for AI
module
actual_fundamental_cols.extend(calculated_ratio_cols)
actual_fundamental_cols = list(set(actual_fundamental_cols))
if CONFIG["NEWS_SENTIMENT_COLUMN_NAME"]: # Conceptual integration
df_featured = add_news_sentiment_placeholder(df_featured,
CONFIG["NEWS_SENTIMENT_COLUMN_NAME"])
# --- Stage 4: Data Cleaning & Final Preprocessing ---
# Determine primary target for cleaning (dropping rows if target is NaN)
if "stock" in run_mode or "ultimate" in run_mode:
primary_analysis_target = CONFIG["PRICE_COLUMN_FOR_TARGET"]
elif "forecast_arima_ets" in run_mode:
primary_analysis_target = CONFIG["ARIMA_ETS_TARGET_COLUMN"]
elif "simulate" in run_mode or "regress" in run_mode:
primary_analysis_target = 'target' # Default from simulation or general regression
df_cleaned_final = clean_and_preprocess_data(df_featured, primary_analysis_target, CONFIG)
if df_cleaned_final.empty: print("Critical Error: DataFrame empty after cleaning. Exiting."); exit()
# --- Stage 5: Exploratory Data Analysis ---
print_section_header("Stage 5: Exploratory Data Analysis", level=1)
df_eda_sample = df_cleaned_final.copy() # Use full cleaned data for EDA
if 'Ticker' in df_eda_sample.columns and df_eda_sample['Ticker'].nunique() > 1: # EDA on first
ticker if multi
first_ticker = df_eda_sample['Ticker'].unique()[0]
print(f"Multiple tickers present. Performing EDA on data for: {first_ticker}")
df_eda_sample = df_eda_sample[df_eda_sample['Ticker'] == first_ticker]
perform_full_eda(df_eda_sample, primary_analysis_target, run_mode,
[c for c in actual_fundamental_cols if c in df_cleaned_final.columns], # Pass existing
fundamental cols
[c for c in calculated_ratio_cols if c in df_cleaned_final.columns]) # Pass existing ratio
cols
# --- Stage 6: Model Training & Evaluation ---
print_section_header("Stage 6: Model Training & Evaluation", level=1)
# A. REGRESSION MODELS
if any(s in run_mode for s in ["regress", "stock"]): # Run regression for stock modes or general
regression modes
df_for_regression = df_cleaned_final.copy()
target_for_regression = primary_analysis_target # Default
if "stock" in run_mode: # For stock prediction, create a shifted target (predict next period)
price_col_actual = CONFIG["HLOCV_MAP"].get(CONFIG["PRICE_COLUMN_FOR_TARGET"],
CONFIG["PRICE_COLUMN_FOR_TARGET"])
if price_col_actual in df_for_regression.columns:
group_col_reg = 'Ticker' if 'Ticker' in df_for_regression.columns else None
shift_val = CONFIG["REGRESSION_TARGET_SHIFT_PERIODS"]
if group_col_reg:
df_for_regression[f'Target_Shifted_{abs(shift_val)}'] =
df_for_regression.groupby(group_col_reg)[price_col_actual].shift(shift_val)
else:
df_for_regression[f'Target_Shifted_{abs(shift_val)}'] =
df_for_regression[price_col_actual].shift(shift_val)
target_for_regression = f'Target_Shifted_{abs(shift_val)}'
df_for_regression.dropna(subset=[target_for_regression], inplace=True) # Critical
else: print(f"Warning: Price column '{price_col_actual}' not found for creating shifted
regression target.")
# Define features for regression: all numeric/categorical except original target, shifted target,
date, ticker ID
cols_to_exclude_reg = list(set([CONFIG["PRICE_COLUMN_FOR_TARGET"],
target_for_regression, 'Date', 'Ticker', 'index', # Common exclusions
CONFIG.get("ARIMA_ETS_TARGET_COLUMN", ""),
CONFIG.get("LSTM_TARGET_COLUMN", "")])) # Exclude other potential targets
cols_to_exclude_reg = [c for c in cols_to_exclude_reg if c in df_for_regression.columns and c is
not None and c != '']
features_for_regression = [col for col in df_for_regression.columns if col not in
cols_to_exclude_reg]
features_for_regression = [f for f in features_for_regression if not
df_for_regression[f].isnull().all()] # No all-NaN cols
# Further refine features to be only numeric or low-cardinality categoricals (handled by pipeline)
refined_features_for_regression = []
for f in features_for_regression:
if pd.api.types.is_numeric_dtype(df_for_regression[f]):
refined_features_for_regression.append(f)
elif pd.api.types.is_object_dtype(df_for_regression[f]) or
pd.api.types.is_categorical_dtype(df_for_regression[f]):
if df_for_regression[f].nunique() < 50: # Heuristic for one-hot encoding
refined_features_for_regression.append(f)
if refined_features_for_regression and target_for_regression in df_for_regression.columns:
main_preprocessor, regression_results = run_regression_models(df_for_regression,
target_for_regression,
refined_features_for_regression, CONFIG)
else: print("Skipping regression: not enough features or target missing after setup.")
# B. ARIMA/ETS MODELS (Univariate on a specific target column)
if "forecast_arima_ets" in run_mode:
target_arima_ets = CONFIG["ARIMA_ETS_TARGET_COLUMN"]
# Date column needs to be index for statsmodels
df_for_arima = df_cleaned_final.copy()
if not isinstance(df_for_arima.index, pd.DatetimeIndex):
print("ARIMA/ETS requires DatetimeIndex. Attempting to use first valid date column or
skipping.")
# (Logic to set date index here if not already done, or skip if impossible)
# For multi-ticker, run on first ticker found, or allow user to specify
if 'Ticker' in df_for_arima.columns and df_for_arima['Ticker'].nunique() > 1:
first_ticker_arima = df_for_arima['Ticker'].unique()[0]
print(f"Running ARIMA/ETS on first ticker: {first_ticker_arima}")
df_for_arima = df_for_arima[df_for_arima['Ticker'] == first_ticker_arima]
if target_arima_ets in df_for_arima.columns and isinstance(df_for_arima.index,
pd.DatetimeIndex):
# ARIMA/ETS typically doesn't use an explicit date column name if date is index
arima_ets_results, _ = run_arima_ets_forecast(df_for_arima, target_arima_ets,
date_col_ts=None, # Date is index
model_type_ts="auto_arima", # Or
CONFIG["ARIMA_ETS_MODEL_TYPE"]
seasonal_period_ts=CONFIG["ARIMA_ETS_SEASONAL_PERIOD"],
config_dict_ts=CONFIG) # Pass full config
else: print("Skipping ARIMA/ETS: Target or DatetimeIndex missing.")
# C. LSTM MODEL
if "stock" in run_mode: # LSTM primarily for stock prediction modes
df_for_lstm = df_cleaned_final.copy()
target_for_lstm = CONFIG["LSTM_TARGET_COLUMN"]
# For multi-ticker, run on first ticker or allow specification
if 'Ticker' in df_for_lstm.columns and df_for_lstm['Ticker'].nunique() > 1:
first_ticker_lstm = df_for_lstm['Ticker'].unique()[0]
print(f"Running LSTM on first ticker: {first_ticker_lstm}")
df_for_lstm = df_for_lstm[df_for_lstm['Ticker'] == first_ticker_lstm]
# Ensure LSTM features are valid and numeric
lstm_feature_input_cols = [f for f in CONFIG["LSTM_FEATURE_COLUMNS"] if f in
df_for_lstm.columns and pd.api.types.is_numeric_dtype(df_for_lstm[f]) and not
df_for_lstm[f].isnull().all()]
if not lstm_feature_input_cols:
print("Warning: No valid numeric features from LSTM_FEATURE_COLUMNS found. LSTM may
be univariate or perform poorly.")
if target_for_lstm in df_for_lstm.columns and isinstance(df_for_lstm.index, pd.DatetimeIndex):
lstm_results = run_lstm_model(df_for_lstm, target_for_lstm, lstm_feature_input_cols,
CONFIG)
else: print("Skipping LSTM: Target, DatetimeIndex, or valid features missing.")
# --- Stage 7: Generate Final Conclusions ---
print_section_header("Stage 7: Final Conclusions & Summary", level=1)
# Determine the main target variable shown in conclusions
final_conclusion_target = primary_analysis_target
if "stock" in run_mode and regression_results: final_conclusion_target = target_for_regression #
Show shifted target for reg
elif "forecast_arima_ets" in run_mode : final_conclusion_target =
CONFIG["ARIMA_ETS_TARGET_COLUMN"]
generate_comprehensive_conclusions(original_shape_main, df_cleaned_final.shape,
final_conclusion_target, run_mode,
reg_res_dict=regression_results,
ts_arima_ets_res_dict=arima_ets_results,
lstm_res_dict=lstm_results,
num_ta_added=len(technical_indicator_cols),
num_ratios_calc=len(calculated_ratio_cols))
# --- Stage 8: Interactive AI Query Module ---
print_section_header("Stage 8: Interactive AI Query Assistant", level=1)
if df_cleaned_final is not None and not df_cleaned_final.empty:
# Consolidate all unique feature columns (original, TAs, Fundamentals, Ratios) for AI module
all_available_features_for_ai = list(df_cleaned_final.columns)
# Pass fundamental and ratio column names separately for specific queries if needed
fund_cols_for_ai = [c for c in actual_fundamental_cols if c in df_cleaned_final.columns] # Raw
mapped fundamentals
ratio_cols_for_ai = [c for c in calculated_ratio_cols if c in df_cleaned_final.columns] # Calculated
ratios
ai_query_module(df_cleaned_final, regression_results, arima_ets_results, lstm_results,
all_available_features_for_ai, fund_cols_for_ai, ratio_cols_for_ai, CONFIG)
else:
print("Skipping AI Query Module: No cleaned data available.")
print_section_header("<<<<< SCRIPT EXECUTION FINISHED >>>>>", level=0)