Get rid of job queue json and run over and over#1385
Conversation
📝 WalkthroughWalkthroughThe changes remove background polling mechanisms for job and workflow processing by eliminating a periodic loop invoked every 3 seconds at API startup. Cross-organization queue file management and centralized workflow triggering on job completion are also removed. Quota tracking is retained but refactored as on-demand background task invocation rather than queue-triggered logic. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
api/transformerlab/services/job_service.py (2)
654-657:⚠️ Potential issue | 🟡 Minor
Nonecan be appended toartifactswhenformat_artifactfails
format_artifactreturnsNoneon exceptions but its result is unconditionally appended here, unlike the consistentif artifact:guard used inget_artifacts_from_sdk(line 616).🐛 Proposed fix
- if item: - artifact = await format_artifact(file_path, storage) - artifacts.append(artifact) + if item: + artifact = await format_artifact(file_path, storage) + if artifact: + artifacts.append(artifact)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/services/job_service.py` around lines 654 - 657, The code appends the result of format_artifact(file_path, storage) to artifacts even when format_artifact returns None on failure; change the block inside the loop so you call artifact = await format_artifact(file_path, storage) and only artifacts.append(artifact) if artifact is truthy (e.g., if artifact:), mirroring the pattern used in get_artifacts_from_sdk; update the branch that currently checks item to ensure you perform the conditional append to avoid None entries in artifacts.
422-439:⚠️ Potential issue | 🔴 Critical
jobis potentially unbound at line 439, silently breaking quota tracking
jobis assigned inside thetryblock. IfJob.get(job_id)raises (e.g., job not found),jobis never created in the local scope. The ternaryif job else {}at line 439 guards against a falsy value, not an unbound name — Python raisesNameErrorhere, which is silently swallowed by the innerexcepton line 453. The result is that quota tracking is silently skipped for any REMOTE job whose first status-update call coincides with a transientJob.getfailure.Also, line 422 carries a stale comment ("for queue management") left over from the removed queue logic.
🐛 Proposed fix
- # Get old status before updating for queue management + job = None try: job = await Job.get(job_id) exp_id = await job.get_experiment_id()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/services/job_service.py` around lines 422 - 439, The variable job is created only inside the try/except around Job.get(job_id) so a failure there leaves job unbound and later causes a NameError during quota tracking; update the logic so job is initialized before the try (e.g., job = None), preserve error handling but ensure quota-tracking always checks for job is None (use explicit "if job is None" checks) before calling job.get_json_data(), and move or rework the terminal-state quota tracking into the same scope that guarantees job exists (or early-return if job lookup failed); also remove or update the stale comment "for queue management" to reflect current behavior. Ensure references: Job.get, job, job.get_json_data, job.update_status, job.set_error_message, and the terminal status set ("COMPLETE","STOPPED","FAILED","DELETED") are handled.
🧹 Nitpick comments (3)
api/transformerlab/services/job_service.py (3)
584-584:Dict[str, any]uses the Python builtinany, nottyping.AnyAll three formatting helpers annotate their return as
Optional[Dict[str, any]]. Lowercaseanyis the built-in function, not a type; type checkers flag this. It should beDict[str, Any]after addingAnyto the typing import (noted above in the import comment).♻️ Proposed fix (all three functions)
-async def format_artifact(file_path: str, storage) -> Optional[Dict[str, any]]: +async def format_artifact(file_path: str, storage) -> Optional[Dict[str, Any]]:-async def format_dataset(dir_path: str, storage) -> Optional[Dict[str, any]]: +async def format_dataset(dir_path: str, storage) -> Optional[Dict[str, Any]]:-async def format_model(dir_path: str, storage) -> Optional[Dict[str, any]]: +async def format_model(dir_path: str, storage) -> Optional[Dict[str, Any]]:Also applies to: 780-780, 798-798
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/services/job_service.py` at line 584, The type annotations for the formatting helper functions (e.g., format_artifact) use the builtin any instead of the typing Any; update their return annotations from Optional[Dict[str, any]] to Optional[Dict[str, Any]] and add Any to the typing imports (e.g., from typing import Any, Dict, Optional) so type checkers recognize the correct type; apply the same change to the other two formatting helpers referenced in the diff.
443-443: Redundantimport asyncioinside function body
asynciois already imported at the module level (line 1); the inner import on line 443 is unnecessary.♻️ Proposed fix
- import asyncio - asyncio.create_task(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/services/job_service.py` at line 443, Remove the redundant local "import asyncio" statement inside the function and rely on the module-level asyncio import already present; delete the inner import (the standalone "import asyncio") so the function uses the top-level asyncio symbol (e.g., any uses of asyncio.run, asyncio.create_task, etc.) and ensure there are no shadowing/local redefinitions introduced by the removal.
79-116: Remove unused_find_org_id_for_jobfunctionThis function has no callers anywhere in the codebase. It was likely used by the queue-management helpers that were removed in this PR and should be deleted.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/transformerlab/services/job_service.py` around lines 79 - 116, Remove the unused helper function _find_org_id_for_job from job_service.py: delete the entire async def _find_org_id_for_job(...) block (including its docstring and body) and any now-unreferenced imports or helper-only references that become dead code as a result (e.g., local variables or imports used solely by that function); ensure no callers remain and run tests/lint to catch leftover unused symbols.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@api/transformerlab/services/job_service.py`:
- Line 4: Add Any to the typing import and replace all occurrences of Dict[str,
any] with Dict[str, Any]; then add missing type annotations for all public
functions listed (job_create, jobs_get_all, jobs_get_by_experiment, job_get,
job_count_running, job_delete_all, job_delete, job_stop, job_update_progress,
job_update_sweep_progress, jobs_get_sweep_children, job_get_sweep_parent)
including argument types and explicit return types, mark internal update
functions (job_update_status, job_update, job_update_status_sync,
job_update_sync, job_update_type_and_status_sync,
_track_quota_for_job_status_change, _record_quota_usage_internal) with -> None,
and annotate all helper functions that accept a storage parameter (e.g.,
format_artifact, get_artifacts_from_sdk, get_artifacts_from_directory) with the
appropriate storage type; ensure all type names reference existing
classes/aliases used in the module so imports resolve.
---
Outside diff comments:
In `@api/transformerlab/services/job_service.py`:
- Around line 654-657: The code appends the result of format_artifact(file_path,
storage) to artifacts even when format_artifact returns None on failure; change
the block inside the loop so you call artifact = await
format_artifact(file_path, storage) and only artifacts.append(artifact) if
artifact is truthy (e.g., if artifact:), mirroring the pattern used in
get_artifacts_from_sdk; update the branch that currently checks item to ensure
you perform the conditional append to avoid None entries in artifacts.
- Around line 422-439: The variable job is created only inside the try/except
around Job.get(job_id) so a failure there leaves job unbound and later causes a
NameError during quota tracking; update the logic so job is initialized before
the try (e.g., job = None), preserve error handling but ensure quota-tracking
always checks for job is None (use explicit "if job is None" checks) before
calling job.get_json_data(), and move or rework the terminal-state quota
tracking into the same scope that guarantees job exists (or early-return if job
lookup failed); also remove or update the stale comment "for queue management"
to reflect current behavior. Ensure references: Job.get, job, job.get_json_data,
job.update_status, job.set_error_message, and the terminal status set
("COMPLETE","STOPPED","FAILED","DELETED") are handled.
---
Nitpick comments:
In `@api/transformerlab/services/job_service.py`:
- Line 584: The type annotations for the formatting helper functions (e.g.,
format_artifact) use the builtin any instead of the typing Any; update their
return annotations from Optional[Dict[str, any]] to Optional[Dict[str, Any]] and
add Any to the typing imports (e.g., from typing import Any, Dict, Optional) so
type checkers recognize the correct type; apply the same change to the other two
formatting helpers referenced in the diff.
- Line 443: Remove the redundant local "import asyncio" statement inside the
function and rely on the module-level asyncio import already present; delete the
inner import (the standalone "import asyncio") so the function uses the
top-level asyncio symbol (e.g., any uses of asyncio.run, asyncio.create_task,
etc.) and ensure there are no shadowing/local redefinitions introduced by the
removal.
- Around line 79-116: Remove the unused helper function _find_org_id_for_job
from job_service.py: delete the entire async def _find_org_id_for_job(...) block
(including its docstring and body) and any now-unreferenced imports or
helper-only references that become dead code as a result (e.g., local variables
or imports used solely by that function); ensure no callers remain and run
tests/lint to catch leftover unused symbols.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
api/api.pyapi/transformerlab/routers/experiment/jobs.pyapi/transformerlab/services/job_service.py
💤 Files with no reviewable changes (2)
- api/api.py
- api/transformerlab/routers/experiment/jobs.py
| import json | ||
| import os | ||
| from typing import List, Dict, Optional, Tuple | ||
| from typing import List, Dict, Optional |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Coding guideline: add Any to the import and add missing type hints across public functions
Any is absent from the import yet needed to fix the Dict[str, any] annotations on lines 584, 780, and 798 (lowercase any resolves to the Python built-in, not a type alias). Beyond that, many public functions introduced or touched by this file have no argument or return-type annotations at all, violating the mandatory type-hint guideline for api/**/*.py:
job_create,jobs_get_all,jobs_get_by_experiment,job_get,job_count_running,job_delete_all,job_delete,job_stop,job_update_progress,job_update_sweep_progress,jobs_get_sweep_children,job_get_sweep_parent— all arguments and return type missing.job_update_status,job_update,job_update_status_sync,job_update_sync,job_update_type_and_status_sync,_track_quota_for_job_status_change,_record_quota_usage_internal— return type-> Nonemissing.- All
storage-accepting helpers (format_artifact,get_artifacts_from_sdk,get_artifacts_from_directory, etc.) —storageparameter untyped.
🔧 Minimal import fix
-from typing import List, Dict, Optional
+from typing import Any, Dict, List, OptionalAnd replace every Dict[str, any] with Dict[str, Any].
As per coding guidelines, api/**/*.py requires mandatory type hints for all Python function arguments and return types.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from typing import List, Dict, Optional | |
| from typing import Any, Dict, List, Optional |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@api/transformerlab/services/job_service.py` at line 4, Add Any to the typing
import and replace all occurrences of Dict[str, any] with Dict[str, Any]; then
add missing type annotations for all public functions listed (job_create,
jobs_get_all, jobs_get_by_experiment, job_get, job_count_running,
job_delete_all, job_delete, job_stop, job_update_progress,
job_update_sweep_progress, jobs_get_sweep_children, job_get_sweep_parent)
including argument types and explicit return types, mark internal update
functions (job_update_status, job_update, job_update_status_sync,
job_update_sync, job_update_type_and_status_sync,
_track_quota_for_job_status_change, _record_quota_usage_internal) with -> None,
and annotate all helper functions that accept a storage parameter (e.g.,
format_artifact, get_artifacts_from_sdk, get_artifacts_from_directory) with the
appropriate storage type; ensure all type names reference existing
classes/aliases used in the module so imports resolve.
Summary by CodeRabbit