Skip to content

Commit 5828b80

Browse files
alxtkr77Alex Tokergtopper
authored
[Model Monitoring] Fix TimescaleDB endpoint count to query correct tables (ML-11624) (#9164)
## Summary Fixes bug where Model Monitoring App shows "Endpoints: 0" when using TimescaleDB but correctly shows endpoints with V3IO. **Root cause**: `count_processed_model_endpoints` was querying the PREDICTIONS table, which doesn't have an `application_name` column, instead of METRICS and APP_RESULTS tables which have this column. ## Changes Made - Move `count_processed_model_endpoints` from `TimescaleDBPredictionsQueries` to `TimescaleDBConnector` - Use SQL UNION to efficiently count endpoints from both METRICS and APP_RESULTS tables in a single query - Store `_tables` and `_pre_aggregate_manager` as instance variables in connector for cross-table operations - Remove unused `_count_with_application_join` and `_count_simple` methods - Update tests to use `connector` fixture directly ## Testing - All 14 TimescaleDB aggregation tests pass - Tested endpoint counting with: - No data (returns empty dict) - Data in METRICS only - Data in APP_RESULTS only - Data in BOTH tables (UNION behavior - counts unique endpoints) - Time range filtering ## Checklist - [x] Code formatted with `ruff format` - [x] Code passes `ruff check` linting - [x] Unit tests pass - [x] No secrets in diff ## Reference - Jira: [ML-11624](https://iguazio.atlassian.net/browse/ML-11624) [ML-11624]: https://iguazio.atlassian.net/browse/ML-11624?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ --------- Co-authored-by: Alex Toker <[email protected]> Co-authored-by: Gal Topper <[email protected]>
1 parent 67ec873 commit 5828b80

File tree

3 files changed

+193
-213
lines changed

3 files changed

+193
-213
lines changed

mlrun/model_monitoring/db/tsdb/timescaledb/queries/timescaledb_predictions_queries.py

Lines changed: 0 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424
import mlrun.model_monitoring.db.tsdb.timescaledb.timescaledb_schema as timescaledb_schema
2525
import mlrun.utils
2626
from mlrun.common.schemas.model_monitoring.model_endpoints import _MetricPoint
27-
from mlrun.model_monitoring.db.tsdb.timescaledb.timescaledb_connection import (
28-
Statement,
29-
)
3027
from mlrun.model_monitoring.db.tsdb.timescaledb.utils.timescaledb_dataframe_processor import (
3128
TimescaleDBDataFrameProcessor,
3229
)
@@ -377,113 +374,3 @@ def build_raw_query():
377374
column_mapping_rules=column_mapping_rules,
378375
debug_name="avg_latency",
379376
)
380-
381-
def count_processed_model_endpoints(
382-
self,
383-
start: Optional[datetime] = None,
384-
end: Optional[datetime] = None,
385-
application_names: Optional[Union[str, list[str]]] = None,
386-
) -> dict[str, int]:
387-
"""
388-
Optimized count with application filtering using JOIN approach.
389-
390-
This implementation:
391-
1. Uses JOIN when application filtering is needed (most performant)
392-
2. Falls back to simple query when no filtering (fastest for that case)
393-
3. Leverages TimescaleDB's chunk exclusion and parallel processing
394-
4. Can utilize pre-aggregates when available
395-
"""
396-
start = start or (mlrun.utils.datetime_now() - timedelta(hours=24))
397-
start, end = self._pre_aggregate_manager.get_start_end(start, end)
398-
399-
predictions_table = self.tables[mm_schemas.TimescaleDBTables.PREDICTIONS]
400-
401-
if application_names:
402-
# Ensure application_names is a list
403-
if isinstance(application_names, str):
404-
application_names = [application_names]
405-
406-
result = {}
407-
408-
# For each application, call the existing JOIN method and wrap result in dict
409-
for app_name in application_names:
410-
# Use existing _count_with_application_join but extract count for single app
411-
count = self._count_with_application_join(
412-
predictions_table,
413-
start,
414-
end,
415-
[app_name], # Pass as list to existing method
416-
)
417-
result[app_name] = count
418-
419-
return result
420-
else:
421-
# Use existing simple count method and wrap result
422-
total_count = self._count_simple(predictions_table, start, end)
423-
return {"total": total_count} if total_count > 0 else {}
424-
425-
def _count_with_application_join(
426-
self,
427-
predictions_table,
428-
start: datetime,
429-
end: datetime,
430-
application_names: Union[str, list[str]],
431-
) -> int:
432-
"""
433-
Use JOIN with metrics table for application filtering.
434-
435-
Performance characteristics:
436-
- Leverages indexes on both tables
437-
- TimescaleDB optimizes time-based JOINs
438-
- Chunk exclusion works on both sides
439-
- DISTINCT applied after filtering
440-
"""
441-
metrics_table = self.tables[mm_schemas.TimescaleDBTables.METRICS]
442-
443-
# Normalize application_names to list for consistent handling
444-
if isinstance(application_names, str):
445-
app_names_list = [application_names]
446-
else:
447-
app_names_list = list(application_names)
448-
449-
# Build parameterized query with proper placeholders
450-
app_placeholders = ", ".join(["%s"] * len(app_names_list))
451-
452-
query_sql = f"""
453-
SELECT COUNT(DISTINCT p.{mm_schemas.WriterEvent.ENDPOINT_ID}) AS endpoint_count
454-
FROM {predictions_table.full_name()} p
455-
INNER JOIN {metrics_table.full_name()} m
456-
ON p.{mm_schemas.WriterEvent.ENDPOINT_ID} = m.{mm_schemas.WriterEvent.ENDPOINT_ID}
457-
AND m.{metrics_table.time_column} >= %s
458-
AND m.{metrics_table.time_column} <= %s
459-
WHERE p.{predictions_table.time_column} >= %s
460-
AND p.{predictions_table.time_column} <= %s
461-
AND m.{mm_schemas.WriterEvent.APPLICATION_NAME} IN ({app_placeholders})
462-
"""
463-
464-
# Parameters: [start, end, start, end] + application_names_list
465-
params = [start, end, start, end] + app_names_list
466-
467-
stmt = Statement(query_sql, params)
468-
result = self._connection.run(query=stmt)
469-
470-
return result.data[0][0] if result and result.data else 0
471-
472-
def _count_simple(self, predictions_table, start: datetime, end: datetime) -> int:
473-
"""
474-
Simple count without application filtering.
475-
476-
Uses the schema's query builder for consistency and potential pre-aggregate usage.
477-
"""
478-
columns = [
479-
f"COUNT(DISTINCT {mm_schemas.WriterEvent.ENDPOINT_ID}) AS endpoint_count"
480-
]
481-
482-
query = predictions_table._get_records_query(
483-
start=start,
484-
end=end,
485-
columns_to_filter=columns,
486-
)
487-
488-
result = self._connection.run(query=query)
489-
return result.data[0][0] if result and result.data else 0

mlrun/model_monitoring/db/tsdb/timescaledb/timescaledb_connector.py

Lines changed: 78 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
TimescaleDBResultsQueries,
3939
)
4040
from mlrun.model_monitoring.db.tsdb.timescaledb.timescaledb_connection import (
41+
Statement,
4142
TimescaleDBConnection,
4243
)
4344
from mlrun.model_monitoring.db.tsdb.timescaledb.timescaledb_operations import (
@@ -109,27 +110,27 @@ def __init__(
109110
)
110111

111112
# Create shared components needed by query classes
112-
tables = timescaledb_schema.create_table_schemas(project)
113-
pre_aggregate_manager = PreAggregateManager(pre_aggregate_config)
113+
self._tables = timescaledb_schema.create_table_schemas(project)
114+
self._pre_aggregate_manager = PreAggregateManager(pre_aggregate_config)
114115

115116
# Create specialized query handlers with proper initialization
116117
self._metrics_queries = TimescaleDBMetricsQueries(
117118
project=project,
118119
connection=self._connection,
119-
pre_aggregate_manager=pre_aggregate_manager,
120-
tables=tables,
120+
pre_aggregate_manager=self._pre_aggregate_manager,
121+
tables=self._tables,
121122
)
122123
self._predictions_queries = TimescaleDBPredictionsQueries(
123124
project=project,
124125
connection=self._connection,
125-
pre_aggregate_manager=pre_aggregate_manager,
126-
tables=tables,
126+
pre_aggregate_manager=self._pre_aggregate_manager,
127+
tables=self._tables,
127128
)
128129
self._results_queries = TimescaleDBResultsQueries(
129130
connection=self._connection,
130131
project=project,
131-
pre_aggregate_manager=pre_aggregate_manager,
132-
tables=tables,
132+
pre_aggregate_manager=self._pre_aggregate_manager,
133+
tables=self._tables,
133134
)
134135

135136
# Create operations and stream handlers
@@ -396,10 +397,75 @@ def get_last_request(self, *args, **kwargs):
396397
def get_avg_latency(self, *args, **kwargs):
397398
return self._predictions_queries.get_avg_latency(*args, **kwargs)
398399

399-
def count_processed_model_endpoints(self, *args, **kwargs):
400-
return self._predictions_queries.count_processed_model_endpoints(
401-
*args, **kwargs
402-
)
400+
def count_processed_model_endpoints(
401+
self,
402+
start: Optional[datetime.datetime] = None,
403+
end: Optional[datetime.datetime] = None,
404+
application_names: Optional[list[str] | str] = None,
405+
) -> dict[str, int]:
406+
"""
407+
Count unique endpoints per application from METRICS and APP_RESULTS tables.
408+
409+
Uses SQL UNION to efficiently count endpoints that have data in EITHER table.
410+
411+
:param start: Start time for the query (default: last 24 hours)
412+
:param end: End time for the query (default: current time)
413+
:param application_names: Filter by specific application names
414+
:return: Dictionary mapping application_name to endpoint count
415+
"""
416+
# Set default time range
417+
start = start or (mlrun.utils.datetime_now() - datetime.timedelta(hours=24))
418+
start, end = self._pre_aggregate_manager.get_start_end(start, end)
419+
420+
metrics_table = self._tables[mm_schemas.TimescaleDBTables.METRICS]
421+
app_results_table = self._tables[mm_schemas.TimescaleDBTables.APP_RESULTS]
422+
time_column = mm_schemas.WriterEvent.END_INFER_TIME
423+
app_column = mm_schemas.WriterEvent.APPLICATION_NAME
424+
endpoint_column = mm_schemas.WriterEvent.ENDPOINT_ID
425+
426+
# Build application filter and params
427+
app_filter_metrics = ""
428+
app_filter_results = ""
429+
430+
if application_names:
431+
if isinstance(application_names, str):
432+
application_names = [application_names]
433+
app_names_list = list(application_names)
434+
app_placeholders = ", ".join(["%s"] * len(app_names_list))
435+
app_filter_metrics = f"AND {app_column} IN ({app_placeholders})"
436+
app_filter_results = f"AND {app_column} IN ({app_placeholders})"
437+
# Params: metrics (start, end, apps), app_results (start, end, apps)
438+
params = [start, end] + app_names_list + [start, end] + app_names_list
439+
else:
440+
params = [start, end, start, end]
441+
442+
# Use UNION to combine endpoints from both METRICS and APP_RESULTS tables
443+
query_sql = f"""
444+
SELECT {app_column}, COUNT(DISTINCT {endpoint_column}) as endpoint_count
445+
FROM (
446+
SELECT DISTINCT {app_column}, {endpoint_column}
447+
FROM {metrics_table.full_name()}
448+
WHERE {time_column} >= %s AND {time_column} <= %s
449+
{app_filter_metrics}
450+
451+
UNION
452+
453+
SELECT DISTINCT {app_column}, {endpoint_column}
454+
FROM {app_results_table.full_name()}
455+
WHERE {time_column} >= %s AND {time_column} <= %s
456+
{app_filter_results}
457+
) combined
458+
GROUP BY {app_column}
459+
"""
460+
461+
stmt = Statement(query_sql, params)
462+
result = self._connection.run(query=stmt)
463+
464+
if not result or not result.data:
465+
return {}
466+
467+
# Convert result to dict: {application_name: count}
468+
return {row[0]: row[1] for row in result.data}
403469

404470
def get_drift_status(self, *args, **kwargs):
405471
return self._results_queries.get_drift_status(*args, **kwargs)

0 commit comments

Comments
 (0)