"""
simple_swing.py
Simple swing-trading strategy:
- Signals: 20-EMA crossing 50-EMA (fast above slow -> buy; reverse -> sell)
- Filter: RSI(14) must be below rsi_entry_max to enter (avoids buying extreme
overbought)
- Exit: EMA cross down OR stop-loss / take-profit
- Backtest: fixed-fraction position sizing, daily close execution
Dependencies:
pip install yfinance pandas numpy matplotlib ta
(ta is optional; RSI implemented manually below if you prefer)
"""
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
# ---- PARAMETERS ----
symbol = "AAPL" # ticker to test
start = "2019-01-01"
end = datetime.today().strftime("%Y-%m-%d")
fast_ema_len = 20
slow_ema_len = 50
rsi_len = 14
rsi_entry_max = 70 # only enter if RSI < this (avoid buying at extreme)
stop_loss_pct = 0.08 # 8% stop loss from entry
take_profit_pct = 0.25 # 25% take profit from entry
initial_capital = 100000
risk_per_trade = 0.02 # fraction of capital risked per trade (position sizing)
commission_per_trade = 0 # flat commission per trade (set >0 if desired)
verbose = True
# ----------------------
def compute_ema(df, length, col="Close"):
return df[col].ewm(span=length, adjust=False).mean()
def compute_rsi(df, length=14, col="Close"):
# classic RSI
delta = df[col].diff(1)
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.rolling(window=length, min_periods=length).mean()
avg_loss = loss.rolling(window=length, min_periods=length).mean()
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def backtest(df):
cash = initial_capital
equity = initial_capital
position = 0 # number of shares held
entry_price = 0
entry_equity = 0
trade_log = []
equity_curve = []
for i in range(1, len(df)):
row = df.iloc[i]
prev = df.iloc[i-1]
date = row.name
close = row["Close"]
high = row["High"]
low = row["Low"]
# Signals
ema_fast = row["EMA_fast"]
ema_slow = row["EMA_slow"]
prev_ema_fast = prev["EMA_fast"]
prev_ema_slow = prev["EMA_slow"]
rsi = row["RSI"]
# Detect cross up/down
cross_up = (prev_ema_fast <= prev_ema_slow) and (ema_fast > ema_slow)
cross_down = (prev_ema_fast >= prev_ema_slow) and (ema_fast < ema_slow)
# --- Entry ---
if position == 0 and cross_up and (rsi < rsi_entry_max):
# position sizing: risk-based
# compute stop price and risk per share
stop_price = close * (1 - stop_loss_pct)
risk_per_share = close - stop_price
if risk_per_share <= 0:
continue
max_risk_amount = cash * risk_per_trade
shares = int(max_risk_amount / risk_per_share)
# if shares 0, buy at least 1 share if enough cash
if shares <= 0 and cash >= close:
shares = int(cash // close)
if shares > 0:
cost = shares * close + commission_per_trade
if cost <= cash:
position = shares
entry_price = close
entry_equity = cash + position * close
cash -= cost
trade_log.append({
"EntryDate": date, "Type": "Buy", "Price": close, "Shares":
shares, "CashAfter": cash
})
if verbose:
print(f"{date.date()} BUY {shares} @ {close:.2f}, cash
remaining {cash:.2f}")
# --- Manage open position ---
if position > 0:
# stop loss / take profit check (assume executed at close if hit
intraday)
if close <= entry_price * (1 - stop_loss_pct):
exit_price = close
cash += position * exit_price - commission_per_trade
trade_log[-1].update({"ExitDate": date, "ExitPrice": exit_price,
"ExitReason": "StopLoss"})
if verbose:
print(f"{date.date()} STOP LOSS exit {position} @
{exit_price:.2f}")
position = 0
entry_price = 0
elif close >= entry_price * (1 + take_profit_pct):
exit_price = close
cash += position * exit_price - commission_per_trade
trade_log[-1].update({"ExitDate": date, "ExitPrice": exit_price,
"ExitReason": "TakeProfit"})
if verbose:
print(f"{date.date()} TAKE PROFIT exit {position} @
{exit_price:.2f}")
position = 0
entry_price = 0
elif cross_down:
# exit on EMA cross down
exit_price = close
cash += position * exit_price - commission_per_trade
trade_log[-1].update({"ExitDate": date, "ExitPrice": exit_price,
"ExitReason": "EMA_CrossDown"})
if verbose:
print(f"{date.date()} EMA cross exit {position} @
{exit_price:.2f}")
position = 0
entry_price = 0
# equity snapshot
equity = cash + position * close
equity_curve.append({"Date": date, "Equity": equity})
equity_df = pd.DataFrame(equity_curve).set_index("Date")
trades = pd.DataFrame(trade_log)
return equity_df, trades
def performance_stats(equity_df):
returns = equity_df["Equity"].pct_change().fillna(0)
total_return = equity_df["Equity"].iloc[-1] / equity_df["Equity"].iloc[0] - 1
annualized_return = (1 + total_return) ** (252 / len(equity_df)) - 1
annualized_vol = returns.std() * np.sqrt(252)
sharpe = (annualized_return / annualized_vol) if annualized_vol != 0 else
np.nan
# max drawdown
cummax = equity_df["Equity"].cummax()
drawdown = (equity_df["Equity"] - cummax) / cummax
maxdd = drawdown.min()
return {
"Total Return": total_return,
"Annualized Return": annualized_return,
"Annualized Vol": annualized_vol,
"Sharpe": sharpe,
"Max Drawdown": maxdd
}
# ---- main ----
if __name__ == "__main__":
# Download data
df = yf.download(symbol, start=start, end=end, progress=False)
if df.empty:
raise SystemExit("No data downloaded; check ticker or internet
connection.")
# Indicators
df["EMA_fast"] = compute_ema(df, fast_ema_len)
df["EMA_slow"] = compute_ema(df, slow_ema_len)
df["RSI"] = compute_rsi(df, rsi_len)
df = df.dropna()
equity_df, trades = backtest(df)
stats = performance_stats(equity_df)
print("\nPerformance summary:")
for k, v in stats.items():
if isinstance(v, float):
print(f" {k}: {v:.4f}")
else:
print(f" {k}: {v}")
print(f"\nNumber of trades: {len(trades)}")
if not trades.empty:
print(trades[['EntryDate','ExitDate','Price' if 'Price' in trades.columns
else 'EntryDate']].head())
# Plot equity curve
plt.figure(figsize=(10,5))
plt.plot(equity_df.index, equity_df["Equity"])
plt.title(f"Equity Curve - {symbol}")
plt.ylabel("Portfolio Value")
plt.grid(True)
plt.tight_layout()
plt.show()