Skip to content

Make compute providers calls async to not block thread and handle connection errors in SkyPilot requests#1362

Merged
aliasaria merged 5 commits intomainfrom
fix/handle-connection-errors-in-skypilot
Feb 19, 2026
Merged

Make compute providers calls async to not block thread and handle connection errors in SkyPilot requests#1362
aliasaria merged 5 commits intomainfrom
fix/handle-connection-errors-in-skypilot

Conversation

@aliasaria
Copy link
Copy Markdown
Member

@aliasaria aliasaria commented Feb 19, 2026

Implement error handling for connection issues in SkyPilot requests, providing clear error messages when the server is unreachable. This enhances user feedback during network failures.

How to test:

  • Edit an existing skypilot server you have but change the IP to a failed server
  • This will result in calls timing out
  • Without this fix, these calls will pile up and your server will become unresponsive after a while

Summary by CodeRabbit

  • Bug Fixes

    • Connectivity to external compute providers now fails gracefully: network errors produce clear, structured error responses or safe defaults (e.g., UNKNOWN status, empty lists) instead of crashing.
    • Provider operations log concise errors and return predictable results on timeouts/connection failures.
  • Refactor

    • Blocking provider calls moved to worker threads to keep the event loop responsive and prevent request hangs.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Feb 19, 2026

📝 Walkthrough

Walkthrough

Adds resilient SkyPilot provider error handling (standardized ConnectionError conversion, structured fallbacks) and converts multiple blocking provider calls to asynchronous thread-offloaded executions via asyncio.to_thread across router and job endpoints to avoid event-loop blocking.

Changes

Cohort / File(s) Summary
SkyPilot Error Handling & Resilience
api/transformerlab/compute_providers/skypilot.py
Standardize and centralize ConnectionError/Timeout handling inside _make_authenticated_request; wrap provider methods (launch_cluster, stop_cluster, get_cluster_status, list_clusters, submit_job, get_job_logs, cancel_job, list_jobs, etc.) to catch ConnectionError and return structured error responses or safe defaults (e.g., {"status":"error",...}, UNKNOWN, empty lists). No public signatures changed.
Async thread-offloading of provider calls
api/transformerlab/routers/compute_provider.py
Replaced synchronous provider calls with asyncio.to_thread(...) wrappers across many routes (launch, stop, status, list, cancel, resources, quota handling, background sweep tasks). Internal control flow adjusted to await threaded results while preserving external route signatures.
Async provider log/job lookups
api/transformerlab/routers/experiment/jobs.py
Offloaded blocking provider calls (list_jobs, get_job_logs) to asyncio.to_thread(...), preserved NotImplementedError handling and fallback logic (use cluster_name as provider_job_id when listing not implemented).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Router as API Router (async)
    participant Thread as Worker Thread
    participant Provider as ComputeProvider (SkyPilot)
    participant SkyPilot as SkyPilot Server

    Client->>Router: API request (e.g., get_cluster_status / launch)
    Router->>Router: wrap blocking call with asyncio.to_thread
    Router->>Thread: invoke provider.method(...)
    Thread->>Provider: provider.method(...)
    Provider->>SkyPilot: HTTP / SDK call
    alt SkyPilot responds
        SkyPilot-->>Provider: success/response
        Provider-->>Thread: parsed result
    else ConnectionError / Timeout
        SkyPilot-->>Provider: connection failure
        Provider-->>Thread: raise standardized ConnectionError
    end
    Thread-->>Router: return result or structured error/fallback
    Router-->>Client: respond with result or stable error structure
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 I hopped through threads and tiny waits,
Caught lost connections by gentle gates,
No crashing thump — just tidy fallbacks,
Logs hum softly, no frantic flaps.
A rabbit's patch keeps services straight. 🥕

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely summarizes the two main changes: making compute provider calls async and handling connection errors in SkyPilot requests, matching the substantial changes across three files.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/handle-connection-errors-in-skypilot

Comment @coderabbitai help to get the list of available commands and usage tips.

@sentry
Copy link
Copy Markdown

sentry bot commented Feb 19, 2026

Codecov Report

❌ Patch coverage is 0% with 64 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
api/transformerlab/compute_providers/skypilot.py 0.00% 48 Missing ⚠️
api/transformerlab/routers/compute_provider.py 0.00% 12 Missing ⚠️
api/transformerlab/routers/experiment/jobs.py 0.00% 4 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
api/transformerlab/compute_providers/skypilot.py (1)

429-433: Consider using the logging module instead of print().

The file imports logging at line 6, but error messages use print(). For consistency and better observability (log levels, formatting, handlers), consider using logging.error() or logging.warning() throughout.

This applies to all similar error handling blocks in this file (lines 432, 477, 557, 753, 1039, 1105, 1185, 1230).

Example fix
         except ConnectionError as e:
-            print(f"Failed to launch cluster {cluster_name}: {e}")
+            logging.error(f"Failed to launch cluster {cluster_name}: {e}")
             return {"status": "error", "message": str(e)}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/compute_providers/skypilot.py` around lines 429 - 433,
Replace print() calls in the exception handlers with the logging module (e.g.,
logging.error or logging.exception) so messages use proper log levels and
handlers; specifically in the try/except around the call to
self._make_authenticated_request("POST", "/launch", ...) (and the other similar
blocks mentioned) change print(f"Failed to launch cluster {cluster_name}: {e}")
to logging.error or logging.exception including context (cluster_name) and the
exception details (use exc_info=True or logging.exception for stack trace) so
all error paths in the methods that call _make_authenticated_request produce
structured logs instead of printing to stdout.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api/transformerlab/compute_providers/skypilot.py`:
- Around line 1102-1106: The f-string print in the except block after calling
self._make_authenticated_request is over the 120-char Ruff limit; run `ruff
format` or manually reflow the line so it stays under 120 characters (e.g.,
assign the formatted message to a local variable or split the string before
calling print/log), preserving the same exception handling and return ""
behavior and keeping references to job_id, cluster_name and exception e.

---

Nitpick comments:
In `@api/transformerlab/compute_providers/skypilot.py`:
- Around line 429-433: Replace print() calls in the exception handlers with the
logging module (e.g., logging.error or logging.exception) so messages use proper
log levels and handlers; specifically in the try/except around the call to
self._make_authenticated_request("POST", "/launch", ...) (and the other similar
blocks mentioned) change print(f"Failed to launch cluster {cluster_name}: {e}")
to logging.error or logging.exception including context (cluster_name) and the
exception details (use exc_info=True or logging.exception for stack trace) so
all error paths in the methods that call _make_authenticated_request produce
structured logs instead of printing to stdout.

Comment thread api/transformerlab/compute_providers/skypilot.py
@aliasaria aliasaria changed the title Handle connection errors in SkyPilot requests Make compute providers calls async to not block thread and handle connection errors in SkyPilot requests Feb 19, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
api/transformerlab/routers/experiment/jobs.py (2)

298-323: ⚠️ Potential issue | 🟠 Major

Initialize provider_jobs before list_jobs to avoid UnboundLocalError.

Line 309 reads provider_jobs even when Line 299 raises NotImplementedError, which leaves the variable undefined and will crash the request (e.g., Runpod path). Initialize to an empty list before the try block.

🛠️ Proposed fix
     if provider_job_id is None:
+        provider_jobs = []
         try:
             provider_jobs = await asyncio.to_thread(provider_instance.list_jobs, cluster_name)
         except NotImplementedError:
             # Provider doesn't support listing jobs (e.g., Runpod)
             # For Runpod, we can't determine a job_id, so we'll use the cluster_name as a fallback
             # or return a message that logs aren't available via job_id
             provider_job_id = cluster_name  # Use cluster_name as fallback identifier
             provider_job_candidates = []
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/routers/experiment/jobs.py` around lines 298 - 323, The
variable provider_jobs is read after the try/except but may be undefined if
provider_instance.list_jobs raises NotImplementedError; initialize provider_jobs
= [] before the try so the NotImplementedError branch and subsequent logic (the
provider_job_id and provider_job_candidates handling and the provider_jobs list
comprehension) operate safely; ensure provider_job_id and
provider_job_candidates assignments remain in the NotImplementedError branch as
fallbacks and that the later block that inspects provider_jobs (the
running_states/chosen_job logic and list comprehension) can handle an empty
provider_jobs list.

334-340: 🛠️ Refactor suggestion | 🟠 Major

Move provider log retrieval into a service layer.

Line 334 performs provider I/O inside the router. Extract this logic into api/transformerlab/services/ and keep the router focused on request/response handling. As per coding guidelines: "Place business logic in api/transformerlab/services/ using the Service pattern; routers should only handle HTTP request/response validation and call services".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/routers/experiment/jobs.py` around lines 334 - 340, The
router is doing provider I/O by calling
asyncio.to_thread(provider_instance.get_job_logs...) and assigning raw_logs;
move that call into a new service function (e.g., create a method like
get_provider_job_logs in api/transformerlab/services/experiment_jobs_service or
similar) that accepts (provider_instance, cluster_name, provider_job_id,
tail_lines, follow=False), performs the
asyncio.to_thread(provider_instance.get_job_logs, ...) and returns raw_logs or
raises errors; then replace the inline asyncio.to_thread call in the router with
a simple call to this service method (await
ExperimentJobsService.get_provider_job_logs(...)) so the router only handles
request/response and the I/O lives in the services layer.
api/transformerlab/routers/compute_provider.py (1)

731-737: 🛠️ Refactor suggestion | 🟠 Major

Move provider checks into a service layer.

Line 735 calls provider_instance.check() directly in the router. Per the service pattern, extract provider interactions into api/transformerlab/services/ and have the router call that service instead. As per coding guidelines: "Place business logic in api/transformerlab/services/ using the Service pattern; routers should only handle HTTP request/response validation and call services".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/routers/compute_provider.py` around lines 731 - 737,
Extract the provider check logic out of the router into a service function
(e.g., create a new function check_provider_active in
api/transformerlab/services/compute_provider_service) that takes (provider,
user_id, team_id), calls get_provider_instance(provider, user_id=user_id,
team_id=team_id), invokes provider_instance.check via asyncio.to_thread and
returns the boolean result; then update the router to call this service function
instead of calling provider_instance.check directly (replace the direct call to
provider_instance.check and await asyncio.to_thread with a call to
check_provider_active).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@api/transformerlab/routers/compute_provider.py`:
- Around line 731-737: Extract the provider check logic out of the router into a
service function (e.g., create a new function check_provider_active in
api/transformerlab/services/compute_provider_service) that takes (provider,
user_id, team_id), calls get_provider_instance(provider, user_id=user_id,
team_id=team_id), invokes provider_instance.check via asyncio.to_thread and
returns the boolean result; then update the router to call this service function
instead of calling provider_instance.check directly (replace the direct call to
provider_instance.check and await asyncio.to_thread with a call to
check_provider_active).

In `@api/transformerlab/routers/experiment/jobs.py`:
- Around line 298-323: The variable provider_jobs is read after the try/except
but may be undefined if provider_instance.list_jobs raises NotImplementedError;
initialize provider_jobs = [] before the try so the NotImplementedError branch
and subsequent logic (the provider_job_id and provider_job_candidates handling
and the provider_jobs list comprehension) operate safely; ensure provider_job_id
and provider_job_candidates assignments remain in the NotImplementedError branch
as fallbacks and that the later block that inspects provider_jobs (the
running_states/chosen_job logic and list comprehension) can handle an empty
provider_jobs list.
- Around line 334-340: The router is doing provider I/O by calling
asyncio.to_thread(provider_instance.get_job_logs...) and assigning raw_logs;
move that call into a new service function (e.g., create a method like
get_provider_job_logs in api/transformerlab/services/experiment_jobs_service or
similar) that accepts (provider_instance, cluster_name, provider_job_id,
tail_lines, follow=False), performs the
asyncio.to_thread(provider_instance.get_job_logs, ...) and returns raw_logs or
raises errors; then replace the inline asyncio.to_thread call in the router with
a simple call to this service method (await
ExperimentJobsService.get_provider_job_logs(...)) so the router only handles
request/response and the I/O lives in the services layer.

---

Duplicate comments:
In `@api/transformerlab/routers/compute_provider.py`:
- Around line 1565-1567: The router is directly calling provider logic via
provider_instance.launch_cluster with formatted_cluster_name and cluster_config;
move this business logic into a service under api/transformerlab/services (e.g.,
add a ComputeProviderService.launch_cluster method or similar) and have the
router only validate inputs and call that service. Create a service function
that accepts the same parameters (provider_instance, formatted_cluster_name,
cluster_config), performs the
asyncio.to_thread(provider_instance.launch_cluster, ...) call and returns the
result, then replace the direct call in the router with a call to the new
service method.
- Around line 2518-2522: The router is calling
provider_instance.get_clusters_detailed directly (via asyncio.to_thread) which
mixes business logic into the HTTP layer; move this logic into a new service
under api/transformerlab/services (e.g., add a ComputeProviderService with a
method like get_clusters_detailed or list_detailed_clusters) that accepts the
provider identifier/instance, calls provider_instance.get_clusters_detailed
(using asyncio.to_thread if it’s blocking) and returns the clusters; then update
the router to only validate input and await the new service method to get and
return the clusters.
- Around line 1742-1748: The router contains business logic that checks Runpod
provider status (the if provider.type == ProviderType.RUNPOD.value branch, the
asyncio.to_thread(provider_instance.get_cluster_status, cluster_name) call and
the terminal_pod_states / pod_finished evaluation using ClusterState); move this
logic into a new or existing service under api/transformerlab/services/ (e.g., a
method like ComputeProviderService.get_runpod_cluster_status or
ComputeProviderService.is_runpod_pod_finished that encapsulates
get_cluster_status and the terminal state set), return a simple status/boolean
from that service, and update the router to only call that service method and
act on its result so the router has no provider-status business logic.
- Around line 1175-1177: The router currently performs provider launch business
logic by calling asyncio.to_thread(provider_instance.launch_cluster, ...) inside
compute_provider.py; move this logic into a new or existing service under
api/transformerlab/services (e.g., ComputeProviderService or ProviderService) so
the router only validates input and calls the service method; implement a
service method (e.g., launch_cluster or launch_cluster_async) that accepts
formatted_cluster_name and cluster_config, performs the
asyncio.to_thread(provider_instance.launch_cluster, ...) call and returns
launch_result, and update the router to call that service method instead of
invoking provider_instance.launch_cluster directly.
- Around line 2459-2463: The router currently calls
provider_instance.get_cluster_status (passing cluster_name) which mixes business
logic into the HTTP layer; move this call into a service in
api/transformerlab/services (e.g., add a function like get_cluster_status or
get_provider_cluster_status that accepts provider_instance and cluster_name and
returns the status), update the router to call that service function instead,
and ensure the service uses
asyncio.to_thread(provider_instance.get_cluster_status, cluster_name) so the
router only handles request/response validation and delegates business logic to
the new service.
- Around line 2789-2793: The router is calling provider_instance.cancel_job
directly (provider_instance.cancel_job(cluster_name, job_id)); move this
business logic into a service under api/transformerlab/services (e.g., add a
method like ComputeProviderService.cancel_job or cancel_provider_job that
accepts provider_instance, cluster_name, job_id and handles setting
provider_instance.extra_config["workspace_dir"] and invoking
provider_instance.cancel_job via asyncio.to_thread), then update the router to
only validate inputs and call that service method and return its result; ensure
the service encapsulates the asyncio.to_thread call and any error handling so
the router contains no provider cancellation logic.
- Around line 1698-1702: The router contains business logic: directly calling
provider_instance.get_cluster_status and evaluating terminal_states_local
({ClusterState.DOWN, ClusterState.FAILED, ClusterState.STOPPED}) when
provider.type == ProviderType.LOCAL.value; move this logic into a service under
api/transformerlab/services/ (e.g., ComputeProviderService or
ProviderStatusService) by creating a method like
get_local_cluster_status_or_is_terminal(provider_instance, cluster_name) that
performs the asyncio.to_thread(provider_instance.get_cluster_status,
cluster_name) call and returns the status or a boolean indicating terminal
state; then update the router to only call the new service method (referencing
ProviderType.LOCAL, provider_instance.get_cluster_status, and ClusterState) and
handle HTTP request/response without embedding the status-check logic.
- Around line 1791-1794: The router currently calls provider_instance.list_jobs
(in the handler around the try/except block) which is business logic and must be
moved into a service; create a service method (e.g.,
ComputeProviderService.list_provider_jobs or list_jobs_for_cluster in
api/transformerlab/services/) that accepts provider_instance and cluster_name,
calls provider_instance.list_jobs inside asyncio.to_thread, handles
NotImplementedError and returns a normalized result or raises a service-specific
exception, then replace the direct call in the router with a call to that new
service method and let the router only translate service responses into HTTP
responses.
- Around line 2429-2433: The router currently invokes
provider_instance.stop_cluster directly; move that business logic into a service
method (e.g., add a stop_cluster method on the compute provider service such as
ComputeProviderService.stop_cluster or ProviderService.stop_cluster) and have
the router call that service method instead. Implement the service method to
accept the same inputs (cluster_name and provider identifier/context), perform
the provider_instance.stop_cluster call (using asyncio.to_thread if needed) and
return the result or raise service-level errors; update the router to only
validate inputs and call the new service method, returning its response.
- Around line 2489-2493: The router is calling
provider_instance.get_cluster_resources (business logic) directly; move that
call into a service under api/transformerlab/services (e.g., add a method
ComputeProviderService.get_cluster_resources that accepts provider_instance and
cluster_name and returns the resources), update the router handler to only
validate inputs and call ComputeProviderService.get_cluster_resources, and
remove any direct business logic from the router so the router only handles HTTP
request/response and delegates to the new service method.
- Around line 2395-2399: The router is invoking provider_instance.launch_cluster
directly (via await asyncio.to_thread(...))—move this business logic into a
service under api/transformerlab/services: add a method (e.g.,
ComputeProviderService.launch_cluster) that accepts provider_instance,
formatted_cluster_name and cluster_config and performs the
asyncio.to_thread(provider_instance.launch_cluster, ...) call and returns or
raises as appropriate; then update the router to call await
ComputeProviderService.launch_cluster(...) and keep the existing error
handling/response logic there. Ensure the service is unit-testable and the
router only handles validation and the service call.

In `@api/transformerlab/routers/experiment/jobs.py`:
- Around line 422-436: The tunnel-info path can still reference provider_jobs
after a NotImplementedError, so initialize provider_jobs = [] immediately before
the try block that calls provider_instance.list_jobs (the same place where
provider_job_id is checked), then keep the existing except NotImplementedError
handler that sets provider_job_id = cluster_name as the fallback; this ensures
the subsequent logic that computes running_states, chosen_job and
provider_job_id (using JobState, chosen_job.job_id) never accesses an
uninitialized variable.
- Around line 441-447: The router is performing provider I/O by calling
provider_instance.get_job_logs to populate raw_logs; move this logic into a new
or existing service under api/transformerlab/services/ (e.g.,
JobLogService.get_job_logs) and have the router call that service instead.
Specifically, create a service method that accepts cluster_name,
provider_job_id, tail_lines (and follow=False) and performs the
asyncio.to_thread call to provider_instance.get_job_logs, returning the
raw_logs; then replace the direct call in the router (the raw_logs assignment
using provider_instance.get_job_logs) with a call to the service method and use
its result. Ensure the service receives any needed provider_instance via
dependency injection or lookup so the router remains I/O-free.

@aliasaria aliasaria merged commit 546b53c into main Feb 19, 2026
8 of 9 checks passed
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
api/transformerlab/routers/compute_provider.py (1)

1789-1802: ⚠️ Potential issue | 🟠 Major

Await the job_data update in the Runpod branch.

job_update_job_data_insert_key_value is async elsewhere in this file; without await, end_time may not be persisted and you’ll get an un-awaited coroutine.

🔧 Proposed fix
-                    job_service.job_update_job_data_insert_key_value(job_id, "end_time", end_time_str, experiment_id)
+                    await job_service.job_update_job_data_insert_key_value(
+                        job_id, "end_time", end_time_str, experiment_id
+                    )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/routers/compute_provider.py` around lines 1789 - 1802, The
code calls job_service.job_update_job_data_insert_key_value without awaiting it
in the Runpod branch, which yields an un-awaited coroutine and may not persist
end_time; change the call in the pod_finished block to await
job_service.job_update_job_data_insert_key_value(...) before calling
job_service.job_update_status so the end_time is persisted reliably (keep the
surrounding try/except and the subsequent await job_update_status as-is, and
ensure you reference the same job_id and experiment_id parameters).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@api/transformerlab/routers/compute_provider.py`:
- Around line 1789-1802: The code calls
job_service.job_update_job_data_insert_key_value without awaiting it in the
Runpod branch, which yields an un-awaited coroutine and may not persist
end_time; change the call in the pod_finished block to await
job_service.job_update_job_data_insert_key_value(...) before calling
job_service.job_update_status so the end_time is persisted reliably (keep the
surrounding try/except and the subsequent await job_update_status as-is, and
ensure you reference the same job_id and experiment_id parameters).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants