Fix optimistic locking and add interruption check in ChunkOrientedStep#5165
Fix optimistic locking and add interruption check in ChunkOrientedStep#5165KILL9-NO-MERCY wants to merge 3 commits intospring-projects:mainfrom
Conversation
78ba896 to
0feafa1
Compare
| public void update(StepExecution stepExecution) { | ||
| stepExecution.setLastUpdated(LocalDateTime.now()); | ||
| if (this.jobExecution != null && this.jobExecution.isStopping()) { | ||
| if (stepExecution.isStopped()) { |
There was a problem hiding this comment.
In ResourcelessJobRepository (in-memory implementation), when JobOperator.stop() calls Repository.update(stepExecution), the fetched StepExecution is actually the same instance as the one being updated (since everything is in memory). Therefore, checking stepExecution.isStopped() directly reflects the state set by the stop operation.
Please let me know if I'm missing anything or if there's a better approach for this.
|
|
||
| stepExecution.setLastUpdated(LocalDateTime.now()); | ||
|
|
||
| StepExecution latestStepExecution = getStepExecution(stepExecution.getId()); |
There was a problem hiding this comment.
Fetch latest StepExecution from database and synchronize version to prevent OptimisticLockingFailureException.
In Batch 6, JobOperator.stop() updates StepExecution status to STOPPED (unlike Batch 5 which only set JobExecution to STOPPING). Therefore, instead of calling jobExecutionDao.synchronizeStatus(), we now:
- Call
getStepExecution()to get the latest state - Synchronize status and version if stopped externally
- Set terminateOnly flag
This ensures version consistency between concurrent stop and execution threads.
| stepExecution.apply(contribution); | ||
| this.compositeItemStream.update(stepExecution.getExecutionContext()); | ||
| getJobRepository().updateExecutionContext(stepExecution); | ||
| getJobRepository().update(stepExecution); |
There was a problem hiding this comment.
Added getJobRepository().update(stepExecution) call to match TaskletStep behavior (line #464).
This ensures ChunkOrientedStep checks for job interruption signals after each chunk processing, just like TaskletStep does. Without this call, ChunkOrientedStep cannot fetch the latest StepExecution state from the database and will miss stop signals from JobOperator.stop().
If the removal of this update() call in ChunkOrientedStep was intentional in the past, then at minimum, there should be alternative logic to synchronize the status and version from the latest StepExecution (required for proper job stop handling).
Please let me know if there's a specific reason this call was omitted or if a different synchronization approach is preferred.
When JobOperator.stop() is called, the StepExecution version could become out of sync between the stopping thread and the executing step thread, causing OptimisticLockingFailureException. This commit synchronizes the StepExecution version by fetching the latest state from the batch metadata before update. If the recent StepExecution is in STOPPED status, it upgrades the current StepExecution's status and version to match the database state, preventing version conflicts. Additionally, added JobRepository.update() call in ChunkOrientedStep after each chunk to match TaskletStep behavior. This ensures both step implementations properly check for job interruption signals by synchronizing with the latest StepExecution status from the database. Fixes spring-projectsgh-5120 Signed-off-by: kill9-no-mercy <[email protected]>
0feafa1 to
f62da2b
Compare
…etection Previously, the interruption check used StepExecution.isStopped() which fails when a job is stopped between steps (after first step completes but before second StepExecution is created). Changed to check JobExecution.isStopped() instead, which correctly detects job stop signals regardless of the current step state. This ensures proper interruption handling for multi-step jobs. See spring-projectsgh-5114 See spring-projectsgh-5120 Signed-off-by: kill9-no-mercy <[email protected]>
|
Hi @KILL9-NO-MERCY , Thank you for the PR 🙏 This change set adds the missing database sync which I mentioned here: #5114 (comment), which is great! I tested the shutdown sample with this PR and in fact it fixes it. However, there are indeed some side effects / regressions from this PR (see failing tests in Many thanks upfront. |
d58bdbc to
5a0c284
Compare
Move stepExecution.update() outside chunk transaction to prevent version mismatch when transaction rolls back. Problem: - Chunk transaction rollback reverts DB version but leaves in-memory version incremented - Next update() causes OptimisticLockingFailureException Solution: - Call getJobRepository().update(stepExecution) after transaction completes, outside transactionTemplate.executeWithoutResult() - On success: update() is called with synchronized versions - On failure: update() is skipped, preventing version conflict Signed-off-by: kill9-no-mercy <[email protected]>
5a0c284 to
bd7285f
Compare
|
@fmbenhassine Thanks for catching the integration-test oversight! I investigated the root cause and found that The fix moves the Changes:
|
|
Thank you very much for the update! That was it, two birds with one stone 🎉 Rebased and merged as 644d7e6. Thank you for your contribution, much appreciated 🙏 |
Problem
As reported in issue #5114, after commit e5fbc2a,
StepExecution.terminateOnlyis not being set properly after JobOperator.stop().Additionally, the
JobRepository.update(StepExecution)call introduced in e5fbc2a (viaJobRepository.stop()) causesOptimisticLockingFailureExceptionduring step execution, as reported in #5120.However, commit e5fbc2a must be retained for d4a7dfd to graceful shutdown function correctly.
Solution
1. Improved JobRepository.update(StepExecution)
Removed:
jobExecution.isStopping()) used in older versions (in previous versions,JobOperator.stop()only cause theJobExecutionstatus to STOPPING)Added:
StepExecutionstate from databaseStepExecutionfrom database and check ifisStopped()stepExecution.setTerminateOnly()This approach prevents
OptimisticLockingFailureExceptionby synchronizing theStepExecutionversion with the latest database state before update.2. Added JobRepository.update() in ChunkOrientedStep
Unlike
TaskletStepwhich callsJobRepository.update(stepExecution)after processing each chunk,ChunkOrientedStepwas missing this call. This prevented it from fetching the latestStepExecutionversion and JobExecution status (BatchStatus = STOPPED) from the batch metadata repository.Added
JobRepository.update(stepExecution)in the same location asTaskletStepto ensure both step implementations can synchronize with the latest state from the database.New commit: Updated interruption detection logic
The first implementation of this PR checked
latestStepExecution.isStopped(), which failed in multi-step jobs whenJobOperator.stop()was called between steps:JobOperator.stop()calledSo Changed to check
latestStepExecution.getJobExecution().isStopped()instead:Changes
SimpleJobRepository.update(StepExecution)
StepExecutionfrom database before updateJobExecution(of latest StepExecution)is in STOPPED statusStepExecution's version with database latest stateterminateOnlyflag when externally stoppedChunkOrientedStep
getJobRepository().update(stepExecution)call in finally block after each chunk processingTaskletStepbehavior for consistent interruption handlingRelated Issues
Fixes gh-5120
See gh-5114
Related to e5fbc2a, d4a7dfd