Adding persistence for partial job failure information#477
Adding persistence for partial job failure information#477larsgeorge-db wants to merge 12 commits intomainfrom
Conversation
| @property | ||
| def query_key(self): | ||
| return f"{self.name}:query_id" | ||
| return f"{self.dashboard_ref}_{self.name}:query_id" |
There was a problem hiding this comment.
Use ":" as separator, to be consistent
| self._state = json.load(self._ws.workspace.download(self._query_state)) | ||
| to_remove = [] | ||
| for k, v in self._state.items(): | ||
| _, name = k.split(":") |
There was a problem hiding this comment.
This may contain 3 elements now, account for it
There was a problem hiding this comment.
Wait what? Since when? Where?
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #477 +/- ##
==========================================
+ Coverage 80.25% 80.30% +0.05%
==========================================
Files 31 32 +1
Lines 3206 3316 +110
Branches 620 629 +9
==========================================
+ Hits 2573 2663 +90
- Misses 486 508 +22
+ Partials 147 145 -2 ☔ View full report in Codecov by Sentry. |
3fb2e31 to
3099cdd
Compare
| # TODO: https://github.com/databrickslabs/ucx/issues/406 | ||
| for _e in errors: | ||
| self._failure_reporter.report(ObjectFailure.make(_e)) | ||
| self._failure_reporter.flush() |
There was a problem hiding this comment.
Method flush() will be forgotten to get called
| self._failure_reporter.flush() | |
| self._failure_reporter.record(errors) |
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class ObjectFailureError(Exception): |
There was a problem hiding this comment.
| class ObjectFailureError(Exception): | |
| class ObjectFailure(RuntimeWarning): |
| @dataclass | ||
| class ObjectFailure: | ||
| event_time: float = field(default_factory=time.time) | ||
| step_name: str = field(default_factory=partial(os.environ.get, WORKFLOW_NAME_ENVKEY, "n/a")) |
There was a problem hiding this comment.
| step_name: str = field(default_factory=partial(os.environ.get, WORKFLOW_NAME_ENVKEY, "n/a")) | |
| step_name: str = field(default_factory=partial(os.environ.get, WORKFLOW_NAME_ENVKEY, None)) |
So that rows have NULL
| step_name: str = field(default_factory=partial(os.environ.get, WORKFLOW_NAME_ENVKEY, "n/a")) | ||
| task_name: str = field(default_factory=partial(os.environ.get, TASK_NAME_ENVKEY, "n/a")) | ||
| parent_run_id: str = field(default_factory=partial(os.environ.get, PARENT_RUN_ID_ENVKEY, "n/a")) | ||
| job_id: str = field(default_factory=partial(os.environ.get, JOB_ID_ENVKEY, "n/a")) |
There was a problem hiding this comment.
Better design would be to inject failure reporter with pre-filled task/run ids and pass it down via constructor. It's an unclear data flow with environment variables that are not guaranteed to be set
| job_id: str = field(default_factory=partial(os.environ.get, JOB_ID_ENVKEY, "n/a")) | ||
| object_type: str | None = None | ||
| object_id: str | None = None | ||
| error_info: str | None = None |
There was a problem hiding this comment.
| error_info: str | None = None | |
| message: str |
Message is always there
| with self._events_lock: | ||
| self._buffer.append(failure) | ||
|
|
||
| def flush(self): |
There was a problem hiding this comment.
Flush a list of failures at once, don't batch them
| _buffer: ClassVar[list[ObjectFailure]] = [] | ||
| _events_lock = threading.Lock() | ||
|
|
||
| def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str = "failures"): |
There was a problem hiding this comment.
| def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str = "failures"): | |
| def __init__(self, backend: SqlBackend, schema: str): |
Having too many things to configure is bad as well
This pull request adds a new file
failures.pywhich defines two classes,ObjectFailureErrorandObjectFailure, and a classFailureReporter. TheObjectFailureErrorclass is a custom exception that takes an object type, object ID, and a root cause exception as arguments. TheObjectFailureclass is also a custom exception that takes the same arguments asObjectFailureError, as well as several optional arguments for additional metadata. TheFailureReporterclass is used to report failures by persisting them in a table using a providedSqlBackendinstance. TheObjectFailureinstances are converted to strings and persisted using thesave_tablemethod of theSqlBackendinstance.In addition to the changes to
failures.py, there are also changes to several existing files. Intasks.py, there are new global environment variablesWORKFLOW_NAME_ENVKEY,TASK_NAME_ENVKEY,PARENT_RUN_ID_ENVKEY, andJOB_ID_ENVKEYadded. These environment variables are used to set the task name, workflow name, parent run ID, and job ID when triggering a task. Thetriggerfunction is also updated to set these environment variables before running the task.In
grants.py, there are changes to theGrantsCrawlerclass. A newFailureReporterinstance is created and stored as an instance variable. In thesnapshotmethod, if there are any errors while crawling grants, they are passed to thereportmethod of theFailureReporterinstance.In
tables.py, there are changes to theTablesCrawlerclass. A newFailureReporterinstance is created and stored as an instance variable. In thesnapshotmethod, if there are any errors while crawling tables, they are passed to thereportmethod of theFailureReporterinstance. In themigrate_tablesmethod, if there are any errors while migrating tables, they are passed to thereportmethod of theFailureReporterinstance.In
generic.py, there are changes to the_applier_taskmethod. If there are any errors while applying permissions, they are passed to thereportmethod of theFailureReporterinstance. TheFailureReporterinstance is passed as an argument to the_applier_taskmethod.In
redash.py, there are changes to the_applier_taskmethod. If there are any errors while applying permissions, they are passed to thereportmethod of theFailureReporterinstance. TheFailureReporterinstance is passed as an argument to the_applier_taskmethod.In
scim.py, there are changes to the_applier_taskmethod. If there are any errors while applying properties, they are passed to thereportmethod of theFailureReporterinstance. TheFailureReporterinstance is passed as an argument to the_applier_taskmethod.In
secrets.py, there are changes to theget_apply_taskmethod. If there are any errors while applying permissions, they are passed to thereportmethod of theFailureReporterinstance. TheFailureReporterinstance is passed as an argument to theget_apply_taskmethod.In
test_failures.py, there are new integration tests added for testing theFailureReporterclass. The tests create aFailureReporterinstance and then use it to report a failure. The test then checks that the failure was persisted in the table by checking the rows written to the table.In
test_failures.py, there are new unit tests added for testing theFailureReporterclass. The tests create aMockBackendinstance and aFailureReporterinstance. The tests then create a list ofObjectFailureinstances and pass them to thereportmethod of theFailureReporterinstance. The test then checks that the failure records were written to the table by checking the rows written to the table using therows_written_formethod of theMockBackendinstance.Overall, this pull request adds new classes and methods for reporting and handling failures in the framework. It also updates several existing files to use the new failure reporting functionality. The changes include adding new instance variables, modifying existing methods to use the
FailureReporterinstance, and adding new testsResolves #406
Resolves #445