Fix job not found errors when running with local providers#1397
Fix job not found errors when running with local providers#1397
Conversation
📝 WalkthroughWalkthroughIntroduces team-scoped context propagation across the local provider cluster launch pipeline. Changes add per-job workspace HOME setup, refactor workspace directory injection to eliminate organization context mutation, add team_id parameter forwarding through sweep and enqueue paths, and enhance the local launch worker with organization context management, provider validation, and critical-section execution. Changes
Sequence Diagram(s)sequenceDiagram
participant Router as Compute Provider Router
participant Queue as Local Provider Queue
participant Worker as Queue Worker
participant OrgCtx as Organization Context
participant Provider as LocalProvider
Router->>Queue: enqueue_local_launch(..., team_id)
Queue->>Queue: Create LocalLaunchWorkItem(team_id)
Worker->>OrgCtx: set_current_org_id(item.team_id)
Worker->>OrgCtx: lab_dirs.set_organization_id(item.team_id)
Worker->>Provider: Fetch provider by ID
alt Provider Exists
Worker->>Provider: launch_cluster (within critical section)
Provider-->>Worker: launch_result
Worker->>Worker: Store provider_launch_result & orchestrator_request_id
else Provider Missing
Worker->>Worker: Mark job as FAILED
Worker->>Worker: Release quota hold if present
end
Worker->>OrgCtx: Clear organization context (finally block)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Comment |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
api/transformerlab/routers/compute_provider.py (2)
2497-2505:⚠️ Potential issue | 🟠 Major
resume_from_checkpointinconsistently mutates org context and bypasses the serialization queue for LOCAL provider.Two issues:
This block still uses the old
set_organization_id(team_id)/try/finally/set_organization_id(None)pattern that was deliberately removed fromlaunch_template_on_provider(Lines 1508-1515). Since the middleware has already set org context for this request, the mutation is unnecessary and inconsistent.For LOCAL provider,
resume_from_checkpointcallsprovider_instance.launch_clusterdirectly viaasyncio.to_thread(Line 2584) instead of going throughenqueue_local_launch. This bypasses_worker_lockand the serialization queue, potentially running multiple local launches concurrently — contradicting the queue's invariant.♻️ Proposed fix — align with launch_template_on_provider pattern
- # For local provider, set TFL_WORKSPACE_DIR so the lab SDK in the subprocess finds the job dir - if provider.type == ProviderType.LOCAL.value and team_id: - set_organization_id(team_id) - try: - workspace_dir = await get_workspace_dir() - if workspace_dir and not storage.is_remote_path(workspace_dir): - env_vars["TFL_WORKSPACE_DIR"] = workspace_dir - finally: - set_organization_id(None) + # For local provider, set TFL_WORKSPACE_DIR so the lab SDK in the subprocess finds the job dir. + # Org context is already set by authentication middleware. + if provider.type == ProviderType.LOCAL.value and team_id: + workspace_dir = await get_workspace_dir() + if workspace_dir and not storage.is_remote_path(workspace_dir): + env_vars["TFL_WORKSPACE_DIR"] = workspace_dirFor the queue bypass, route through
enqueue_local_launchfor LOCAL provider (similar tolaunch_template_on_providerLines 1657-1679) instead of callinglaunch_clusterdirectly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/routers/compute_provider.py` around lines 2497 - 2505, The resume_from_checkpoint path mutates org context with set_organization_id(team_id)/try/finally and bypasses the local-launch serialization by calling provider_instance.launch_cluster via asyncio.to_thread; remove the unnecessary set_organization_id(...) calls (the request middleware already sets org context) so resume_from_checkpoint matches launch_template_on_provider, and for ProviderType.LOCAL.value change the direct launch to use the existing enqueue_local_launch flow (instead of asyncio.to_thread(provider_instance.launch_cluster...)) so the call is serialized under _worker_lock/serialization queue just like launch_template_on_provider; update any surrounding logic to pass the same arguments used by enqueue_local_launch.
1069-1076:⚠️ Potential issue | 🟠 Major
_launch_sweep_jobsredundantly re-sets org context and LOCAL sweep child jobs bypass the serialization queue.
set_current_org_id(team_id)andlab_set_org_id(team_id)are already called at Lines 979-981 at the top of_launch_sweep_jobs. Theset_organization_id(team_id)block at Line 1070 redundantly re-sets the same value and then immediately clears it infinally, transiently resetting the context toNonemid-loop if an exception is thrown inget_workspace_dir.More critically, LOCAL provider sweep child jobs call
provider_instance.launch_clusterdirectly viaasyncio.to_thread(Line 1209) rather thanenqueue_local_launch, bypassing_worker_lockand allowing concurrent local launches that the queue was introduced to serialize.♻️ Proposed fix
- if provider.type == ProviderType.LOCAL.value and team_id: - set_organization_id(team_id) - try: - workspace_dir = await get_workspace_dir() - if workspace_dir and not storage.is_remote_path(workspace_dir): - env_vars["TFL_WORKSPACE_DIR"] = workspace_dir - finally: - set_organization_id(None) + if provider.type == ProviderType.LOCAL.value and team_id: + workspace_dir = await get_workspace_dir() + if workspace_dir and not storage.is_remote_path(workspace_dir): + env_vars["TFL_WORKSPACE_DIR"] = workspace_dirFor the queue bypass, LOCAL provider child sweep jobs should be routed through
enqueue_local_launchto respect the serialization invariant.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/routers/compute_provider.py` around lines 1069 - 1076, Remove the redundant org-context toggling in _launch_sweep_jobs by deleting the set_organization_id(team_id)/finally block around get_workspace_dir (since set_current_org_id(team_id) and lab_set_org_id(team_id) are already applied at the top); instead, ensure get_workspace_dir reads the already-set context. Also fix the LOCAL provider child job path so it uses enqueue_local_launch rather than calling provider_instance.launch_cluster via asyncio.to_thread (which bypasses _worker_lock); replace the direct asyncio.to_thread(provider_instance.launch_cluster, ...) call with a call that enqueues the launch through enqueue_local_launch so local launches are serialized by _worker_lock.api/transformerlab/compute_providers/local.py (2)
244-259:⚠️ Potential issue | 🔴 CriticalPre-existing logic inversion in
get_cluster_statusdefeats the PR's fix.
os.kill(pid, 0)returnsNoneon success (process alive) and raises an exception when the process is gone. The current branchif os_killed is not Noneis therefore alwaysFalse, makingClusterState.UPdead code and every running local job appear asDOWN. This causescheck-statusto immediately mark all local jobs asCOMPLETEregardless of actual process state.🐛 Proposed fix
- try: - pid = int(pid_file.read_text().strip()) - os_killed = os.kill(pid, 0) - # Return up only if the process is not running - if os_killed is not None: - return ClusterStatus( - cluster_name=cluster_name, - state=ClusterState.UP, - status_message="Process running", - ) - else: - return ClusterStatus( - cluster_name=cluster_name, - state=ClusterState.DOWN, - status_message="Process not running", - ) + try: + pid = int(pid_file.read_text().strip()) + os.kill(pid, 0) # raises if process is gone + return ClusterStatus( + cluster_name=cluster_name, + state=ClusterState.UP, + status_message="Process running", + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/compute_providers/local.py` around lines 244 - 259, In get_cluster_status, os.kill(pid, 0) returns None on success (process exists) and raises on failure, so the current if os_killed is not None branch is inverted and marks running processes as DOWN; change the logic to call os.kill(pid, 0) inside a try and treat the no-exception path as ClusterState.UP (status_message "Process running"), catch ProcessLookupError (and/or OSError with errno ESRCH) to return ClusterState.DOWN ("Process not running"), treat PermissionError as UP (process exists but not permitted), and handle any other exceptions by logging and returning an appropriate DOWN status; update references to pid_file.read_text(), ClusterStatus and ClusterState.UP/DOWN accordingly.
185-192:⚠️ Potential issue | 🟠 MajorFile descriptor leak in
Popencall.The
open()handles forstdout.logandstderr.logare passed directly and never closed. The parent process will hold these file descriptors open until garbage-collected.🔒 Proposed fix
- proc = subprocess.Popen( - ["/bin/bash", "-c", config.command or "true"], - cwd=str(job_dir), - env=env, - stdout=open(job_dir / "stdout.log", "w"), - stderr=open(job_dir / "stderr.log", "w"), - start_new_session=True, - ) + stdout_log = open(job_dir / "stdout.log", "w") + stderr_log = open(job_dir / "stderr.log", "w") + proc = subprocess.Popen( + ["/bin/bash", "-c", config.command or "true"], + cwd=str(job_dir), + env=env, + stdout=stdout_log, + stderr=stderr_log, + start_new_session=True, + ) + stdout_log.close() + stderr_log.close()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/compute_providers/local.py` around lines 185 - 192, The Popen call leaks file descriptors because open(job_dir / "stdout.log", "w") and stderr are passed directly; fix by opening those files into local variables (e.g., stdout_f, stderr_f) before calling subprocess.Popen, call proc = subprocess.Popen(..., stdout=stdout_f, stderr=stderr_f, ...), and then close the parent-side file objects in a finally block (or immediately after Popen returns) to ensure they are closed even on exceptions; reference the subprocess.Popen invocation and the job_dir/stdout.log and stderr.log file opens when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@api/transformerlab/compute_providers/local.py`:
- Around line 244-259: In get_cluster_status, os.kill(pid, 0) returns None on
success (process exists) and raises on failure, so the current if os_killed is
not None branch is inverted and marks running processes as DOWN; change the
logic to call os.kill(pid, 0) inside a try and treat the no-exception path as
ClusterState.UP (status_message "Process running"), catch ProcessLookupError
(and/or OSError with errno ESRCH) to return ClusterState.DOWN ("Process not
running"), treat PermissionError as UP (process exists but not permitted), and
handle any other exceptions by logging and returning an appropriate DOWN status;
update references to pid_file.read_text(), ClusterStatus and
ClusterState.UP/DOWN accordingly.
- Around line 185-192: The Popen call leaks file descriptors because
open(job_dir / "stdout.log", "w") and stderr are passed directly; fix by opening
those files into local variables (e.g., stdout_f, stderr_f) before calling
subprocess.Popen, call proc = subprocess.Popen(..., stdout=stdout_f,
stderr=stderr_f, ...), and then close the parent-side file objects in a finally
block (or immediately after Popen returns) to ensure they are closed even on
exceptions; reference the subprocess.Popen invocation and the job_dir/stdout.log
and stderr.log file opens when making the change.
In `@api/transformerlab/routers/compute_provider.py`:
- Around line 2497-2505: The resume_from_checkpoint path mutates org context
with set_organization_id(team_id)/try/finally and bypasses the local-launch
serialization by calling provider_instance.launch_cluster via asyncio.to_thread;
remove the unnecessary set_organization_id(...) calls (the request middleware
already sets org context) so resume_from_checkpoint matches
launch_template_on_provider, and for ProviderType.LOCAL.value change the direct
launch to use the existing enqueue_local_launch flow (instead of
asyncio.to_thread(provider_instance.launch_cluster...)) so the call is
serialized under _worker_lock/serialization queue just like
launch_template_on_provider; update any surrounding logic to pass the same
arguments used by enqueue_local_launch.
- Around line 1069-1076: Remove the redundant org-context toggling in
_launch_sweep_jobs by deleting the set_organization_id(team_id)/finally block
around get_workspace_dir (since set_current_org_id(team_id) and
lab_set_org_id(team_id) are already applied at the top); instead, ensure
get_workspace_dir reads the already-set context. Also fix the LOCAL provider
child job path so it uses enqueue_local_launch rather than calling
provider_instance.launch_cluster via asyncio.to_thread (which bypasses
_worker_lock); replace the direct
asyncio.to_thread(provider_instance.launch_cluster, ...) call with a call that
enqueues the launch through enqueue_local_launch so local launches are
serialized by _worker_lock.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
api/transformerlab/compute_providers/local.pyapi/transformerlab/routers/compute_provider.pyapi/transformerlab/services/local_provider_queue.py
Paragon SummaryThis pull request review identified 2 issues across 2 categories in 3 files. The review analyzed code changes, potential bugs, security vulnerabilities, performance issues, and code quality concerns using automated analysis tools. This PR fixes "job not found" errors occurring when running with local providers by correcting job path resolution and setting the home directory to be within Key changes:
Confidence score: 2/5
3 files reviewed, 2 comments Severity breakdown: Critical: 1, High: 1 Tip: |
| job_id=str(job_id), | ||
| experiment_id=str(experiment_id), | ||
| provider_id=str(provider_id), | ||
| team_id=str(team_id), |
There was a problem hiding this comment.
Bug: Stringifying a null team ID creates an invalid organization scope
Stringifying a null team ID creates an invalid organization scope. This completely breaks operations for personal workspaces. Make the team ID optional and pass it without stringification.
View Details
Location: api/transformerlab/services/local_provider_queue.py (lines 49)
Analysis
Stringifying a null team ID creates an invalid organization scope
| What fails | Personal workspaces will be scoped to a literal 'None' organization. |
| Result | The worker calls set_organization_id('None'), forcing all filesystem operations into orgs/None/workspace. |
| Expected | team_id should be Optional[str] and passed without stringification. |
| Impact | Breaks personal workspaces completely by misrouting file operations to orgs/None/workspace. |
How to reproduce
Launch a job without a team (team_id=None).Patch Details
- team_id=str(team_id),\n+ team_id=team_id,AI Fix Prompt
Fix this issue: Stringifying a null team ID creates an invalid organization scope. This completely breaks operations for personal workspaces. Make the team ID optional and pass it without stringification.
Location: api/transformerlab/services/local_provider_queue.py (lines 49)
Problem: Personal workspaces will be scoped to a literal 'None' organization.
Current behavior: The worker calls set_organization_id('None'), forcing all filesystem operations into orgs/None/workspace.
Expected: team_id should be Optional[str] and passed without stringification.
Steps to reproduce: Launch a job without a team (team_id=None).
Provide a code fix.
Tip: Reply with @paragon-run to automatically fix this issue
| env = os.environ.copy() | ||
| env.update(config.env_vars or {}) | ||
| env["PATH"] = f"{venv_bin}{os.pathsep}{env.get('PATH', '')}" | ||
| env["HOME"] = str(workspace_home) |
There was a problem hiding this comment.
Bug: Overriding HOME breaks shared model and package caches
Overriding HOME breaks shared model and package caches. Ephemeral jobs will waste time redownloading gigabytes of data. Explicitly set host cache paths in the subprocess environment.
View Details
Location: api/transformerlab/compute_providers/local.py (lines 164)
Analysis
Overriding HOME breaks shared model and package caches
| What fails | Shared models and packages will not be reused across jobs. |
| Result | Models are downloaded to the ephemeral workspace HOME instead of the shared ~/.cache directory. |
| Expected | Preserve default cache directories if they aren't explicitly set in the environment. |
| Impact | Local jobs will waste disk space and time re-downloading gigabytes of models from scratch. |
How to reproduce
Launch a local job that downloads Hugging Face models.Patch Details
- env["HOME"] = str(workspace_home)\n+ if "HF_HOME" not in env:\n+ env["HF_HOME"] = str(Path.home() / ".cache" / "huggingface")\n+ env["HOME"] = str(workspace_home)AI Fix Prompt
Fix this issue: Overriding HOME breaks shared model and package caches. Ephemeral jobs will waste time redownloading gigabytes of data. Explicitly set host cache paths in the subprocess environment.
Location: api/transformerlab/compute_providers/local.py (lines 164)
Problem: Shared models and packages will not be reused across jobs.
Current behavior: Models are downloaded to the ephemeral workspace HOME instead of the shared ~/.cache directory.
Expected: Preserve default cache directories if they aren't explicitly set in the environment.
Steps to reproduce: Launch a local job that downloads Hugging Face models.
Provide a code fix.
Tip: Reply with @paragon-run to automatically fix this issue
Summary by CodeRabbit
New Features
Bug Fixes