Conversation
📝 WalkthroughWalkthroughAdds resilient SkyPilot provider error handling (standardized ConnectionError conversion, structured fallbacks) and converts multiple blocking provider calls to asynchronous thread-offloaded executions via asyncio.to_thread across router and job endpoints to avoid event-loop blocking. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Router as API Router (async)
participant Thread as Worker Thread
participant Provider as ComputeProvider (SkyPilot)
participant SkyPilot as SkyPilot Server
Client->>Router: API request (e.g., get_cluster_status / launch)
Router->>Router: wrap blocking call with asyncio.to_thread
Router->>Thread: invoke provider.method(...)
Thread->>Provider: provider.method(...)
Provider->>SkyPilot: HTTP / SDK call
alt SkyPilot responds
SkyPilot-->>Provider: success/response
Provider-->>Thread: parsed result
else ConnectionError / Timeout
SkyPilot-->>Provider: connection failure
Provider-->>Thread: raise standardized ConnectionError
end
Thread-->>Router: return result or structured error/fallback
Router-->>Client: respond with result or stable error structure
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
api/transformerlab/compute_providers/skypilot.py (1)
429-433: Consider using theloggingmodule instead ofprint().The file imports
loggingat line 6, but error messages useprint(). For consistency and better observability (log levels, formatting, handlers), consider usinglogging.error()orlogging.warning()throughout.This applies to all similar error handling blocks in this file (lines 432, 477, 557, 753, 1039, 1105, 1185, 1230).
Example fix
except ConnectionError as e: - print(f"Failed to launch cluster {cluster_name}: {e}") + logging.error(f"Failed to launch cluster {cluster_name}: {e}") return {"status": "error", "message": str(e)}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/compute_providers/skypilot.py` around lines 429 - 433, Replace print() calls in the exception handlers with the logging module (e.g., logging.error or logging.exception) so messages use proper log levels and handlers; specifically in the try/except around the call to self._make_authenticated_request("POST", "/launch", ...) (and the other similar blocks mentioned) change print(f"Failed to launch cluster {cluster_name}: {e}") to logging.error or logging.exception including context (cluster_name) and the exception details (use exc_info=True or logging.exception for stack trace) so all error paths in the methods that call _make_authenticated_request produce structured logs instead of printing to stdout.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@api/transformerlab/compute_providers/skypilot.py`:
- Around line 1102-1106: The f-string print in the except block after calling
self._make_authenticated_request is over the 120-char Ruff limit; run `ruff
format` or manually reflow the line so it stays under 120 characters (e.g.,
assign the formatted message to a local variable or split the string before
calling print/log), preserving the same exception handling and return ""
behavior and keeping references to job_id, cluster_name and exception e.
---
Nitpick comments:
In `@api/transformerlab/compute_providers/skypilot.py`:
- Around line 429-433: Replace print() calls in the exception handlers with the
logging module (e.g., logging.error or logging.exception) so messages use proper
log levels and handlers; specifically in the try/except around the call to
self._make_authenticated_request("POST", "/launch", ...) (and the other similar
blocks mentioned) change print(f"Failed to launch cluster {cluster_name}: {e}")
to logging.error or logging.exception including context (cluster_name) and the
exception details (use exc_info=True or logging.exception for stack trace) so
all error paths in the methods that call _make_authenticated_request produce
structured logs instead of printing to stdout.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
api/transformerlab/routers/experiment/jobs.py (2)
298-323:⚠️ Potential issue | 🟠 MajorInitialize
provider_jobsbeforelist_jobsto avoid UnboundLocalError.Line 309 reads
provider_jobseven when Line 299 raisesNotImplementedError, which leaves the variable undefined and will crash the request (e.g., Runpod path). Initialize to an empty list before the try block.🛠️ Proposed fix
if provider_job_id is None: + provider_jobs = [] try: provider_jobs = await asyncio.to_thread(provider_instance.list_jobs, cluster_name) except NotImplementedError: # Provider doesn't support listing jobs (e.g., Runpod) # For Runpod, we can't determine a job_id, so we'll use the cluster_name as a fallback # or return a message that logs aren't available via job_id provider_job_id = cluster_name # Use cluster_name as fallback identifier provider_job_candidates = []🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/routers/experiment/jobs.py` around lines 298 - 323, The variable provider_jobs is read after the try/except but may be undefined if provider_instance.list_jobs raises NotImplementedError; initialize provider_jobs = [] before the try so the NotImplementedError branch and subsequent logic (the provider_job_id and provider_job_candidates handling and the provider_jobs list comprehension) operate safely; ensure provider_job_id and provider_job_candidates assignments remain in the NotImplementedError branch as fallbacks and that the later block that inspects provider_jobs (the running_states/chosen_job logic and list comprehension) can handle an empty provider_jobs list.
334-340: 🛠️ Refactor suggestion | 🟠 MajorMove provider log retrieval into a service layer.
Line 334 performs provider I/O inside the router. Extract this logic into
api/transformerlab/services/and keep the router focused on request/response handling. As per coding guidelines: "Place business logic inapi/transformerlab/services/using the Service pattern; routers should only handle HTTP request/response validation and call services".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/routers/experiment/jobs.py` around lines 334 - 340, The router is doing provider I/O by calling asyncio.to_thread(provider_instance.get_job_logs...) and assigning raw_logs; move that call into a new service function (e.g., create a method like get_provider_job_logs in api/transformerlab/services/experiment_jobs_service or similar) that accepts (provider_instance, cluster_name, provider_job_id, tail_lines, follow=False), performs the asyncio.to_thread(provider_instance.get_job_logs, ...) and returns raw_logs or raises errors; then replace the inline asyncio.to_thread call in the router with a simple call to this service method (await ExperimentJobsService.get_provider_job_logs(...)) so the router only handles request/response and the I/O lives in the services layer.api/transformerlab/routers/compute_provider.py (1)
731-737: 🛠️ Refactor suggestion | 🟠 MajorMove provider checks into a service layer.
Line 735 calls
provider_instance.check()directly in the router. Per the service pattern, extract provider interactions intoapi/transformerlab/services/and have the router call that service instead. As per coding guidelines: "Place business logic inapi/transformerlab/services/using the Service pattern; routers should only handle HTTP request/response validation and call services".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/routers/compute_provider.py` around lines 731 - 737, Extract the provider check logic out of the router into a service function (e.g., create a new function check_provider_active in api/transformerlab/services/compute_provider_service) that takes (provider, user_id, team_id), calls get_provider_instance(provider, user_id=user_id, team_id=team_id), invokes provider_instance.check via asyncio.to_thread and returns the boolean result; then update the router to call this service function instead of calling provider_instance.check directly (replace the direct call to provider_instance.check and await asyncio.to_thread with a call to check_provider_active).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@api/transformerlab/routers/compute_provider.py`:
- Around line 731-737: Extract the provider check logic out of the router into a
service function (e.g., create a new function check_provider_active in
api/transformerlab/services/compute_provider_service) that takes (provider,
user_id, team_id), calls get_provider_instance(provider, user_id=user_id,
team_id=team_id), invokes provider_instance.check via asyncio.to_thread and
returns the boolean result; then update the router to call this service function
instead of calling provider_instance.check directly (replace the direct call to
provider_instance.check and await asyncio.to_thread with a call to
check_provider_active).
In `@api/transformerlab/routers/experiment/jobs.py`:
- Around line 298-323: The variable provider_jobs is read after the try/except
but may be undefined if provider_instance.list_jobs raises NotImplementedError;
initialize provider_jobs = [] before the try so the NotImplementedError branch
and subsequent logic (the provider_job_id and provider_job_candidates handling
and the provider_jobs list comprehension) operate safely; ensure provider_job_id
and provider_job_candidates assignments remain in the NotImplementedError branch
as fallbacks and that the later block that inspects provider_jobs (the
running_states/chosen_job logic and list comprehension) can handle an empty
provider_jobs list.
- Around line 334-340: The router is doing provider I/O by calling
asyncio.to_thread(provider_instance.get_job_logs...) and assigning raw_logs;
move that call into a new service function (e.g., create a method like
get_provider_job_logs in api/transformerlab/services/experiment_jobs_service or
similar) that accepts (provider_instance, cluster_name, provider_job_id,
tail_lines, follow=False), performs the
asyncio.to_thread(provider_instance.get_job_logs, ...) and returns raw_logs or
raises errors; then replace the inline asyncio.to_thread call in the router with
a simple call to this service method (await
ExperimentJobsService.get_provider_job_logs(...)) so the router only handles
request/response and the I/O lives in the services layer.
---
Duplicate comments:
In `@api/transformerlab/routers/compute_provider.py`:
- Around line 1565-1567: The router is directly calling provider logic via
provider_instance.launch_cluster with formatted_cluster_name and cluster_config;
move this business logic into a service under api/transformerlab/services (e.g.,
add a ComputeProviderService.launch_cluster method or similar) and have the
router only validate inputs and call that service. Create a service function
that accepts the same parameters (provider_instance, formatted_cluster_name,
cluster_config), performs the
asyncio.to_thread(provider_instance.launch_cluster, ...) call and returns the
result, then replace the direct call in the router with a call to the new
service method.
- Around line 2518-2522: The router is calling
provider_instance.get_clusters_detailed directly (via asyncio.to_thread) which
mixes business logic into the HTTP layer; move this logic into a new service
under api/transformerlab/services (e.g., add a ComputeProviderService with a
method like get_clusters_detailed or list_detailed_clusters) that accepts the
provider identifier/instance, calls provider_instance.get_clusters_detailed
(using asyncio.to_thread if it’s blocking) and returns the clusters; then update
the router to only validate input and await the new service method to get and
return the clusters.
- Around line 1742-1748: The router contains business logic that checks Runpod
provider status (the if provider.type == ProviderType.RUNPOD.value branch, the
asyncio.to_thread(provider_instance.get_cluster_status, cluster_name) call and
the terminal_pod_states / pod_finished evaluation using ClusterState); move this
logic into a new or existing service under api/transformerlab/services/ (e.g., a
method like ComputeProviderService.get_runpod_cluster_status or
ComputeProviderService.is_runpod_pod_finished that encapsulates
get_cluster_status and the terminal state set), return a simple status/boolean
from that service, and update the router to only call that service method and
act on its result so the router has no provider-status business logic.
- Around line 1175-1177: The router currently performs provider launch business
logic by calling asyncio.to_thread(provider_instance.launch_cluster, ...) inside
compute_provider.py; move this logic into a new or existing service under
api/transformerlab/services (e.g., ComputeProviderService or ProviderService) so
the router only validates input and calls the service method; implement a
service method (e.g., launch_cluster or launch_cluster_async) that accepts
formatted_cluster_name and cluster_config, performs the
asyncio.to_thread(provider_instance.launch_cluster, ...) call and returns
launch_result, and update the router to call that service method instead of
invoking provider_instance.launch_cluster directly.
- Around line 2459-2463: The router currently calls
provider_instance.get_cluster_status (passing cluster_name) which mixes business
logic into the HTTP layer; move this call into a service in
api/transformerlab/services (e.g., add a function like get_cluster_status or
get_provider_cluster_status that accepts provider_instance and cluster_name and
returns the status), update the router to call that service function instead,
and ensure the service uses
asyncio.to_thread(provider_instance.get_cluster_status, cluster_name) so the
router only handles request/response validation and delegates business logic to
the new service.
- Around line 2789-2793: The router is calling provider_instance.cancel_job
directly (provider_instance.cancel_job(cluster_name, job_id)); move this
business logic into a service under api/transformerlab/services (e.g., add a
method like ComputeProviderService.cancel_job or cancel_provider_job that
accepts provider_instance, cluster_name, job_id and handles setting
provider_instance.extra_config["workspace_dir"] and invoking
provider_instance.cancel_job via asyncio.to_thread), then update the router to
only validate inputs and call that service method and return its result; ensure
the service encapsulates the asyncio.to_thread call and any error handling so
the router contains no provider cancellation logic.
- Around line 1698-1702: The router contains business logic: directly calling
provider_instance.get_cluster_status and evaluating terminal_states_local
({ClusterState.DOWN, ClusterState.FAILED, ClusterState.STOPPED}) when
provider.type == ProviderType.LOCAL.value; move this logic into a service under
api/transformerlab/services/ (e.g., ComputeProviderService or
ProviderStatusService) by creating a method like
get_local_cluster_status_or_is_terminal(provider_instance, cluster_name) that
performs the asyncio.to_thread(provider_instance.get_cluster_status,
cluster_name) call and returns the status or a boolean indicating terminal
state; then update the router to only call the new service method (referencing
ProviderType.LOCAL, provider_instance.get_cluster_status, and ClusterState) and
handle HTTP request/response without embedding the status-check logic.
- Around line 1791-1794: The router currently calls provider_instance.list_jobs
(in the handler around the try/except block) which is business logic and must be
moved into a service; create a service method (e.g.,
ComputeProviderService.list_provider_jobs or list_jobs_for_cluster in
api/transformerlab/services/) that accepts provider_instance and cluster_name,
calls provider_instance.list_jobs inside asyncio.to_thread, handles
NotImplementedError and returns a normalized result or raises a service-specific
exception, then replace the direct call in the router with a call to that new
service method and let the router only translate service responses into HTTP
responses.
- Around line 2429-2433: The router currently invokes
provider_instance.stop_cluster directly; move that business logic into a service
method (e.g., add a stop_cluster method on the compute provider service such as
ComputeProviderService.stop_cluster or ProviderService.stop_cluster) and have
the router call that service method instead. Implement the service method to
accept the same inputs (cluster_name and provider identifier/context), perform
the provider_instance.stop_cluster call (using asyncio.to_thread if needed) and
return the result or raise service-level errors; update the router to only
validate inputs and call the new service method, returning its response.
- Around line 2489-2493: The router is calling
provider_instance.get_cluster_resources (business logic) directly; move that
call into a service under api/transformerlab/services (e.g., add a method
ComputeProviderService.get_cluster_resources that accepts provider_instance and
cluster_name and returns the resources), update the router handler to only
validate inputs and call ComputeProviderService.get_cluster_resources, and
remove any direct business logic from the router so the router only handles HTTP
request/response and delegates to the new service method.
- Around line 2395-2399: The router is invoking provider_instance.launch_cluster
directly (via await asyncio.to_thread(...))—move this business logic into a
service under api/transformerlab/services: add a method (e.g.,
ComputeProviderService.launch_cluster) that accepts provider_instance,
formatted_cluster_name and cluster_config and performs the
asyncio.to_thread(provider_instance.launch_cluster, ...) call and returns or
raises as appropriate; then update the router to call await
ComputeProviderService.launch_cluster(...) and keep the existing error
handling/response logic there. Ensure the service is unit-testable and the
router only handles validation and the service call.
In `@api/transformerlab/routers/experiment/jobs.py`:
- Around line 422-436: The tunnel-info path can still reference provider_jobs
after a NotImplementedError, so initialize provider_jobs = [] immediately before
the try block that calls provider_instance.list_jobs (the same place where
provider_job_id is checked), then keep the existing except NotImplementedError
handler that sets provider_job_id = cluster_name as the fallback; this ensures
the subsequent logic that computes running_states, chosen_job and
provider_job_id (using JobState, chosen_job.job_id) never accesses an
uninitialized variable.
- Around line 441-447: The router is performing provider I/O by calling
provider_instance.get_job_logs to populate raw_logs; move this logic into a new
or existing service under api/transformerlab/services/ (e.g.,
JobLogService.get_job_logs) and have the router call that service instead.
Specifically, create a service method that accepts cluster_name,
provider_job_id, tail_lines (and follow=False) and performs the
asyncio.to_thread call to provider_instance.get_job_logs, returning the
raw_logs; then replace the direct call in the router (the raw_logs assignment
using provider_instance.get_job_logs) with a call to the service method and use
its result. Ensure the service receives any needed provider_instance via
dependency injection or lookup so the router remains I/O-free.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
api/transformerlab/routers/compute_provider.py (1)
1789-1802:⚠️ Potential issue | 🟠 MajorAwait the job_data update in the Runpod branch.
job_update_job_data_insert_key_valueis async elsewhere in this file; withoutawait, end_time may not be persisted and you’ll get an un-awaited coroutine.🔧 Proposed fix
- job_service.job_update_job_data_insert_key_value(job_id, "end_time", end_time_str, experiment_id) + await job_service.job_update_job_data_insert_key_value( + job_id, "end_time", end_time_str, experiment_id + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/routers/compute_provider.py` around lines 1789 - 1802, The code calls job_service.job_update_job_data_insert_key_value without awaiting it in the Runpod branch, which yields an un-awaited coroutine and may not persist end_time; change the call in the pod_finished block to await job_service.job_update_job_data_insert_key_value(...) before calling job_service.job_update_status so the end_time is persisted reliably (keep the surrounding try/except and the subsequent await job_update_status as-is, and ensure you reference the same job_id and experiment_id parameters).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@api/transformerlab/routers/compute_provider.py`:
- Around line 1789-1802: The code calls
job_service.job_update_job_data_insert_key_value without awaiting it in the
Runpod branch, which yields an un-awaited coroutine and may not persist
end_time; change the call in the pod_finished block to await
job_service.job_update_job_data_insert_key_value(...) before calling
job_service.job_update_status so the end_time is persisted reliably (keep the
surrounding try/except and the subsequent await job_update_status as-is, and
ensure you reference the same job_id and experiment_id parameters).
Implement error handling for connection issues in SkyPilot requests, providing clear error messages when the server is unreachable. This enhances user feedback during network failures.
How to test:
Summary by CodeRabbit
Bug Fixes
Refactor