Support retrying non-finished async tasks on startup and periodically#2003
Support retrying non-finished async tasks on startup and periodically#2003danielhumanmod wants to merge 3 commits intoapache:mainfrom
Conversation
|
|
||
| handler.handleTask(task, callContext); | ||
|
|
||
| timeSource.add(Duration.ofMinutes(10)); |
There was a problem hiding this comment.
@adnanhemani continue the previous comment here:
Can you explain this further - I'm not sure why the tests need this 10m jump? Is it so that tasks are "recovered" by the Quarkus Scheduled method?
metaStoreManager.loadTasks fetches available tasks from the metastore — meaning tasks that are either not leased by any executor or whose lease has already timed out (after 5 minutes).
In this test, the new tasks are created and not executed by its parent task, so to ensure these tasks are fetched, we need to simulate a time gap longer than 5 minutes.
| } | ||
|
|
||
| @Override | ||
| public Iterator<Map.Entry<String, PolarisMetaStoreManager>> getMetaStoreManagerMap() { |
There was a problem hiding this comment.
Make it to Iter<Map.Entry> as https://github.com/apache/polaris/pull/1585/files#r2121942046 suggests
| new ManifestFileCleanupTaskHandler.ManifestCleanupTask( | ||
| tableEntity.getTableIdentifier(), TaskUtils.encodeManifestFile(mf))) | ||
| .withLastAttemptExecutorId(executorId) | ||
| .withAttemptCount(1) |
There was a problem hiding this comment.
@adnanhemani continue previous discussion here:
How can we assume this?
This new task (ManifestFileCleanupTask) is created by the current task (TableCleanupTask) and will be executed immediately at the end of it.
Since it’s its first execution, we set the attemptCount to 1 here.
| configurationStore, | ||
| clock); | ||
| EntitiesResult entitiesResult = | ||
| metaStoreManager.loadTasks(polarisCallContext, executorId, PageToken.readEverything()); |
There was a problem hiding this comment.
@adnanhemani continue previous comment here:
I'm not sure I'm understanding the logic here: we are asking for 20 tasks here - but what if there are more than 20 tasks that need recovery?
Good catch, we should update to read all pending tasks
|
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days. |
Fix #774
Context
Polaris uses async tasks to perform operations such as table and manifest file cleanup. These tasks are executed asynchronously in a separate thread within the same JVM, and retries are handled inline within the task execution. However, this mechanism does not guarantee eventual execution in the following cases:
Implementation
Persist failed tasks and introduce a retry mechanism triggered during Polaris startup and via periodic background checks, changes included:
getMetaStoreManagerMapLAST_ATTEMPT_START_TIMEset for each task entity creation, which is important for time-out filtering whenloadTasks()from metastore, so that prevent multiple executors from picking the same taskTaskRecoveryManager: New class responsible for task recovery logic, including:PolarisCallContextQuarkusTaskExecutorImpl: Hook into application lifecycle to initiate task recovery.Recommended Review Order
TaskRecoveryManagerQuarkusTaskExecutorImplandTaskExecutorImpl