App Test
App Test
py
from binascii import Error
import csv
import io
import math
import requests
from flask import Flask, Response, json, jsonify, request
from flask_cors import CORS
import pymysql.cursors
from datetime import datetime, timedelta, time, timezone
from collections import defaultdict
from typing import Optional, Dict, Any, List, Union
import logging
from werkzeug.utils import secure_filename
from flask import session
# Added import
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
import os
from dotenv import load_dotenv
import uuid
import boto3
def get_db_connection():
host=os.getenv('DB_HOST', '139.59.84.157')
port=int(os.getenv('DB_PORT', 3306))
user=os.getenv('DB_USERNAME', 'root')
password=os.getenv('DB_PASSWORD', 'LIFELAB@1server')
database=os.getenv('DB_DATABASE', 'lifeapp')
return pymysql.connect(
host = host,
port = port,
user = user,
password=password,
database=database,
cursorclass=pymysql.cursors.DictCursor
)
@app.route('/api/login', methods=['POST'])
def admin_login():
data = request.json
email = data.get('email')
password = data.get('password')
cursor = get_db_connection().cursor(pymysql.cursors.DictCursor)
cursor.execute("SELECT id, email, password, type FROM users WHERE email=%s AND
type=1", (email,))
user = cursor.fetchone()
if not user:
return jsonify({
'errors': {
'title': 'Login Failed!',
'icon': 'AlertCircleIcon',
'type': 'warning',
'message': 'Admin not exist'
}
}), 401
if password != user['password']:
return jsonify({
'errors': {
'title': 'Login Failed!',
'icon': 'EyeOffIcon',
'type': 'warning',
'message': 'Admin wrong password'
}
}), 401
return jsonify({
'accessToken': 'test-token', # Optional, just for frontend compatibility
'message': 'Login Success Full',
'res': True,
'admin': admin
}), 200
@app.route('/api/logout', methods=['POST'])
def admin_logout():
# For stateless token-based login, logout is usually handled on frontend
# But we return a success response for frontend compatibility
return jsonify({
'message': 'Logout successful',
'res': True
}), 200
@app.route('/debug-env', methods = ['GET'])
def debug_env():
host = os.getenv('DB_HOST')
user = os.getenv('DB_USERNAME')
password = os.getenv('DB_PASSWORD')
return jsonify({
'host': host,
'user': user,
'password': password
})
# @app.route('/')
# def backup():
# return "Heya, thanks for checking"
def upload_media(file):
original_filename = file.filename
ext = os.path.splitext(original_filename)[1]
unique_filename = str(uuid.uuid4()) + ext
key = f"media/{unique_filename}"
s3 = boto3.client(
's3',
region_name=DO_SPACES_REGION,
endpoint_url=DO_SPACES_ENDPOINT,
aws_access_key_id=DO_SPACES_KEY,
aws_secret_access_key=DO_SPACES_SECRET
)
sql = """
INSERT INTO media (name, path)
VALUES (%s, %s)
"""
cur.execute(sql, (original_filename, key))
conn.commit()
cur.close()
conn.close()
BASE_URL = os.getenv('BASE_URL', '')
# Return Media Object
return {
'id': media_id,
'name': original_filename,
'path': key,
'url': BASE_URL + key
}
###################################################################################
###################################################################################
######################## HOME DASHBOARD APIs ######################################
###################################################################################
###################################################################################
Args:
query (str): SQL query to execute
params (tuple, optional): Query parameters
Returns:
List[Dict[str, Any]]: Query results
"""
connection = None
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(query, params or ())
return cursor.fetchall()
except Exception as e:
logger.error(f"Query execution error: {str(e)}")
raise
finally:
if connection:
connection.close()
@app.route('/api/signing-user', methods=['POST'])
def get_user_signups2():
"""
Get user signup statistics grouped by different time periods.
Query parameters:
grouping: str - Time grouping (daily, weekly, monthly, quarterly, yearly,
lifetime)
start_date: str - Start date for filtering (YYYY-MM-DD)
end_date: str - End date for filtering (YYYY-MM-DD)
"""
try:
filters = request.get_json() or {}
grouping = filters.get('grouping', 'monthly')
user_type = filters.get('user_type', 'All') # Get user_type from request
start_date = filters.get('start_date')
end_date = filters.get('end_date')
return jsonify(response)
except Exception as e:
logger.error(f"Error in get_user_signups: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/user-signups', methods=['GET'])
def get_user_signups():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
# Execute the SQL query
sql = """
SELECT
DATE_FORMAT(created_at, '%Y-%m') AS month,
COUNT(*) AS count
FROM lifeapp.users
GROUP BY month
HAVING month is not null
ORDER BY month
"""
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/user-count', methods=['GET'])
def get_user_count():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
# Execute the SQL query
sql = """
SELECT count(*) as count from lifeapp.users
"""
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/new-signups', methods=['GET'])
def get_new_signups():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
# Execute the SQL query
sql = """
SELECT
DATE_FORMAT(created_at, '%Y-%m') AS month,
COUNT(*) AS count
FROM lifeapp.users
GROUP BY month
HAVING month is not null
ORDER BY month DESC
LIMIT 1
"""
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
"""
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/coupons-used-count', methods=['GET'])
def get_coupons_used_count():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
select -amount as amount, count(*) as coupon_count from
lifeapp.coin_transactions group by coinable_type,amount having amount < 0 order by
amount asc ;
"""
cursor.execute(sql)
result = cursor.fetchall()
print("Query Result:", result) # Debugging statement
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/user-type-chart',methods = ['GET'])
def get_user_type_fetch():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
# Execute the SQL query
sql = """
select count(*) as count,
case
when type = 1
then 'Admin'
when type = 3
then 'Student'
when type = 5
then 'Teacher'
when type = 4
then 'Mentor'
else 'Default'
end as
userType from lifeapp.users group by type;
"""
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/students-by-grade-over-time', methods=['POST'])
def get_students_by_grade_over_time():
req = request.get_json()
grouping = req.get('grouping', 'monthly')
sql = f"""
SELECT {period_expr} AS period,
IFNULL(grade, 'Unspecified') AS grade,
COUNT(*) AS count
FROM lifeapp.users
WHERE `type` = 3
GROUP BY period, grade
ORDER BY period, grade;
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/total-student-count', methods=['GET'])
def get_total_student_count():
connection = None # Initialize connection to None
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
SELECT count(*) as count from lifeapp.users where `type` = 3;
"""
cursor.execute(sql)
result = cursor.fetchall()
print("Query Result:", result) # Debugging statement
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
if connection: # Only close connection if it was established
connection.close()
@app.route('/api/teachers-by-grade-over-time', methods=['POST'])
def get_teachers_by_grade_over_time():
req = request.get_json()
grouping = req.get('grouping', 'monthly')
sql = f"""
SELECT {period_expr} AS period,
la_grade_id AS grade,
COUNT(DISTINCT user_id) AS count
FROM lifeapp.la_teacher_grades
GROUP BY period, grade
ORDER BY period, grade;
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/demograph-students', methods=['POST'])
def get_demograph_students():
"""
Returns Count of students in each state with normalized state names.
Ensures unique state entries and consistent naming.
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
# Normalize state names and aggregate counts
sql = """
SELECT
CASE
WHEN state IN ('Gujrat', 'Gujarat') THEN 'Gujarat'
WHEN state IN ('Tamilnadu', 'Tamil Nadu') THEN 'Tamil Nadu'
ELSE state
END AS normalized_state,
SUM(count) as total_count
FROM (
SELECT state, COUNT(*) as count
FROM lifeapp.users
WHERE `type` = 3 AND state != 2
GROUP BY state
) AS subquery
GROUP BY normalized_state
ORDER BY total_count DESC
"""
cursor.execute(sql)
result = cursor.fetchall()
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/total-points-earned', methods=['POST'])
def get_total_points_earned():
sql1 = """
SELECT COALESCE(SUM(points), 0) AS total_points
FROM lifeapp.la_mission_completes;
"""
sql2 = """
SELECT COALESCE(SUM(coins), 0) AS total_coins
FROM lifeapp.la_quiz_game_results;
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
# Execute first query to get points sum
cursor.execute(sql1)
result_points = cursor.fetchone()
total_points = result_points['total_points']
@app.route('/api/histogram_level_subject_challenges_complete', methods=['POST'])
def get_histogram_data_level_subject_challenges_complete():
try:
data = request.get_json() or {}
grouping = data.get('grouping', 'monthly').lower()
status_filter = data.get('status', 'all').lower()
subject_filter = data.get('subject') # New subject filter
# Validate grouping
allowed_groupings = ['daily', 'weekly', 'monthly', 'quarterly', 'yearly',
'lifetime']
grouping = grouping if grouping in allowed_groupings else 'monthly'
# Status conditions
status_conditions = {
'submitted': "lamc.approved_at IS NULL AND lamc.rejected_at IS NULL",
'rejected': "lamc.approved_at IS NULL AND lamc.rejected_at IS NOT
NULL",
'approved': "lamc.approved_at IS NOT NULL",
'all': "1=1"
}
status_condition = status_conditions.get(status_filter, "1=1")
# Period expression
period_expressions = {
'daily': "DATE(lamc.created_at)",
'weekly': "CONCAT(YEAR(lamc.created_at), '-W', WEEK(lamc.created_at,
1))",
'monthly': "DATE_FORMAT(lamc.created_at, '%%Y-%%m')",
'quarterly': "CONCAT(YEAR(lamc.created_at), '-Q',
QUARTER(lamc.created_at))",
'yearly': "CAST(YEAR(lamc.created_at) AS CHAR)",
'lifetime': "'lifetime'"
}
period_expr = period_expressions[grouping]
connection = get_db_connection()
with connection.cursor() as cursor:
sql = f"""
SELECT
{period_expr} AS period,
COUNT(*) AS count,
las.title AS subject_title,
lal.title AS level_title
FROM lifeapp.la_mission_completes lamc
INNER JOIN lifeapp.la_missions lam ON lam.id = lamc.la_mission_id
INNER JOIN lifeapp.la_subjects las ON lam.la_subject_id = las.id
INNER JOIN lifeapp.la_levels lal ON lam.la_level_id = lal.id
WHERE lam.type = 1
AND {status_condition}
{"AND JSON_CONTAINS(las.title, %s, '$')" if subject_filter else ""}
GROUP BY period, lam.la_subject_id, lam.la_level_id
ORDER BY period, lam.la_subject_id, lam.la_level_id;
"""
params = ()
if subject_filter:
# Create JSON string for subject filter
subject_json = json.dumps({"en": subject_filter})
params = (subject_json,)
cursor.execute(sql, params)
results = cursor.fetchall()
return jsonify(results), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
if connection:
connection.close()
@app.route('/api/histogram_level_subject_jigyasa_complete', methods=['POST'])
def get_histogram_data_level_subject_jigyasa_complete():
try:
data = request.get_json() or {}
grouping = data.get('grouping', 'monthly').lower()
status_filter = data.get('status', 'all').lower()
subject_filter = data.get('subject') # New subject filter
allowed_groupings = ['daily', 'weekly', 'monthly', 'quarterly', 'yearly',
'lifetime']
grouping = grouping if grouping in allowed_groupings else 'monthly'
status_conditions = {
'submitted': "lamc.approved_at IS NULL AND lamc.rejected_at IS NULL",
'rejected': "lamc.approved_at IS NULL AND lamc.rejected_at IS NOT
NULL",
'approved': "lamc.approved_at IS NOT NULL",
'all': "1=1"
}
status_condition = status_conditions.get(status_filter, "1=1")
period_expressions = {
'daily': "DATE(lamc.created_at)",
'weekly': "CONCAT(YEAR(lamc.created_at), '-W', WEEK(lamc.created_at,
1))",
'monthly': "DATE_FORMAT(lamc.created_at, '%%Y-%%m')",
'quarterly': "CONCAT(YEAR(lamc.created_at), '-Q',
QUARTER(lamc.created_at))",
'yearly': "CAST(YEAR(lamc.created_at) AS CHAR)",
'lifetime': "'lifetime'"
}
period_expr = period_expressions[grouping]
connection = get_db_connection()
with connection.cursor() as cursor:
sql = f"""
SELECT
{period_expr} AS period,
COUNT(*) AS count,
las.title AS subject_title,
lal.title AS level_title
FROM lifeapp.la_mission_completes lamc
INNER JOIN lifeapp.la_missions lam ON lam.id = lamc.la_mission_id
INNER JOIN lifeapp.la_subjects las ON lam.la_subject_id = las.id
INNER JOIN lifeapp.la_levels lal ON lam.la_level_id = lal.id
WHERE lam.type = 5
AND {status_condition}
{"AND JSON_CONTAINS(las.title, %s, '$')" if subject_filter else ""}
GROUP BY period, lam.la_subject_id, lam.la_level_id
ORDER BY period, lam.la_subject_id, lam.la_level_id;
"""
params = ()
if subject_filter:
subject_json = json.dumps({"en": subject_filter})
params = (subject_json,)
cursor.execute(sql, params)
results = cursor.fetchall()
return jsonify(results), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
if connection:
connection.close()
@app.route('/api/histogram_level_subject_pragya_complete', methods=['POST'])
def get_histogram_data_level_subject_pragya_complete():
try:
data = request.get_json() or {}
grouping = data.get('grouping', 'monthly').lower()
status_filter = data.get('status', 'all').lower()
subject_filter = data.get('subject') # New subject filter
status_conditions = {
'submitted': "lamc.approved_at IS NULL AND lamc.rejected_at IS NULL",
'rejected': "lamc.approved_at IS NULL AND lamc.rejected_at IS NOT
NULL",
'approved': "lamc.approved_at IS NOT NULL",
'all': "1=1"
}
status_condition = status_conditions.get(status_filter, "1=1")
period_expressions = {
'daily': "DATE(lamc.created_at)",
'weekly': "CONCAT(YEAR(lamc.created_at), '-W', WEEK(lamc.created_at,
1))",
'monthly': "DATE_FORMAT(lamc.created_at, '%%Y-%%m')",
'quarterly': "CONCAT(YEAR(lamc.created_at), '-Q',
QUARTER(lamc.created_at))",
'yearly': "CAST(YEAR(lamc.created_at) AS CHAR)",
'lifetime': "'lifetime'"
}
period_expr = period_expressions[grouping]
connection = get_db_connection()
with connection.cursor() as cursor:
sql = f"""
SELECT
{period_expr} AS period,
COUNT(*) AS count,
las.title AS subject_title,
lal.title AS level_title
FROM lifeapp.la_mission_completes lamc
INNER JOIN lifeapp.la_missions lam ON lam.id = lamc.la_mission_id
INNER JOIN lifeapp.la_subjects las ON lam.la_subject_id = las.id
INNER JOIN lifeapp.la_levels lal ON lam.la_level_id = lal.id
WHERE lam.type = 6
AND {status_condition}
{"AND JSON_CONTAINS(las.title, %s, '$')" if subject_filter else ""}
GROUP BY period, lam.la_subject_id, lam.la_level_id
ORDER BY period, lam.la_subject_id, lam.la_level_id;
"""
params = ()
if subject_filter:
subject_json = json.dumps({"en": subject_filter})
params = (subject_json,)
cursor.execute(sql, params)
results = cursor.fetchall()
return jsonify(results), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
if connection:
connection.close()
@app.route('/api/histogram_topic_level_subject_quizgames_2', methods=['POST'])
def get_histogram_topic_level_subject_quizgames_2():
connection = None
try:
data = request.get_json() or {}
grouping = data.get('grouping', 'monthly').lower()
subject_filter = data.get('subject') # Get subject filter from request
connection = get_db_connection()
with connection.cursor() as cursor:
sql = f"""
SELECT
{period_expr} AS period,
COUNT(*) AS count,
las.title AS subject_title,
lal.title AS level_title
FROM lifeapp.la_quiz_games laqg
INNER JOIN lifeapp.la_subjects las ON laqg.la_subject_id = las.id
INNER JOIN lifeapp.la_topics lat ON lat.id = laqg.la_topic_id
INNER JOIN lifeapp.la_levels lal ON lat.la_level_id = lal.id
WHERE las.status = 1
AND laqg.completed_at IS NOT NULL
{"AND JSON_CONTAINS(las.title, %s, '$')" if subject_filter else
""}
GROUP BY period, laqg.la_subject_id, lat.la_level_id
ORDER BY period, las.title, lal.title;
"""
params = ()
if subject_filter:
# Create JSON string for subject filter
subject_json = json.dumps({"en": subject_filter})
params = (subject_json,)
cursor.execute(sql, params)
result = cursor.fetchall()
return jsonify(result), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
if connection:
connection.close()
@app.route('/api/mission-points-over-time', methods=['POST'])
def mission_points_over_time():
"""
Returns total mission points grouped by the requested time period.
Expects JSON payload: { "grouping": "daily" | "weekly" | "monthly" |
"quarterly" | "yearly" | "lifetime" }
"""
req = request.get_json() or {}
grouping = req.get('grouping', 'monthly')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute(query)
rows = cursor.fetchall()
finally:
conn.close()
@app.route('/api/quiz-points-over-time', methods=['POST'])
def quiz_points_over_time():
"""
Returns total mission points grouped by the requested time period.
Expects JSON payload: { "grouping": "daily" | "weekly" | "monthly" |
"quarterly" | "yearly" | "lifetime" }
"""
req = request.get_json() or {}
grouping = req.get('grouping', 'monthly')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute(query)
rows = cursor.fetchall()
finally:
conn.close()
@app.route('/api/jigyasa-points-over-time', methods=['POST'])
def jigyasa_points_over_time():
"""
Returns total mission points grouped by the requested time period.
Expects JSON payload: { "grouping": "daily" | "weekly" | "monthly" |
"quarterly" | "yearly" | "lifetime" }
"""
req = request.get_json() or {}
grouping = req.get('grouping', 'monthly')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute(query)
rows = cursor.fetchall()
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/api/pragya-points-over-time', methods=['POST'])
def pragya_points_over_time():
"""
Returns total mission points grouped by the requested time period.
Expects JSON payload: { "grouping": "daily" | "weekly" | "monthly" |
"quarterly" | "yearly" | "lifetime" }
"""
req = request.get_json() or {}
grouping = req.get('grouping', 'monthly')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute(query)
rows = cursor.fetchall()
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/api/coupon-redeems-over-time', methods=['POST'])
def coupon_redeems_over_time():
req = request.get_json() or {}
grouping = req.get('grouping', 'monthly')
if grouping == 'daily':
expr = "DATE(created_at)"
elif grouping == 'weekly':
expr = "CONCAT(YEAR(created_at), '-', LPAD(WEEK(created_at, 1), 2, '0'))"
elif grouping == 'monthly':
expr = "DATE_FORMAT(created_at, '%Y-%m')"
elif grouping == 'quarterly':
expr = "CONCAT(YEAR(created_at), '-Q', QUARTER(created_at))"
elif grouping == 'yearly':
expr = "CAST(YEAR(created_at) AS CHAR)"
else:
expr = "'Lifetime'"
sql_parts = [
f"SELECT {expr} AS period, SUM(coins) AS coins",
"FROM lifeapp.coupon_redeems"
]
if grouping != 'lifetime':
sql_parts.append(f"GROUP BY {expr}")
sql_parts.append("ORDER BY period ASC")
@app.route('/api/get-all-states', methods=['GET'])
def get_all_states():
"""
Returns a JSON payload with all distinct normalized_state values
for users of type 3 (students).
"""
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute("""
SELECT DISTINCT normalized_state
FROM (
SELECT
CASE
WHEN state IN ('Gujrat','Gujarat') THEN 'Gujarat'
WHEN state IN ('Tamilnadu','Tamil Nadu') THEN 'Tamil Nadu'
ELSE state
END AS normalized_state
FROM lifeapp.users
) AS subquery
WHERE normalized_state IS NOT NULL AND normalized_state !=2
ORDER BY normalized_state;
""")
rows = cursor.fetchall()
# Extract into a simple list of strings
states = [row['normalized_state'] for row in rows]
return jsonify({'states': states}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/demograph-students-2', methods=['POST'])
def get_demograph_students_2():
"""
Returns the count of students (type = 3) in each normalized state
grouped by a time period derived from the created_at column.
Accepts a JSON payload with key "grouping" (daily, weekly, monthly, quarterly,
yearly, lifetime)
"""
try:
data = request.get_json() or {}
grouping = data.get('grouping', 'monthly').lower()
allowed_groupings = ['daily', 'weekly', 'monthly', 'quarterly', 'yearly',
'lifetime']
if grouping not in allowed_groupings:
grouping = 'monthly'
# Build the period expression based on the grouping value using created_at
if grouping == 'daily':
period_expr = "DATE(created_at)"
elif grouping == 'weekly':
period_expr = "CONCAT(YEAR(created_at), '-W', WEEK(created_at, 1))"
elif grouping == 'monthly':
period_expr = "DATE_FORMAT(created_at, '%%Y-%%m')"
elif grouping == 'quarterly':
period_expr = "CONCAT(YEAR(created_at), '-Q', QUARTER(created_at))"
elif grouping == 'yearly':
period_expr = "YEAR(created_at)"
else: # lifetime grouping: all rows in one group
period_expr = "'lifetime'"
connection = get_db_connection()
with connection.cursor() as cursor:
# First, group by state and created_at (so each row represents one
state on a given day, week, etc.)
# Then, in an outer query, group by the computed period and state.
sql = f"""
SELECT
{period_expr} AS period,
normalized_state,
SUM(student_count) AS total_count
FROM (
SELECT
created_at,
CASE
WHEN state IN ('Gujrat', 'Gujarat') THEN 'Gujarat'
WHEN state IN ('Tamilnadu', 'Tamil Nadu') THEN 'Tamil
Nadu'
ELSE state
END AS normalized_state,
COUNT(*) AS student_count
FROM lifeapp.users
WHERE {where_clause}
GROUP BY normalized_state, created_at
) AS subquery
GROUP BY period, normalized_state
ORDER BY period, total_count DESC;
"""
cursor.execute(sql, params)
result = cursor.fetchall()
formatted_result = [
{"period": row['period'], "state": row['normalized_state'],
"count": row['total_count']}
for row in result
]
return jsonify(formatted_result), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/demograph-teachers-2', methods=['POST'])
def get_teacher_demograph_2():
"""
Returns count of teachers (type = 5) in each normalized state grouped by time
period,
based on the created_at column. Accepts a JSON payload with the key "grouping",
which
can be daily, weekly, monthly, quarterly, yearly, or lifetime.
"""
try:
data = request.get_json() or {}
grouping = data.get('grouping', 'monthly').lower()
allowed_groupings = ['daily', 'weekly', 'monthly', 'quarterly', 'yearly',
'lifetime']
if grouping not in allowed_groupings:
grouping = 'monthly'
connection = get_db_connection()
with connection.cursor() as cursor:
# First group by state and the individual created_at date, then in an
outer query group by the computed period.
sql = f"""
SELECT
{period_expr} AS period,
normalized_state,
SUM(teacher_count) AS total_count
FROM (
SELECT
created_at,
CASE
WHEN state IN ('Gujrat', 'Gujarat') THEN 'Gujarat'
WHEN state IN ('Tamilnadu', 'Tamil Nadu') THEN 'Tamil
Nadu'
ELSE state
END AS normalized_state,
COUNT(*) AS teacher_count
FROM lifeapp.users
WHERE {where_clause}
GROUP BY normalized_state, created_at
) AS subquery
GROUP BY period, normalized_state
ORDER BY period, total_count DESC;
"""
cursor.execute(sql, params)
result = cursor.fetchall()
formatted_result = [
{"period": row['period'], "state": row['normalized_state'],
"count": row['total_count']}
for row in result
]
@app.route('/api/mission_participation_rate', methods=['POST'])
def get_mission_participation_rate():
sql_total_students = """
SELECT COUNT(*) AS count FROM lifeapp.users WHERE `type` = 3;
"""
sql_mission_complete = """
SELECT COUNT(DISTINCT lamc.user_id) AS count FROM
lifeapp.la_mission_completes lamc inner join lifeapp.la_missions lam on lam.id =
lamc.la_mission_id where lam.type = 1;
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql_total_students)
total_students = cursor.fetchone()['count']
cursor.execute(sql_mission_complete)
mission_complete_user = cursor.fetchone()['count']
@app.route('/api/jigyasa_participation_rate', methods=['POST'])
def get_jigyasa_participation_rate():
sql_total_students = """
SELECT COUNT(*) AS count FROM lifeapp.users WHERE `type` = 3;
"""
sql_jigyasa_complete = """
SELECT COUNT(DISTINCT lamc.user_id) AS count FROM
lifeapp.la_mission_completes lamc inner join lifeapp.la_missions lam on lam.id =
lamc.la_mission_id where lam.type = 5;
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql_total_students)
total_students = cursor.fetchone()['count']
cursor.execute(sql_jigyasa_complete)
jigyasa_complete_user = cursor.fetchone()['count']
@app.route('/api/pragya_participation_rate', methods=['POST'])
def get_pragya_participation_rate():
sql_total_students = """
SELECT COUNT(*) AS count FROM lifeapp.users WHERE `type` = 3;
"""
sql_pragya_complete = """
SELECT COUNT(DISTINCT lamc.user_id) AS count FROM
lifeapp.la_mission_completes lamc inner join lifeapp.la_missions lam on lam.id =
lamc.la_mission_id where lam.type = 6;
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql_total_students)
total_students = cursor.fetchone()['count']
cursor.execute(sql_pragya_complete)
pragya_complete_user = cursor.fetchone()['count']
@app.route('/api/quiz_participation_rate', methods=['POST'])
def get_quiz_participation_rate():
sql_total_students = """
SELECT COUNT(*) AS count FROM lifeapp.users WHERE `type` = 3;
"""
sql_quiz_complete = """
select count(distinct user_id) as count from lifeapp.la_quiz_game_results;
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql_total_students)
total_students = cursor.fetchone()['count']
cursor.execute(sql_quiz_complete)
quiz_complete_user = cursor.fetchone()['count']
@app.route('/api/student-count-by-level-over-time', methods=['POST'])
def student_count_by_level_over_time():
req = request.get_json() or {}
grouping = req.get('grouping', 'monthly')
selected_levels = req.get('levels', ['level1', 'level2', 'level3', 'level4'])
sql = f"""
SELECT
{period_expr} AS period,
{', '.join(level_columns)}
FROM lifeapp.users
WHERE type = 3 # Students only
GROUP BY period
ORDER BY period;
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
if connection:
connection.close()
@app.route('/api/signing-user-gender', methods=['POST'])
def signing_user_gender():
# Get grouping and user_type from the request; default grouping is 'monthly'
req = request.get_json()
grouping = req.get('grouping', 'monthly')
user_type = req.get('user_type', 'All') # Add the user_type parameter
# Choose the SQL expression for grouping based on the given filter,
# doubling the percent signs to escape them for the query execution.
if grouping == 'daily':
period_expr = "DATE(created_at)"
elif grouping == 'weekly':
period_expr = "CONCAT(YEAR(created_at), '-', LPAD(WEEK(created_at, 3), 2,
'0'))"
elif grouping == 'monthly':
period_expr = "DATE_FORMAT(created_at, '%%Y-%%m')"
elif grouping == 'quarterly':
period_expr = "CONCAT(YEAR(created_at), '-Q', QUARTER(created_at))"
elif grouping == 'yearly':
period_expr = "YEAR(created_at)"
elif grouping == 'lifetime':
period_expr = "'Lifetime'"
else:
period_expr = "DATE_FORMAT(created_at, '%%Y-%%m')"
# Build the base query with a WHERE clause that always evaluates to true
where_clause = "WHERE 1=1"
params = []
# Build the final SQL query with the user type filter in place.
sql = f"""
SELECT {period_expr} AS period,
CASE
WHEN gender = 0 THEN 'Male'
WHEN gender = 1 THEN 'Female'
ELSE 'Unspecified'
END as gender_label,
COUNT(*) AS count
FROM lifeapp.users
{where_clause}
GROUP BY period, gender_label
ORDER BY period
"""
db = get_db_connection()
try:
with db.cursor() as cursor:
cursor.execute(sql, tuple(params))
results = cursor.fetchall()
finally:
db.close()
# Transform the result into an array of objects where each record is:
# { period: ..., Male: <count>, Female: <count>, Unspecified: <count> }
data_by_period = {}
for row in results:
period = row['period']
gender = row['gender_label']
count = row['count']
if period not in data_by_period:
data_by_period[period] = {'period': period, 'Male': 0, 'Female': 0,
'Unspecified': 0}
data_by_period[period][gender] = count
@app.route('/api/PBLsubmissions', methods=['POST'])
def get_PBLsubmissions():
payload = request.get_json() or {}
grouping = payload.get('grouping', 'monthly')
status = payload.get('status', 'all')
# Status filters
STATUS_CONDITIONS = {
'submitted': "lamc.approved_at IS NULL AND lamc.rejected_at IS NULL",
'approved': "lamc.approved_at IS NOT NULL",
'rejected': "lamc.rejected_at IS NOT NULL",
'all': "1"
}
# Validate inputs
if grouping not in GROUPING_SQL or status not in STATUS_CONDITIONS:
return jsonify(error='Invalid grouping or status'), 400
period_expr = GROUPING_SQL[grouping]
status_where = STATUS_CONDITIONS[status]
sql = f"""
SELECT
{period_expr} AS period,
COUNT(*) AS count
FROM lifeapp.la_mission_assigns lama
INNER JOIN lifeapp.la_missions lam
ON lam.id = lama.la_mission_id
INNER JOIN lifeapp.la_mission_completes lamc
ON lama.user_id = lamc.user_id
AND lama.la_mission_id = lamc.la_mission_id
WHERE lam.allow_for = 2
AND {status_where}
GROUP BY period
ORDER BY period;
"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute(sql)
results = cursor.fetchall()
finally:
conn.close()
return jsonify(data=results)
@app.route('/api/PBLsubmissions/total', methods=['GET'])
def get_total_PBLsubmissions():
# Returns the total count for a given status (default 'all')
# status = request.args.get('status', 'all')
# if status not in STATUS_CONDITIONS:
# return jsonify(error='Invalid status'), 400
# status_where = STATUS_CONDITIONS[status]
sql = f"""
SELECT
COUNT(*) AS total
FROM lifeapp.la_mission_assigns lama
INNER JOIN lifeapp.la_missions lam
ON lam.id = lama.la_mission_id
INNER JOIN lifeapp.la_mission_completes lamc
ON lama.user_id = lamc.user_id
AND lama.la_mission_id = lamc.la_mission_id
WHERE lam.allow_for = 2;
"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchone()
total = result.get('total', 0) if result else 0
finally:
conn.close()
return jsonify(total=total)
@app.route('/api/vision-completion-stats', methods=['GET'])
def vision_completion_stats():
# Query params
grouping = request.args.get('grouping', 'daily')
subject_id = request.args.get('subject_id', type=int)
assigned_by = request.args.get('assigned_by') # 'teacher' or 'self'
conn = get_db_connection()
try:
with conn.cursor() as cursor:
# Fetch counts grouped by period, level, subject
sql = f"""
SELECT
{period_expr} AS period,
JSON_UNQUOTE(JSON_EXTRACT(l.title, '$.en')) AS level_title,
JSON_UNQUOTE(JSON_EXTRACT(s.title, '$.en')) AS subject_title,
COUNT(DISTINCT a.user_id) AS user_count
FROM vision_question_answers a
JOIN visions v ON v.id = a.vision_id
JOIN la_levels l ON l.id = v.la_level_id
JOIN la_subjects s ON s.id = v.la_subject_id
LEFT JOIN vision_assigns vs
ON vs.vision_id = a.vision_id AND vs.student_id = a.user_id
WHERE 1=1
"""
params = []
if subject_id:
sql += " AND v.la_subject_id = %s"; params.append(subject_id)
if assigned_by == 'teacher':
sql += " AND vs.teacher_id IS NOT NULL"
elif assigned_by == 'self':
sql += " AND vs.teacher_id IS NULL"
# Format array
formatted = []
for period, levels in data.items():
formatted.append({
'period': period,
'levels': [
{
'level': lvl,
'count': info['count'],
'subjects': [{'subject': sub, 'count': c} for sub, c in
info['subjects'].items()]
}
for lvl, info in levels.items()
]
})
return jsonify({'data': formatted}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/vision-score-stats', methods=['GET'])
def vision_score_stats():
# Query params
grouping = request.args.get('grouping', 'daily') # daily, weekly, monthly,
quarterly, yearly, lifetime
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = f"""
SELECT
{period_expr} AS period,
COALESCE(SUM(a.score), 0) AS total_score
FROM vision_question_answers a
WHERE a.score IS NOT NULL
GROUP BY period
ORDER BY period
"""
cursor.execute(sql)
rows = cursor.fetchall()
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/vision-answer-summary', methods=['GET'])
def vision_answer_summary():
conn = get_db_connection()
try:
with conn.cursor() as cursor:
# Total sum of scores (ignore NULLs)
cursor.execute("SELECT COALESCE(SUM(score),0) AS total_score FROM
vision_question_answers")
total_score = cursor.fetchone()['total_score']
return jsonify({
'total_score': int(total_score),
'total_vision_answers': total_answers
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/vision-teacher-completions-summary', methods=['GET'])
def vision_teacher_completions_summary():
conn = get_db_connection()
try:
with conn.cursor() as cursor:
# Count only answers where there was a teacher assignment
sql = '''
SELECT COUNT(*) AS teacher_assigned_completions
FROM lifeapp.vision_question_answers a
JOIN lifeapp.vision_assigns vs
ON vs.vision_id = a.vision_id
AND vs.student_id = a.user_id
WHERE vs.teacher_id IS NOT NULL
'''
cursor.execute(sql)
total = cursor.fetchone()['teacher_assigned_completions']
finally:
conn.close()
###################################################################################
###################################################################################
######################## STUDENT/ DASHBOARD APIs ##################################
###################################################################################
###################################################################################
@app.route('/api/state_list', methods=['GET'])
def get_state_list():
connection = get_db_connection()
try:
@app.route('/api/city_list_teachers', methods=['POST'])
def get_city_list_teachers():
filters = request.get_json() or {}
state = filters.get('state')
if not state:
return jsonify({"error": "Query param 'state' is required"}), 400
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
SELECT DISTINCT city
FROM lifeapp.users
WHERE state = %s
AND city IS NOT NULL AND city != ''
"""
cursor.execute(sql, (state))
result = cursor.fetchall()
cities = [row['city'] for row in result]
return jsonify(cities), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/school_list', methods=['GET'])
def get_school_list():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
select distinct(name) from lifeapp.schools;
"""
cursor.execute(sql)
result = cursor.fetchall()
@app.route('/api/new_school_list', methods=['GET'])
def get_new_school_list():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
select distinct(name), id, code from lifeapp.schools;
"""
cursor.execute(sql)
result = cursor.fetchall()
if school:
if isinstance(school, list) and len(school) > 0:
if len(school) == 1:
sql += " AND ls.name = %s"
params.append(school[0])
else:
sql += " AND ls.name IN %s"
params.append(tuple(school))
elif school:
sql += " AND ls.name = %s"
params.append(school)
if city:
sql += " AND u.city = %s"
params.append(city)
if grade:
sql += " AND u.grade = %s"
params.append(grade)
if earn_coins:
if earn_coins == "0-100":
sql += " AND u.earn_coins BETWEEN 0 AND 100"
elif earn_coins == "101-500":
sql += " AND u.earn_coins BETWEEN 101 AND 500"
elif earn_coins == "501-1000":
sql += " AND u.earn_coins BETWEEN 501 AND 1000"
elif earn_coins == "1000+":
sql += " AND u.earn_coins > 1000"
if mobile_no:
sql += " AND u.mobile_no = %s"
params.append(mobile_no)
if from_date:
sql += " AND u.created_at >= %s"
params.append(from_date)
if to_date:
sql += " AND u.created_at <= %s"
params.append(to_date)
sql += " ) SELECT * FROM cte ORDER BY registered_at DESC;"
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql, tuple(params))
result = cursor.fetchall()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/add_student', methods=['POST'])
def add_student():
data = request.get_json() or {}
# Validate required
if not all([name, email, username, mobile_no]):
return jsonify({'error': 'Missing required fields'}), 400
conn = get_db_connection()
try:
# If user passed school_name instead of ID, look it up
if not school_id and school_name:
with conn.cursor() as cur:
cur.execute("SELECT id FROM lifeapp.schools WHERE name = %s",
(school_name,))
row = cur.fetchone()
if not row:
return jsonify({'error': f"Unknown school '{school_name}'"}),
400
school_id = row['id']
sql = """
INSERT INTO lifeapp.users
(name, guardian_name, email, username, mobile_no, dob,
grade, city, state, school_id, school_code, type, created_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,3,NOW())
"""
params = (
name, guardian, email, username, mobile_no, dob,
grade, city, state, school_id, school_code
)
with conn.cursor() as cur:
cur.execute(sql, params)
conn.commit()
new_id = cur.lastrowid
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/edit_student/<int:user_id>', methods=['PUT'])
def edit_student(user_id):
data = request.get_json() or {}
# Fields we allow editing
name = data.get('name')
guardian = data.get('guardian_name')
email = data.get('email')
username = data.get('username')
mobile_no = data.get('mobile_no')
dob = data.get('dob')
grade = data.get('grade')
city = data.get('city')
state = data.get('state')
school_id = data.get('school_id')
school_name = data.get('school_name')
school_code = data.get('school_code')
conn = get_db_connection()
try:
# resolve school_name → school_id if needed
if not school_id and school_name:
with conn.cursor() as cur:
cur.execute("SELECT id FROM lifeapp.schools WHERE name = %s",
(school_name,))
row = cur.fetchone()
if not row:
return jsonify({'error': f"Unknown school '{school_name}'"}),
400
school_id = row['id']
sql = f"""
UPDATE lifeapp.users
SET {', '.join(updates)}
WHERE id = %s
"""
params.append(user_id)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/delete_student', methods=['POST'])
def delete_student():
data = request.get_json() or {}
user_id = data.get('id')
if not user_id:
return jsonify({'error': 'Missing student ID'}), 400
try:
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute("DELETE FROM lifeapp.users WHERE id = %s", (user_id,))
conn.commit()
return jsonify({'success': True}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/mission-status-graph', methods=['POST'])
def get_mission_status_graph():
"""
Get mission completion statistics grouped by period and status.
"""
try:
filters = request.get_json() or {}
grouping = filters.get('grouping', 'monthly')
start_date = filters.get('start_date')
end_date = filters.get('end_date')
date_format = date_formats[grouping]
query += """
GROUP BY period, mission_status
HAVING period IS NOT NULL
ORDER BY period
"""
result = execute_query(query)
@app.route('/api/histogram_topic_level_subject_quizgames', methods=['POST'])
def get_histogram_topic_level_subject_quizgames():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
SELECT
COUNT(*) AS count,
las.title AS subject_title,
lal.title AS level_title
FROM lifeapp.la_quiz_games laqg
INNER JOIN lifeapp.la_subjects las ON laqg.la_subject_id = las.id
INNER JOIN lifeapp.la_topics lat ON lat.id = laqg.la_topic_id
INNER JOIN lifeapp.la_levels lal ON lat.la_level_id = lal.id --
use topic’s level
WHERE las.status = 1
GROUP BY laqg.la_subject_id, lat.la_level_id
ORDER BY las.title, lal.title;
"""
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
###################################################################################
###################################################################################
######################## STUDENT/ COUPON REDEEMED APIs ############################
###################################################################################
###################################################################################
@app.route('/api/coupon_titles', methods=['GET'])
def get_coupon_titles():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute("SELECT DISTINCT title FROM lifeapp.coupons ORDER BY
title")
result = cursor.fetchall()
titles = [item['title'] for item in result]
return jsonify(titles)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/coupon_redeem_search', methods=['POST'])
def fetch_coupon_redeem_list():
data = request.get_json() or {}
search = data.get('search', '')
state = data.get('state', '')
city = data.get('city', '')
school = data.get('school', '')
grade = data.get('grade', '')
coupon_title = data.get('coupon_title', '')
mobile = data.get('mobile', '')
start_date = data.get('start_date', '')
end_date = data.get('end_date', '')
# Base SQL query - note the aliasing for coupon title if needed.
sql = """
SELECT
u.name AS 'Student Name',
ls.name AS 'School Name',
u.mobile_no AS 'Mobile Number',
u.state,
u.city,
u.grade,
lc.title as 'Coupon Title',
cr.coins AS 'Coins Redeemed',
cr.user_id,
cr.created_at AS 'Coupon Redeemed Date'
FROM lifeapp.coupon_redeems cr
INNER JOIN lifeapp.users u ON u.id = cr.user_id
INNER JOIN lifeapp.schools ls ON ls.id = u.school_id
INNER JOIN lifeapp.coupons lc ON lc.id = cr.coupon_id
"""
filters = []
params = []
if search:
search_terms = search.strip().split()
if search_terms:
name_conditions = []
for term in search_terms:
name_conditions.append("u.name LIKE %s")
params.append(f"%{term}%")
filters.append(f"({' AND '.join(name_conditions)})")
# Additional filters – add only if parameter exists so that NULL or empty
values do not interfere.
if state:
filters.append("u.state = %s")
params.append(state)
if city:
filters.append("u.city = %s")
params.append(city)
if school:
filters.append("ls.name = %s")
params.append(school)
if grade:
filters.append("u.grade = %s")
params.append(grade)
if sanitized_mobile:
# Use exact match if full length (adjust 10 to your mobile number length)
if len(sanitized_mobile) == 10:
filters.append("u.mobile_no = %s")
params.append(sanitized_mobile)
else:
filters.append("u.mobile_no LIKE %s")
params.append(f"%{sanitized_mobile}%")
# Date validation
if start_date and end_date and start_date > end_date:
return jsonify({'error': 'Start date cannot be after end date'}), 400
###################################################################################
###################################################################################
######################## STUDENT/ MISSION APIs ####################################
###################################################################################
###################################################################################
@app.route('/api/student_mission_search', methods=['POST'])
def mission_search():
print("📥 Received request for /api/student_mission_search")
filters = request.get_json() or {}
mission_acceptance = filters.get('mission_acceptance')
assigned_by = filters.get('assigned_by')
from_date = filters.get('from_date')
to_date = filters.get('to_date')
page = int(filters.get('page', 1))
per_page = int(filters.get('per_page', 50))
offset = (page - 1) * per_page
sql = """
WITH cte AS (
SELECT
mc.id AS Row_ID,
m.id AS Mission_ID,
m.title AS Mission_Title,
CASE
WHEN ma.teacher_id IS NULL THEN 'Self'
ELSE t.name
END AS Assigned_By,
u.id AS Student_ID,
u.name AS Student_Name,
u.school_code AS school_code,
s.name AS School_Name,
CASE
WHEN mc.approved_at IS NOT NULL THEN 'Approved'
WHEN mc.rejected_at IS NOT NULL THEN 'Rejected'
ELSE 'Requested'
END AS Status,
mc.created_at AS Requested_At,
mc.points AS Total_Points,
mc.timing AS Each_Mission_Timing,
u.mobile_no,
u.dob,
u.grade,
u.city,
u.state,
u.address,
u.earn_coins,
u.heart_coins,
u.brain_coins,
mc.media_id,
mia.path AS media_path
FROM lifeapp.la_mission_completes mc
JOIN lifeapp.users u ON mc.user_id = u.id
JOIN lifeapp.la_missions m
ON m.id = mc.la_mission_id
LEFT JOIN lifeapp.la_mission_assigns ma
ON ma.la_mission_id = m.id AND ma.user_id = u.id
LEFT JOIN lifeapp.users t ON ma.teacher_id = t.id
LEFT JOIN lifeapp.schools s ON u.school_id = s.id
LEFT JOIN lifeapp.media mia ON mia.id = mc.media_id
)
SELECT * FROM cte
WHERE 1=1
"""
params = []
if mission_acceptance and mission_acceptance in ("Approved", "Requested",
"Rejected"):
sql += " AND cte.Status = %s"
params.append(mission_acceptance)
print(f"🟡 Filter: mission_acceptance={mission_acceptance}")
if assigned_by:
if assigned_by.lower() == "self":
sql += " AND cte.Assigned_By = 'Self'"
print(f"🟡 Filter: assigned_by=Self")
elif assigned_by.lower() == "teacher":
sql += " AND cte.Assigned_By <> 'Self'"
print(f"🟡 Filter: assigned_by=Teacher")
if from_date:
sql += " AND cte.Requested_At >= %s"
params.append(from_date)
print(f"🟡 Filter: from_date={from_date}")
if to_date:
sql += " AND cte.Requested_At <= %s"
params.append(to_date)
print(f"🟡 Filter: to_date={to_date}")
schoolCodes = filters.get('school_code')
if schoolCodes:
codes = schoolCodes if isinstance(schoolCodes, list) else [schoolCodes]
placeholders = ",".join(["%s"] * len(codes))
sql += f" AND cte.school_code IN ({placeholders})"
params.extend([int(c) for c in codes])
print(f"🟡 Filter: school_codes={codes}")
mobile_no = filters.get('mobile_no')
if mobile_no:
sql += " AND cte.mobile_no = %s"
params.append(mobile_no)
print(f"🟡 Filter: mobile_no={mobile_no}")
count_sql = f"SELECT COUNT(*) AS total FROM ({sql}) AS sub"
print(f" Count SQL: {count_sql}")
print(f" Count Params: {params}")
try:
connection = get_db_connection()
with connection.cursor() as cursor:
print(" Executing count query...")
cursor.execute(count_sql, tuple(params))
total = cursor.fetchone()['total']
print(f" Total rows: {total}")
return jsonify({
'data': rows,
'pagination': {
'total': total,
'page': page,
'per_page': per_page,
'total_pages': math.ceil(total / per_page)
}
})
except Exception as e:
print(f" Error occurred: {e}")
return jsonify({'error': str(e)}), 500
finally:
connection.close()
print(" Connection closed.")
@app.route('/api/update_mission_status', methods=['POST'])
def update_mission_status():
data = request.get_json()
row_id = data.get('row_id')
mission_id = data.get('mission_id')
student_id = data.get('student_id')
action = data.get('action')
if action == 'approve':
# 1 mark approved
print(" Marking mission as approved...")
cursor.execute("""
UPDATE la_mission_completes
SET approved_at = %s, rejected_at = NULL
WHERE id = %s AND user_id = %s
""", (now, row_id, student_id))
# 2️
fetch mission details
cursor.execute("""
SELECT
lm.la_level_id,
lm.type,
ll.mission_points,
ll.jigyasa_points,
ll.pragya_points
FROM la_missions lm
JOIN la_levels ll ON ll.id = lm.la_level_id
WHERE lm.id = %s
""", (mission_id,))
result = cursor.fetchone()
if result:
mission_type = result["type"]
if mission_type == 1:
mission_points = result["mission_points"]
elif mission_type == 5:
mission_points = result["jigyasa_points"]
elif mission_type == 6:
mission_points = result["pragya_points"]
else:
mission_points = 0
conn.commit()
print("✅ Transaction committed successfully")
return jsonify({'success': True}), 200
except Exception as e:
print(f"❌ Exception: {str(e)}")
return jsonify({'error': str(e)}), 500
finally:
conn.close()
print("🔚 Connection closed")
###################################################################################
###################################################################################
######################## STUDENT/ VISION APIs ####################################
###################################################################################
###################################################################################
@app.route('/api/vision_sessions', methods=['GET'])
def fetch_vision_sessions():
qs = request.args
page = int(qs.get('page', 1))
per_page = int(qs.get('per_page', 25))
offset = (page - 1) * per_page
qtype = qs.get('question_type')
assigned_by = qs.get('assigned_by')
date_start = qs.get('date_start')
date_end = qs.get('date_end')
school_codes = qs.getlist('school_codes')
status_filt = qs.get('status') # 'requested'|'approved'|'rejected'
base_sql = '''
SELECT
a.id AS answer_id,
v.title AS vision_title,
JSON_UNQUOTE(JSON_EXTRACT(q.question, '$.en')) AS question_title,
u.name AS user_name,
COALESCE(t.name,'self') AS teacher_name,
a.answer_text,
a.answer_option,
m.id AS media_id,
m.path AS media_path,
a.score,
a.answer_type,
a.status,
a.approved_at,
a.rejected_at,
a.created_at
FROM vision_question_answers a
JOIN visions v ON v.id = a.vision_id
JOIN vision_questions q ON q.id = a.question_id
JOIN users u ON u.id = a.user_id
LEFT JOIN vision_assigns vs
ON vs.vision_id = a.vision_id
AND vs.student_id = a.user_id
LEFT JOIN users t ON t.id = vs.teacher_id
LEFT JOIN media m ON m.id = a.media_id
LEFT JOIN lifeapp.schools s ON s.id = u.school_id
WHERE 1=1
'''
params = []
if qtype:
base_sql += ' AND a.answer_type = %s'; params.append(qtype)
if assigned_by=='teacher':
base_sql += ' AND vs.teacher_id IS NOT NULL'
elif assigned_by=='self':
base_sql += ' AND vs.teacher_id IS NULL'
if date_start:
base_sql += ' AND DATE(a.created_at) >= %s'; params.append(date_start)
if date_end:
base_sql += ' AND DATE(a.created_at) <= %s'; params.append(date_end)
if school_codes:
ph = ','.join(['%s']*len(school_codes))
base_sql += f' AND u.school_code IN ({ph})'; params += school_codes
if status_filt in ('requested','approved','rejected'):
base_sql += ' AND a.status = %s'; params.append(status_filt)
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute(base_sql, params)
rows = cursor.fetchall()
base_url = os.getenv('BASE_URL','').rstrip('/')
for r in rows:
r['media_url'] = f"{base_url}/{r['media_path']}" if r.get('media_path')
else None
return jsonify({
'page': page,
'per_page': per_page,
'data': rows
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/vision_sessions/<int:answer_id>/score', methods=['PUT'])
def update_vision_session_score(answer_id):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f" Starting score update for answer ID: {answer_id} at {now}")
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT vqa.answer_type, vqa.vision_id, vqa.user_id, v.la_level_id
FROM vision_question_answers vqa
JOIN visions v ON vqa.vision_id = v.id
WHERE vqa.id = %s
""", (answer_id,))
row = cursor.fetchone()
if not row:
print(" Vision answer not found.")
return jsonify({'error': 'Vision answer not found'}), 404
answer_type = row['answer_type']
level_id = row['la_level_id']
user_id = row['user_id']
cursor.execute("""
SELECT vision_text_image_points
FROM la_levels
WHERE id = %s
""", (level_id,))
level = cursor.fetchone()
if not level:
print("Level not found.")
return jsonify({'error': 'Level not found'}), 404
score = level['vision_text_image_points']
print(f" Vision points for level: {score}")
cursor.execute("""
UPDATE vision_question_answers
SET score = %s, updated_at = %s
WHERE id = %s
""", (score, now, answer_id))
print(" Score updated.")
cursor.execute("""
INSERT INTO coin_transactions (user_id, type, amount,
coinable_type, coinable_id, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
user_id,
7, # TYPE_VISION
score,
'App\\Models\\VisionQuestionAnswer',
answer_id,
now,
now
))
print(" Coin transaction inserted.")
cursor.execute("""
UPDATE users
SET earn_coins = earn_coins + %s,
updated_at = %s
WHERE id = %s
""", (score, now, user_id))
print(" User's coins updated.")
conn.commit()
print("All updates committed successfully.")
return jsonify({'success': True, 'coins_awarded': score}), 200
except Exception as e:
print(f" Exception occurred: {e}")
return jsonify({'error': str(e)}), 500
finally:
conn.close()
print(" Database connection closed.")
@app.route('/api/vision_sessions/<int:answer_id>/status', methods=['PUT'])
def update_vision_session_status(answer_id):
data = request.get_json() or {}
new_status = data.get('status')
print(f"Received status update request for answer_id={answer_id},
new_status={new_status}")
try:
with conn.cursor() as cursor:
# FIRST: Get user_id and vision_id BEFORE updating
cursor.execute(
"SELECT user_id, vision_id FROM vision_question_answers WHERE id =
%s",
(answer_id,)
)
result = cursor.fetchone()
if not result:
print(" Vision answer not found in DB")
return jsonify({'error': 'Vision answer not found'}), 404
user_id = result['user_id']
vision_id = result['vision_id']
print(f"Fetched: user_id={user_id}, vision_id={vision_id}")
# Update vision_question_answers
if new_status == 'approved':
sql = "UPDATE vision_question_answers SET status=%s, approved_at=%s
WHERE id=%s"
params = (new_status, now, answer_id)
vision_user_status = 'completed'
else: # rejected
sql = "UPDATE vision_question_answers SET status=%s, rejected_at=%s
WHERE id=%s"
params = (new_status, now, answer_id)
vision_user_status = 'rejected'
cursor.execute(sql, params)
print(f" vision_question_answers updated for ID {answer_id}")
# Update vision_user_statuses
status_sql = """
INSERT INTO vision_user_statuses (user_id, vision_id, status,
created_at, updated_at)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE status = %s, updated_at = %s
"""
cursor.execute(status_sql, (
user_id, vision_id, vision_user_status, now, now,
vision_user_status, now
))
conn.commit()
print(" Commit successful")
return jsonify({
'success': True,
'status': vision_user_status,
'notification_sent': notification_sent
}), 200
except Exception as e:
print(f" Exception: {e}")
return jsonify({'error': str(e)}), 500
finally:
conn.close()
print("🔚 Connection closed")
###################################################################################
###################################################################################
######################## STUDENT/ QUIZ SESSIONS APIs ##############################
###################################################################################
###################################################################################
@app.route('/api/quiz_sessions', methods=['POST'])
def get_quiz_sessions():
data = request.get_json() or {}
start_date = data.get('start_date')
end_date = data.get('end_date')
la_subject_id = data.get('la_subject_id')
la_level_id = data.get('la_level_id')
la_topic_id = data.get('la_topic_id')
# ── Pagination params ──
try:
page = max(int(data.get('page', 1)), 1)
per_page = min(int(data.get('per_page', 50)), 200) # cap at 200
except (ValueError, TypeError):
page, per_page = 1, 50
offset = (page - 1) * per_page
connection = get_db_connection()
try:
with connection.cursor() as cursor:
# base query with filters
base_sql = """
SELECT
laqg.id,
laqg.user_id,
laqg.game_code AS game_id,
las.title AS subject_title,
lal.title AS level_title,
lat.title AS topic_title,
laqg.time AS time_taken,
laqgr.total_questions,
laqgr.total_correct_answers,
laqgr.created_at,
laqgr.coins,
u.name AS user_name,
ls.name AS school_name,
u.earn_coins,
u.heart_coins,
u.brain_coins
FROM lifeapp.la_quiz_games laqg
INNER JOIN lifeapp.la_quiz_game_results laqgr
ON laqg.game_code = laqgr.la_quiz_game_id
AND laqg.user_id = laqgr.user_id
INNER JOIN lifeapp.users u
ON u.id = laqg.user_id
INNER JOIN lifeapp.la_subjects las
ON las.id = laqg.la_subject_id
INNER JOIN lifeapp.la_levels lal
ON lal.id = laqg.la_level_id
INNER JOIN lifeapp.la_topics lat
ON lat.id = laqg.la_topic_id
INNER JOIN lifeapp.schools ls
ON ls.id = u.school_id
WHERE 1=1
"""
params = []
if start_date:
base_sql += " AND laqgr.created_at >= %s"
params.append(start_date)
if end_date:
base_sql += " AND laqgr.created_at <= %s"
params.append(end_date)
if la_subject_id:
base_sql += " AND laqg.la_subject_id = %s"
params.append(la_subject_id)
if la_level_id:
base_sql += " AND laqg.la_level_id = %s"
params.append(la_level_id)
if la_topic_id:
base_sql += " AND laqg.la_topic_id = %s"
params.append(la_topic_id)
# 1) total count
count_sql = f"SELECT COUNT(*) AS total FROM ({base_sql}) AS sub"
cursor.execute(count_sql, tuple(params))
total = cursor.fetchone()['total']
# 2) unique users
user_sql = f"""
SELECT COUNT(DISTINCT laqg.user_id) AS unique_user_count
FROM lifeapp.la_quiz_games laqg
INNER JOIN lifeapp.la_quiz_game_results laqgr
ON laqg.game_code = laqgr.la_quiz_game_id
AND laqg.user_id = laqgr.user_id
INNER JOIN lifeapp.users u
ON u.id = laqg.user_id
WHERE 1=1
{ ' AND laqgr.created_at >= %s' if start_date else '' }
{ ' AND laqgr.created_at <= %s' if end_date else '' }
{ ' AND laqg.la_subject_id = %s' if la_subject_id else '' }
{ ' AND laqg.la_level_id = %s' if la_level_id else '' }
{ ' AND laqg.la_topic_id = %s' if la_topic_id else '' }
"""
cursor.execute(user_sql, tuple(params))
unique_user_count = cursor.fetchone()['unique_user_count']
# 3) unique schools
school_sql = f"""
SELECT COUNT(DISTINCT u.school_id) AS unique_school_count
FROM lifeapp.la_quiz_games laqg
INNER JOIN lifeapp.la_quiz_game_results laqgr
ON laqg.game_code = laqgr.la_quiz_game_id
AND laqg.user_id = laqgr.user_id
INNER JOIN lifeapp.users u
ON u.id = laqg.user_id
WHERE 1=1
{ ' AND laqgr.created_at >= %s' if start_date else '' }
{ ' AND laqgr.created_at <= %s' if end_date else '' }
{ ' AND laqg.la_subject_id = %s' if la_subject_id else '' }
{ ' AND laqg.la_level_id = %s' if la_level_id else '' }
{ ' AND laqg.la_topic_id = %s' if la_topic_id else '' }
"""
cursor.execute(school_sql, tuple(params))
unique_school_count = cursor.fetchone()['unique_school_count']
# 2) page of results
page_sql = base_sql + " ORDER BY laqgr.created_at DESC LIMIT %s OFFSET
%s"
cursor.execute(page_sql, tuple(params) + (per_page, offset))
rows = cursor.fetchall()
return jsonify({
"page": page,
"per_page": per_page,
"total": total,
"total_pages": math.ceil(total / per_page),
"unique_user_count": unique_user_count,
"unique_school_count": unique_school_count,
"data": rows
}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/game_questions', methods=['POST'])
def get_game_questions():
try:
data = request.get_json()
game_code = data.get("game_code")
connection = get_db_connection()
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
sql = """
WITH RECURSIVE split_questions AS (
SELECT
game_code,
TRIM(BOTH '[]' FROM questions) AS cleaned_questions,
1 AS pos,
SUBSTRING_INDEX(TRIM(BOTH '[]' FROM questions), ',', 1) AS
question_id,
SUBSTRING(TRIM(BOTH '[]' FROM questions),
LENGTH(SUBSTRING_INDEX(TRIM(BOTH '[]' FROM questions), ',', 1)) + 2) AS remaining
FROM lifeapp.la_quiz_games
WHERE questions != '0' AND game_code = %s
UNION ALL
SELECT
game_code,
cleaned_questions,
pos + 1,
SUBSTRING_INDEX(remaining, ',', 1),
SUBSTRING(remaining, LENGTH(SUBSTRING_INDEX(remaining, ',', 1))
+ 2)
FROM split_questions
WHERE remaining != ''
)
SELECT
sq.game_code,
laq.id AS question_id,
laq.title AS question_title,
laq.la_level_id,
laq.la_topic_id,
CASE
WHEN laq.type = 2 THEN 'Quiz'
WHEN laq.type = 3 THEN 'Riddle'
WHEN laq.type = 4 THEN 'Puzzle'
ELSE 'Default'
END AS game_type,
CASE
WHEN laq.question_type = 1 THEN 'Text'
WHEN laq.question_type = 2 THEN 'Image'
ELSE 'Default'
END AS question_type,
CASE
WHEN laq.answer_option_id = laqo.id THEN 1
ELSE 0
END AS is_answer,
laqo.title AS answer_option
FROM split_questions sq
INNER JOIN lifeapp.la_questions laq ON laq.id =
CAST(TRIM(sq.question_id) AS UNSIGNED)
INNER JOIN lifeapp.la_question_options laqo ON laq.id =
laqo.question_id
ORDER BY sq.game_code, sq.pos;
"""
cursor.execute(sql, (game_code,))
questions = cursor.fetchall()
@app.route('/api/game_questions_with_answers', methods=['POST'])
def get_game_questions_with_answers():
try:
data = request.get_json()
game_id = data.get("game_id")
user_id = data.get("user_id")
connection = get_db_connection()
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
sql = """
WITH RECURSIVE split_questions AS (
SELECT
game_code,
TRIM(BOTH '[]' FROM questions) AS cleaned_questions,
1 AS pos,
SUBSTRING_INDEX(TRIM(BOTH '[]' FROM questions), ',', 1) AS
question_id,
SUBSTRING(TRIM(BOTH '[]' FROM questions),
LENGTH(SUBSTRING_INDEX(TRIM(BOTH '[]' FROM questions), ',', 1)) + 2) AS remaining
FROM lifeapp.la_quiz_games
WHERE questions != '0' AND game_code = %s
UNION ALL
SELECT
game_code,
cleaned_questions,
pos + 1,
SUBSTRING_INDEX(remaining, ',', 1),
SUBSTRING(remaining, LENGTH(SUBSTRING_INDEX(remaining, ',', 1))
+ 2)
FROM split_questions
WHERE remaining != ''
)
SELECT
sq.pos AS question_position,
sq.question_id AS question_id,
laq.title AS question_title,
laqo.id AS option_id,
laqo.title AS option_text,
laqo.id = laq.answer_option_id AS is_correct_option,
ans.la_question_option_id = laqo.id AS selected_by_user,
COALESCE(ans.is_correct, 0) AS user_is_correct,
ans.coins AS coins_awarded
FROM split_questions sq
JOIN lifeapp.la_questions laq
ON laq.id = CAST(TRIM(sq.question_id) AS UNSIGNED)
JOIN lifeapp.la_question_options laqo
ON laq.id = laqo.question_id
LEFT JOIN lifeapp.la_quiz_game_question_answers ans
ON ans.la_quiz_game_id = %s
AND ans.user_id = %s
AND ans.la_question_id = laq.id
ORDER BY sq.pos, laqo.id;
"""
cursor.execute(sql, (game_id, game_id, user_id))
rows = cursor.fetchall()
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
# @app.route('/api/quiz_sessions', methods=['POST'])
# def get_quiz_sessions():
# try:
# data = request.get_json()
# page = int(data.get('page', 1))
# limit = int(data.get('limit', 10))
# offset = (page - 1) * limit
# connection = get_db_connection()
# with connection.cursor() as cursor:
# # Main paginated data
# cursor.execute(f"""
# WITH sessionized_data AS (
# SELECT
# *,
# @session_group := CASE
# WHEN TIMESTAMPDIFF(SECOND, @prev_created_at,
created_at) > 5
# OR @prev_user_id != user_id
# OR @prev_game_id != la_quiz_game_id
# THEN @session_group + 1
# ELSE @session_group
# END AS session_group,
# @prev_created_at := created_at,
# @prev_user_id := user_id,
# @prev_game_id := la_quiz_game_id
# FROM
# lifeapp.la_quiz_game_results,
# (SELECT @prev_created_at := NULL, @prev_user_id := NULL,
@prev_game_id := NULL, @session_group := 0) vars
# ORDER BY
# user_id, la_quiz_game_id, created_at
# ),
# ranked_entries AS (
# SELECT
# *,
# ROW_NUMBER() OVER (
# PARTITION BY user_id, la_quiz_game_id, session_group
# ORDER BY created_at
# ) AS session_rank
# FROM sessionized_data
# )
# SELECT
# id,
# la_quiz_game_id,
# user_id,
# total_questions,
# total_correct_answers,
# coins,
# created_at,
# updated_at
# FROM ranked_entries
# WHERE session_rank = 1
# ORDER BY user_id, la_quiz_game_id, created_at
# LIMIT %s OFFSET %s;
# """, (limit, offset))
# results = cursor.fetchall()
# except Exception as e:
# return jsonify({'error': str(e)}), 500
# finally:
# connection.close()
###################################################################################
###################################################################################
######################## TEACHER / DASHBOARD APIs #################################
###################################################################################
###################################################################################
@app.route('/api/state_list_teachers', methods=['GET'])
def get_state_list_teachers():
connection = get_db_connection()
try:
@app.route('/api/city_list', methods=['GET'])
def get_city_list():
state = request.args.get('state')
if not state:
return jsonify({"error": "Query param 'state' is required"}), 400
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
SELECT DISTINCT city
FROM lifeapp.schools
WHERE state = %s
AND deleted_at IS NULL
AND city IS NOT NULL AND city != ''
"""
cursor.execute(sql, (state,))
result = cursor.fetchall()
cities = [row['city'] for row in result]
return jsonify(cities), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/teacher_dashboard_search', methods=['POST'])
def fetch_teacher_dashboard():
filters = request.get_json() or {}
state = filters.get('state')
city = filters.get('city')
is_life_lab = filters.get('is_life_lab')
school = filters.get('school')
from_date = filters.get('from_date') # Starting date filter
to_date = filters.get('to_date') # Ending date filter
# New filters for teacher subject and grade:
teacher_subject = filters.get('teacher_subject')
teacher_grade = filters.get('teacher_grade')
board = filters.get('board')
# Start with base SQL. We join to la_teacher_grades (ltg), la_grades (lgr), and
la_sections (lsct)
sql = """
WITH cte AS (
SELECT count(*) as mission_assign_count, teacher_id
FROM lifeapp.la_mission_assigns
GROUP BY teacher_id
)
SELECT
u.id, u.name, u.email,
u.mobile_no, u.state,
u.city, ls.name as school_name, u.school_code,
cte.mission_assign_count,
CASE
WHEN ls.is_life_lab = 1 THEN 'Yes'
ELSE 'No'
END AS is_life_lab,
u.created_at, u.updated_at,
las.title,
lgr.name as grade_name,
lsct.name as section_name,
lab.name as board_name
FROM lifeapp.users u
INNER JOIN lifeapp.schools ls ON ls.id = u.school_id
LEFT JOIN cte ON cte.teacher_id = u.id
LEFT JOIN lifeapp.la_teacher_grades ltg ON ltg.user_id = u.id
LEFT JOIN lifeapp.la_subjects las on las.id = ltg.la_subject_id
LEFT JOIN lifeapp.la_grades lgr ON ltg.la_grade_id = lgr.id
LEFT JOIN lifeapp.la_sections lsct ON ltg.la_section_id = lsct.id
LEFT JOIN lifeapp.la_boards lab on u.la_board_id = lab.id
WHERE u.type = 5
"""
params = []
if state and state.strip():
sql += " AND u.state = %s"
params.append(state)
# NEW: Add filter for School ID and Mobile No.
schoolCodes = filters.get('school_code')
if schoolCodes:
# 1. Make sure it’s a Python list
codes = schoolCodes if isinstance(schoolCodes, list) else [schoolCodes]
)
"""
# OR u.school_id IN ({placeholders})
# 4a. Bind for u.school_code → cast each code to int()
params.extend([int(c) for c in codes])
# 4b. Bind for u.school_id → use the raw codes (or ints if your IDs are
numeric)
# params.extend(codes)
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/teachers-by-grade-subject-section', methods=['POST'])
def teachers_by_grade_subject_section():
"""
Returns teacher counts broken down by grade, subject, section, and board.
"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """
SELECT
lgr.id AS grade_id,
lgr.name AS grade_name,
las.id AS subject_id,
las.title,
lsct.id AS section_id,
lsct.name AS section_name,
lab.id AS board_id,
lab.name AS board_name,
COUNT(u.id) AS count
FROM lifeapp.users u
LEFT JOIN lifeapp.la_teacher_grades ltg ON ltg.user_id =
u.id
LEFT JOIN lifeapp.la_grades lgr ON ltg.la_grade_id =
lgr.id
LEFT JOIN lifeapp.la_subjects las ON las.id =
ltg.la_subject_id
LEFT JOIN lifeapp.la_sections lsct ON lsct.id =
ltg.la_section_id
LEFT JOIN lifeapp.la_boards lab ON u.la_board_id =
lab.id
WHERE u.type = 5
AND las.status = 1
GROUP BY lgr.id, las.id, lsct.id, lab.id
ORDER BY lgr.id, las.id, lsct.id, lab.id;
"""
cursor.execute(sql)
rows = cursor.fetchall()
result = []
for r in rows:
result.append({
'grade': r['grade_id'],
'subject': r['subject'],
'section': r['section_name'],
'board': r['board_name'] or 'Unspecified',
'count': r['count']
})
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/add_teacher', methods=['POST'])
def add_teacher():
"""
Expects a JSON payload with teacher details.
Example payload:
{
"name": "John Doe",
"email": "[email protected]",
"mobile_no": "9876543210",
"state": "SomeState",
"city": "SomeCity",
"school_id": "1234",
"school_code" : "3467",
"teacher_subject": "1", # subject id (as string or number)
"teacher_grade": "3", # grade number (as string, will be converted)
"teacher_section": "3" # section id
"teacher_board" : "2" # board id
}
Inserts a new teacher record (with u.type = 5) in lifeapp.users using
the current datetime for created_at and updated_at, and if teacher_subject,
teacher_grade,
and teacher_section are provided, inserts an entry into la_teacher_grades.
"""
data = request.get_json() or {}
try:
name = data.get("name")
email = data.get("email")
mobile_no = data.get("mobile_no")
state = data.get("state")
city = data.get("city")
school_id = data.get("school_id")
school_code = data.get('school_code')
connection = get_db_connection()
with connection:
with connection.cursor() as cursor:
# Insert teacher record into lifeapp.users (u.type = 5 for
teachers)
sql = """
INSERT INTO lifeapp.users
(name, email, mobile_no, state, city, school_id, school_code, type,
la_section_id, la_grade_id, la_board_id, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, 5, %s,%s, %s, NOW(), NOW())
"""
params = (name, email, mobile_no, state, city, school_id,
school_code, teacher_section, teacher_grade, teacher_board)
cursor.execute(sql, params)
teacher_id = cursor.lastrowid
@app.route('/api/teacher_update', methods=['POST'])
def update_teacher():
"""
Expects a JSON payload:
{
"id": 43253,
"name": "Updated Name",
"email": "[email protected]",
"mobile_no": "9876543210",
"state": "NewState",
"city": "NewCity",
"school_id": "1234",
"school_code" : "3467",
"teacher_subject": "1", # subject id
"teacher_grade": "3", # grade (will be converted to int)
"teacher_section": "3" # section id
"teacher_board" : "2" # board id
}
teacher_subject = data.get("teacher_subject")
teacher_grade = data.get("teacher_grade")
teacher_section = data.get("teacher_section")
teacher_board = data.get('teacher_board')
connection = get_db_connection()
with connection:
with connection.cursor() as cursor:
# Update the teacher record in users
sql = """
UPDATE lifeapp.users
SET name = %s, email = %s, mobile_no = %s, state = %s, city = %s,
school_id = %s, school_code = %s, la_section_id =%s , la_grade_id = %s, la_board_id
= %s, updated_at = %s
WHERE id = %s AND type = 5
"""
params = (name, email, mobile_no, state, city, school_id,
school_code, teacher_section, teacher_grade, teacher_board, datetime_str,
teacher_id)
cursor.execute(sql, params)
@app.route('/api/teacher_delete', methods=['POST'])
def delete_teacher():
"""
Expects a JSON payload:
{
"id": 43253
}
Deletes the teacher from lifeapp.users (type 5) and optionally its record from
la_teacher_grades.
"""
data = request.get_json() or {}
try:
teacher_id = data.get("id")
if not teacher_id:
return jsonify({"error": "Teacher id is required."}), 400
connection = get_db_connection()
with connection:
with connection.cursor() as cursor:
# Delete from la_teacher_grades first if exists.
sql2 = "DELETE FROM lifeapp.la_teacher_grades WHERE user_id = %s"
cursor.execute(sql2, (teacher_id,))
@app.route('/api/grades_list', methods=['POST'])
def get_grades_list():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
# Fetch active grades from the la_grades table
sql = """
SELECT id, name, status, created_at, updated_at
FROM lifeapp.la_grades
WHERE status = 1
"""
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result)
except Exception as e:
print("Error in get_grades_list:", str(e))
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/vision-teacher-completion-rate', methods=['GET'])
def vision_teacher_completion_rate():
conn = get_db_connection()
try:
with conn.cursor() as cursor:
# Total number of visions assigned by teachers
cursor.execute(
"SELECT COUNT(*) AS total_assigned FROM vision_assigns WHERE
teacher_id IS NOT NULL"
)
total_assigned = cursor.fetchone()['total_assigned'] or 0
# Count how many of these assignments have at least one answer (i.e.,
completed)
cursor.execute(
'''
SELECT COUNT(DISTINCT a.vision_id, a.user_id) AS completed_count
FROM vision_question_answers a
JOIN vision_assigns vs
ON vs.vision_id = a.vision_id
AND vs.student_id = a.user_id
WHERE vs.teacher_id IS NOT NULL
'''
)
completed_count = cursor.fetchone()['completed_count'] or 0
# Calculate percentage
percentage = (completed_count / total_assigned * 100) if total_assigned > 0
else 0
return jsonify({
'total_assigned': total_assigned,
'completed_count': completed_count,
'percentage': round(percentage, 2)
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
###################################################################################
###################################################################################
######################## TEACHER / CONCEPT CARTOON APIs ###########################
###################################################################################
###################################################################################
@app.route('/api/teacher_concept_cartoons', methods=['POST'])
def fetch_teacher_concept_cartoons():
filters = request.get_json() or {}
subject = filters.get('subject')
status = filters.get('status')
try:
conn = get_db_connection()
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql, tuple(params))
rows = cursor.fetchall()
base_url = os.getenv('BASE_URL')
for r in rows:
r['media_url'] = f"{base_url}/{r['media_path']}" if r.get('media_path')
else None
return jsonify(rows), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/update_concept_cartoon', methods=['POST'])
def update_concept_cartoon():
form = request.form
file = request.files.get('media')
media_id = None
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/add_concept_cartoon', methods=['POST'])
def add_concept_cartoon():
form = request.form
file = request.files.get('media')
media_id = None
if file and file.filename:
media = upload_media(file)
media_id = media['id']
sql = '''
INSERT INTO lifeapp.la_concept_cartoons
(la_subject_id, la_level_id, title, document, status, created_at,
updated_at)
VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
'''
params = [
form.get('la_subject_id'),
form.get('la_level_id'),
form.get('title'),
media_id,
int(form.get('status'))
]
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(sql, tuple(params))
new_id = cursor.lastrowid
conn.commit()
return jsonify({'id': new_id}), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/delete_concept_cartoon/<int:id>', methods=['DELETE'])
def delete_concept_cartoon(id):
try:
conn = get_db_connection()
if existing['media_path']:
s3 = boto3.client(
's3',
region_name=DO_SPACES_REGION,
endpoint_url=DO_SPACES_ENDPOINT,
aws_access_key_id=DO_SPACES_KEY,
aws_secret_access_key=DO_SPACES_SECRET
)
try:
s3.delete_object(Bucket=DO_SPACES_BUCKET,
Key=existing['media_path'])
except Exception:
pass
conn.commit()
return jsonify({'message': 'Concept cartoon deleted'}), 200
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
conn.close()
###################################################################################
###################################################################################
################# TEACHER / CONCEPT CARTOON HEADER APIs ###########################
###################################################################################
###################################################################################
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
conn.close()
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
conn.close()
# delete header
with conn.cursor() as cursor:
cursor.execute("DELETE FROM lifeapp.la_concept_cartoon_headers WHERE id
= %s", (header_id,))
conn.commit()
return jsonify({'message': 'Header and media deleted'}), 200
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
conn.close()
###################################################################################
###################################################################################
################# TEACHER / LESSON PLAN LANGUAGES APIs ############################
###################################################################################
###################################################################################
@app.route('/api/lesson_plan_language', methods=["GET"])
def fetch_lesson_plan_language():
sql = """
SELECT id, name as title,
CASE WHEN status = 1
THEN 'Publish'
ELSE 'Draft'
END as status
FROM lifeapp.la_lession_plan_languages
ORDER BY created_at DESC;
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result if result else [])
except Exception as e:
print("Error in lesson_plan_language:", str(e))
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/update_lesson_plan_language', methods=['POST'])
def update_lesson_plan_language():
data = request.get_json() or {}
try:
# assume the front end now sends status as 1 or 0
status_value = int(data.get("status", 0))
except ValueError:
# fallback if it's still a string
status_value = 1 if data.get("status") == "Publish" else 0
sql = """
UPDATE lifeapp.la_lession_plan_languages
SET name = %s,
status = %s,
updated_at = NOW()
WHERE id = %s
"""
params = (data.get("title"), status_value, data.get("id"))
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(sql, params)
conn.commit()
return jsonify({'message': 'Updated successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/add_lesson_plan_language', methods=['POST'])
def add_lesson_plan_language():
data = request.get_json()
status_value = 1 if data["status"] == "Publish" else 0
@app.route('/api/delete_lesson_plan_language/<int:id>', methods=['DELETE'])
def delete_lesson_plan_language(id):
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute("DELETE FROM lifeapp.la_lession_plan_languages WHERE id
= %s", (id,))
connection.commit()
return jsonify({'message': 'Deleted successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
###################################################################################
###################################################################################
######################## TEACHER / LESSON PLAN APIs ###############################
###################################################################################
###################################################################################
@app.route('/api/lesson_plans_search', methods=['POST'])
def fetch_lesson_plans_search():
filters = request.get_json() or {}
language = filters.get('language')
status = filters.get('status')
title = filters.get('title')
sql = """
SELECT
lalp.id,
lall.name AS language,
CASE
WHEN lalp.type = 1 THEN 'Life Lab - Demo Models'
WHEN lalp.type = 2 THEN 'Jigyasa - Self DIY Activities'
WHEN lalp.type = 3 THEN 'Pragya - DIY Activities With Life Lab
KITS'
WHEN lalp.type = 4 THEN 'Life Lab - Activities Lesson Plans'
ELSE 'Default type (None Mentioned)'
END AS type,
lalp.title AS title,
CASE
WHEN lalp.status = 1 THEN 'Published'
ELSE 'Drafted'
END AS status,
lalp.document AS media_id,
m.path AS media_path
FROM lifeapp.la_lession_plans lalp
INNER JOIN lifeapp.la_lession_plan_languages lall
ON lall.id = lalp.la_lession_plan_language_id
LEFT JOIN lifeapp.media m
on m.id = lalp.document
"""
if where_clauses:
sql += " WHERE " + " AND ".join(where_clauses)
@app.route('/api/update_lesson_plan', methods=['POST'])
def update_lesson_plan():
form = request.form
file = request.files.get('media')
media_id = None
if file and file.filename:
media = upload_media(file)
media_id = media['id']
try:
lp_id = int(form['id'])
language_id= int(form['language_id'])
plan_type = int(form['type'])
status_val = int(form['status'])
except:
return jsonify({'error':'Missing or invalid IDs'}),400
@app.route('/api/add_lesson_plan', methods=['POST'])
def add_lesson_plan():
form = request.form
file = request.files.get('media')
media_id = None
if file and file.filename:
media = upload_media(file)
media_id = media['id']
try:
language_id = int(form['language_id'])
plan_type = int(form['type'])
status_val = int(form['status'])
except:
return jsonify({'error':'Invalid form data'}), 400
sql = """
INSERT INTO lifeapp.la_lession_plans
(la_lession_plan_language_id, title, document, `type`, status, created_at,
updated_at)
VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
"""
params = (
language_id,
form.get('title','').strip(),
media_id,
plan_type,
status_val
)
try:
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute(sql, params)
new_id = cur.lastrowid
conn.commit()
return jsonify({'id': new_id}), 201
finally:
conn.close()
@app.route('/api/delete_lesson_plan/<int:lp_id>', methods=['DELETE'])
def delete_lesson_plan(lp_id):
try:
conn = get_db_connection()
# fetch media record
with conn.cursor(pymysql.cursors.DictCursor) as cur:
cur.execute("""
SELECT document AS media_id, m.path AS media_path
FROM lifeapp.la_lession_plans lp
LEFT JOIN lifeapp.media m ON lp.document=m.id
WHERE lp.id=%s
""", (lp_id,))
row = cur.fetchone()
conn.commit()
return jsonify({'message':'Deleted successfully'}),200
finally:
conn.close()
###################################################################################
###################################################################################
######################## TEACHER / WORKSHEET APIs ###############################$#
###################################################################################
###################################################################################
sql = """
SELECT
w.id,
s.title AS subject_title,
w.la_grade_id AS grade,
w.title,
w.document AS media_id,
m.path AS media_path,
CASE WHEN w.status = 1 THEN 'Published' ELSE 'Drafted' END AS status
FROM lifeapp.la_work_sheets w
INNER JOIN lifeapp.la_subjects s
ON w.la_subject_id = s.id
LEFT JOIN lifeapp.media m
ON w.document = m.id
WHERE 1=1
"""
params = []
if grade_filter:
try:
params.append(int(grade_filter))
sql += " AND w.la_grade_id = %s"
except ValueError:
pass
if status_filter:
sql += " AND w.status = %s"
params.append(1 if status_filter == 'Published' else 0)
if title_filter:
sql += " AND w.title LIKE %s"
params.append(f"%{title_filter}%")
try:
conn = get_db_connection()
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql, tuple(params))
rows = cursor.fetchall()
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
try:
la_grade_id = int(form.get('grade'))
except:
return jsonify({'error':'Invalid grade'}), 400
title = form.get('title','').strip()
status_val = 1 if form.get('status')=='Published' else 0
sql = """
INSERT INTO lifeapp.la_work_sheets
(la_subject_id, la_grade_id, title, document, status, created_at,
updated_at)
VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
"""
params = (la_subject_id, la_grade_id, title, media_id, status_val)
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(sql, params)
new_id = cursor.lastrowid
conn.commit()
return jsonify({'id': new_id}), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
try:
ws_id = int(form.get('id'))
except:
return jsonify({'error':'Missing or invalid ID'}), 400
try:
la_grade_id = int(form.get('grade'))
except:
return jsonify({'error':'Invalid grade'}), 400
title = form.get('title','').strip()
status_val = 1 if form.get('status')=='Published' else 0
if media_id:
sql = """
UPDATE lifeapp.la_work_sheets
SET la_subject_id=%s, la_grade_id=%s, title=%s, document=%s, status=%s,
updated_at=NOW()
WHERE id=%s
"""
params = (la_subject_id, la_grade_id, title, media_id, status_val, ws_id)
else:
sql = """
UPDATE lifeapp.la_work_sheets
SET la_subject_id=%s, la_grade_id=%s, title=%s, status=%s,
updated_at=NOW()
WHERE id=%s
"""
params = (la_subject_id, la_grade_id, title, status_val, ws_id)
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(sql, params)
conn.commit()
return jsonify({'message':'Worksheet updated'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
# delete worksheet
with conn.cursor() as cur:
cur.execute("DELETE FROM lifeapp.la_work_sheets WHERE id=%s", (ws_id,))
conn.commit()
return jsonify({'message':'Worksheet deleted'}),200
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}),500
finally:
conn.close()
###################################################################################
###################################################################################
######################## TEACHER / ASSESSMENT APIs ################################
###################################################################################
###################################################################################
@app.route('/api/assessments_search', methods=['POST'])
def assessments_search():
filters = request.get_json() or {}
subject_id = filters.get('subject_id', '').strip()
grade = filters.get('grade', '').strip()
title = filters.get('title', '').strip()
status = filters.get('status', '').strip()
sql = """
SELECT
a.id,
CASE WHEN a.la_subject_id = 1 THEN 'Science' ELSE 'Maths' END AS
subject,
a.la_grade_id AS grade,
a.title,
a.document AS media_id,
m.path AS media_path,
CASE WHEN a.status = 1 THEN 'Published' ELSE 'Drafted' END AS status
FROM lifeapp.la_assessments a
LEFT JOIN lifeapp.media m ON a.document = m.id
WHERE 1=1
"""
params = []
if subject_id:
try:
params.append(int(subject_id))
sql += " AND a.la_subject_id = %s"
except ValueError:
pass
if grade:
try:
params.append(int(grade))
sql += " AND a.la_grade_id = %s"
except ValueError:
pass
if title:
params.append(f"%{title}%")
sql += " AND a.title LIKE %s"
if status:
sql += " AND a.status = %s"
params.append(1 if status == "Published" else 0)
try:
conn = get_db_connection()
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql, tuple(params))
rows = cursor.fetchall()
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/add_assessment', methods=['POST'])
def add_assessment():
"""
Expects multipart/form-data:
- subject: "Science" or "Maths"
- grade: integer
- title: string
- media: file upload (optional)
- status: "Published" or "Drafted"
"""
form = request.form
file = request.files.get('media')
media_id = None
try:
la_grade_id = int(form.get('grade', '0'))
except ValueError:
return jsonify({'error': 'Invalid grade'}), 400
title = form.get('title', '').strip()
status_str = form.get('status', 'Drafted')
status_val = 1 if status_str == 'Published' else 0
# 3) Insert into DB
sql = """
INSERT INTO lifeapp.la_assessments
(la_subject_id, la_grade_id, title, document, status, created_at,
updated_at)
VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
"""
params = (la_subject_id, la_grade_id, title, media_id, status_val)
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(sql, params)
new_id = cursor.lastrowid
conn.commit()
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/update_assessment', methods=['POST'])
def update_assessment():
form = request.form
file = request.files.get('media')
media_id = None
# required
try:
assessment_id = int(form.get('id'))
except (TypeError, ValueError):
return jsonify({'error': 'Invalid or missing assessment ID'}), 400
try:
la_subject_id = int(form.get('subject_id', '0'))
except ValueError:
return jsonify({'error': 'Invalid subject ID'}), 400
try:
la_grade_id = int(form.get('grade', 0))
except ValueError:
return jsonify({'error': 'Invalid grade'}), 400
title = form.get('title', '').strip()
status_val = 1 if form.get('status') == 'Published' else 0
# build SQL
if media_id:
sql = """
UPDATE lifeapp.la_assessments
SET la_subject_id = %s,
la_grade_id = %s,
title = %s,
document = %s,
status = %s,
updated_at = NOW()
WHERE id = %s
"""
params = (la_subject_id, la_grade_id, title, media_id, status_val,
assessment_id)
else:
sql = """
UPDATE lifeapp.la_assessments
SET la_subject_id = %s,
la_grade_id = %s,
title = %s,
status = %s,
updated_at = NOW()
WHERE id = %s
"""
params = (la_subject_id, la_grade_id, title, status_val, assessment_id)
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(sql, params)
conn.commit()
return jsonify({'message': 'Assessment updated successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/delete_assessment/<int:assessment_id>', methods=['DELETE'])
def delete_assessment(assessment_id):
try:
conn = get_db_connection()
# 1) Fetch the media ID & path so we can delete the S3 file too
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute("""
SELECT a.document AS media_id, m.path AS media_path
FROM lifeapp.la_assessments a
LEFT JOIN lifeapp.media m ON a.document = m.id
WHERE a.id = %s
""", (assessment_id,))
row = cursor.fetchone()
conn.commit()
return jsonify({'message': 'Assessment and media deleted successfully'}),
200
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
conn.close()
###################################################################################
###################################################################################
####################### TEACHER/COMPETENCIES APIs #################################
###################################################################################
###################################################################################
@app.route('/admin/competencies', methods=['GET'])
def get_competencies():
"""
Fetch competencies along with subject and level information.
Optional query parameters:
- la_subject_id (filter by subject)
- status (filter by competency status; e.g. 1 for ACTIVE, 0 for DEACTIVE)
- page (for pagination, default: 1)
"""
try:
# Get filters from query string (or use defaults)
la_subject_id = request.args.get('la_subject_id')
status = request.args.get('status') # expected as a string, e.g. "1" or
"0"
page = int(request.args.get('page', 1))
per_page = 25
offset = (page - 1) * per_page
connection = get_db_connection()
with connection.cursor() as cursor:
# Build the SQL with JOINs:
sql = """
SELECT
comp.id,
comp.title AS competency_title,
comp.document,
comp.status,
comp.created_at,
comp.la_subject_id,
comp.la_level_id,
s.title AS subject_title,
l.title AS level_title,
-- raw media id
comp.document AS document_id,
connection.close()
# build full URL
base_url = os.getenv('BASE_URL', '')
for r in competencies:
r['document_url'] = (
f"{base_url}/{r['document_path']}" if r.get('document_path') else
None
)
# Return both competencies and subjects
return jsonify({"competencies": competencies, "subjects": subjects})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/admin/competencies', methods=['POST'])
def create_competency():
"""
Expects a multipart/form-data POST request (for file upload) and other fields
in form data.
Required fields: name (for competency title), la_subject_id, la_level_id,
status
Document is expected as a file input named 'document'
"""
form = request.form
document_file = request.files.get('document')
if not document_file or not document_file.filename:
return jsonify({'error': 'Document file required'}), 400
media = upload_media(document_file)
document_id = media['id']
connection = get_db_connection()
try:
cursor = connection.cursor()
sql = """
INSERT INTO lifeapp.la_competencies
(title, la_subject_id, la_level_id, status, document, created_at,
updated_at)
VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
"""
# Here 'title' is the competency title.
cursor.execute(sql, (
form.get('name'),
form.get('la_subject_id'),
form.get('la_level_id'),
form.get('status', 'ACTIVE'),
document_id
))
connection.commit()
return jsonify({'message': 'Competency created successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
cursor.close()
connection.close()
new_doc_id = None
if document_file and document_file.filename:
# Initialize S3 client
s3 = boto3.client(
's3',
region_name=DO_SPACES_REGION,
endpoint_url=DO_SPACES_ENDPOINT,
aws_access_key_id=DO_SPACES_KEY,
aws_secret_access_key=DO_SPACES_SECRET
)
cursor.execute(update_sql, params)
connection.commit()
except Exception as e:
if connection:
connection.rollback()
return jsonify({'error': str(e)}), 500
finally:
if connection:
connection.close()
# Delete a competency
@app.route('/admin/competencies/<int:competency_id>', methods=['DELETE'])
def delete_competency(competency_id):
try:
connection = get_db_connection()
with connection.cursor() as cursor:
# Get the document ID from the competency
cursor.execute("SELECT document FROM lifeapp.la_competencies WHERE id =
%s", (competency_id,))
result = cursor.fetchone()
document_id = result['document'] if result else None
connection.commit()
return jsonify({'message': 'Competency and associated media deleted
successfully'}), 200
except Exception as e:
connection.rollback()
return jsonify({'error': str(e)}), 500
finally:
connection.close()
###################################################################################
###################################################################################
####################### SCHOOLS/SCHOOL-DATA APIs ##################################
###################################################################################
###################################################################################
@app.route('/api/get_schools_data', methods=['POST'])
def get_schools_data():
"""
Returns paginated rows from lifeapp.schools with columns:
id, name, state, city, district, block, cluster, pin_code, code,
app_visible, is_life_lab, status.
Numeric flags are converted to user-friendly text.
Accepts JSON body with optional filters *and* pagination params:
- page: (int) 1-based page number, default=1
- per_page: (int) rows per page, default=50
"""
data = request.get_json() or {}
# --- Filters ---
name = data.get('name')
state = data.get('state')
city = data.get('city')
district = data.get('district')
status = data.get('status')
cluster = data.get('cluster')
block = data.get('block')
codes = data.get('code')
try:
conn = get_db_connection()
with conn.cursor() as cursor:
# Build base WHERE clause
where_clauses = ["deleted_at IS NULL"]
params = []
if status:
where_clauses.append("status = %s")
params.append(1 if status == "Active" else 0)
if district:
where_clauses.append("(district = %s OR district LIKE %s)")
params.extend([district, f"%{district}%"])
if city:
where_clauses.append("city = %s")
params.append(city)
if state:
where_clauses.append("state = %s")
params.append(state)
if name:
where_clauses.append("(name = %s OR name LIKE %s)")
params.extend([name, f"%{name}%"])
if cluster:
where_clauses.append("(cluster = %s OR cluster LIKE %s)")
params.extend([cluster, f"%{cluster}%"])
if block:
where_clauses.append("(block = %s OR block LIKE %s)")
params.extend([block, f"%{block}%"])
if codes:
# ensure list
code_list = codes if isinstance(codes, list) else [codes]
placeholders = ",".join(["%s"] * len(code_list))
where_clauses.append(f"code IN ({placeholders})")
# cast codes to int when needed
params.extend([int(c) for c in code_list])
# convert flags
for row in rows:
row["app_visible"] = "Yes" if row["app_visible"] == 1 else "No"
row["is_life_lab"] = "Yes" if row["is_life_lab"] == 1 else "No"
row["status"] = "Active" if row["status"] == 1 else "Inactive"
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/api/schools_data', methods=['POST'])
def add_school_data():
"""
Adds a new school row.
Expected JSON keys: name, state, city, district, pin_code, app_visible,
is_life_lab, status.
app_visible and is_life_lab are "Yes"/"No", and status "Active"/"Inactive".
"""
data = request.get_json() or {}
try:
connection = get_db_connection()
with connection.cursor() as cursor:
name = data.get("name")
state = data.get("state")
city = data.get("city")
district = data.get("district")
block = data.get('block')
cluster = data.get('cluster')
code = data.get('code')
pin_code = data.get("pin_code")
donor_name = data.get("donor_name") # ← new line
app_visible_val = 1 if data.get("app_visible") == "Yes" else 0
is_life_lab_val = 1 if data.get("is_life_lab") == "Yes" else 0
status_val = 1 if data.get("status") == "Active" else 0
sql = """
INSERT INTO lifeapp.schools
(name, state, city, district, pin_code, donor_name, app_visible,
is_life_lab, status, created_at, updated_at, block, cluster, code)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), %s, %s,
%s)
"""
cursor.execute(sql, (name, state, city, district, pin_code, donor_name,
app_visible_val, is_life_lab_val, status_val, block, cluster, code))
connection.commit()
return jsonify({"message": "School added successfully"}), 201
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/schools_data/<int:school_id>', methods=['PUT'])
def update_school_data(school_id):
"""
Updates an existing school row.
Expected JSON keys: name, state, city, district, pin_code, app_visible,
is_life_lab, status.
"""
data = request.get_json() or {}
try:
connection = get_db_connection()
with connection.cursor() as cursor:
name = data.get("name")
state = data.get("state")
city = data.get("city")
district = data.get("district")
cluster = data.get('cluster')
block = data.get('block')
code = data.get('code')
pin_code = data.get("pin_code")
donor_name = data.get("donor_name") # ← new line
app_visible_val = 1 if data.get("app_visible") == "Yes" else 0
is_life_lab_val = 1 if data.get("is_life_lab") == "Yes" else 0
status_val = 1 if data.get("status") == "Active" else 0
sql = """
UPDATE lifeapp.schools
SET
name = %s,
state = %s,
city = %s,
district = %s,
cluster = %s,
block = %s,
pin_code = %s,
code = %s,
app_visible = %s,
is_life_lab = %s,
status = %s,
donor_name = %s, -- add this line
updated_at = NOW()
WHERE id = %s
"""
cursor.execute(sql, (name, state, city, district, cluster, block,
pin_code, code, app_visible_val, is_life_lab_val, status_val, donor_name,
school_id))
connection.commit()
return jsonify({"message": "School updated successfully"}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/schools_data/<int:school_id>', methods=['DELETE'])
def delete_school_data(school_id):
"""
Soft-deletes a school row by setting deleted_at.
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
UPDATE lifeapp.schools
SET deleted_at = NOW()
WHERE id = %s
"""
cursor.execute(sql, (school_id,))
connection.commit()
return jsonify({"message": "School deleted successfully"}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/upload_schools_csv', methods=['POST'])
def upload_schools_csv():
if 'csv' not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files['csv']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
if not file.filename.endswith('.csv'):
return jsonify({"error": "File must be a CSV"}), 400
try:
# Read CSV file
stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None)
csv_reader = csv.DictReader(stream)
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
INSERT INTO lifeapp.schools
(name, state, city, district, block, cluster,
pin_code, code, donor_name, app_visible, is_life_lab, status,
created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(),
NOW())
"""
count = 0
for row in csv_reader:
# Map Excel columns to database fields
mapped_data = {
'name': row.get('school_name', '').strip(),
'state': row.get('state_name', '').strip(),
'city': row.get('city_name', '').strip(),
'district': row.get('district_name', '').strip(),
'block': row.get('block_name', '').strip(),
'cluster': row.get('cluster_name', '').strip(),
'pin_code': row.get('pin_code', '').strip(),
'code': row.get('school_code', '').strip(),
'donor_name': row.get('donor_name', '').strip(), # ←
new
'app_visible': row.get('app_visible', 'No').strip().lower(),
'is_life_lab': row.get('is_life_lab', 'No').strip().lower(),
'status': row.get('status', 'Active').strip().lower()
}
# Convert values
app_visible_val = 1 if mapped_data['app_visible'] == 'yes' else 0
is_life_lab_val = 1 if mapped_data['is_life_lab'] == 'yes' else 0
status_val = 1 if mapped_data['status'] == 'active' else 0
cursor.execute(sql, (
mapped_data['name'],
mapped_data['state'],
mapped_data['city'],
mapped_data['district'],
mapped_data['block'],
mapped_data['cluster'],
mapped_data['pin_code'],
mapped_data['code'],
mapped_data['donor_name'], # ← pass donor_name
app_visible_val,
is_life_lab_val,
status_val
))
count += 1
connection.commit()
return jsonify({"message": f"Successfully uploaded {count} schools"}),
201
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
if 'connection' in locals():
connection.close()
# @app.route('/api/state_list', methods=['GET'])
# def get_state_list():
# """
# Returns distinct states from the schools table.
# """
# try:
# connection = get_db_connection()
# with connection.cursor() as cursor:
# sql = """
# SELECT DISTINCT(state)
# FROM lifeapp.schools
# WHERE state IS NOT NULL AND state != '' AND state != '2'
# """
# cursor.execute(sql)
# result = cursor.fetchall()
# states = [row['state'] for row in result]
# return jsonify(states)
# except Exception as e:
# return jsonify({"error": str(e)}), 500
# finally:
# connection.close()
@app.route('/api/cities_for_state', methods=['GET'])
def get_cities_for_state():
"""
Returns distinct city values for the given state.
Example: GET /api/cities_for_state?state=Maharashtra
"""
state = request.args.get('state')
if not state:
return jsonify({"error": "Query param 'state' is required"}), 400
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
SELECT DISTINCT city
FROM lifeapp.schools
WHERE state = %s
AND deleted_at IS NULL
AND city IS NOT NULL AND city != ''
"""
cursor.execute(sql, (state,))
result = cursor.fetchall()
cities = [row['city'] for row in result]
return jsonify(cities), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
###################################################################################
###################################################################################
######################## SCHOOLS/DASHBOARD APIs ###################################
###################################################################################
###################################################################################
@app.route('/api/count_school_state_dashboard', methods= ['POST'])
def get_count_school_rate_dashboard():
connection = get_db_connection()
try:
@app.route('/api/demograph-schools', methods=['POST'])
def get_schools_demograph():
try:
data = request.get_json() or {}
grouping = data.get('grouping', 'monthly').lower()
state_filter = data.get('state', '').strip()
connection = get_db_connection()
with connection.cursor() as cursor:
# Build period expression
period_expr = {
'daily': "DATE(created_at)",
'weekly': "CONCAT(YEAR(created_at), '-W', LPAD(WEEK(created_at, 3),
2, '0'))",
'monthly': "DATE_FORMAT(created_at, '%%Y-%%m')",
'quarterly': "CONCAT(YEAR(created_at), '-Q', QUARTER(created_at))",
'yearly': "YEAR(created_at)",
'lifetime': "'lifetime'"
}[grouping]
sql = f"""
SELECT
period,
normalized_state as state,
SUM(school_count) as count
FROM (
SELECT
{period_expr} AS period,
CASE
WHEN LOWER(state) IN ('gujrat', 'gujarat') THEN
'Gujarat'
WHEN LOWER(state) IN ('tamilnadu', 'tamil nadu') THEN
'Tamil Nadu'
WHEN LOWER(state) = 'nct of delhi' THEN 'Delhi'
ELSE TRIM(LOWER(state))
END AS normalized_state,
COUNT(*) AS school_count
FROM lifeapp.schools
WHERE state IS NOT NULL
AND state != 'null'
AND state != ''
GROUP BY period, state
) AS normalized
WHERE 1=1 {state_condition}
GROUP BY period, normalized_state
ORDER BY period, count DESC
"""
except Exception as e:
app.logger.error(f"Error fetching school demographics: {str(e)}")
return jsonify({"error": "Internal server error"}), 500
finally:
if 'connection' in locals() and connection:
connection.close()
###################################################################################
###################################################################################
######################## MENTORS/DASHBOARD APIs ###################################
###################################################################################
###################################################################################
@app.route('/api/mentors', methods=['POST'])
def get_mentors():
"""Fetch list of mentors with optional filters for state, mobile number, and
mentor_code."""
try:
filters = request.get_json() or {}
state_filter = filters.get('state')
mobile_filter = filters.get('mobile_no')
mentor_code_filter = filters.get('mentor_code')
connection = get_db_connection()
with connection.cursor() as cursor:
# Base query: selecting mentors (type 4)
sql = """
SELECT id, name, email, mobile_no, pin, gender, dob, state, city
FROM lifeapp.users
WHERE `type` = 4
"""
conditions = []
params = []
if state_filter:
conditions.append(" state = %s")
params.append(state_filter)
if mobile_filter:
conditions.append(" mobile_no = %s")
params.append(mobile_filter)
if mentor_code_filter:
conditions.append(" pin = %s")
params.append(mentor_code_filter)
if conditions:
sql += " AND " + " AND ".join(conditions)
sql += " order by id desc"
cursor.execute(sql, tuple(params))
result = cursor.fetchall()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/add_mentor', methods=['POST'])
def add_mentor():
data = request.get_json() or {}
# Required fields
name = data.get('name')
mobile_no = data.get('mobile_no')
pin = data.get('pin')
# Optional fields
email = data.get('email') or None
state = data.get('state') or None
city = data.get('city') or None
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """
INSERT INTO lifeapp.users
(name, email, mobile_no, pin,
state, city, gender, dob,
`type`, created_at, updated_at)
VALUES
(%s, %s, %s, %s,
%s, %s, %s, %s,
4, NOW(), NOW())
"""
cursor.execute(sql, (
name, email, mobile_no, pin,
state, city, gender, dob
))
conn.commit()
return jsonify({"message": "Mentor added successfully"}), 201
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/api/update_mentor', methods=['POST'])
def do_update_mentor():
data = request.get_json() or {}
mentor_id = data.get('id')
if not mentor_id:
return jsonify({'error': 'Mentor ID is required.'}), 400
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """
UPDATE lifeapp.users
SET name = %s,
email = %s,
mobile_no = %s,
pin = %s,
state = %s,
city = %s,
gender = %s,
dob = %s,
updated_at= %s
WHERE id = %s
AND `type` = 4
"""
params = (
name,
email,
mobile_no,
pin,
state,
city,
gender,
dob,
updated_at,
mentor_id
)
cursor.execute(sql, params)
conn.commit()
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/delete_mentor', methods=['POST'])
def do_delete_mentor():
"""Delete a mentor from the database."""
try:
data = request.get_json()
mentor_id = data.get('id')
if not mentor_id:
return jsonify({'error': 'Mentor ID is required'}), 400
connection = get_db_connection()
with connection.cursor() as cursor:
sql = "DELETE FROM lifeapp.users WHERE id = %s AND `type` = 4"
cursor.execute(sql, (mentor_id,))
connection.commit()
@app.route('/api/upload_mentors_csv', methods=['POST'])
def upload_mentors_csv():
if 'csv' not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files['csv']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
if not file.filename.lower().endswith('.csv'):
return jsonify({"error": "File must be a CSV"}), 400
try:
stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None)
reader = csv.DictReader(stream)
required = ['name', 'email', 'mobile_no', 'mentor_code']
conn = get_db_connection()
count = 0
with conn.cursor() as cursor:
sql = """
INSERT INTO lifeapp.users
(name, email, mobile_no, pin,
`type`, created_at, updated_at)
VALUES (%s, %s, %s, %s,
4, NOW(), NOW())
"""
normalized_rows = []
for raw in reader:
# strip anything after a space in the header
norm = { k.split(' ')[0].strip(): v for k, v in raw.items() }
normalized_rows.append(norm)
for row in normalized_rows:
# skip rows missing required fields
if not all(row.get(f) for f in required):
continue
cursor.execute(sql, (
name, email, mobile_no, pin
))
count += 1
conn.commit()
return jsonify({"message": f"Successfully uploaded {count} mentors"}), 201
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
if 'conn' in locals():
conn.close()
###################################################################################
###################################################################################
######################## MENTORS/SESSIONS APIs ###################################
###################################################################################
###################################################################################
@app.route('/api/sessions', methods=['POST'])
def get_sessions():
"""Fetch all mentor sessions with user name and status."""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
SELECT
las.id,
u.name,
las.heading,
las.zoom_link,
las.zoom_password,
las.date_time,
las.status
FROM
lifeapp.la_sessions las
INNER JOIN
lifeapp.users u ON las.user_id = u.id
ORDER BY las.date_time DESC
"""
cursor.execute(sql)
sessions = cursor.fetchall()
@app.route('/api/update_session', methods=['POST'])
def update_session():
"""Update an existing session's heading, description, and status."""
try:
data = request.get_json()
session_id = data.get('id')
heading = data.get('heading')
description = data.get('description')
status = data.get('status')
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
UPDATE lifeapp.la_sessions
SET heading = %s, description = %s, status = %s
WHERE id = %s
"""
cursor.execute(sql, (heading, description, status, session_id))
connection.commit()
return jsonify({'message': 'Session updated successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/session_participants', methods=['POST'])
def get_session_participants():
"""Get all participants of a given session ID."""
try:
data = request.get_json()
session_id = data.get('session_id')
connection = get_db_connection()
with connection.cursor() as cursor:
sql = """
SELECT
u.school_id,
u.name,
u.mobile_no,
u.grade,
u.city,
u.state,
lasp.la_session_id
FROM
lifeapp.users u
INNER JOIN
lifeapp.la_session_participants lasp
ON
u.id = lasp.user_id
WHERE
lasp.la_session_id = %s;
"""
cursor.execute(sql, (session_id,))
participants = cursor.fetchall()
return jsonify(participants), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
###################################################################################
###################################################################################
################# MENTOR DASHBOARD AND SESSION APIs (UNUSED) ######################
###################################################################################
###################################################################################
@app.route('/api/mentor_dashboard_table', methods = ['POST'])
def get_mentor_dashboard_table():
sql = """
select id, name, email, mobile_no, pin as mentor_code from lifeapp.users
where `type` = 4;
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
connection.close()
# @app.route('/api/update_mentor', methods=['POST'])
# def update_mentor():
# filters = request.get_json()
# mentor_id = filters.get('id')
# if not mentor_id:
# return jsonify({'error': 'Mentor ID is required'}), 400
# try:
# connection = get_db_connection()
# cursor = connection.cursor()
# query = """
# UPDATE lifeapp.users
# SET name = %s, email = %s, mobile_no = %s, city = %s, state = %s, pin
= %s, updated_at = %s
# WHERE id = %s
# """
# values = (name, email, mobile_no, city, state, pin, updated_at,
mentor_id)
# cursor.execute(query, values)
# connection.commit()
# cursor.close()
# connection.close()
# @app.route('/api/delete_mentor', methods=['DELETE'])
# def delete_mentor():
# filters = request.get_json()
# mentor_id = filters.get('id')
# if not mentor_id:
# return jsonify({'error': 'Mentor ID is required'}), 400
# try:
# connection = get_db_connection()
# cursor = connection.cursor()
###################################################################################
@app.route('/api/student_count_by_level_778', methods=['POST'])
def student_count_by_level_778():
"""
POST body (JSON) can include an optional "level" filter.
If level is not provided or is 'all', then return counts for all levels:
level1: type=3 and grade >= 1
level2: type=3 and grade >= 6
level3: type=3 and grade >= 7
level4: type=3 and grade >= 8
try:
connection = get_db_connection()
with connection:
with connection.cursor() as cursor:
if level in [None, '', 'all']:
# Return counts for all levels
query = """
SELECT
SUM(CASE WHEN type = 3 AND grade >= 1 THEN 1 ELSE 0 END) AS
level1_count,
SUM(CASE WHEN type = 3 AND grade >= 6 THEN 1 ELSE 0 END) AS
level2_count,
SUM(CASE WHEN type = 3 AND grade >= 7 THEN 1 ELSE 0 END) AS
level3_count,
SUM(CASE WHEN type = 3 AND grade >= 8 THEN 1 ELSE 0 END) AS
level4_count
FROM lifeapp.users;
"""
cursor.execute(query)
result = cursor.fetchone()
return jsonify(result)
else:
# For a specified level, choose condition accordingly.
if level == "1":
cond = "grade >= 1"
elif level == "2":
cond = "grade >= 6"
elif level == "3":
cond = "grade >= 7"
elif level == "4":
cond = "grade >= 8"
else:
return jsonify({"error": "Invalid level parameter"}), 400
query = f"""
SELECT COUNT(*) AS count
FROM lifeapp.users
WHERE type = 3 AND {cond};
"""
cursor.execute(query)
result = cursor.fetchone()
return jsonify({
'level1_count': result[0],
'level2_count': result[1],
'level3_count': result[2],
'level4_count': result[3]
})
except Exception as e:
return jsonify({"error": str(e)}), 500
###################################################################################
###################################################################################
###################################################################################
##################### RESOURCES/STUDENT_RELATED/MISSION APIs ######################
###################################################################################
###################################################################################
@app.route('/api/missions_resource', methods=['POST'])
def get_missions_resource():
try:
filters = request.json
connection = get_db_connection()
with connection.cursor() as cursor:
base_query = """
SELECT
lam.id,
lam.title,
lam.description,
lam.question,
lam.type,
CASE WHEN lam.allow_for=1 THEN 'All' ELSE 'Teacher' END AS
allow_for,
lam.la_subject_id AS subject_id,
las.title AS subject,
lam.la_level_id AS level_id,
lal.title AS level,
lam.status AS status,
lam.index AS mission_index,
JSON_UNQUOTE(JSON_EXTRACT(lam.image, '$.en')) AS
image_id,
mimg.path AS image_path,
lamr.id AS resource_id,
document.id AS media_id,
document.path AS resource_path,
lamr.index AS idx
FROM lifeapp.la_missions lam
JOIN lifeapp.la_subjects las ON las.id = lam.la_subject_id
JOIN lifeapp.la_levels lal ON lal.id = lam.la_level_id
LEFT JOIN lifeapp.media mimg ON mimg.id =
JSON_UNQUOTE(JSON_EXTRACT(lam.image, '$.en'))
LEFT JOIN lifeapp.la_mission_resources lamr ON lamr.la_mission_id =
lam.id
LEFT JOIN lifeapp.media document ON document.id = lamr.media_id
WHERE 1=1
"""
params = []
if filters.get('status'):
base_query += " AND lam.status = %s"
params.append(filters['status'])
if filters.get('type'):
base_query += " AND lam.type = %s"
params.append(filters['type'])
if filters.get('subject'):
base_query += " AND lam.la_subject_id = %s"
params.append(filters['subject'])
if filters.get('level'):
base_query += " AND lam.la_level_id = %s"
params.append(filters['level'])
@app.route('/api/add_mission', methods=['POST'])
def add_mission():
try:
logging.info("===== STARTING ADD MISSION REQUEST =====")
connection = get_db_connection()
with connection.cursor() as cursor:
form = request.form
files = request.files
logging.info(f"Form data received: {form}")
logging.info(f"Files received: {files}")
# Prepare SQL
sql = """
INSERT INTO lifeapp.la_missions
(la_subject_id, la_level_id, type, allow_for,
title, description, question,
image, document, `index`,
created_at, updated_at, status)
VALUES
(%s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s,
NOW(), NOW(), %s)
"""
params = (
subject, level, type_id, allow_for,
title_json, description_json, question_json,
image_json, document_json,
index_value,
status
)
except Exception as e:
logging.error(f"Error in add_mission: {str(e)}", exc_info=True)
return jsonify({"error": str(e)}), 500
finally:
connection.close()
logging.info("Database connection closed")
@app.route('/api/delete_mission', methods=['POST'])
def delete_mission():
try:
connection = get_db_connection()
with connection.cursor() as cursor:
mission_id = request.json.get('id')
if not mission_id:
return jsonify({'error': 'Missing mission id'}), 400
if not mission:
return jsonify({'error': 'Mission not found'}), 404
image_id = mission.get('image_id')
# document_id = mission.get('document_id')
connection.commit()
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/update_mission', methods=['POST'])
def update_mission():
try:
logging.info("===== STARTING UPDATE MISSION REQUEST =====")
connection = get_db_connection()
with connection.cursor() as cursor:
form = request.form
files = request.files
logging.info(f"Form data received: {form}")
logging.info(f"Files received: {files}")
if old_img:
logging.info("Deleting old image")
cursor.execute("DELETE FROM lifeapp.media WHERE id =
JSON_UNQUOTE(JSON_EXTRACT(%s, '$.en'))", (old_img,))
update_sql += " WHERE id = %s"
params.append(mission_id)
logging.info(f"Final update query: {update_sql}")
logging.info(f"Query params: {params}")
cursor.execute(update_sql, tuple(params))
logging.info("Mission data updated successfully")
cursor.execute(
"DELETE FROM lifeapp.la_mission_resources WHERE la_mission_id =
%s",
(mission_id,)
)
logging.info("Old resource references deleted")
connection.commit()
logging.info("Transaction committed successfully")
return jsonify({"success": True}), 200
except Exception as e:
logging.error(f"Error in update_mission: {str(e)}", exc_info=True)
return jsonify({"error": str(e)}), 500
finally:
connection.close()
logging.info("Database connection closed")
###################################################################################
###################################################################################
############## RESOURCES/STUDENT_RELATED/QUIZ questions APIs ######################
###################################################################################
###################################################################################
# @app.route('/api/quiz_questions', methods=['POST'])
# def get_quiz_questions():
# data = request.get_json() or {}
# subject_id = data.get('subject_id')
# level_id = data.get('level_id')
# status = data.get('status')
# connection = get_db_connection()
# try:
# with connection.cursor() as cursor:
# base_query = """
# SELECT laq.id, laq.title as question_title, lal.title as
level_title, las.title as subject_title,
# CASE WHEN laq.status = 0 THEN 'Inactive' ELSE 'Active' END as
status,
# laq.la_topic_id,
# CASE
# WHEN laq.type = 2 THEN 'Quiz'
# WHEN laq.type = 3 THEN 'Riddle'
# WHEN laq.type = 4 THEN 'Puzzle'
# ELSE 'Default'
# END as game_type,
# CASE
# WHEN laq.question_type = 1 THEN 'Text'
# WHEN laq.question_type = 2 THEN 'Image'
# ELSE 'Default'
# END as question_type,
# CASE
# WHEN laq.answer_option_id = laqo.id THEN 1 ELSE 0
# END as is_answer,
# laqo.title as answer_option
# FROM lifeapp.la_question_options laqo
# INNER JOIN lifeapp.la_questions laq ON laq.id = laqo.question_id
# INNER JOIN lifeapp.la_levels lal ON lal.id = laq.la_level_id
# INNER JOIN lifeapp.la_subjects las ON las.id = laq.la_subject_id
# WHERE 1 = 1
# """
# filters = []
# if subject_id:
# base_query += " AND laq.la_subject_id = %s"
# filters.append(subject_id)
# if level_id:
# base_query += " AND laq.la_level_id = %s"
# filters.append(level_id)
# return jsonify(results)
# except Exception as e:
# return jsonify({'error': str(e)}), 500
# finally:
# connection.close()
@app.route('/api/quiz_questions', methods=['POST'])
def get_quiz_questions():
data = request.get_json() or {}
subject_id = data.get('subject_id')
level_id = data.get('level_id')
status = data.get('status')
topic_id = data.get('topic_id')
game_type_filter = data.get('type')
connection = get_db_connection()
try:
with connection.cursor() as cursor:
base_query = """
SELECT laq.id, laq.title as question_title, lal.title as
level_title, las.title as subject_title,
CASE WHEN laq.status = 0 THEN 'Inactive' ELSE 'Active' END as
status, laq.index,
laq.la_topic_id,
lat.title as topic_title,
CASE
WHEN laq.type = 2 THEN 'Quiz'
WHEN laq.type = 3 THEN 'Riddle'
WHEN laq.type = 4 THEN 'Puzzle'
ELSE 'Default'
END as game_type,
CASE
WHEN laq.question_type = 1 THEN 'Text'
WHEN laq.question_type = 2 THEN 'Image'
ELSE 'Default'
END as question_type,
CASE
WHEN laq.answer_option_id = laqo.id THEN 1 ELSE 0
END as is_answer,
laqo.title as answer_option
FROM lifeapp.la_question_options laqo
INNER JOIN lifeapp.la_questions laq ON laq.id = laqo.question_id
INNER JOIN lifeapp.la_levels lal ON lal.id = laq.la_level_id
INNER JOIN lifeapp.la_subjects las ON las.id = laq.la_subject_id
LEFT JOIN lifeapp.la_topics lat ON laq.la_topic_id = lat.id
WHERE 1 = 1
"""
filters = []
if subject_id:
base_query += " AND laq.la_subject_id = %s"
filters.append(subject_id)
if level_id:
base_query += " AND laq.la_level_id = %s"
filters.append(level_id)
if status is not None and status != "" and status.lower() != "all":
base_query += " AND laq.status = %s"
filters.append(status)
if topic_id:
base_query += " AND laq.la_topic_id = %s"
filters.append(topic_id)
if game_type_filter:
base_query += " AND laq.type = %s"
filters.append(game_type_filter)
cursor.execute(base_query, filters)
results = cursor.fetchall()
return jsonify(results)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/add_quiz_question', methods=['POST'])
def add_quiz_question():
"""
Expects JSON payload, where:
- question_title is a JSON string (e.g. "{\"en\":\"...\"}")
- options[].title is a JSON string (e.g. "{\"en\":\"Option 1\"}")
"""
data = request.get_json() or {}
# Required
try:
question_json = data['question_title']
subject_id = data['subject_id']
level_id = data['level_id']
options = data['options']
except KeyError as e:
return jsonify({"error": f"Missing field: {e}"}), 400
# Optional / defaults
topic_id = data.get('topic_id', 1)
created_by = data.get('created_by', 1)
question_type = data.get('question_type', 1)
game_type = data.get('type', 2)
status = data.get('status', 1)
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
# 1) Insert question
sql_q = """
INSERT INTO lifeapp.la_questions
(title, la_subject_id, la_level_id, la_topic_id,
created_by, question_type, `type`, status,
created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql_q, (
question_json,
subject_id,
level_id,
topic_id,
created_by,
question_type,
game_type,
status,
now,
now
))
question_id = cursor.lastrowid
# 2) Insert options
sql_o = """
INSERT INTO lifeapp.la_question_options
(question_id, title, created_at, updated_at)
VALUES (%s, %s, %s, %s)
"""
answer_option_id = None
for opt in options:
title_json = opt['title']
cursor.execute(sql_o, (question_id, title_json, now, now))
opt_id = cursor.lastrowid
if opt.get('is_correct'):
answer_option_id = opt_id
conn.commit()
return jsonify({"success": True, "question_id": question_id}), 201
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/api/update_quiz_question/<int:question_id>', methods=['PUT'])
def update_quiz_question(question_id):
data = request.get_json() or {}
# required
qt_raw = data.get('question_title', '')
subject = data.get('subject_id')
level = data.get('level_id')
options = data.get('options', [])
# optional
topic = data.get('topic_id', 1)
conn = get_db_connection()
try:
with conn.cursor() as cursor:
# 1) update question row
cursor.execute("""
UPDATE lifeapp.la_questions
SET
title = %s,
la_subject_id = %s,
la_level_id = %s,
la_topic_id = %s,
status = %s,
question_type = %s,
`type` = %s,
updated_at = %s
WHERE id = %s
""", (
question_json,
subject, level, topic,
status, q_type, game_type,
now, question_id
))
conn.commit()
cursor.execute("""
INSERT INTO lifeapp.la_question_options
(question_id, title, created_at, updated_at)
VALUES (%s, %s, %s, %s)
""", (question_id, opt_json, now, now))
oid = cursor.lastrowid
if is_correct:
answer_option_id = oid
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/api/delete_quiz_question/<int:question_id>', methods=['DELETE'])
def delete_quiz_question(question_id):
"""
Deletes the question with the given ID and all its options.
"""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
# 1) Remove all options for this question
cursor.execute(
"DELETE FROM lifeapp.la_question_options WHERE question_id = %s",
(question_id,)
)
connection.commit()
except Exception as e:
# Log the error if you have a logging setup, then return
return jsonify({"error": str(e)}), 500
finally:
connection.close()
@app.route('/api/download_quiz_template', methods=['GET'])
def download_quiz_template():
# a simple 2-row example; you can expand it if you like
csv_content = """questions,option1,option2,option3,option4,answer
how are you?,good,bad,worst,ok,option1
how old are you?,ten,eleven,twelve,thirteen,option3
"""
return Response(
csv_content,
mimetype='text/csv',
headers={'Content-Disposition': 'attachment; filename=quiz_template.csv'}
)
@app.route('/api/import_quiz_questions_csv', methods=['POST'])
def import_quiz_questions_csv():
subject_id = request.form.get('subject_id')
level_id = request.form.get('level_id')
topic_id = request.form.get('topic_id')
f = request.files.get('file')
try:
# read CSV
stream = io.StringIO(f.stream.read().decode('utf-8').replace('\u2011',
'-'))
reader = csv.DictReader(stream)
inserted = 0
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
conn = get_db_connection()
with conn.cursor() as cursor:
for idx, row in enumerate(reader, 1):
q_text = row.get('questions', '').strip()
if not q_text:
print(f"[WARN] Row {idx}: Skipped empty question")
continue
# 1) Insert question
q_json = json.dumps({'en': q_text})
cursor.execute("""
INSERT INTO lifeapp.la_questions
(title, la_subject_id, la_level_id, la_topic_id,
created_by, question_type, `type`, status,
created_at, updated_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", (
q_json,
subject_id,
level_id,
topic_id,
1, # created_by
1, # question_type = Text
2, # game_type = Quiz
1, # status = Active
now,
now
))
qid = cursor.lastrowid
# 2) Insert options
answer_key = row.get('answer', '').strip().lower()
correct_idx = None
if answer_key.startswith('option'):
try:
correct_idx = int(answer_key.replace('option', '')) - 1
except:
print(f"[WARN] Row {idx}: Invalid answer format
'{answer_key}'")
answer_option_id = None
for opt_idx, col in enumerate(['option1', 'option2', 'option3',
'option4']):
txt = row.get(col, '').strip()
opt_json = json.dumps({'en': txt})
cursor.execute("""
INSERT INTO lifeapp.la_question_options
(question_id, title, created_at, updated_at)
VALUES (%s,%s,%s,%s)
""", (qid, opt_json, now, now))
oid = cursor.lastrowid
if correct_idx is not None and opt_idx == correct_idx:
answer_option_id = oid
inserted += 1
conn.commit()
print(f"[SUCCESS] Inserted {inserted} questions successfully.")
return jsonify({'success': True, 'inserted': inserted}), 200
except Exception as e:
print(f"[ERROR] Exception occurred: {str(e)}")
return jsonify({'error': str(e)}), 500
finally:
conn.close()
print("[INFO] Database connection closed.")
subject_id = request.form.get('subject_id')
level_id = request.form.get('level_id')
topic_id = request.form.get('topic_id')
f = request.files.get('file')
if not (subject_id and level_id and topic_id and f):
return jsonify({'error':'Missing subject_id, level_id, topic_id or
file'}),400
# read CSV
stream = io.StringIO(f.stream.read().decode('utf-8'))
reader = csv.DictReader(stream)
inserted = 0
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
for row in reader:
q_text = row.get('questions','').strip()
if not q_text:
continue
# 1) insert question
q_json = json.dumps({'en': q_text})
cursor.execute("""
INSERT INTO lifeapp.la_questions
(title, la_subject_id, la_level_id, la_topic_id,
created_by, question_type, `type`, status,
created_at, updated_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", (
q_json,
subject_id,
level_id,
topic_id,
1, # created_by
1, # question_type = Text
2, # game_type = Quiz
1, # status = Active
now,
now
))
qid = cursor.lastrowid
# 2) insert options
answer_key = row.get('answer','').strip().lower() # e.g. 'option1'
correct_idx = None
if answer_key.startswith('option'):
try:
correct_idx = int(answer_key.replace('option','')) - 1
except:
correct_idx = None
answer_option_id = None
for idx, col in
enumerate(['option1','option2','option3','option4']):
txt = row.get(col,'').strip()
opt_json = json.dumps({'en': txt})
cursor.execute("""
INSERT INTO lifeapp.la_question_options
(question_id, title, created_at, updated_at)
VALUES (%s,%s,%s,%s)
""", (qid, opt_json, now, now))
oid = cursor.lastrowid
if correct_idx is not None and idx == correct_idx:
answer_option_id = oid
inserted += 1
conn.commit()
return jsonify({'success': True, 'inserted': inserted}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/updated_today', methods=['POST'])
def get_questions_updated_today():
"""
Returns a list of question IDs that were updated today.
Optional filters: subject_id, level_id, topic_id, type
"""
data = request.get_json() or {}
subject_id = data.get('subject_id')
level_id = data.get('level_id')
topic_id = data.get('topic_id')
game_type = data.get('type')
today = datetime.now().strftime('%Y-%m-%d')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
query = """
SELECT id, title, updated_at
FROM lifeapp.la_questions
WHERE DATE(updated_at) = %s
"""
filters = [today]
if subject_id:
query += " AND la_subject_id = %s"
filters.append(subject_id)
if level_id:
query += " AND la_level_id = %s"
filters.append(level_id)
if topic_id:
query += " AND la_topic_id = %s"
filters.append(topic_id)
if game_type:
query += " AND `type` = %s"
filters.append(game_type)
cursor.execute(query, filters)
rows = cursor.fetchall()
return jsonify(rows), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@app.route('/api/options_updated_today', methods=['POST'])
def get_options_updated_today():
"""
Returns list of question options that were updated today.
Optional filters: question_id, subject_id, level_id, topic_id, type
"""
data = request.get_json() or {}
question_id = data.get('question_id')
subject_id = data.get('subject_id')
level_id = data.get('level_id')
topic_id = data.get('topic_id')
game_type = data.get('type')
today = datetime.now().strftime('%Y-%m-%d')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
query = """
SELECT laqo.id, laqo.title, laqo.updated_at,
laqo.question_id, laq.title AS question_title
FROM lifeapp.la_question_options laqo
JOIN lifeapp.la_questions laq ON laqo.question_id = laq.id
WHERE DATE(laqo.updated_at) = %s
"""
filters = [today]
if question_id:
query += " AND laqo.question_id = %s"
filters.append(question_id)
if subject_id:
query += " AND laq.la_subject_id = %s"
filters.append(subject_id)
if level_id:
query += " AND laq.la_level_id = %s"
filters.append(level_id)
if topic_id:
query += " AND laq.la_topic_id = %s"
filters.append(topic_id)
if game_type:
query += " AND laq.type = %s"
filters.append(game_type)
cursor.execute(query, filters)
rows = cursor.fetchall()
return jsonify(rows), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/delete_today_questions', methods=['DELETE'])
def delete_today_questions_and_options():
"""
Deletes all questions and their options that were created or updated today.
"""
today = datetime.now().strftime('%Y-%m-%d')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
# 1. Get all question IDs updated or created today
cursor.execute("""
SELECT id FROM lifeapp.la_questions
WHERE DATE(created_at) = %s OR DATE(updated_at) = %s
""", (today, today))
question_ids = [row['id'] for row in cursor.fetchall()]
if not question_ids:
return jsonify({"message": "No questions found for today"}), 200
conn.commit()
return jsonify({
"success": True,
"message": f"Deleted {len(question_ids)} question(s) and their
options"
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
###################################################################################
###################################################################################
############## RESOURCES/STUDENT_RELATED/VISIONS APIs ######################
###################################################################################
###################################################################################
# 1. Fetch Visions + Questions (with filters and pagination)
@app.route('/api/visions', methods=['GET'])
def fetch_visions():
qs = request.args
status = qs.get('status')
subject = qs.get('subject_id')
level = qs.get('level_id')
page = int(qs.get('page', 1))
per_page = int(qs.get('per_page', 30))
conn = get_db_connection()
try:
with conn.cursor() as cursor:
# Build count query
count_sql = "SELECT COUNT(DISTINCT v.id) AS total FROM lifeapp.visions
v WHERE 1=1"
count_params = []
if status in ('0','1'):
count_sql += " AND v.status=%s"
count_params.append(int(status))
if subject:
count_sql += " AND v.la_subject_id=%s"
count_params.append(subject)
if level:
count_sql += " AND v.la_level_id=%s"
count_params.append(level)
cursor.execute(count_sql, count_params)
total = cursor.fetchone()['total']
cursor.execute(sql, params)
rows = cursor.fetchall()
# Group visions
visions = {}
for r in rows:
vid = r['vision_id']
if vid not in visions:
visions[vid] = {
'vision_id': vid,
'title': r['title'],
'description': r['description'],
'youtube_url': r['youtube_url'],
'allow_for': r['allow_for'],
'subject': r['subject'],
'level': r['level'],
'status': r['status'],
'index': r['idx'],
'questions': []
}
# Only add question if it exists
if r['question_id']:
# Safely parse options JSON
options = None
if r['options']:
try:
options = json.loads(r['options'])
except:
options = None
visions[vid]['questions'].append({
'question_id': r['question_id'],
'question_type': r['question_type'],
'question': r['question'],
'options': options,
'correct_answer': r['correct_answer']
})
return jsonify({
'visions': list(visions.values()),
'total': total
}), 200
except Exception as e:
app.logger.error(f"Error fetching visions: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500
finally:
conn.close()
required =
['title','description','allow_for','subject_id','level_id','status','questions']
for f in required:
if f not in data:
print(f"❌ [ADD] Missing field: {f}")
return jsonify({'error': f'Missing field {f}'}), 400
(title,description,youtube_url,allow_for,la_subject_id,la_level_id,status,created_a
t,updated_at,`index`)
VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
vparams = (
json.dumps({'en':data['title']}),
json.dumps({'en':data['description']}),
data.get('youtube_url'),
int(data['allow_for']),
data['subject_id'],
data['level_id'],
int(data['status']),
now, now,
int(data.get('index', 1))
)
conn = get_db_connection()
try:
with conn.cursor() as cur:
print("🚀 [ADD] Inserting Vision...")
cur.execute(vsql, vparams)
vid = cur.lastrowid
print("✅ [ADD] Vision inserted with ID:", vid)
for i, q in enumerate(data['questions']):
print(f"📘 [ADD] Inserting Question {i+1}:
Type={q['question_type']}")
qsql = """INSERT INTO lifeapp.vision_questions
(vision_id,question,question_type,options,correct_answer,created_at,updated_at)
VALUES(%s,%s,%s,%s,%s,%s,%s)"""
qparams = (
vid,
json.dumps({'en': q['question']}),
q['question_type'],
json.dumps(q.get('options')) if q.get('options') else None,
q.get('correct_answer'),
now, now
)
cur.execute(qsql, qparams)
conn.commit()
print("🎉 [ADD] All questions saved")
return jsonify({'vision_id': vid}), 201
except Exception as e:
print("🔥 [ADD] Error:", str(e))
return jsonify({'error': str(e)}), 500
finally:
conn.close()
required = [
'title','description','youtube_url',
'allow_for','subject_id','level_id','status',
'questions'
]
for f in required:
if f not in data:
print(f"❌ [EDIT] Missing field: {f}")
return jsonify({'error': f'Missing field {f}'}), 400
for i, q in enumerate(data['questions']):
print(f"📘 [EDIT] Inserting Question {i+1}:
Type={q['question_type']}")
cursor.execute("""
INSERT INTO lifeapp.vision_questions
(vision_id, question, question_type,
options, correct_answer,
created_at, updated_at)
VALUES (%s,%s,%s,%s,%s,%s,%s)
""", (
vision_id,
json.dumps({'en': q['question']}),
q['question_type'],
json.dumps(q.get('options')) if q.get('options') else None,
q.get('correct_answer'),
now, now
))
conn.commit()
print(f"🎉 [EDIT] Vision ID {vision_id} updated successfully")
return jsonify({'success': True}), 200
except Exception as e:
print("🔥 [EDIT] Error:", str(e))
return jsonify({'error': str(e)}), 500
finally:
conn.close()
###################################################################################
###################################################################################
######################## SETTINGS/SUBJECTS APIs ###################################
###################################################################################
###################################################################################
@app.route('/api/subjects_list', methods=['POST'])
def get_subjects():
"""Fetch subjects with an optional status filter.
Expects a JSON payload with key 'status' that can be "1", "0", or "all".
"all" returns all subjects.
"""
connection = get_db_connection()
try:
data = request.get_json() or {}
status = data.get('status', 'all') # Default to "all" if not provided
return jsonify(subjects)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/subjects_new', methods=['POST'])
def create_subject():
"""
Expects multipart/form-data:
- title: valid JSON string, e.g. '{"en":"Science"}'
- heading: valid JSON string
- created_by: integer user ID
- image: file upload (optional)
"""
form = request.form
file = request.files.get('image')
image_id = None
try:
created_by = int(user_id)
except ValueError:
return jsonify({'error': 'created_by must be an integer'}), 400
sql = """
INSERT INTO lifeapp.la_subjects
(created_by, title, heading, image, status, `index`, created_at,
updated_at)
VALUES
(%s,
%s, -- title JSON
%s, -- heading JSON
JSON_OBJECT('en', %s), -- image as JSON {"en": media_id}
%s, -- default active
1, -- default index
NOW(),
NOW()
)
"""
params = (
created_by,
title,
heading,
image_id,
status
)
try:
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute(sql, params)
conn.commit()
return jsonify({'message': 'Subject created successfully'}), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/subjects/<int:subject_id>', methods=['PUT'])
def update_subject(subject_id):
"""
Expects multipart/form-data:
- title: valid JSON string
- heading: valid JSON string
- created_by: integer user ID (foreign key)
- image: file upload (optional)
"""
form = request.form
file = request.files.get('image')
new_image_id = None
# ————————————————————————————————————————————
# Map role‐name → user ID (same as in create_subject)
role_to_id = {"Admin": 1, "Mentor": 4, "Teacher": 5}
if created_by in role_to_id:
created_by = role_to_id[created_by]
else:
# if it wasn’t one of the roles, try to parse it as an integer
try:
created_by = int(created_by)
except (ValueError, TypeError):
return jsonify({'error': 'created_by must be an integer or valid role
name'}), 400
# ————————————————————————————————————————————
try:
created_by = int(created_by)
except ValueError:
return jsonify({'error': 'created_by must be an integer'}), 400
# Build an UPDATE that only replaces image if a new file was provided
sql = """
UPDATE lifeapp.la_subjects
SET
created_by = %s,
title = %s,
heading = %s,
status = %s
"""
params = [
created_by,
title,
heading,
status,
]
if new_image_id is not None:
sql += ", image = JSON_OBJECT('en', %s)"
params.append(new_image_id)
try:
conn = get_db_connection()
with conn.cursor() as cur:
cur.execute(sql, params)
conn.commit()
return jsonify({'message': 'Subject updated successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/subjects/<int:subject_id>', methods=['DELETE'])
def delete_subject(subject_id):
try:
conn = get_db_connection()
# 1) Fetch the media ID (from JSON) and S3 key
with conn.cursor(pymysql.cursors.DictCursor) as cur:
cur.execute("""
SELECT
JSON_UNQUOTE(JSON_EXTRACT(s.image, '$.en')) AS media_id,
m.path AS media_path
FROM lifeapp.la_subjects s
LEFT JOIN lifeapp.media m
ON m.id = JSON_UNQUOTE(JSON_EXTRACT(s.image, '$.en'))
WHERE s.id = %s
""", (subject_id,))
row = cur.fetchone()
conn.commit()
return jsonify({'message': 'Subject and its media deleted successfully'}),
200
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/subjects/<int:subject_id>/status', methods=['PATCH'])
def change_subject_status(subject_id):
"""Change the status of a subject."""
try:
filters = request.get_json() or {}
status = filters.get('status')
if status == 'ACTIVE':
status_no = 1
else:
status_no = 0
connection = get_db_connection()
###################################################################################
###################################################################################
######################## SETTINGS/LEVELS APIs ###################################
###################################################################################
###################################################################################
@app.route('/api/levels', methods=['POST'])
def get_levels():
"""Fetch all levels with pagination."""
connection = None
try:
filters = request.get_json() or {}
page = filters.get('page', 1)
per_page = 25 # Default pagination limit
offset = (page - 1) * per_page
connection = get_db_connection()
if connection is None:
raise Exception("Database connection failed")
return jsonify(levels)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
if connection: # Check if connection is not None
connection.close()
@app.route('/api/levels_new', methods=['POST'])
def create_level():
"""Create a new level."""
conn = None
try:
data = request.get_json() or {}
print("[CREATE] Incoming data:", data)
sql = """
INSERT INTO lifeapp.la_levels
(title, description,
jigyasa_points, mission_points, pragya_points,
puzzle_points, puzzle_time,
quiz_points, quiz_time,
riddle_points, riddle_time,
vision_text_image_points, vision_mcq_points,
status, created_at)
VALUES
(%s, %s,
%s, %s, %s,
%s, %s,
%s, %s,
%s, %s,
%s, %s,
%s, NOW())
"""
params = [
json.dumps(title_data),
json.dumps(description_data),
jigyasa_points,
mission_points,
pragya_points,
puzzle_points,
puzzle_time,
quiz_points,
quiz_time,
riddle_points,
riddle_time,
vision_text_image_points,
vision_mcq_points,
status
]
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(sql, params)
conn.commit()
print("[CREATE] Level created successfully.")
return jsonify({'message': 'Level Created', 'reload': True}), 201
except Exception as e:
print("[CREATE] Error:", e)
return jsonify({'error': str(e)}), 500
finally:
if conn:
conn.close()
@app.route('/api/levels_update', methods=['POST'])
def update_level():
"""Update an existing level."""
conn = None
try:
data = request.get_json() or {}
print("[UPDATE] Incoming data:", data)
level_id = data.get('id')
if not level_id:
print("[UPDATE] Missing level ID")
return jsonify({'error': 'Missing level ID'}), 400
params = [
json.dumps(title_data),
json.dumps(description_data),
jigyasa_points,
mission_points,
pragya_points,
puzzle_points,
puzzle_time,
quiz_points,
quiz_time,
riddle_points,
riddle_time,
vision_text_image_points,
vision_mcq_points,
status,
level_id
]
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(sql, params)
conn.commit()
print("[UPDATE] Level updated successfully.")
return jsonify({'message': 'Level Updated', 'reload': True}), 200
except Exception as e:
print("[UPDATE] Error:", e)
return jsonify({'error': str(e)}), 500
finally:
if conn:
conn.close()
@app.route('/api/levels_delete', methods=['POST'])
def delete_level():
"""Delete a level."""
try:
data = request.get_json() or {}
level_id = data.get('id')
if not level_id:
return jsonify({'error': 'Missing level ID'}), 400
connection = get_db_connection()
with connection.cursor() as cursor:
sql = "DELETE FROM lifeapp.la_levels WHERE id = %s"
cursor.execute(sql, (level_id,))
connection.commit()
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
###################################################################################
###################################################################################
######################## SETTINGS/LANGUAGES APIs
###################################
###################################################################################
###################################################################################
@app.route('/api/languages', methods=['POST'])
def get_languages():
"""Fetch all languages with pagination."""
try:
filters = request.get_json() or {}
page = filters.get('page', 1)
per_page = 25 # Default pagination limit
offset = (page - 1) * per_page
connection = get_db_connection()
with connection.cursor() as cursor:
sql = "SELECT * FROM lifeapp.languages ORDER BY title LIMIT %s OFFSET
%s"
cursor.execute(sql, (per_page, offset))
languages = cursor.fetchall()
return jsonify(languages)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/languages_new', methods=['POST'])
def create_language():
"""Create a new language with improved error handling."""
try:
# Get request data and validate
data = request.get_json() or {}
print(f"Received data: {data}") # Debugging
# Validate inputs
if not slug:
return jsonify({'error': 'Slug is required'}), 400
if not title:
return jsonify({'error': 'Title is required'}), 400
try:
status = int(status)
if status not in [0, 1]:
return jsonify({'error': 'Status must be 0 or 1'}), 400
except (TypeError, ValueError):
return jsonify({'error': 'Invalid status value'}), 400
# Connect to database
conn = get_db_connection()
if not conn:
return jsonify({'error': 'Database connection failed'}), 500
try:
with conn.cursor() as cursor:
# Check for duplicate slug
cursor.execute(
"SELECT COUNT(*) FROM lifeapp.languages WHERE slug = %s",
(slug,)
)
count = cursor.fetchone()[0]
if count > 0:
return jsonify({'error': 'Slug already exists'}), 400
return jsonify({
'message': 'Language Created Successfully',
'language_id': new_id
}), 201
except Exception as e:
conn.rollback()
print(f"Database error: {str(e)}") # Debugging
return jsonify({'error': f'Database error: {str(e)}'}), 500
except Exception as e:
print(f"Unexpected error: {str(e)}") # Debugging
return jsonify({'error': f'Server error: {str(e)}'}), 500
finally:
if 'conn' in locals() and conn:
conn.close()
@app.route('/api/languages_update', methods=['POST'])
def update_language():
"""Update a language."""
try:
data = request.get_json() or {}
language_id = data.get('id')
title = data.get('title', '').strip()
status = data.get('status')
try:
status = int(status)
except (TypeError, ValueError):
return jsonify({'error': 'Invalid status value'}), 400
connection = get_db_connection()
with connection.cursor() as cursor:
# Check if slug already exists for another language
cursor.execute("SELECT COUNT(*) FROM lifeapp.languages WHERE slug = %s
AND id != %s", (slug, language_id))
exists = cursor.fetchone()
if exists and exists[0] > 0:
return jsonify({'error': 'Slug already exists'}), 400
sql = """
UPDATE lifeapp.languages
SET
slug = %s,
title = %s,
status = %s,
updated_at = NOW()
WHERE id = %s
"""
cursor.execute(sql, (slug, title, status, language_id))
connection.commit()
return jsonify({'message': 'Language Updated'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/languages_delete/<int:language_id>', methods=['DELETE'])
def delete_language(language_id):
"""Delete a language."""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = "DELETE FROM lifeapp.languages WHERE id = %s"
cursor.execute(sql, (language_id,))
connection.commit()
return jsonify({'message': 'Language Deleted'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/languages/<int:language_id>/status', methods=['PATCH'])
def change_language_status(language_id):
"""Change the status of a language."""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
sql = "UPDATE languages SET status = IF(status='ACTIVE', 'DEACTIVE',
'ACTIVE') WHERE id = %s"
cursor.execute(sql, (language_id,))
connection.commit()
return jsonify({'message': 'Language Status Changed'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
###################################################################################
###################################################################################
######################## SETTINGS/SECTIONS APIs ###################################
###################################################################################
###################################################################################
@app.route('/api/sections', methods=['POST'])
def get_sections():
"""Fetch list of sections."""
try:
filters = request.get_json() or {}
status = filters.get('status')
connection = get_db_connection()
with connection.cursor() as cursor:
sql = "SELECT * FROM lifeapp.la_sections"
if status is not None: # Ensure status=0 is also considered
sql += " WHERE status = %s"
cursor.execute(sql, (status,))
else:
cursor.execute(sql)
result = cursor.fetchall()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/sections_new', methods=['POST'])
def create_section():
"""Create a new section."""
try:
data = request.get_json() or {}
status = data.get('status')
name = data.get('name')
datetime_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if not name:
return jsonify({'error': 'Name is required'}), 400
connection = get_db_connection()
with connection.cursor() as cursor:
# Fixed SQL statement with correct number of placeholders
cursor.execute(
"INSERT INTO lifeapp.la_sections (name, status, created_at,
updated_at) VALUES (%s, %s, %s, %s)",
(name, status, datetime_str, datetime_str)
)
connection.commit()
return jsonify({'message': 'Section created successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/sections_update', methods=['POST'])
def update_section():
"""Update section details."""
try:
data = request.get_json() or {}
section_id = data.get('id') # Getting id from request body
name = data.get('name')
status = data.get('status')
datetime_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if not name:
return jsonify({'error': 'Name is required'}), 400
if not section_id:
return jsonify({'error': 'Section ID is required'}), 400
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute(
"UPDATE lifeapp.la_sections SET name = %s, status = %s, updated_at
= %s WHERE id = %s",
(name, status, datetime_str, section_id)
)
connection.commit()
return jsonify({'message': 'Section updated successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/sections_delete', methods=['POST'])
def delete_section():
"""Delete a section."""
try:
data = request.get_json() or {}
section_id = data.get('id')
if not section_id:
return jsonify({'error': 'Section ID is required'}), 400
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute("DELETE FROM lifeapp.la_sections WHERE id = %s",
(section_id,))
connection.commit()
@app.route('/api/sections/<int:section_id>/status', methods=['PATCH'])
def toggle_section_status(section_id):
"""Toggle the status of a section."""
try:
connection = get_db_connection()
with connection.cursor() as cursor:
cursor.execute("SELECT status FROM la_sections WHERE id = %s",
(section_id,))
section = cursor.fetchone()
if not section:
return jsonify({'error': 'Section not found'}), 404
new_status = 'inactive' if section['status'] == 'active' else 'active'
cursor.execute("UPDATE la_sections SET status = %s WHERE id = %s",
(new_status, section_id))
connection.commit()
return jsonify({'message': 'Section status changed', 'status': new_status})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
###################################################################################
###################################################################################
####################### SETTINGS/TOPICS APIs ######################################
###################################################################################
###################################################################################
@app.route('/api/topics', methods=['POST'])
def get_topics():
data = request.get_json() or {}
la_subject_id = data.get('la_subject_id')
la_level_id = data.get('la_level_id')
status = data.get('status', '') # Expect "1", "0", or "all"
connection = get_db_connection()
try:
with connection.cursor() as cursor:
sql = """
SELECT
t.id,
t.title,
t.status,
-- t.created_at,
-- t.updated_at,
t.image,
t.allow_for,
t.type,
t.la_subject_id,
t.la_level_id,
m.id as media_id,
m.path as media_path
from lifeapp.la_topics t left join lifeapp.media m on m.id =
JSON_EXTRACT(t.image,'$.en')
WHERE 1=1
"""
filters = []
if la_subject_id:
sql += " AND t.la_subject_id = %s"
filters.append(la_subject_id)
if la_level_id:
sql += " AND t.la_level_id = %s"
filters.append(la_level_id)
if status and status.lower() != "all":
sql += " AND t.status = %s"
filters.append(status)
sql += " ORDER BY t.id"
cursor.execute(sql, filters)
topics = cursor.fetchall()
base_url = os.getenv('BASE_URL')
for r in topics:
r['media_url'] = f"{base_url}/{r['media_path']}" if
r.get('media_path') else None
return jsonify(topics)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/add_topic', methods=['POST'])
def add_topic():
form = request.form
file = request.files.get('media')
media_id = None
sql = """
INSERT INTO lifeapp.la_topics
(title, status, created_at, updated_at, image, allow_for, type,
la_subject_id, la_level_id)
VALUES (
JSON_OBJECT('en', %s),
%s, NOW(), NOW(),
JSON_OBJECT('en', %s),
%s, %s, %s, %s
)
"""
params = (
title_raw,
status,
media_id,
allow_for,
topic_type,
la_subject,
la_level
)
conn = get_db_connection()
try:
with conn.cursor() as cur:
cur.execute(sql, params)
new_id = cur.lastrowid
conn.commit()
return jsonify({"success": True, "topic_id": new_id}), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/update_topic/<int:topic_id>', methods=['POST'])
def update_topic(topic_id):
form = request.form
file = request.files.get('media')
new_media = None
conn = get_db_connection()
try:
with conn.cursor() as cur:
cur.execute(sql, params)
conn.commit()
return jsonify({"success": True, "topic_id": topic_id}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
@app.route('/api/delete_topic/<int:topic_id>', methods=['DELETE'])
def delete_topic(topic_id):
conn = get_db_connection()
try:
# fetch media id & path
with conn.cursor(pymysql.cursors.DictCursor) as cur:
cur.execute("""
SELECT JSON_UNQUOTE(JSON_EXTRACT(image,'$.en')) AS media_id,
m.path AS media_path
FROM lifeapp.la_topics t
LEFT JOIN lifeapp.media m
ON m.id = JSON_UNQUOTE(JSON_EXTRACT(t.image,'$.en'))
WHERE t.id=%s
""", (topic_id,))
row = cur.fetchone()
# delete topic
with conn.cursor() as cur:
cur.execute("DELETE FROM lifeapp.la_topics WHERE id=%s", (topic_id,))
# delete from S3
s3 = boto3.client(
's3',
region_name=DO_SPACES_REGION,
endpoint_url=DO_SPACES_ENDPOINT,
aws_access_key_id=DO_SPACES_KEY,
aws_secret_access_key=DO_SPACES_SECRET
)
try:
s3.delete_object(Bucket=DO_SPACES_BUCKET, Key=key)
except:
pass
conn.commit()
return jsonify({"success": True, "topic_id": topic_id}), 200
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
conn.close()
###################################################################################
###################################################################################
####################### SETTINGS/BOARDS APIs ######################################
###################################################################################
###################################################################################
@app.route('/api/boards', methods=['POST'])
def get_boards():
"""Return all boards."""
connection = get_db_connection()
try:
with connection.cursor() as cursor:
sql = "SELECT id, name, status, created_at, updated_at FROM
lifeapp.la_boards ORDER BY id;"
cursor.execute(sql)
boards = cursor.fetchall()
return jsonify(boards)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/add_board', methods=['POST'])
def add_board():
"""Add a new board."""
data = request.get_json() or {}
name = data.get("name")
status = data.get("status", 1)
datetime_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
connection = get_db_connection()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO lifeapp.la_boards (name, status, created_at,
updated_at)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(sql, (name, status, datetime_str, datetime_str))
board_id = cursor.lastrowid
connection.commit()
return jsonify({"success": True, "board_id": board_id})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/update_board/<int:board_id>', methods=['PUT'])
def update_board(board_id):
"""Update an existing board."""
data = request.get_json() or {}
name = data.get("name")
status = data.get("status", 1)
datetime_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
connection = get_db_connection()
try:
with connection.cursor() as cursor:
sql = """
UPDATE lifeapp.la_boards
SET name = %s,
status = %s,
updated_at = %s
WHERE id = %s
"""
cursor.execute(sql, (name, status, datetime_str, board_id))
connection.commit()
return jsonify({"success": True, "board_id": board_id})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
@app.route('/api/delete_board/<int:board_id>', methods=['DELETE'])
def delete_board(board_id):
"""Delete a board."""
connection = get_db_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM lifeapp.la_boards WHERE id = %s"
cursor.execute(sql, (board_id,))
connection.commit()
return jsonify({"success": True, "board_id": board_id})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
###################################################################################
###################################################################################
####################### SETTINGS/GAME ENROLLMENTS APIs ############################
###################################################################################
###################################################################################
connection = get_db_connection()
try:
cursor = connection.cursor()
# Insert with temporary enrollment_code
sql = """
INSERT INTO lifeapp.la_game_enrollments
(enrollment_code, type, user_id, unlock_enrollment_at, created_at,
updated_at)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (enrollment_code, type_val, user_id, unlock_time,
created_at, updated_at))
enrollment_id = cursor.lastrowid
return jsonify({
"success": True,
"message": "Enrollment created",
"id": enrollment_id,
"enrollment_code": real_enrollment_code
}), 201
finally:
connection.close()
except Exception as e:
logger.error("Error adding enrollment: %s", e)
return jsonify({"error": str(e), "success": False}), 500
# 3. Edit Enrollment - PUT /enrollments/<id>
@app.route('/api/enrollments/<int:enrollment_id>', methods=['PUT'])
def edit_enrollment(enrollment_id):
try:
data = request.get_json()
if not data:
return jsonify({"error": "No input data provided"}), 400
update_fields = []
params = []
if "enrollment_code" in data:
update_fields.append("enrollment_code = %s")
params.append(data["enrollment_code"])
if "type" in data:
update_fields.append("`type` = %s")
params.append(data["type"])
if "user_id" in data:
update_fields.append("user_id = %s")
params.append(data["user_id"])
if "unlock_enrollment_at" in data:
unlock_enrollment_at = data["unlock_enrollment_at"]
if unlock_enrollment_at:
try:
unlock_dt = datetime.fromisoformat(unlock_enrollment_at)
except ValueError:
return jsonify({"error": "Incorrect date format for
unlock_enrollment_at. Use ISO format."}), 400
else:
unlock_dt = None
update_fields.append("unlock_enrollment_at = %s")
params.append(unlock_dt)
if not update_fields:
return jsonify({"error": "No valid fields provided for update"}), 400
except Exception as e:
logger.error("Error editing enrollment: %s", e)
return jsonify({"error": str(e)}), 500
###################################################################################
###################################################################################
####################### SETTINGS/GAME ENROLLMENTS REQUEST APIs ####################
###################################################################################
###################################################################################
# ===============================================
# 1. List Enrollment Requests with Filters
# ===============================================
@app.route('/api/game_enrollment_requests', methods=['POST'])
def list_game_enrollment_requests():
data = request.get_json() or {}
# Filter by status: "approved", "not_approved", "all"
status = data.get('status', 'all').lower()
# Filter by type: "1", "2", "3", "4", "5", "6", or "all"
req_type = data.get('type', 'all')
if status != "all":
if status == "approved":
sql += " AND approved_at IS NOT NULL"
elif status == "not_approved":
sql += " AND approved_at IS NULL"
connection = get_db_connection()
try:
with connection.cursor() as cursor:
cursor.execute(sql, tuple(params))
result = cursor.fetchall()
return jsonify(result), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
# ===============================================
# 2. Add a New Enrollment Request
# ===============================================
@app.route('/api/game_enrollment_requests/add', methods=['POST'])
def add_game_enrollment_request():
data = request.get_json() or {}
user_id = data.get('user_id')
req_type = data.get('type')
la_game_enrollment_id = data.get('la_game_enrollment_id')
# approved_at can be provided (if approved) or None for pending request.
approved_at = data.get('approved_at', None)
connection = get_db_connection()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO lifeapp.la_request_game_enrollments
(user_id, type, la_game_enrollment_id, approved_at, created_at,
updated_at)
VALUES (%s, %s, %s, %s, NOW(), NOW())
"""
cursor.execute(sql, (user_id, req_type, la_game_enrollment_id,
approved_at))
connection.commit()
return jsonify({'message': 'Request added successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
# ===============================================
# 3. Edit an Existing Enrollment Request
# ===============================================
@app.route('/api/game_enrollment_requests/<int:request_id>', methods=['PUT'])
def edit_game_enrollment_request(request_id):
data = request.get_json() or {}
user_id = data.get('user_id')
req_type = data.get('type')
la_game_enrollment_id = data.get('la_game_enrollment_id')
approved_at = data.get('approved_at') # may be null if not approved
connection = get_db_connection()
try:
with connection.cursor() as cursor:
sql = """
UPDATE lifeapp.la_request_game_enrollments
SET user_id = %s, type = %s, la_game_enrollment_id = %s, approved_at =
%s, updated_at = NOW()
WHERE id = %s
"""
cursor.execute(sql, (user_id, req_type, la_game_enrollment_id,
approved_at, request_id))
connection.commit()
return jsonify({'message': 'Request updated successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
# ===============================================
# 4. Delete an Enrollment Request
# ===============================================
@app.route('/api/game_enrollment_requests/<int:request_id>', methods=['DELETE'])
def delete_game_enrollment_request(request_id):
connection = get_db_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM lifeapp.la_request_game_enrollments WHERE id = %s"
cursor.execute(sql, (request_id,))
connection.commit()
return jsonify({'message': 'Request deleted successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
connection.close()
###################################################################################
###################################################################################
####################### SETTINGS/COUPONS APIs #################################
###################################################################################
###################################################################################
def delete_s3_object(key):
s3 = boto3.client(
's3',
region_name=DO_SPACES_REGION,
endpoint_url=DO_SPACES_ENDPOINT,
aws_access_key_id=DO_SPACES_KEY,
aws_secret_access_key=DO_SPACES_SECRET
)
try:
s3.delete_object(Bucket=DO_SPACES_BUCKET, Key=key)
except Exception:
pass
base_query = '''
SELECT
c.id, c.title, c.category_id,
c.coin, c.link, c.details,
c.index, c.coupon_media_id as media_id, c.created_at, c.updated_at,
m.path as media_path
FROM lifeapp.coupons c
left join lifeapp.media m on c.coupon_media_id = m.id
'''
conditions = []
params = []
cursor.execute(base_query, params)
coupons = cursor.fetchall()
base_url = os.getenv('BASE_URL')
for r in coupons:
r['media_url'] = f"{base_url}/{r['media_path']}" if
r.get('media_path') else None
conn = get_db_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO lifeapp.coupons
(title, category_id, coin, link, details, `index`, coupon_media_id,
created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
""",
(title, category_id, coin, link, details, idx, media_id)
)
conn.commit()
return jsonify({'success': True}), 201
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
conn.close()
conn = get_db_connection()
try:
# if replacing media, fetch old key so we can delete from S3/db
if new_media_id is not None:
with conn.cursor(pymysql.cursors.DictCursor) as cur:
cur.execute("SELECT coupon_media_id, m.path AS media_path FROM
coupons c LEFT JOIN media m ON c.coupon_media_id=m.id WHERE c.id=%s", (id,))
old = cur.fetchone()
# delete old media row + S3
if old and old['coupon_media_id']:
with conn.cursor() as cur:
cur.execute("DELETE FROM media WHERE id=%s",
(old['coupon_media_id'],))
delete_s3_object(old['media_path'])
# build update
if new_media_id is not None:
sql = """
UPDATE lifeapp.coupons
SET title=%s, category_id=%s, coin=%s, link=%s,
details=%s, `index`=%s, coupon_media_id=%s,
updated_at=NOW()
WHERE id=%s
"""
params = (title, category_id, coin, link, details, idx, new_media_id,
id)
else:
sql = """
UPDATE lifeapp.coupons
SET title=%s, category_id=%s, coin=%s, link=%s,
details=%s, `index`=%s,
updated_at=NOW()
WHERE id=%s
"""
params = (title, category_id, coin, link, details, idx, id)
# DELETE coupon
@app.route('/api/coupons/<int:id>', methods=['DELETE'])
def delete_coupon(id):
conn = get_db_connection()
try:
# fetch the media id + path
with conn.cursor(pymysql.cursors.DictCursor) as cur:
cur.execute("""
SELECT coupon_media_id, m.path AS media_path
FROM lifeapp.coupons c
LEFT JOIN media m ON c.coupon_media_id=m.id
WHERE c.id=%s
""", (id,))
row = cur.fetchone()
conn.commit()
return jsonify({'success': True})
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
conn.close()
###################################################################################
###################################################################################
####################### CAMPAIGNS APIs ############################################
###################################################################################
###################################################################################
@app.route('/api/campaigns', methods=['GET'])
def list_campaigns():
qs = request.args
page = int(qs.get('page', 1))
per_page = int(qs.get('per_page', 25))
offset = (page - 1) * per_page
conn = get_db_connection()
try:
with conn.cursor() as cursor:
# 1) total count
cursor.execute("SELECT COUNT(*) AS total FROM la_campaigns")
total = cursor.fetchone()['total']
return jsonify({
'page': page,
'per_page': per_page,
'total': total,
'data': camps
}), 200
finally:
conn.close()
@app.route('/api/campaigns', methods=['POST'])
def create_campaign():
logger.info("📥 [POST] Create campaign")
conn = None
try:
if request.content_type.startswith("application/json"):
data = request.get_json()
else:
data = request.form
logger.info("📦 Content-Type: %s", request.content_type)
game_type = data.get('game_type')
reference_id = data.get('reference_id')
title = data.get('title') or data.get('campaign_title')
description = data.get('description')
scheduled_for = data.get('scheduled_for')
button_name = data.get('button_name')
media_id = None
image_file = request.files.get('image')
logger.info("📷 Image file: %s", image_file.filename if image_file else
"None")
if image_file and image_file.filename:
logger.info(f"📷 Uploading image: {image_file.filename}")
media = upload_media(image_file)
media_id = media['id']
logger.info("📷 Uploaded media ID: %s", media_id)
else:
media_id = data.get('media_id') # Handle JSON input with media_id
sql = """
INSERT INTO lifeapp.la_campaigns
(game_type, reference_id, title, description, scheduled_for,
button_name, media_id, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
"""
params = (game_type, reference_id, title, description, scheduled_for,
button_name, media_id)
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(sql, params)
conn.commit()
logger.info("✅ Campaign created with ID %s", cursor.lastrowid)
return jsonify({'id': cursor.lastrowid}), 201
except Exception as e:
logger.error("🔥 Error in POST /campaigns: %s", e)
return jsonify({'error': str(e)}), 500
finally:
if conn:
conn.close()
@app.route('/api/campaigns/<int:id>', methods=['PUT'])
def update_campaign(id):
# Handle different content types
if request.content_type.startswith('application/json'):
data = request.get_json()
else:
data = request.form
try:
# Extract fields with proper defaults
game_type = int(data.get('game_type')) if 'game_type' in data else None
reference_id = int(data.get('reference_id')) if 'reference_id' in data else
None
title = data.get('title') or data.get('campaign_title') or ''
description = data.get('description') or ''
scheduled_for = data.get('scheduled_for') or ''
button_name = data.get('button_name') or 'Start'
# Media handling
media_id = None
image_file = request.files.get('image')
if image_file and image_file.filename:
media = upload_media(image_file)
media_id = media['id']
elif 'media_id' in data:
media_id = data.get('media_id')
# Build SQL query
sql = """
UPDATE lifeapp.la_campaigns
SET game_type = %s,
reference_id = %s,
title = %s,
description = %s,
scheduled_for = %s,
button_name = %s,
status = %s,
"""
params = [
game_type,
reference_id,
title,
description,
scheduled_for,
button_name,
status_val
]
# Handle media_id
if media_id:
sql += " media_id = %s,"
params.append(media_id)
# Execute update
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute(sql, tuple(params))
conn.commit()
return jsonify({'success': True}), 200
except Exception as e:
logger.error("🔥 Error in PUT /campaigns/%s: %s", id, e)
return jsonify({'error': str(e)}), 500
finally:
if conn:
conn.close()
@app.route('/api/campaigns/<int:id>', methods=['DELETE'])
def delete_campaign(id):
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM lifeapp.la_campaigns WHERE id=%s", (id,))
conn.commit()
return jsonify({'success': True}), 200
finally:
conn.close()
# ————— Reference Lists —————
@app.route('/api/mission_list', methods=['POST'])
def mission_list():
data = request.get_json() or {}
subject_id = data.get('subject_id')
level_id = data.get('level_id')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """
SELECT id, JSON_UNQUOTE(JSON_EXTRACT(title, '$.en')) as title
FROM lifeapp.la_missions
WHERE status=1
AND allow_for=1
"""
params = []
if subject_id:
sql += " AND la_subject_id=%s"
params.append(subject_id)
if level_id:
sql += " AND la_level_id=%s"
params.append(level_id)
@app.route('/api/vision_list', methods=['POST'])
def vision_list():
data = request.get_json() or {}
subject_id = data.get('subject_id')
level_id = data.get('level_id')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """
SELECT id, JSON_UNQUOTE(JSON_EXTRACT(title, '$.en')) as title
FROM lifeapp.visions
WHERE status=1
AND allow_for IN (1, 3)
"""
params = []
if subject_id:
sql += " AND la_subject_id=%s"
params.append(subject_id)
if level_id:
sql += " AND la_level_id=%s"
params.append(level_id)
@app.route('/api/quiz_list', methods=['POST'])
def quiz_list():
data = request.get_json() or {}
subject_id = data.get('subject_id')
level_id = data.get('level_id')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = """
SELECT id, JSON_UNQUOTE(JSON_EXTRACT(title, '$.en')) AS title
FROM lifeapp.la_topics
WHERE status = 1
"""
params = []
if subject_id:
sql += " AND la_subject_id=%s"
params.append(subject_id)
if level_id:
sql += " AND la_level_id=%s"
params.append(level_id)
cursor.execute(sql, params)
items = cursor.fetchall()
return jsonify(items), 200
finally:
conn.close()
@app.route('/api/campaigns/<int:id>/details', methods=['GET'])
def get_campaign_details(id):
"""Fetch statistics for a specific campaign"""
school_code = request.args.get('school_code')
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT id, game_type, reference_id, scheduled_for, status, ended_at
FROM la_campaigns
WHERE id = %s
""", (id,))
campaign = cursor.fetchone()
if not campaign:
return jsonify({'error': 'Campaign not found'}), 404
game_type = campaign['game_type']
reference_id = campaign['reference_id']
start_date = campaign['scheduled_for']
if game_type == 7:
stats = handle_vision_details(conn, cursor, reference_id,
start_date, campaign, school_code)
return jsonify(stats), 200
elif game_type == 1:
stats = handle_mission_details(conn, cursor, reference_id,
start_date, school_code)
return jsonify(stats), 200
elif game_type == 2:
stats = handle_quiz_details(conn, cursor, reference_id, start_date,
campaign, school_code)
return jsonify(stats), 200
else:
return jsonify({
'total_submission': 0,
'total_coins_earned': 0
}), 200
except Exception as e:
logger.error(f"Error fetching campaign details: {str(e)}")
return jsonify({'error': str(e)}), 500
finally:
conn.close()
if school_code:
distinct_user_query += " AND u.school_code = %s"
params.append(school_code)
cursor.execute(distinct_user_query, tuple(params))
user_rows = cursor.fetchall()
# Process results
user_status = {}
user_coins = defaultdict(int)
if status == 'approved':
user_coins[user_id] += score
total_submission = len(user_ids)
total_approved = sum(1 for s in user_status.values() if s == 'approved')
total_rejected = sum(1 for s in user_status.values() if s == 'rejected')
total_requested = sum(1 for s in user_status.values() if s == 'requested')
total_coins_earned = sum(user_coins.values())
return {
'total_submission': total_submission,
'total_approved': total_approved,
'total_rejected': total_rejected,
'total_requested': total_requested,
'total_coins_earned': total_coins_earned
}
mission_query = """
SELECT m.user_id, m.approved_at, m.rejected_at, m.points
FROM la_mission_completes m
INNER JOIN lifeapp.users u ON m.user_id = u.id
INNER JOIN (
SELECT user_id, MIN(created_at) as first_submit
FROM la_mission_completes
WHERE la_mission_id = %s
AND created_at BETWEEN %s AND %s
GROUP BY user_id
) as firsts ON m.user_id = firsts.user_id AND m.created_at =
firsts.first_submit
"""
params = [mission_id, start_datetime, end_datetime]
if school_code:
mission_query += " WHERE u.school_code = %s"
params.append(school_code)
cursor.execute(mission_query, tuple(params))
mission_rows = cursor.fetchall()
total_submission = len(mission_rows)
total_approved = 0
total_rejected = 0
total_requested = 0
total_coins_earned = 0
return {
'total_submission': total_submission,
'total_approved': total_approved,
'total_rejected': total_rejected,
'total_requested': total_requested,
'total_coins_earned': total_coins_earned
}
quiz_query = """
SELECT
COUNT(DISTINCT g.user_id) AS total_submission,
COALESCE(SUM(r.coins), 0) AS total_coins_earned
FROM la_quiz_games g
LEFT JOIN la_quiz_game_results r ON g.id = r.la_quiz_game_id
INNER JOIN lifeapp.users u ON g.user_id = u.id
WHERE g.la_topic_id = %s
AND g.created_at BETWEEN %s AND %s
"""
params = [topic_id, start_datetime, end_datetime]
if school_code:
quiz_query += " AND u.school_code = %s"
params.append(school_code)
cursor.execute(quiz_query, tuple(params))
result = cursor.fetchone()
return {
'total_submission': result['total_submission'] or 0,
'total_coins_earned': result['total_coins_earned'] or 0
}
if __name__ == '__main__':
app.run(debug=True, use_reloader=True)