Skip to content

Commit 2d2403d

Browse files
authored
Merge branch 'main' into ignore-whitelisted-imports
2 parents 4092871 + f292991 commit 2d2403d

File tree

8 files changed

+154
-207
lines changed

8 files changed

+154
-207
lines changed

.github/workflows/no-cheat.sh

Lines changed: 0 additions & 8 deletions
This file was deleted.

.github/workflows/push.yml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,10 @@ jobs:
7272
fetch-depth: 0
7373

7474
- name: Verify no lint disabled in the new code
75-
run: sh .github/workflows/no-cheat.sh
75+
run: |
76+
NEW_CODE=$(git diff origin/main..$(git branch --show-current) | grep -e '^+')
77+
CHEAT=$(echo "${NEW_CODE}" | grep '# pylint: disable' | grep -v "CHEAT" | wc -c)
78+
if [ "${CHEAT}" -ne 0 ]; then
79+
echo "Do not cheat the linter: ${CHEAT}"
80+
exit 1
81+
fi

src/databricks/labs/ucx/framework/tasks.py

Lines changed: 2 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,11 @@
1-
import contextlib
2-
import logging
3-
import os
41
from collections.abc import Callable, Iterable
5-
from contextlib import contextmanager
62
from dataclasses import dataclass
7-
from datetime import timedelta
8-
from logging.handlers import TimedRotatingFileHandler
9-
from pathlib import Path
103

114
from databricks.labs.blueprint.installation import Installation
12-
from databricks.labs.blueprint.logger import install_logger
13-
from databricks.labs.lsql.backends import SqlBackend, RuntimeBackend
5+
from databricks.labs.lsql.backends import SqlBackend
146
from databricks.sdk import WorkspaceClient
157
from databricks.sdk.core import Config
16-
from databricks.sdk.retries import retried
178

18-
from databricks.labs.ucx.__about__ import __version__
199
from databricks.labs.ucx.config import WorkspaceConfig
2010

2111
_TASKS: dict[str, "Task"] = {}
@@ -65,109 +55,6 @@ def remove_extra_indentation(doc: str) -> str:
6555
return "\n".join(stripped)
6656

6757

68-
class TaskLogger(contextlib.AbstractContextManager):
69-
# files are available in the workspace only once their handlers are closed,
70-
# so we rotate files log every 10 minutes.
71-
#
72-
# See https://docs.python.org/3/library/logging.handlers.html#logging.handlers.TimedRotatingFileHandler
73-
# See https://docs.python.org/3/howto/logging-cookbook.html
74-
75-
def __init__(
76-
self,
77-
install_dir: Path,
78-
workflow: str,
79-
workflow_id: str,
80-
task_name: str,
81-
workflow_run_id: str,
82-
log_level="INFO",
83-
attempt: str = "0",
84-
):
85-
self._log_level = log_level
86-
self._workflow = workflow
87-
self._workflow_id = workflow_id
88-
self._workflow_run_id = workflow_run_id
89-
self._databricks_logger = logging.getLogger("databricks")
90-
self._app_logger = logging.getLogger("databricks.labs.ucx")
91-
self._log_path = self.log_path(install_dir, workflow, workflow_run_id, attempt)
92-
self.log_file = self._log_path / f"{task_name}.log"
93-
self._app_logger.info(f"UCX v{__version__} After job finishes, see debug logs at {self.log_file}")
94-
95-
@classmethod
96-
def log_path(cls, install_dir: Path, workflow: str, workflow_run_id: str | int, attempt: str | int) -> Path:
97-
return install_dir / "logs" / workflow / f"run-{workflow_run_id}-{attempt}"
98-
99-
def __repr__(self):
100-
return self.log_file.as_posix()
101-
102-
def __enter__(self):
103-
self._log_path.mkdir(parents=True, exist_ok=True)
104-
self._init_debug_logfile()
105-
self._init_run_readme()
106-
self._databricks_logger.setLevel(logging.DEBUG)
107-
self._app_logger.setLevel(logging.DEBUG)
108-
console_handler = install_logger(self._log_level)
109-
self._databricks_logger.removeHandler(console_handler)
110-
self._databricks_logger.addHandler(self._file_handler)
111-
return self
112-
113-
def __exit__(self, _t, error, _tb):
114-
if error:
115-
log_file_for_cli = str(self.log_file).removeprefix("/Workspace")
116-
cli_command = f"databricks workspace export /{log_file_for_cli}"
117-
self._app_logger.error(f"Execute `{cli_command}` locally to troubleshoot with more details. {error}")
118-
self._databricks_logger.debug("Task crash details", exc_info=error)
119-
self._file_handler.flush()
120-
self._file_handler.close()
121-
122-
def _init_debug_logfile(self):
123-
log_format = "%(asctime)s %(levelname)s [%(name)s] {%(threadName)s} %(message)s"
124-
log_formatter = logging.Formatter(fmt=log_format, datefmt="%H:%M:%S")
125-
self._file_handler = TimedRotatingFileHandler(self.log_file.as_posix(), when="M", interval=1)
126-
self._file_handler.setFormatter(log_formatter)
127-
self._file_handler.setLevel(logging.DEBUG)
128-
129-
def _init_run_readme(self):
130-
log_readme = self._log_path.joinpath("README.md")
131-
if log_readme.exists():
132-
return
133-
# this may race when run from multiple tasks, therefore it must be multiprocess safe
134-
with self._exclusive_open(str(log_readme), mode="w") as f:
135-
f.write(f"# Logs for the UCX {self._workflow} workflow\n")
136-
f.write("This folder contains UCX log files.\n\n")
137-
f.write(f"See the [{self._workflow} job](/#job/{self._workflow_id}) and ")
138-
f.write(f"[run #{self._workflow_run_id}](/#job/{self._workflow_id}/run/{self._workflow_run_id})\n")
139-
140-
@classmethod
141-
@contextmanager
142-
def _exclusive_open(cls, filename: str, **kwargs):
143-
"""Open a file with exclusive access across multiple processes.
144-
Requires write access to the directory containing the file.
145-
146-
Arguments are the same as the built-in open.
147-
148-
Returns a context manager that closes the file and releases the lock.
149-
"""
150-
lockfile_name = filename + ".lock"
151-
lockfile = cls._create_lock(lockfile_name)
152-
153-
try:
154-
with open(filename, encoding="utf8", **kwargs) as f:
155-
yield f
156-
finally:
157-
try:
158-
os.close(lockfile)
159-
finally:
160-
os.unlink(lockfile_name)
161-
162-
@staticmethod
163-
@retried(on=[FileExistsError], timeout=timedelta(seconds=5))
164-
def _create_lock(lockfile_name):
165-
while True: # wait until the lock file can be opened
166-
f = os.open(lockfile_name, os.O_CREAT | os.O_EXCL)
167-
break
168-
return f
169-
170-
17158
def parse_args(*argv) -> dict[str, str]:
17259
args = dict(a[2:].split("=") for a in argv if a[0:2] == "--")
17360
if "config" not in args:
@@ -176,53 +63,6 @@ def parse_args(*argv) -> dict[str, str]:
17663
return args
17764

17865

179-
def run_task(
180-
args: dict[str, str],
181-
install_dir: Path,
182-
cfg: WorkspaceConfig,
183-
workspace_client: WorkspaceClient,
184-
sql_backend: RuntimeBackend,
185-
installation: Installation,
186-
):
187-
# TODO: remove this function
188-
task_name = args.get("task", "not specified")
189-
if task_name not in _TASKS:
190-
msg = f'task "{task_name}" not found. Valid tasks are: {", ".join(_TASKS.keys())}'
191-
raise KeyError(msg)
192-
print(f"UCX v{__version__}")
193-
current_task = _TASKS[task_name]
194-
print(current_task.doc)
195-
196-
# `{{parent_run_id}}` is the run of entire workflow, whereas `{{run_id}}` is the run of a task
197-
workflow_run_id = args.get("parent_run_id", "unknown_run_id")
198-
job_id = args.get("job_id", "unknown_job_id")
199-
200-
with TaskLogger(
201-
install_dir,
202-
workflow=current_task.workflow,
203-
workflow_id=job_id,
204-
task_name=task_name,
205-
workflow_run_id=workflow_run_id,
206-
log_level=cfg.log_level,
207-
) as task_logger:
208-
ucx_logger = logging.getLogger("databricks.labs.ucx")
209-
ucx_logger.info(f"UCX v{__version__} After job finishes, see debug logs at {task_logger}")
210-
current_task.fn(cfg, workspace_client, sql_backend, installation)
211-
212-
213-
def trigger(*argv):
214-
args = parse_args(*argv)
215-
config_path = Path(args["config"])
216-
217-
cfg = Installation.load_local(WorkspaceConfig, config_path)
218-
sql_backend = RuntimeBackend(debug_truncate_bytes=cfg.connect.debug_truncate_bytes)
219-
workspace_client = WorkspaceClient(config=cfg.connect, product='ucx', product_version=__version__)
220-
install_folder = config_path.parent.as_posix().removeprefix("/Workspace")
221-
installation = Installation(workspace_client, "ucx", install_folder=install_folder)
222-
223-
run_task(args, config_path.parent, cfg, workspace_client, sql_backend, installation)
224-
225-
22666
class Workflow:
22767
def __init__(self, name: str):
22868
self._name = name
@@ -245,7 +85,7 @@ def job_task(
24585
fn=None,
24686
*,
24787
depends_on=None,
248-
job_cluster="main",
88+
job_cluster=Task.job_cluster,
24989
notebook: str | None = None,
25090
dashboard: str | None = None,
25191
cloud: str | None = None,

src/databricks/labs/ucx/installer/logs.py

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
1+
import contextlib
12
import datetime as dt
23
import logging
4+
import os
35
import re
46
from collections.abc import Iterator
7+
from contextlib import contextmanager
58
from dataclasses import dataclass
9+
from datetime import timedelta
10+
from logging.handlers import TimedRotatingFileHandler
611
from pathlib import Path
712
from typing import TextIO
813

14+
from databricks.labs.blueprint.logger import install_logger
915
from databricks.labs.lsql.backends import SqlBackend
1016

11-
from databricks.labs.ucx.framework.tasks import TaskLogger
1217
from databricks.sdk.errors import InternalError
18+
from databricks.sdk.retries import retried
19+
20+
from databricks.labs.ucx.__about__ import __version__
1321

1422
logger = logging.getLogger(__name__)
1523

@@ -186,3 +194,106 @@ def snapshot(self) -> list[LogRecord]:
186194
if len(error_messages) > 0:
187195
raise InternalError("\n".join(error_messages))
188196
return log_records
197+
198+
199+
class TaskLogger(contextlib.AbstractContextManager):
200+
# files are available in the workspace only once their handlers are closed,
201+
# so we rotate files log every minute to make them available for download.
202+
#
203+
# See https://docs.python.org/3/library/logging.handlers.html#logging.handlers.TimedRotatingFileHandler
204+
# See https://docs.python.org/3/howto/logging-cookbook.html
205+
206+
def __init__(
207+
self,
208+
install_dir: Path,
209+
workflow: str,
210+
workflow_id: str,
211+
task_name: str,
212+
workflow_run_id: str,
213+
log_level="INFO",
214+
attempt: str = "0",
215+
):
216+
self._log_level = log_level
217+
self._workflow = workflow
218+
self._workflow_id = workflow_id
219+
self._workflow_run_id = workflow_run_id
220+
self._databricks_logger = logging.getLogger("databricks")
221+
self._app_logger = logging.getLogger("databricks.labs.ucx")
222+
self._log_path = self.log_path(install_dir, workflow, workflow_run_id, attempt)
223+
self.log_file = self._log_path / f"{task_name}.log"
224+
self._app_logger.info(f"UCX v{__version__} After job finishes, see debug logs at {self.log_file}")
225+
226+
@classmethod
227+
def log_path(cls, install_dir: Path, workflow: str, workflow_run_id: str | int, attempt: str | int) -> Path:
228+
return install_dir / "logs" / workflow / f"run-{workflow_run_id}-{attempt}"
229+
230+
def __repr__(self):
231+
return self.log_file.as_posix()
232+
233+
def __enter__(self):
234+
self._log_path.mkdir(parents=True, exist_ok=True)
235+
self._init_debug_logfile()
236+
self._init_run_readme()
237+
self._databricks_logger.setLevel(logging.DEBUG)
238+
self._app_logger.setLevel(logging.DEBUG)
239+
console_handler = install_logger(self._log_level)
240+
self._databricks_logger.removeHandler(console_handler)
241+
self._databricks_logger.addHandler(self._file_handler)
242+
return self
243+
244+
def __exit__(self, _t, error, _tb):
245+
if error:
246+
log_file_for_cli = str(self.log_file).removeprefix("/Workspace")
247+
cli_command = f"databricks workspace export /{log_file_for_cli}"
248+
self._app_logger.error(f"Execute `{cli_command}` locally to troubleshoot with more details. {error}")
249+
self._databricks_logger.debug("Task crash details", exc_info=error)
250+
self._file_handler.flush()
251+
self._file_handler.close()
252+
253+
def _init_debug_logfile(self):
254+
log_format = "%(asctime)s %(levelname)s [%(name)s] {%(threadName)s} %(message)s"
255+
log_formatter = logging.Formatter(fmt=log_format, datefmt="%H:%M:%S")
256+
self._file_handler = TimedRotatingFileHandler(self.log_file.as_posix(), when="M", interval=1)
257+
self._file_handler.setFormatter(log_formatter)
258+
self._file_handler.setLevel(logging.DEBUG)
259+
260+
def _init_run_readme(self):
261+
log_readme = self._log_path.joinpath("README.md")
262+
if log_readme.exists():
263+
return
264+
# this may race when run from multiple tasks, therefore it must be multiprocess safe
265+
with self._exclusive_open(str(log_readme), mode="w") as f:
266+
f.write(f"# Logs for the UCX {self._workflow} workflow\n")
267+
f.write("This folder contains UCX log files.\n\n")
268+
f.write(f"See the [{self._workflow} job](/#job/{self._workflow_id}) and ")
269+
f.write(f"[run #{self._workflow_run_id}](/#job/{self._workflow_id}/run/{self._workflow_run_id})\n")
270+
271+
@classmethod
272+
@contextmanager
273+
def _exclusive_open(cls, filename: str, **kwargs):
274+
"""Open a file with exclusive access across multiple processes.
275+
Requires write access to the directory containing the file.
276+
277+
Arguments are the same as the built-in open.
278+
279+
Returns a context manager that closes the file and releases the lock.
280+
"""
281+
lockfile_name = filename + ".lock"
282+
lockfile = cls._create_lock(lockfile_name)
283+
284+
try:
285+
with open(filename, encoding="utf8", **kwargs) as f:
286+
yield f
287+
finally:
288+
try:
289+
os.close(lockfile)
290+
finally:
291+
os.unlink(lockfile_name)
292+
293+
@staticmethod
294+
@retried(on=[FileExistsError], timeout=timedelta(seconds=5))
295+
def _create_lock(lockfile_name):
296+
while True: # wait until the lock file can be opened
297+
f = os.open(lockfile_name, os.O_CREAT | os.O_EXCL)
298+
break
299+
return f

src/databricks/labs/ucx/runtime.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
from databricks.labs.ucx.__about__ import __version__
88
from databricks.labs.ucx.assessment.workflows import Assessment, Failing
99
from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
10-
from databricks.labs.ucx.framework.tasks import Task, TaskLogger, Workflow, parse_args
10+
from databricks.labs.ucx.framework.tasks import Task, Workflow, parse_args
11+
from databricks.labs.ucx.installer.logs import TaskLogger
1112
from databricks.labs.ucx.hive_metastore.workflows import (
1213
MigrateTablesInMounts,
1314
TableMigration,

0 commit comments

Comments
 (0)