Skip to content

Get rid of job queue json and run over and over#1385

Merged
deep1401 merged 4 commits intomainfrom
remove/job-queue-over
Feb 24, 2026
Merged

Get rid of job queue json and run over and over#1385
deep1401 merged 4 commits intomainfrom
remove/job-queue-over

Conversation

@deep1401
Copy link
Copy Markdown
Member

@deep1401 deep1401 commented Feb 24, 2026

  • Also gets rid of old trigger+workflows functions in job service

Summary by CodeRabbit

  • Chores
    • Removed automatic background job queue management and periodic polling for processing jobs and workflows.
    • Disabled automatic workflow triggering on job completion.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Feb 24, 2026

📝 Walkthrough

Walkthrough

The changes remove background polling mechanisms for job and workflow processing by eliminating a periodic loop invoked every 3 seconds at API startup. Cross-organization queue file management and centralized workflow triggering on job completion are also removed. Quota tracking is retained but refactored as on-demand background task invocation rather than queue-triggered logic.

Changes

Cohort / File(s) Summary
Background polling removal
api/api.py
Removed periodic background task scheduling in startup hook and eliminated the run_over_and_over() coroutine that polled for new jobs and workflow steps every 3 seconds. Also removed the import of workflows module.
Job queue handler
api/transformerlab/routers/experiment/jobs.py
Removed the start_next_job() function that performed cross-organization queue polling and immediate job execution within the background loop context.
Queue and workflow management
api/transformerlab/services/job_service.py
Removed persistent queue file operations (_get_queue_file_path, _get_queued_jobs_from_file, _write_queued_jobs_to_file, _add_job_to_queue, _remove_job_from_queue), cross-org queue polling functions, and centralized workflow triggering on job completion. Refactored quota tracking for REMOTE jobs as background task invocation. Public API signatures remain unchanged.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 No more waiting loops that spin,
The rabbit hops with lighter grin!
Queue files vanish, polling ends,
Background tasks now make amends.
Cleaner code with simpler grace,
Jobs find a better resting place! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 72.73% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main changes: removing the job queue JSON persistence and the periodic 'run over and over' background loop that continuously polled for jobs/workflows.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch remove/job-queue-over

Comment @coderabbitai help to get the list of available commands and usage tips.

@sentry
Copy link
Copy Markdown

sentry bot commented Feb 24, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
api/transformerlab/services/job_service.py (2)

654-657: ⚠️ Potential issue | 🟡 Minor

None can be appended to artifacts when format_artifact fails

format_artifact returns None on exceptions but its result is unconditionally appended here, unlike the consistent if artifact: guard used in get_artifacts_from_sdk (line 616).

🐛 Proposed fix
-    if item:
-        artifact = await format_artifact(file_path, storage)
-        artifacts.append(artifact)
+    if item:
+        artifact = await format_artifact(file_path, storage)
+        if artifact:
+            artifacts.append(artifact)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/services/job_service.py` around lines 654 - 657, The code
appends the result of format_artifact(file_path, storage) to artifacts even when
format_artifact returns None on failure; change the block inside the loop so you
call artifact = await format_artifact(file_path, storage) and only
artifacts.append(artifact) if artifact is truthy (e.g., if artifact:), mirroring
the pattern used in get_artifacts_from_sdk; update the branch that currently
checks item to ensure you perform the conditional append to avoid None entries
in artifacts.

422-439: ⚠️ Potential issue | 🔴 Critical

job is potentially unbound at line 439, silently breaking quota tracking

job is assigned inside the try block. If Job.get(job_id) raises (e.g., job not found), job is never created in the local scope. The ternary if job else {} at line 439 guards against a falsy value, not an unbound name — Python raises NameError here, which is silently swallowed by the inner except on line 453. The result is that quota tracking is silently skipped for any REMOTE job whose first status-update call coincides with a transient Job.get failure.

Also, line 422 carries a stale comment ("for queue management") left over from the removed queue logic.

🐛 Proposed fix
-    # Get old status before updating for queue management
+    job = None
     try:
         job = await Job.get(job_id)
         exp_id = await job.get_experiment_id()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/services/job_service.py` around lines 422 - 439, The
variable job is created only inside the try/except around Job.get(job_id) so a
failure there leaves job unbound and later causes a NameError during quota
tracking; update the logic so job is initialized before the try (e.g., job =
None), preserve error handling but ensure quota-tracking always checks for job
is None (use explicit "if job is None" checks) before calling
job.get_json_data(), and move or rework the terminal-state quota tracking into
the same scope that guarantees job exists (or early-return if job lookup
failed); also remove or update the stale comment "for queue management" to
reflect current behavior. Ensure references: Job.get, job, job.get_json_data,
job.update_status, job.set_error_message, and the terminal status set
("COMPLETE","STOPPED","FAILED","DELETED") are handled.
🧹 Nitpick comments (3)
api/transformerlab/services/job_service.py (3)

584-584: Dict[str, any] uses the Python builtin any, not typing.Any

All three formatting helpers annotate their return as Optional[Dict[str, any]]. Lowercase any is the built-in function, not a type; type checkers flag this. It should be Dict[str, Any] after adding Any to the typing import (noted above in the import comment).

♻️ Proposed fix (all three functions)
-async def format_artifact(file_path: str, storage) -> Optional[Dict[str, any]]:
+async def format_artifact(file_path: str, storage) -> Optional[Dict[str, Any]]:
-async def format_dataset(dir_path: str, storage) -> Optional[Dict[str, any]]:
+async def format_dataset(dir_path: str, storage) -> Optional[Dict[str, Any]]:
-async def format_model(dir_path: str, storage) -> Optional[Dict[str, any]]:
+async def format_model(dir_path: str, storage) -> Optional[Dict[str, Any]]:

Also applies to: 780-780, 798-798

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/services/job_service.py` at line 584, The type annotations
for the formatting helper functions (e.g., format_artifact) use the builtin any
instead of the typing Any; update their return annotations from
Optional[Dict[str, any]] to Optional[Dict[str, Any]] and add Any to the typing
imports (e.g., from typing import Any, Dict, Optional) so type checkers
recognize the correct type; apply the same change to the other two formatting
helpers referenced in the diff.

443-443: Redundant import asyncio inside function body

asyncio is already imported at the module level (line 1); the inner import on line 443 is unnecessary.

♻️ Proposed fix
-                import asyncio
-
                 asyncio.create_task(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/services/job_service.py` at line 443, Remove the redundant
local "import asyncio" statement inside the function and rely on the
module-level asyncio import already present; delete the inner import (the
standalone "import asyncio") so the function uses the top-level asyncio symbol
(e.g., any uses of asyncio.run, asyncio.create_task, etc.) and ensure there are
no shadowing/local redefinitions introduced by the removal.

79-116: Remove unused _find_org_id_for_job function

This function has no callers anywhere in the codebase. It was likely used by the queue-management helpers that were removed in this PR and should be deleted.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/services/job_service.py` around lines 79 - 116, Remove the
unused helper function _find_org_id_for_job from job_service.py: delete the
entire async def _find_org_id_for_job(...) block (including its docstring and
body) and any now-unreferenced imports or helper-only references that become
dead code as a result (e.g., local variables or imports used solely by that
function); ensure no callers remain and run tests/lint to catch leftover unused
symbols.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api/transformerlab/services/job_service.py`:
- Line 4: Add Any to the typing import and replace all occurrences of Dict[str,
any] with Dict[str, Any]; then add missing type annotations for all public
functions listed (job_create, jobs_get_all, jobs_get_by_experiment, job_get,
job_count_running, job_delete_all, job_delete, job_stop, job_update_progress,
job_update_sweep_progress, jobs_get_sweep_children, job_get_sweep_parent)
including argument types and explicit return types, mark internal update
functions (job_update_status, job_update, job_update_status_sync,
job_update_sync, job_update_type_and_status_sync,
_track_quota_for_job_status_change, _record_quota_usage_internal) with -> None,
and annotate all helper functions that accept a storage parameter (e.g.,
format_artifact, get_artifacts_from_sdk, get_artifacts_from_directory) with the
appropriate storage type; ensure all type names reference existing
classes/aliases used in the module so imports resolve.

---

Outside diff comments:
In `@api/transformerlab/services/job_service.py`:
- Around line 654-657: The code appends the result of format_artifact(file_path,
storage) to artifacts even when format_artifact returns None on failure; change
the block inside the loop so you call artifact = await
format_artifact(file_path, storage) and only artifacts.append(artifact) if
artifact is truthy (e.g., if artifact:), mirroring the pattern used in
get_artifacts_from_sdk; update the branch that currently checks item to ensure
you perform the conditional append to avoid None entries in artifacts.
- Around line 422-439: The variable job is created only inside the try/except
around Job.get(job_id) so a failure there leaves job unbound and later causes a
NameError during quota tracking; update the logic so job is initialized before
the try (e.g., job = None), preserve error handling but ensure quota-tracking
always checks for job is None (use explicit "if job is None" checks) before
calling job.get_json_data(), and move or rework the terminal-state quota
tracking into the same scope that guarantees job exists (or early-return if job
lookup failed); also remove or update the stale comment "for queue management"
to reflect current behavior. Ensure references: Job.get, job, job.get_json_data,
job.update_status, job.set_error_message, and the terminal status set
("COMPLETE","STOPPED","FAILED","DELETED") are handled.

---

Nitpick comments:
In `@api/transformerlab/services/job_service.py`:
- Line 584: The type annotations for the formatting helper functions (e.g.,
format_artifact) use the builtin any instead of the typing Any; update their
return annotations from Optional[Dict[str, any]] to Optional[Dict[str, Any]] and
add Any to the typing imports (e.g., from typing import Any, Dict, Optional) so
type checkers recognize the correct type; apply the same change to the other two
formatting helpers referenced in the diff.
- Line 443: Remove the redundant local "import asyncio" statement inside the
function and rely on the module-level asyncio import already present; delete the
inner import (the standalone "import asyncio") so the function uses the
top-level asyncio symbol (e.g., any uses of asyncio.run, asyncio.create_task,
etc.) and ensure there are no shadowing/local redefinitions introduced by the
removal.
- Around line 79-116: Remove the unused helper function _find_org_id_for_job
from job_service.py: delete the entire async def _find_org_id_for_job(...) block
(including its docstring and body) and any now-unreferenced imports or
helper-only references that become dead code as a result (e.g., local variables
or imports used solely by that function); ensure no callers remain and run
tests/lint to catch leftover unused symbols.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fce1a03 and 59001fa.

📒 Files selected for processing (3)
  • api/api.py
  • api/transformerlab/routers/experiment/jobs.py
  • api/transformerlab/services/job_service.py
💤 Files with no reviewable changes (2)
  • api/api.py
  • api/transformerlab/routers/experiment/jobs.py

import json
import os
from typing import List, Dict, Optional, Tuple
from typing import List, Dict, Optional
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Coding guideline: add Any to the import and add missing type hints across public functions

Any is absent from the import yet needed to fix the Dict[str, any] annotations on lines 584, 780, and 798 (lowercase any resolves to the Python built-in, not a type alias). Beyond that, many public functions introduced or touched by this file have no argument or return-type annotations at all, violating the mandatory type-hint guideline for api/**/*.py:

  • job_create, jobs_get_all, jobs_get_by_experiment, job_get, job_count_running, job_delete_all, job_delete, job_stop, job_update_progress, job_update_sweep_progress, jobs_get_sweep_children, job_get_sweep_parent — all arguments and return type missing.
  • job_update_status, job_update, job_update_status_sync, job_update_sync, job_update_type_and_status_sync, _track_quota_for_job_status_change, _record_quota_usage_internal — return type -> None missing.
  • All storage-accepting helpers (format_artifact, get_artifacts_from_sdk, get_artifacts_from_directory, etc.) — storage parameter untyped.
🔧 Minimal import fix
-from typing import List, Dict, Optional
+from typing import Any, Dict, List, Optional

And replace every Dict[str, any] with Dict[str, Any].

As per coding guidelines, api/**/*.py requires mandatory type hints for all Python function arguments and return types.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from typing import List, Dict, Optional
from typing import Any, Dict, List, Optional
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/transformerlab/services/job_service.py` at line 4, Add Any to the typing
import and replace all occurrences of Dict[str, any] with Dict[str, Any]; then
add missing type annotations for all public functions listed (job_create,
jobs_get_all, jobs_get_by_experiment, job_get, job_count_running,
job_delete_all, job_delete, job_stop, job_update_progress,
job_update_sweep_progress, jobs_get_sweep_children, job_get_sweep_parent)
including argument types and explicit return types, mark internal update
functions (job_update_status, job_update, job_update_status_sync,
job_update_sync, job_update_type_and_status_sync,
_track_quota_for_job_status_change, _record_quota_usage_internal) with -> None,
and annotate all helper functions that accept a storage parameter (e.g.,
format_artifact, get_artifacts_from_sdk, get_artifacts_from_directory) with the
appropriate storage type; ensure all type names reference existing
classes/aliases used in the module so imports resolve.

@deep1401 deep1401 merged commit 99154d3 into main Feb 24, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants