# -*- coding: utf-8 -*-
"""Activity_Detection_Code.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1aCkm53svUCwXaD1CK4h4u3DE1sgTrsJI
"""
import pandas as pd
import numpy as np
import math
import seaborn as sns
import matplotlib.pyplot as plt
sns.set(style='darkgrid')
!pip install m2cgen
def features(lst):
if len(lst) == 0:
return 0,0,0,0,0,0,0,0,0,0
l=len(lst)
#Computing various features
avg=sum(lst)/l #1
std_dev=(sum([((i - avg) ** 2) for i in lst]) / l)**0.5 #2
energy=sum([i**2 for i in lst])/l #3
skew=sum([((i - avg) ** 3) for i in lst]) * (1 / (l * ((std_dev+0.000001) ** 3)))
#4
return [avg, std_dev, energy, skew]
def process_data(activity_df):
# Convert all columns to numeric (ensuring data consistency)
for col in activity_df.columns:
activity_df[col] = pd.to_numeric(activity_df[col], errors='coerce')
#after doing EDA, it was found that only A and G features are sufficient for
training
activity_df.drop(columns=['Index', 'M_x', 'M_y', 'M_z'], errors='ignore',
inplace=True)
buffer_capacity = 100 # Buffer size for feature computation
sensor_buffers = [[] for _ in range(6)] # Allocate storage for 9 sensor
readings
computed_features = [] # Store processed feature values
# Iterate through dataset rows
for idx, row in activity_df.iterrows():
# Append data to respective sensor buffers
for sensor_idx, reading in enumerate(row):
sensor_buffers[sensor_idx].append(reading)
# Wait until buffer fills up before processing
if len(sensor_buffers[0]) < buffer_capacity:
continue
# Maintain buffer size (remove oldest entries when limit exceeds)
if len(sensor_buffers[0]) > buffer_capacity:
for buf in sensor_buffers:
buf.pop(0)
# Extract features from sensor data
feature_vector = []
for i, sensor_data in enumerate(sensor_buffers):
feature_vector.extend([sensor_data[-1]] + features(sensor_data))
computed_features.append(feature_vector)
# Define sensor types and feature names
sensor_types = ['A_x','A_y','A_z','G_x','G_y','G_z']
feature_names = ['mean', 'std_dev', 'energy', 'skew']
# Generate column names dynamically
column_headers = []
for sensor in sensor_types:
column_headers.append(sensor) # Include raw sensor values
for feat in feature_names:
column_headers.append(f"{sensor}_{feat}") # Add feature-specific
columns
print(column_headers) # Display generated column names
# Convert processed data into a structured DataFrame
processed_df = pd.DataFrame(computed_features, columns=column_headers)
return processed_df # Return final structured dataset
# Reading data from the sensors
idle_df = pd.read_csv('sensor_data_idle_sanjeev.csv',on_bad_lines='skip')
sweeping_df = pd.read_csv('sensor_data_sweeping_sanjeev.csv',on_bad_lines='skip')
vibration_df = pd.read_csv('sensor_data_vibrating_sanjeev.csv',on_bad_lines='skip')
walking_df= pd.read_csv('sensor_data_walking_sanjeev.csv',on_bad_lines='skip')
jumping_df = pd.read_csv('sensor_data_jumping_sanjeev.csv',on_bad_lines='skip')
# Define sensor groups and their respective columns
sensor_groups = {
'Accelerometer': ['A_x', 'A_y', 'A_z'],
'Gyroscope': ['G_x', 'G_y', 'G_z'],
'Magnetometer': ['M_x', 'M_y', 'M_z']
}
# Define activities and their respective dataframes
activities = {
'Idle': idle_df,
'Jumping': jumping_df,
'Sweeping': sweeping_df,
'Vibration': vibration_df,
'Walking': walking_df
}
# Loop through each sensor type to create separate figures
for sensor_type, sensor_cols in sensor_groups.items():
for axis in sensor_cols: # Iterate over individual axes (x, y, z)
fig, ax = plt.subplots(1, 5, figsize=(20, 5), sharey=True)
# Loop through each activity and plot the corresponding sensor data
for i, (activity, df) in enumerate(activities.items()):
df[axis].plot(ax=ax[i], legend=False)
ax[i].set_title(f'{activity} - {axis} ({sensor_type})', fontsize=12)
ax[i].set_xlabel("Time")
plt.suptitle(f'{axis} ({sensor_type}) Data Across Activities', fontsize=16)
plt.show()
# The plots below show the raw data acquired through the sensors. We can see
multiple unwanted outliers
# which we shall remove during pre-processing
'''Pre-processing of data to eliminate unwanted outliers'''
import numpy as np
import matplotlib.pyplot as plt
# Remove only A_x outliers based on activity-specific thresholds
cleaned_activities = {}
for activity, df in activities.items():
df_cleaned = df.copy() # Create a copy to preserve the original data
if activity == 'Idle':
df_cleaned.loc[(df_cleaned['A_x'] > 1) | (df_cleaned['A_x'] < -1), 'A_x'] =
np.nan
print(f"Marked {sum((df['A_x'] > 1) | (df['A_x'] < -1))} outliers in A_x as
NaN in {activity} dataset.")
else:
df_cleaned.loc[(df_cleaned['A_x'] > 1000) | (df_cleaned['A_x'] < -1000),
'A_x'] = np.nan
print(f"Marked {sum((df['A_x'] > 1000) | (df['A_x'] < -1000))} outliers in
A_x as NaN in {activity} dataset.")
cleaned_activities[activity] = df_cleaned
sensor_groups = {
'Accelerometer': ['A_x', 'A_y', 'A_z'],
'Gyroscope': ['G_x', 'G_y', 'G_z'],
'Magnetometer': ['M_x', 'M_y', 'M_z']
}
# Plot Accelerometer, Gyroscope, and Magnetometer Data for Each Activity
for sensor_type, sensor_cols in sensor_groups.items():
fig, axes = plt.subplots(len(sensor_cols), 5, figsize=(20, 15), sharex=True,
sharey=True)
# Loop through each axis (x, y, z)
for row_idx, axis in enumerate(sensor_cols):
# Loop through each activity and plot its sensor data
for col_idx, (activity, df) in enumerate(cleaned_activities.items()):
df[axis].plot(ax=axes[row_idx, col_idx], legend=False)
axes[row_idx, col_idx].set_title(f'{activity} - {axis}
({sensor_type})', fontsize=10)
axes[row_idx, col_idx].set_xlabel("Time")
# Set y-axis limits to zoom in
axes[row_idx, col_idx].set_ylim(-3, 3) # Adjusting the scale
axes[row_idx,col_idx].set_xlabel("Time")
plt.suptitle(f'{sensor_type} Data Across Activities (Y-Axis Zoomed)',
fontsize=16)
plt.show()
# Assign labels for activities
idle_df['Target'] = 'I'
vibration_df['Target'] = 'V'
sweeping_df['Target'] = 'S'
walking_df['Target'] = 'W'
jumping_df['Target'] = 'J'
idle_df = idle_df.drop(columns=['Target'], errors='ignore')
jumping_df = jumping_df.drop(columns=['Target'], errors='ignore')
sweeping_df = sweeping_df.drop(columns=['Target'], errors='ignore')
vibration_df = vibration_df.drop(columns=['Target'], errors='ignore')
walking_df = walking_df.drop(columns=['Target'], errors='ignore')
idle = process_data(idle_df)
idle['Target'] = 'Idle'
jumping = process_data(jumping_df)
jumping['Target'] = 'Jumping'
sweeping = process_data(sweeping_df)
sweeping['Target'] = 'Sweeping'
vibration = process_data(vibration_df)
vibration['Target'] = 'Vibration'
walking = process_data(walking_df)
walking['Target'] = 'Walking'
df = pd.concat([idle, jumping, sweeping, vibration, walking])
cols = df.columns.drop('Target')
X, y = df[cols], df['Target']
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1,
random_state=42)
# X_train
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
model_rf=RandomForestClassifier(max_depth=5, n_estimators=5, random_state=0)
model_rf.fit(X_train, y_train)
print("Training")
training_predict_rf = model_rf.predict(X_train)
print (metrics.classification_report(y_train, training_predict_rf, digits = 3 ))
print (metrics.confusion_matrix(y_train, training_predict_rf))
test_predict_rf = model_rf.predict(X_test)
print ("Test")
print (metrics.classification_report(y_test, test_predict_rf, digits = 3 ))
print (metrics.confusion_matrix(y_test, test_predict_rf))
import m2cgen as m2c
model_to_python_rf = m2c.export_to_python(model_rf)
# model_to_python_dt
print(model_to_python_rf)
from sklearn.tree import DecisionTreeClassifier
from sklearn import metrics
import m2cgen as m2c
# Initialize Decision Tree Classifier
model_dt = DecisionTreeClassifier(max_depth=5, random_state=0)
# Train the Decision Tree model
model_dt.fit(X_train, y_train)
# Training Evaluation
print("Training Performance")
training_predict_dt = model_dt.predict(X_train)
print(metrics.classification_report(y_train, training_predict_dt, digits=3))
print(metrics.confusion_matrix(y_train, training_predict_dt))
# Testing Evaluation
print("\nTesting Performance")
test_predict_dt = model_dt.predict(X_test)
print(metrics.classification_report(y_test, test_predict_dt, digits=3))
print(metrics.confusion_matrix(y_test, test_predict_dt))
# Convert Decision Tree model to Python code using m2cgen
model_to_python_dt = m2c.export_to_python(model_dt)
# Print the converted Python code
print(model_to_python_dt)
# Verify classifier type
print(model_dt.__class__)
print(model_dt.classes_)
from sklearn.ensemble import ExtraTreesClassifier
from sklearn import metrics
import m2cgen as m2c
# Train Extra Trees Classifier
model_extra_trees = ExtraTreesClassifier(n_estimators=5, max_depth=5,
random_state=0)
model_extra_trees.fit(X_train, y_train_encoded)
# Training Performance
print("Training Performance")
training_predict_et = model_extra_trees.predict(X_train)
print(metrics.classification_report(y_train_encoded, training_predict_et,
digits=3))
print(metrics.confusion_matrix(y_train_encoded, training_predict_et))
# Testing Performance
print("\nTesting Performance")
test_predict_et = model_extra_trees.predict(X_test)
print(metrics.classification_report(y_test_encoded, test_predict_et, digits=3))
print(metrics.confusion_matrix(y_test_encoded, test_predict_et))
# Convert Extra Trees model to Python using m2cgen
model_to_python_et = m2c.export_to_python(model_extra_trees)
print(model_to_python_et)
# Verify classifier type and classes
print(model_extra_trees.__class__)
print("Encoded Classes: ", label_encoder.classes_) # Shows mapping of labels to
numbers
print(model_rf.__class__)
model_rf.classes_