Support retrying non-finished async tasks on startup and periodically#1585
Support retrying non-finished async tasks on startup and periodically#1585danielhumanmod wants to merge 8 commits intoapache:mainfrom
Conversation
|
|
||
| handler.handleTask(task, callContext); | ||
|
|
||
| timeSource.add(Duration.ofMinutes(10)); |
There was a problem hiding this comment.
Previously, task entity might miss LAST_ATTEMPT_START_TIME prop so loading tasks without time-out can success; After complete each task entity with this property, we need to manipulate time to make loadTasks works
There was a problem hiding this comment.
Can you explain this further - I'm not sure why the tests need this 10m jump? Is it so that tasks are "recovered" by the Quarkus Scheduled method?
| } | ||
|
|
||
| @Override | ||
| public Map<String, PolarisMetaStoreManager> getMetaStoreManagerMap() { |
There was a problem hiding this comment.
To make this a bit more defensively-coded, I might recommend making this into a iterator of Map.Entry objects, given that this is a public method and we wouldn't want any code path to be able to modify this mapping?
There was a problem hiding this comment.
Good catch, will fix this!
| } | ||
|
|
||
| private void addTaskLocation(TaskEntity task) { | ||
| Map<String, String> internalPropertiesAsMap = new HashMap<>(task.getInternalPropertiesAsMap()); |
| dataFileDeletes.size(), | ||
| manifestFile.path()); | ||
| try { | ||
| ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile, fileIO); |
There was a problem hiding this comment.
What's the reason behind this change?
| new ManifestFileCleanupTaskHandler.ManifestCleanupTask( | ||
| tableEntity.getTableIdentifier(), TaskUtils.encodeManifestFile(mf))) | ||
| .withLastAttemptExecutorId(executorId) | ||
| .withAttemptCount(1) |
There was a problem hiding this comment.
How can we assume this?
| new BatchFileCleanupTaskHandler.BatchFileCleanupTask( | ||
| tableEntity.getTableIdentifier(), metadataBatch)) | ||
| .withLastAttemptExecutorId(executorId) | ||
| .withAttemptCount(1) |
| PolarisCallContext polarisCallContext = | ||
| new PolarisCallContext( | ||
| metastore, new PolarisDefaultDiagServiceImpl(), configurationStore, clock); | ||
| EntitiesResult entitiesResult = |
There was a problem hiding this comment.
I'm not sure I'm understanding the logic here: we are asking for 20 tasks here - but what if there are more than 20 tasks that need recovery?
|
|
||
| handler.handleTask(task, callContext); | ||
|
|
||
| timeSource.add(Duration.ofMinutes(10)); |
There was a problem hiding this comment.
Can you explain this further - I'm not sure why the tests need this 10m jump? Is it so that tasks are "recovered" by the Quarkus Scheduled method?
| tableCleanupTaskHandler.handleTask(task, callCtx); | ||
|
|
||
| // Step 3: Verify that the generated child tasks were registered, ATTEMPT_COUNT = 2 | ||
| timeSource.add(Duration.ofMinutes(10)); |
There was a problem hiding this comment.
I, personally, found this very hard to follow - even with the comments. I would highly recommend making the comments much more verbose here to allow the full flow of logic (what is happening with which task and why) to be communicated to a reader who may not be an expert at this particular type of task or tasks in general.
|
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days. |
|
@adnanhemani Since the branch is outdated for too long and hard to rebase on main, I create a new PR #2003 to introduce this changes, will migrate and address your comments there |
Fix #774
Context
Polaris uses async tasks to perform operations such as table and manifest file cleanup. These tasks are executed asynchronously in a separate thread within the same JVM, and retries are handled inline within the task execution. However, this mechanism does not guarantee eventual execution in the following cases:
Implementation Plan
Stage 1: Potential improvement - #1523
Introduce per-task transactional leasing in the metastore layer via loadTasks(...)
Stage 2 (Current PR):
Persist failed tasks and introduce a retry mechanism triggered during Polaris startup and via periodic background checks, changes included:
getMetaStoreManagerMapLAST_ATTEMPT_START_TIMEset for each task entity creation, which is important for time-out filtering whenloadTasks()from metastore, so that prevent multiple executors from picking the same taskTaskRecoveryManager: New class responsible for task recovery logic, including:PolarisCallContextQuarkusTaskExecutorImpl: Hook into application lifecycle to initiate task recovery.Recommended Review Order
TaskRecoveryManagerQuarkusTaskExecutorImplandTaskExecutorImpl