Skip to content

Fix optimistic locking and add interruption check in ChunkOrientedStep#5165

Closed
KILL9-NO-MERCY wants to merge 3 commits intospring-projects:mainfrom
KILL9-NO-MERCY:kill9/GH-5120
Closed

Fix optimistic locking and add interruption check in ChunkOrientedStep#5165
KILL9-NO-MERCY wants to merge 3 commits intospring-projects:mainfrom
KILL9-NO-MERCY:kill9/GH-5120

Conversation

@KILL9-NO-MERCY
Copy link
Copy Markdown
Contributor

@KILL9-NO-MERCY KILL9-NO-MERCY commented Dec 15, 2025

Problem

As reported in issue #5114, after commit e5fbc2a, StepExecution.terminateOnly is not being set properly after JobOperator.stop().

Additionally, the JobRepository.update(StepExecution) call introduced in e5fbc2a (via JobRepository.stop()) causes OptimisticLockingFailureException during step execution, as reported in #5120.

However, commit e5fbc2a must be retained for d4a7dfd to graceful shutdown function correctly.

Solution

1. Improved JobRepository.update(StepExecution)

Removed:

  • Legacy job stop check (jobExecution.isStopping()) used in older versions (in previous versions, JobOperator.stop() only cause the JobExecution status to STOPPING)

Added:

  • New Batch 6 style stop detection by checking StepExecution state from database
  • Fetch latest StepExecution from database and check if isStopped()
  • If externally stopped, synchronize version and set stepExecution.setTerminateOnly()

This approach prevents OptimisticLockingFailureException by synchronizing the StepExecution version with the latest database state before update.

2. Added JobRepository.update() in ChunkOrientedStep

Unlike TaskletStep which calls JobRepository.update(stepExecution) after processing each chunk, ChunkOrientedStep was missing this call. This prevented it from fetching the latest StepExecution version and JobExecution status (BatchStatus = STOPPED) from the batch metadata repository.

Added JobRepository.update(stepExecution) in the same location as TaskletStep to ensure both step implementations can synchronize with the latest state from the database.

New commit: Updated interruption detection logic

The first implementation of this PR checked latestStepExecution.isStopped(), which failed in multi-step jobs when JobOperator.stop() was called between steps:

  • First step completes
  • JobOperator.stop() called
  • Second step's StepExecution not yet created
  • No way to detect the stop signal

So Changed to check latestStepExecution.getJobExecution().isStopped() instead:

Changes

SimpleJobRepository.update(StepExecution)

  • Fetch latest StepExecution from database before update
  • Check if the fetched JobExecution(of latest StepExecution) is in STOPPED status
  • Synchronize current StepExecution's version with database latest state
  • Set terminateOnly flag when externally stopped

ChunkOrientedStep

  • Added getJobRepository().update(stepExecution) call in finally block after each chunk processing
  • Matches TaskletStep behavior for consistent interruption handling
  • Ensures step can detect job stop signals by fetching latest state from database

Related Issues

Fixes gh-5120
See gh-5114
Related to e5fbc2a, d4a7dfd

@KILL9-NO-MERCY KILL9-NO-MERCY force-pushed the kill9/GH-5120 branch 2 times, most recently from 78ba896 to 0feafa1 Compare December 15, 2025 13:37
public void update(StepExecution stepExecution) {
stepExecution.setLastUpdated(LocalDateTime.now());
if (this.jobExecution != null && this.jobExecution.isStopping()) {
if (stepExecution.isStopped()) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ResourcelessJobRepository (in-memory implementation), when JobOperator.stop() calls Repository.update(stepExecution), the fetched StepExecution is actually the same instance as the one being updated (since everything is in memory). Therefore, checking stepExecution.isStopped() directly reflects the state set by the stop operation.

Please let me know if I'm missing anything or if there's a better approach for this.


stepExecution.setLastUpdated(LocalDateTime.now());

StepExecution latestStepExecution = getStepExecution(stepExecution.getId());
Copy link
Copy Markdown
Contributor Author

@KILL9-NO-MERCY KILL9-NO-MERCY Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fetch latest StepExecution from database and synchronize version to prevent OptimisticLockingFailureException.

In Batch 6, JobOperator.stop() updates StepExecution status to STOPPED (unlike Batch 5 which only set JobExecution to STOPPING). Therefore, instead of calling jobExecutionDao.synchronizeStatus(), we now:

  1. Call getStepExecution() to get the latest state
  2. Synchronize status and version if stopped externally
  3. Set terminateOnly flag

This ensures version consistency between concurrent stop and execution threads.

stepExecution.apply(contribution);
this.compositeItemStream.update(stepExecution.getExecutionContext());
getJobRepository().updateExecutionContext(stepExecution);
getJobRepository().update(stepExecution);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added getJobRepository().update(stepExecution) call to match TaskletStep behavior (line #464).

This ensures ChunkOrientedStep checks for job interruption signals after each chunk processing, just like TaskletStep does. Without this call, ChunkOrientedStep cannot fetch the latest StepExecution state from the database and will miss stop signals from JobOperator.stop().

If the removal of this update() call in ChunkOrientedStep was intentional in the past, then at minimum, there should be alternative logic to synchronize the status and version from the latest StepExecution (required for proper job stop handling).

Please let me know if there's a specific reason this call was omitted or if a different synchronization approach is preferred.

When JobOperator.stop() is called, the StepExecution version could become
out of sync between the stopping thread and the executing step thread,
causing OptimisticLockingFailureException.

This commit synchronizes the StepExecution version by fetching the latest
state from the batch metadata  before update. If the recent StepExecution is in
STOPPED status, it upgrades the current StepExecution's status and version
to match the database state, preventing version conflicts.

Additionally, added JobRepository.update() call in ChunkOrientedStep after
each chunk to match TaskletStep behavior. This ensures both step
implementations properly check for job interruption signals by synchronizing
with the latest StepExecution status from the database.

Fixes spring-projectsgh-5120

Signed-off-by: kill9-no-mercy <[email protected]>
…etection Previously, the interruption check used StepExecution.isStopped() which fails when a job is stopped between steps (after first step completes but before second StepExecution is created). Changed to check JobExecution.isStopped() instead, which correctly detects job stop signals regardless of the current step state. This ensures proper interruption handling for multi-step jobs. See spring-projectsgh-5114 See spring-projectsgh-5120

Signed-off-by: kill9-no-mercy <[email protected]>
@fmbenhassine
Copy link
Copy Markdown
Contributor

Hi @KILL9-NO-MERCY ,

Thank you for the PR 🙏 This change set adds the missing database sync which I mentioned here: #5114 (comment), which is great! I tested the shutdown sample with this PR and in fact it fixes it.

However, there are indeed some side effects / regressions from this PR (see failing tests in ChunkOrientedStepIntegrationTests and ChunkOrientedStepFaultToleranceIntegrationTests when building the project on the latest main 98c10cd). Can you please check? If we manage to have a fix without a regression, it will be good to merge for the upcoming v6.0.1.

Many thanks upfront.

Move stepExecution.update() outside chunk transaction to prevent
version mismatch when transaction rolls back.

Problem:
- Chunk transaction rollback reverts DB version but leaves
  in-memory version incremented
- Next update() causes OptimisticLockingFailureException

Solution:
- Call getJobRepository().update(stepExecution) after transaction
  completes, outside transactionTemplate.executeWithoutResult()
- On success: update() is called with synchronized versions
- On failure: update() is skipped, preventing version conflict

Signed-off-by: kill9-no-mercy <[email protected]>
@KILL9-NO-MERCY
Copy link
Copy Markdown
Contributor Author

@fmbenhassine
Hi Mahmoud

Thanks for catching the integration-test oversight!

I investigated the root cause and found that JobRepository.update() should NOT be called when chunk processing fails. However, it was placed in the finally block, causing it to execute even on failures. This led to a version mismatch between the in-memory StepExecution (version incremented) and the database StepExecution (version rolled back due to transaction rollback).

The fix moves the update() call outside the transaction block, ensuring it's only called when no exceptions occur.

Changes:

  • Moved getJobRepository().update(stepExecution) to doExecute() method, outside transactionTemplate.executeWithoutResult()
  • Removed update() from processNextChunk() finally block
  • Update is now only called on successful chunk processing

@fmbenhassine
Copy link
Copy Markdown
Contributor

Thank you very much for the update! That was it, two birds with one stone 🎉

Rebased and merged as 644d7e6. Thank you for your contribution, much appreciated 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

StepExecution Update in SimpleJobOperator.stop() Causes JobExecution.BatchStatus.UNKNOWN after graceful stop

2 participants