1- import contextlib
2- import logging
3- import os
41from collections .abc import Callable , Iterable
5- from contextlib import contextmanager
62from dataclasses import dataclass
7- from datetime import timedelta
8- from logging .handlers import TimedRotatingFileHandler
9- from pathlib import Path
103
114from databricks .labs .blueprint .installation import Installation
12- from databricks .labs .blueprint .logger import install_logger
13- from databricks .labs .lsql .backends import SqlBackend , RuntimeBackend
5+ from databricks .labs .lsql .backends import SqlBackend
146from databricks .sdk import WorkspaceClient
157from databricks .sdk .core import Config
16- from databricks .sdk .retries import retried
178
18- from databricks .labs .ucx .__about__ import __version__
199from databricks .labs .ucx .config import WorkspaceConfig
2010
2111_TASKS : dict [str , "Task" ] = {}
@@ -65,109 +55,6 @@ def remove_extra_indentation(doc: str) -> str:
6555 return "\n " .join (stripped )
6656
6757
68- class TaskLogger (contextlib .AbstractContextManager ):
69- # files are available in the workspace only once their handlers are closed,
70- # so we rotate files log every 10 minutes.
71- #
72- # See https://docs.python.org/3/library/logging.handlers.html#logging.handlers.TimedRotatingFileHandler
73- # See https://docs.python.org/3/howto/logging-cookbook.html
74-
75- def __init__ (
76- self ,
77- install_dir : Path ,
78- workflow : str ,
79- workflow_id : str ,
80- task_name : str ,
81- workflow_run_id : str ,
82- log_level = "INFO" ,
83- attempt : str = "0" ,
84- ):
85- self ._log_level = log_level
86- self ._workflow = workflow
87- self ._workflow_id = workflow_id
88- self ._workflow_run_id = workflow_run_id
89- self ._databricks_logger = logging .getLogger ("databricks" )
90- self ._app_logger = logging .getLogger ("databricks.labs.ucx" )
91- self ._log_path = self .log_path (install_dir , workflow , workflow_run_id , attempt )
92- self .log_file = self ._log_path / f"{ task_name } .log"
93- self ._app_logger .info (f"UCX v{ __version__ } After job finishes, see debug logs at { self .log_file } " )
94-
95- @classmethod
96- def log_path (cls , install_dir : Path , workflow : str , workflow_run_id : str | int , attempt : str | int ) -> Path :
97- return install_dir / "logs" / workflow / f"run-{ workflow_run_id } -{ attempt } "
98-
99- def __repr__ (self ):
100- return self .log_file .as_posix ()
101-
102- def __enter__ (self ):
103- self ._log_path .mkdir (parents = True , exist_ok = True )
104- self ._init_debug_logfile ()
105- self ._init_run_readme ()
106- self ._databricks_logger .setLevel (logging .DEBUG )
107- self ._app_logger .setLevel (logging .DEBUG )
108- console_handler = install_logger (self ._log_level )
109- self ._databricks_logger .removeHandler (console_handler )
110- self ._databricks_logger .addHandler (self ._file_handler )
111- return self
112-
113- def __exit__ (self , _t , error , _tb ):
114- if error :
115- log_file_for_cli = str (self .log_file ).removeprefix ("/Workspace" )
116- cli_command = f"databricks workspace export /{ log_file_for_cli } "
117- self ._app_logger .error (f"Execute `{ cli_command } ` locally to troubleshoot with more details. { error } " )
118- self ._databricks_logger .debug ("Task crash details" , exc_info = error )
119- self ._file_handler .flush ()
120- self ._file_handler .close ()
121-
122- def _init_debug_logfile (self ):
123- log_format = "%(asctime)s %(levelname)s [%(name)s] {%(threadName)s} %(message)s"
124- log_formatter = logging .Formatter (fmt = log_format , datefmt = "%H:%M:%S" )
125- self ._file_handler = TimedRotatingFileHandler (self .log_file .as_posix (), when = "M" , interval = 1 )
126- self ._file_handler .setFormatter (log_formatter )
127- self ._file_handler .setLevel (logging .DEBUG )
128-
129- def _init_run_readme (self ):
130- log_readme = self ._log_path .joinpath ("README.md" )
131- if log_readme .exists ():
132- return
133- # this may race when run from multiple tasks, therefore it must be multiprocess safe
134- with self ._exclusive_open (str (log_readme ), mode = "w" ) as f :
135- f .write (f"# Logs for the UCX { self ._workflow } workflow\n " )
136- f .write ("This folder contains UCX log files.\n \n " )
137- f .write (f"See the [{ self ._workflow } job](/#job/{ self ._workflow_id } ) and " )
138- f .write (f"[run #{ self ._workflow_run_id } ](/#job/{ self ._workflow_id } /run/{ self ._workflow_run_id } )\n " )
139-
140- @classmethod
141- @contextmanager
142- def _exclusive_open (cls , filename : str , ** kwargs ):
143- """Open a file with exclusive access across multiple processes.
144- Requires write access to the directory containing the file.
145-
146- Arguments are the same as the built-in open.
147-
148- Returns a context manager that closes the file and releases the lock.
149- """
150- lockfile_name = filename + ".lock"
151- lockfile = cls ._create_lock (lockfile_name )
152-
153- try :
154- with open (filename , encoding = "utf8" , ** kwargs ) as f :
155- yield f
156- finally :
157- try :
158- os .close (lockfile )
159- finally :
160- os .unlink (lockfile_name )
161-
162- @staticmethod
163- @retried (on = [FileExistsError ], timeout = timedelta (seconds = 5 ))
164- def _create_lock (lockfile_name ):
165- while True : # wait until the lock file can be opened
166- f = os .open (lockfile_name , os .O_CREAT | os .O_EXCL )
167- break
168- return f
169-
170-
17158def parse_args (* argv ) -> dict [str , str ]:
17259 args = dict (a [2 :].split ("=" ) for a in argv if a [0 :2 ] == "--" )
17360 if "config" not in args :
@@ -176,53 +63,6 @@ def parse_args(*argv) -> dict[str, str]:
17663 return args
17764
17865
179- def run_task (
180- args : dict [str , str ],
181- install_dir : Path ,
182- cfg : WorkspaceConfig ,
183- workspace_client : WorkspaceClient ,
184- sql_backend : RuntimeBackend ,
185- installation : Installation ,
186- ):
187- # TODO: remove this function
188- task_name = args .get ("task" , "not specified" )
189- if task_name not in _TASKS :
190- msg = f'task "{ task_name } " not found. Valid tasks are: { ", " .join (_TASKS .keys ())} '
191- raise KeyError (msg )
192- print (f"UCX v{ __version__ } " )
193- current_task = _TASKS [task_name ]
194- print (current_task .doc )
195-
196- # `{{parent_run_id}}` is the run of entire workflow, whereas `{{run_id}}` is the run of a task
197- workflow_run_id = args .get ("parent_run_id" , "unknown_run_id" )
198- job_id = args .get ("job_id" , "unknown_job_id" )
199-
200- with TaskLogger (
201- install_dir ,
202- workflow = current_task .workflow ,
203- workflow_id = job_id ,
204- task_name = task_name ,
205- workflow_run_id = workflow_run_id ,
206- log_level = cfg .log_level ,
207- ) as task_logger :
208- ucx_logger = logging .getLogger ("databricks.labs.ucx" )
209- ucx_logger .info (f"UCX v{ __version__ } After job finishes, see debug logs at { task_logger } " )
210- current_task .fn (cfg , workspace_client , sql_backend , installation )
211-
212-
213- def trigger (* argv ):
214- args = parse_args (* argv )
215- config_path = Path (args ["config" ])
216-
217- cfg = Installation .load_local (WorkspaceConfig , config_path )
218- sql_backend = RuntimeBackend (debug_truncate_bytes = cfg .connect .debug_truncate_bytes )
219- workspace_client = WorkspaceClient (config = cfg .connect , product = 'ucx' , product_version = __version__ )
220- install_folder = config_path .parent .as_posix ().removeprefix ("/Workspace" )
221- installation = Installation (workspace_client , "ucx" , install_folder = install_folder )
222-
223- run_task (args , config_path .parent , cfg , workspace_client , sql_backend , installation )
224-
225-
22666class Workflow :
22767 def __init__ (self , name : str ):
22868 self ._name = name
@@ -245,7 +85,7 @@ def job_task(
24585 fn = None ,
24686 * ,
24787 depends_on = None ,
248- job_cluster = "main" ,
88+ job_cluster = Task . job_cluster ,
24989 notebook : str | None = None ,
25090 dashboard : str | None = None ,
25191 cloud : str | None = None ,
0 commit comments