Skip to content

Commit 2b2cb13

Browse files
committed
..
1 parent 5aa1587 commit 2b2cb13

File tree

1 file changed

+22
-11
lines changed

1 file changed

+22
-11
lines changed

src/databricks/labs/ucx/install.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import yaml
1717
from databricks.sdk import WorkspaceClient
18-
from databricks.sdk.errors import NotFound, OperationFailed
18+
from databricks.sdk.errors import InvalidParameterValue, NotFound, OperationFailed
1919
from databricks.sdk.mixins.compute import SemVer
2020
from databricks.sdk.service import compute, jobs
2121
from databricks.sdk.service.sql import EndpointInfoWarehouseType, SpotInstancePolicy
@@ -457,24 +457,35 @@ def _create_jobs(self):
457457
settings = self._job_settings(step_name, remote_wheel)
458458
if self._override_clusters:
459459
settings = self._apply_cluster_overrides(settings, self._override_clusters, wheel_runner)
460-
if step_name in self._state.jobs:
461-
job_id = self._state.jobs[step_name]
462-
logger.info(f"Updating configuration for step={step_name} job_id={job_id}")
463-
self._ws.jobs.reset(job_id, jobs.JobSettings(**settings))
464-
else:
465-
logger.info(f"Creating new job configuration for step={step_name}")
466-
job_id = self._ws.jobs.create(**settings).job_id
467-
self._state.jobs[step_name] = job_id
460+
self._deploy_workflow(step_name, settings)
468461

469462
for step_name, job_id in self._state.jobs.items():
470463
if step_name not in desired_steps:
471-
logger.info(f"Removing job_id={job_id}, as it is no longer needed")
472-
self._ws.jobs.delete(job_id)
464+
try:
465+
logger.info(f"Removing job_id={job_id}, as it is no longer needed")
466+
self._ws.jobs.delete(job_id)
467+
except InvalidParameterValue:
468+
logger.warning(f"step={step_name} does not exist anymore for some reason")
469+
continue
473470

474471
self._state.save()
475472
self._create_readme()
476473
self._create_debug(remote_wheel)
477474

475+
def _deploy_workflow(self, step_name: str, settings):
476+
if step_name in self._state.jobs:
477+
try:
478+
job_id = self._state.jobs[step_name]
479+
logger.info(f"Updating configuration for step={step_name} job_id={job_id}")
480+
return self._ws.jobs.reset(job_id, jobs.JobSettings(**settings))
481+
except InvalidParameterValue:
482+
del self._state.jobs[step_name]
483+
logger.warning(f"step={step_name} does not exist anymore for some reason")
484+
return self._deploy_workflow(step_name, settings)
485+
logger.info(f"Creating new job configuration for step={step_name}")
486+
job_id = self._ws.jobs.create(**settings).job_id
487+
self._state.jobs[step_name] = job_id
488+
478489
def _deployed_steps_pre_v06(self):
479490
deployed_steps = {}
480491
logger.debug(f"Fetching all jobs to determine already deployed steps for app={self._app}")

0 commit comments

Comments
 (0)