Sensor Data Acquisition Testing:
Introduction:
There are 5 main sensors; Bar30 (External pressure sensor), BME680 (Internal pressure
sensor), SOS Leak sensor (Blue robotics), Adafruit INA260 (Current sensor), Sparton AHRS-8
(Altitude heading and reference system). The goal is to collect data from all these 5 sensors
and store it in a CSV file.
Problem and Solution:
1) Problem: There are libraries given by the manufacturer in order to acquire data from
these sensors using raspberry pi for all sensors other than AHRS-8.
Solution: For AHRS-8 a GitHub repository was referred and the ROS code based of
python was converted to raspberry pi data acquisition code. Before directly testing it
in raspberry pi, the AHRS was connected to a laptop and was tested as this made the
testing process much simpler, and this same code was migrated to raspberry pi and
tested. The link to the GitHub repository is this:
https://github.com/RoboticsClubatUCF/ucf_AHRS/blob/master/core_sparton/src/
ahrs8_nmea.py
2) Problem: The sensors were sometime giving random values that made no sense.
Solution: Power the sensor correctly, even though you don’t turn on the sensor, it
doesn’t mean it will give value of 0 it might give random values of large magnitude
causing errors.
3) Problem: The current sensor got damaged many times.
Solution: Even though it is attached to the portion with its rated current and voltage,
the sensor got permanently damaged. Thus, it has currently been removed from the
complete architecture of the system as the underlying reason for this has not yet
been found.
4) Problem: The external and internal pressure sensor failed once.
Solution: Both were replaced with new sensors and everything seemed to be
working fine. The assumption for the cause of failure seems to be natural and it is
assumed that it was the life of the sensor.
5) Problem: The wiring of all the sensors were too messy and it was difficult to remove,
attach and debug problems with the sensors.
Solution: Designed and fabricated a custom PCB with JST connectors so it was easier
to fix and remove the sensor wires. The raspberry pi was connected to this custom
PCB board using a ribbon cable.
Idea iterations:
1) Initially the primary design was to use IPC (Inter process communication) and run 2
programs one for acquiring data from the AHRS-8 and the other for the rest of the
sensors. This data had to be pipelined to a main dashboard in order to display the
sensor data. This system worked fine but there were a few timing issues (this means
the data being recorded was supposed to be poled for each second but it was polling
randomly between 1 and 2 seconds).
2) Instead of having 2 programs and 2 pipelines the program was combined into one
program and only a single pipeline was created to make the architecture simple. This
worked well but there was this new functionality where we had to not only display
the data but create a CSV file to store the acquired data. The data storage worked but
when this was integrated with the actuator control and BMS dashboard there were
issues. Basically, the dashboard was created in such a way that there was a drop-
down list on the top right which was able to switch between the actuator control,
BMS control & monitoring and sensor data acquisition. The problem was that when
we tried to extend the actuator and switched to the other 2 dashboard the actuator
stopped and the other functionality worked, and the same happened when we
switched back to actuator control the data acquisition stopped working as there was
no data being stored in the CSV file. After searching for solutions multiprocessing and
multithreading seemed to be the final solution.
3) Based on this a multiprocessing dashboard was created and the IPC was removed as
it was difficult to debug. Instead of IPC we used shared memory. Basically, the
program writes the data onto the CSV file and shared memory and the dashboard
reads data from shared memory. This is a much cleaner system than IPC and only
minimal and necessary data is being shared between the programs. We could have
also implemented multithreading but threads tend to fail more so multiprocessing
was implemented.
4) A feature called abnormality was added where this is raised when a certain sensor
crosses a certain threshold, this is a failproof mechanism implemented in order to
make the actuator fully and disable all control feature. Eg. If the sensor detects a leak
in the system, then there is something wrong so we extend fully to increase the
buoyancy more than the weight which moves the actuator up.
Sensor Data Acquisition (Custom Module) Code:
import time
import os
import csv
import RPi.GPIO as GPIO
import busio
import board
import serial
from datetime import datetime
import math
import threading
latest_sensor_data = {}
latest_bms_data = {}
# Attempt to import sensor libraries with error handling
try:
from adafruit_bme680 import Adafruit_BME680_I2C
BME680_AVAILABLE = True
except ImportError:
BME680_AVAILABLE = False
try:
import ms5837
MS5837_AVAILABLE = True
except ImportError:
MS5837_AVAILABLE = False
try:
import adafruit_ina260
INA260_AVAILABLE = True
except ImportError:
INA260_AVAILABLE = False
# Function to initialize I2C
def setup_i2c():
"""Initializes I2C communication."""
try:
i2c = busio.I2C(board.SCL, board.SDA)
print("I2C setup complete.")
return i2c
except Exception as e:
print(f"I2C Setup Error: {e}")
return None
# Function to initialize sensors safely
def initialize_sensors():
"""Initialize available sensors with error handling."""
i2c = setup_i2c() # Initialize I2C once
sensors = {}
try:
if BME680_AVAILABLE:
sensors['bme680'] = Adafruit_BME680_I2C(i2c, address=0x77)
except Exception as e:
print(f"Failed to initialize BME680: {e}")
sensors['bme680'] = None
try:
if MS5837_AVAILABLE:
sensors['pressure'] = ms5837.MS5837_30BA()
if sensors['pressure'].init(): # Ensure sensor is initialized
correctly
print("MS5837 sensor initialized successfully.")
else:
print("Failed to initialize MS5837 sensor.")
sensors['pressure'] = None
except Exception as e:
print(f"Failed to initialize MS5837: {e}")
sensors['pressure'] = None
try:
if INA260_AVAILABLE:
sensors['ina260'] = adafruit_ina260.INA260(i2c)
except Exception as e:
print(f"Failed to initialize INA260: {e}")
sensors['ina260'] = None
return sensors
# File paths
SENSOR_LOG_FILE = "1_sensor_data.csv"
BMS_LOG_FILE = "1_bms_data.csv"
# Logging state
is_logging = False
def setup_gpio():
"""Initializes GPIO settings."""
try:
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(17, GPIO.IN) # Example: Leak Sensor Pin
print("GPIO setup complete.")
except Exception as e:
print(f"GPIO Setup Error: {e}")
def setup_serial():
"""Initializes serial communication."""
try:
ser = serial.Serial('/dev/ttyUSB1', baudrate=115200, bytesize=8,
parity='N', stopbits=1, timeout=0.05)
print("Serial communication setup complete.")
return ser
except Exception as e:
print(f"Serial Setup Error: {e}")
return None
########### Abnormality ###########
# Define global abnormality flag
abnormality = False
THRESHOLDS = {
"temperature": 50, # Max temperature in °C
"humidity": 90, # Max humidity in %
"pressure_hpa": 1100, # Max pressure in hPa
"pressure_mbar": 1100, # Max pressure in mbar
"temperature_C_pressure": 50, # Max temperature from pressure sensor
"current": 10, # Max current in A
"voltage": 15 # Max voltage in V
}
# Leak detection is a boolean check (not a numerical threshold)
LEAK_DETECTED_THRESHOLD = True # If True, an abnormality is triggered
# Abnormality is not reset
"""def check_abnormality(sensor_data):
global abnormality
for key, threshold in THRESHOLDS.items():
if key in sensor_data and sensor_data[key] != "Error": # Ignore
errors
try:
if float(sensor_data[key]) > threshold: # Compare numeric
values
abnormality = True # Trigger abnormality
print(f"⚠️ Abnormality detected: {key} exceeded
threshold ({sensor_data[key]} > {threshold})")
return # Stop further checking
except ValueError:
pass # Ignore conversion errors for non-numeric values
# Special case: Leak sensor (boolean check)
if "leak_detected" in sensor_data and
isinstance(sensor_data["leak_detected"], bool):
if sensor_data["leak_detected"] == LEAK_DETECTED_THRESHOLD:
abnormality = True
print(f"⚠️ Abnormality detected: Leak detected!")"""
# Abnormality is reset
def check_abnormality(sensor_data):
global abnormality
# Assume normal state before checking
abnormality_detected = False
for key, threshold in THRESHOLDS.items():
if key in sensor_data and sensor_data[key] != "Error": # Ignore
errors
try:
if float(sensor_data[key]) > threshold: # Compare numeric
values
abnormality_detected = True
print(f"⚠️ Abnormality detected: {key} exceeded
threshold ({sensor_data[key]} > {threshold})")
break # No need to check further
except ValueError:
pass # Ignore conversion errors for non-numeric values
# Special case: Leak sensor (boolean check)
if "leak_detected" in sensor_data and
isinstance(sensor_data["leak_detected"], bool):
if sensor_data["leak_detected"] == LEAK_DETECTED_THRESHOLD:
abnormality_detected = True
print(f"⚠️ Abnormality detected: Leak detected!")
# Update the global abnormality flag based on detection status
abnormality = abnormality_detected
########### Sensor #############
def read_sensors(sensors):
data = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"temperature": None,
"humidity": None,
"pressure_hpa": None,
"pressure_mbar": None,
"temperature_C_pressure": None,
"current": None,
"voltage": None,
"leak_detected": GPIO.input(17) == 1,
}
if 'bme680' in sensors:
try:
sensor = sensors['bme680']
data.update({
"temperature": sensor.temperature,
"humidity": sensor.humidity,
"pressure_hpa": sensor.pressure
})
except Exception:
pass
if 'pressure' in sensors:
try:
sensor = sensors['pressure']
if sensor.read():
data.update({
"pressure_mbar": sensor.pressure(ms5837.UNITS_mbar),
"temperature_C_pressure":
sensor.temperature(ms5837.UNITS_Centigrade)
})
except Exception:
pass
if 'ina260' in sensors:
try:
sensor = sensors['ina260']
data.update({
"current": max(0, sensor.current),
"voltage": sensor.voltage
})
except Exception:
pass
return data
def read_ins_data():
ins_data = {}
try:
with serial.Serial("/dev/ttyUSB0", 115200, timeout=0.9) as ser:
ser.write(b"$PSPA,G\r\n")
gyro_response = ser.readline().decode('utf-8').strip()
ser.write(b"$PSPA,QUAT\r\n")
quat_response = ser.readline().decode('utf-8').strip()
ser.write(b"$PSPA,A\r\n")
accel_response = ser.readline().decode('utf-8').strip()
def parse_G(string):
split_string = string.split(",")
if len(split_string) < 4:
return None
try:
Gx = float(split_string[1].split("=")[1]) * (math.pi /
180.0) / 1000
Gy = float(split_string[2].split("=")[1]) * (math.pi /
180.0) / 1000
Gz = float(split_string[3].split("=")[1].split("*")[0])
* (math.pi / 180.0) / 1000
return {"Gx": Gx, "Gy": Gy, "Gz": Gz}
except ValueError:
return None
def parse_A(string):
split_string = string.split(",")
if len(split_string) < 4:
return None
try:
Ax = float(split_string[1].split("=")[1]) / 1000 * 9.81
Ay = float(split_string[2].split("=")[1]) / 1000 * 9.81
Az = float(split_string[3].split("=")[1].split("*")[0])
/ 1000 * 9.81
return {"Ax": -Ax, "Ay": Ay, "Az": Az}
except ValueError:
return None
def parse_QUAT(string):
split_string = string.split(",")
if len(split_string) < 5:
return None
try:
w = float(split_string[1].split("=")[1])
x = float(split_string[2].split("=")[1])
y = float(split_string[3].split("=")[1])
z = float(split_string[4].split("=")[1].split("*")[0])
return {"orientation_w": w, "orientation_x": x,
"orientation_y": y, "orientation_z": z}
except ValueError:
return None
gyro_data = parse_G(gyro_response)
if gyro_data:
ins_data.update(gyro_data)
accel_data = parse_A(accel_response)
if accel_data:
ins_data.update(accel_data)
quat_data = parse_QUAT(quat_response)
if quat_data:
ins_data.update(quat_data)
except Exception as e:
print(f"INS sensor error: {e}")
return ins_data
#####################################################
##################### BATTERY #######################
def send_command(ser, command, address, data):
packet = [
command,
(address >> 8) & 0xFF,
address & 0xFF,
(data >> 8) & 0xFF,
data & 0xFF,
0x00, 0x00,
0x00
]
ser.write(bytes(packet))
response = ser.read(10)
return list(response)
def get_active_state_description(active_state_value):
state_descriptions = {
100: "System fault",
101: "Temperature trip",
102: "Short circuit trip",
103: "Overload current trip",
104: "Cell voltage fault",
105: "Over-charge trip",
106: "Over-discharge trip",
107: "Pre-charge state",
108: "Normal operation",
109: "Critical over-charge trip",
110: "Critical over-discharge trip",
90: "User disabled state",
91: "Sleep state",
}
return state_descriptions.get(active_state_value, "Unknown state")
def get_rtd(ser):
try:
if ser is None:
raise Exception("Serial connection failed.") # 🔹 Ensure serial
is valid
temp = send_command(ser, 0x30, 0x0F00, 0x0000)
if not temp:
raise Exception("Serial read failed for voltage") # 🔹 Prevent
crashes on bad reads
v = round((temp[1] + temp[2] * 256) * 0.1, 1)
temp = send_command(ser, 0x30, 0x1300, 0x0000)
if not temp:
raise Exception("Serial read failed for current")
i = round((temp[1] + temp[2] * 256) * 0.1, 1)
temp = send_command(ser, 0x30, 0x0100, 0x0000)
status = get_active_state_description(temp[1])
temp = send_command(ser, 0x30, 0x4600, 0x0000)
if not temp:
raise Exception("Serial read failed for BMS temperature")
bms_temp = temp[1] - 128
# ✅ Check and Fix Battery Temperature Readings
bat_temp = []
hex_values = [0x4800, 0x4900, 0x4A00, 0x4B00] # 🔹 Verify these
register addresses
for register in hex_values:
temp = send_command(ser, 0x30, register, 0x0000)
if temp:
bat_temp.append(temp[1] - 128) # Convert raw data
else:
bat_temp.append("Error")
# ✅ Check and Fix Battery Voltage Readings
bat_v = []
hex_values = [0x6500, 0x6600, 0x6700, 0x6800, 0x6900, 0x6A00,
0x6B00] # 🔹 Verify these addresses
for register in hex_values:
temp = send_command(ser, 0x30, register, 0x0000)
if temp:
voltage = temp[2] * 256 + temp[1]
bat_v.append(voltage)
else:
bat_v.append("Error")
return [datetime.now().strftime("%Y-%m-%d %H:%M:%S"), v, i, status,
bms_temp, bat_temp, bat_v]
except Exception as e:
print(f"❌ Error reading BMS data: {e}")
return None
#####################################################
def start_logging():
global is_logging
is_logging = True
print("Logging started...")
def stop_logging():
global is_logging
is_logging = False
print("Logging stopped...")
def log_sensor_data(sensors, data):
"""Reads sensor and INS data, then writes it to a CSV file with
abnormality flag."""
global abnormality
file_exists = os.path.isfile(SENSOR_LOG_FILE)
if not file_exists:
with open(SENSOR_LOG_FILE, mode="w", newline="") as file:
writer = csv.writer(file)
writer.writerow(["temperature", "humidity", "pressure_hpa",
"pressure_mbar",
"temperature_C_pressure", "current",
"voltage", "leak_detected",
"Timestamp", "Gx", "Gy", "Gz", "Ax", "Ay",
"Az",
"orientation_w", "orientation_x",
"orientation_y", "orientation_z",
"abnormality"]) # Added abnormality column
try:
"""data = read_sensors(sensors)
ins_data = read_ins_data()
data.update(ins_data)"""
for key in data:
if data[key] is None:
data[key] = "Error"
# Check for abnormality based on thresholds
check_abnormality(data)
# Add abnormality flag to logged data
data["abnormality"] = abnormality
# print(f"Logging sensor & INS data: {data}")
except Exception as e:
print(f"Sensor Read Error: {e}")
return
with open(SENSOR_LOG_FILE, mode="a", newline="") as file:
writer = csv.writer(file)
writer.writerow(data.values())
file.flush()
def log_bms_data(ser, data):
"""Reads BMS data and writes it to a CSV file, ensuring proper file
creation and error handling."""
# Ensure the file is created before reading BMS data
file_exists = os.path.isfile(BMS_LOG_FILE)
if not file_exists:
with open(BMS_LOG_FILE, mode="w", newline="") as file:
writer = csv.writer(file)
writer.writerow(
["Timestamp", "Voltage", "Current", "State", "BMS Temp",
"Battery Temps", "Battery Voltages"])
# 🔹 Column headers are now added if the file is new
try:
if ser is None:
raise Exception("Serial connection failed.") # 🔹 If serial
setup fails, raise an error
# data = get_rtd(ser) # Get BMS readings
# print(f"Logging BMS data: {data}") # Debugging print
except Exception as e:
print(f"BMS Read Error: {e}")
return # Skip logging if BMS read fails, but the file is already
created
# Write data to the CSV file
with open(BMS_LOG_FILE, mode="a", newline="") as file:
writer = csv.writer(file)
writer.writerow(data) # Write available BMS data
file.flush() # 🔹 Ensure immediate data storage
def monitor_and_log(shared_data):
start_logging()
setup_gpio() # Setup GPIO once
ser = setup_serial() # Setup Serial once
sensors = initialize_sensors() # Setup I2C and Sensors once
try:
while True:
if is_logging:
latest_sensor = read_sensors(sensors)
ins_data = read_ins_data()
latest_sensor.update(ins_data)
log_sensor_data(sensors, latest_sensor) # Log sensor & INS
data
latest_bms = get_rtd(ser)
log_bms_data(ser, latest_bms) # Log BMS data
# Update shared memory for the dashboard
shared_data['sensor'] = latest_sensor
shared_data['bms'] = latest_bms
# print(latest_sensor, "\n", latest_bms)
except KeyboardInterrupt:
print("\nLogging stopped by user (Ctrl+C).")
finally:
stop_logging()
print("Data safely stored before exit.")
if __name__ == "__main__":
monitor_and_log()
Sensor Dashboard Code:
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
from multiprocessing import Manager, Process
import time
import log_draft # Assumes the modified monitor_and_log() is here
# Create a shared memory manager and dictionary
manager = Manager()
shared_data = manager.dict()
shared_data['sensor'] = {}
shared_data['bms'] = {}
# Initialize the Dash app with your existing layout
app = dash.Dash(__name__)
app.layout = html.Div([
html.H1("Real-Time Data Display"),
html.Div([
html.H2("Latest Sensor Data:"),
html.Pre(id='sensor-data-display')
], style={'margin-bottom': '20px'}),
html.Div([
html.H2("Latest BMS Data:"),
html.Pre(id='bms-data-display')
]),
# Fires every 1 second (1000 ms), causing a callback to update data
dcc.Interval(id='interval-component', interval=1000, n_intervals=0)
])
# Callback that reads from the shared dictionary
@app.callback(
Output('sensor-data-display', 'children'),
Output('bms-data-display', 'children'),
Input('interval-component', 'n_intervals')
)
def update_data(n):
sensor_str = str(shared_data.get('sensor', {}))
bms_str = str(shared_data.get('bms', {}))
return sensor_str, bms_str
if __name__ == '__main__':
# Start the logging process with shared memory
p = Process(target=log_draft.monitor_and_log, args=(shared_data,))
p.start()
# Run Dash server
app.run_server(debug=False)
# When the Dash server is stopped, terminate the logging process
p.terminate()
p.join()