Skip to content

Conversation

@noob-se7en
Copy link
Contributor

@noob-se7en noob-se7en commented Jun 11, 2025

Problem Statement:
In Pauseless + Dedup, During a Disaster scenario (i.e. a segment online in IS but having no completed (immutable) replica on any server), Re-Ingestion is disabled by default in RVM. Its disabled because Dedup requires ingestion to only happen in strict order (i.e. a segment ingested in past cannot be re-ingested if server has consumed the following segments to it).
So incase of above Disaster scenario, there's only a manual fix requiring bulk delete of segments which is quite operationally heavy and time consuming.

PR:
There can be some use-cases where dedup constraints can be relaxed until the disaster/data-loss is recovered (priority is to recover from disaster quickly as possible letting go off Dedup metadata correctness for the time being).
This PR adds a new enum config to StreamIngestionConfig: DisasterRecoveryMode. Please refer below:

public enum DisasterRecoveryMode {
  // ALWAYS means Pinot will always run the Disaster Recovery Job
  ALWAYS,
  // DEFAULT means Pinot will skip the Disaster Recovery Job for tables like dedup/upsert where consistency
  // of data is higher in priority than availability.
  DEFAULT
}

@codecov-commenter
Copy link

codecov-commenter commented Jun 11, 2025

Codecov Report

Attention: Patch coverage is 28.57143% with 25 lines in your changes missing coverage. Please review.

Project coverage is 63.21%. Comparing base (1a476de) to head (4b804df).
Report is 371 commits behind head on master.

Files with missing lines Patch % Lines
.../core/realtime/PinotLLCRealtimeSegmentManager.java 0.00% 21 Missing ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 42.85% 3 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #16071      +/-   ##
============================================
+ Coverage     62.90%   63.21%   +0.30%     
+ Complexity     1386     1363      -23     
============================================
  Files          2867     2960      +93     
  Lines        163354   170879    +7525     
  Branches      24952    26154    +1202     
============================================
+ Hits         102755   108015    +5260     
- Misses        52847    54685    +1838     
- Partials       7752     8179     +427     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.15% <28.57%> (+0.28%) ⬆️
java-21 63.17% <28.57%> (+0.34%) ⬆️
skip-bytebuffers-false ?
skip-bytebuffers-true ?
temurin 63.21% <28.57%> (+0.30%) ⬆️
unittests 63.20% <28.57%> (+0.30%) ⬆️
unittests1 64.74% <71.42%> (+8.92%) ⬆️
unittests2 33.44% <11.42%> (-0.13%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@noob-se7en noob-se7en marked this pull request as ready for review June 12, 2025 20:10
} else if (_currentOffset.compareTo(endOffset) == 0) {
_segmentLogger
.info("Current offset {} matches offset in zk {}. Replacing segment", _currentOffset, endOffset);
boolean replaced = buildSegmentAndReplace();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are required because incase of segment buildfailure, reingestion happens. Let's say during reingestion server is successful in uploading the segment, but when offline -> online is triggered during reset of the segment build can still fail on servers.

Copy link
Contributor Author

@noob-se7en noob-se7en Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can put these changes of this file in separate PR as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two scenarios:

  1. Issue with the segment e.g. incorrect data:
    • In this case the re-ingestion will fail as well.
    • Deleting the segment and switching to latest offset will be the way forward.
  2. Issue on the server e.g. out of disk/ memory:
    • In this case both of the above scenarios i.e. build and downloadSegmentAndReplace might end up failing.

Also, have we come across this scenario in past ?

cc: @Jackie-Jiang

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It happened in the integration test due to injection of failure during commit - RealtimeSegmentDataManager failed to build but StateLessRealtimeSegmentDataManager succeeded to build. In this segment is present in deep store but server is unable to load the segment and move forward.

@Jackie-Jiang Jackie-Jiang added feature ingestion real-time dedup changes related to realtime ingestion dedup handling labels Jun 12, 2025
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @KKcorps for review

* Class representing configurations related to segment assignment strategy.
* @deprecated Use {@link org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig} instead.
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot fully deprecate this config yet because partitionColumn can only be configured here when we are using instancePartitionsMap. I'd suggest not deprecating it in this PR (as it is a different scope) and addressing it in a separate PR


public enum DisasterRecoveryMode {
BEST_EFFORT
// TODO: Add support for strict recovery mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding STRICT in the same PR, which is the default mode when not configured

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default should be null IMO. STRICT is very risky as segments will be deleted in bulk based on the time interval set and it might create data loss.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is to introduce an enum for the default mode. Using null to represent the default enum is a little bit anti pattern

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default we mean not doing anything i.e. skipping re-ingestion for dedup and partial upserts.
Are you planning to make this the default behavior i.e. re-ingest missing segments ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default mode will be DisasterRecoveryMode .NONE. In this mode we skip repair for upsert/dedup tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually NONE will be confusing since this config is now a streamIngestion config instead of dedup/upsert config.
Let me think of sth else.

Copy link
Contributor Author

@noob-se7en noob-se7en Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm will change the definition of DisasterRecoveryMode.STRICT. DisasterRecoveryMode.STRICT will mean skip upsert/dedup tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very bad with naming, let me know if any better name:

public enum DisasterRecoveryMode {
  // ALWAYS means Pinot will always run the Disaster Recovery Job
  ALWAYS,
  // CONSISTENCY_FIRST means Pinot will skip the Disaster Recovery Job for tables like dedup/upsert where consistency of data is
  // higher in priority than availability.
  CONSISTENCY_FIRST
}

Comment on lines 2575 to 2580
boolean isPartialUpsertEnabled = (tableConfig.getUpsertConfig() != null) && (tableConfig.getUpsertConfig().getMode()
== UpsertConfig.Mode.PARTIAL);
if (isPartialUpsertEnabled) {
// If isPartialUpsert is enabled, do not allow repair.
return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also honor best effort here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noob-se7en given that this behavior can be kept common for partial upsert and dedup, we can put this config in stream ingestion config as suggested by @Jackie-Jiang .

@JsonPropertyDescription("Recovery mode which is used to decide how to recover a segment online in IS but having no"
+ " completed (immutable) replica on any server in pause-less ingestion")
@Nullable
private DisasterRecoveryMode _disasterRecoveryMode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this property belongs to StreamIngestionConfig

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

boolean isDedupEnabled = (dedupConfig != null) && (dedupConfig.isDedupEnabled());
if (isDedupEnabled) {
DisasterRecoveryMode disasterRecoveryMode = dedupConfig.getDisasterRecoveryMode();
if (disasterRecoveryMode == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Default mode for not recovering here instead of relying on null.

ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size());
LOGGER.error("Skipping repair for errored segments in table: {} because dedup or partial upsert is enabled.",
realtimeTableName);
LOGGER.error("Skipping repair for errored segments in table: {}.", realtimeTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these log lines to the function allowRepairOfErrorSegments since it's only being called from here.

We can then add information like PartialUpsert enabled or repairErrorSegmentForPartialUpsertOrDedup is set to true as the reason for skipping re-ingestion instead of loosing this info.

@noob-se7en noob-se7en changed the title Adds BEST_EFFORT disaster recovery mode in dedup Adds Disaster Recovery mode config for Pauseless Jun 30, 2025
@noob-se7en noob-se7en requested a review from Jackie-Jiang June 30, 2025 19:46
@noob-se7en noob-se7en requested a review from 9aman June 30, 2025 19:46
@noob-se7en noob-se7en changed the title Adds Disaster Recovery mode config for Pauseless Adds Disaster Recovery modes for Pauseless Jun 30, 2025
ALWAYS,
// CONSISTENCY_FIRST means Pinot will skip the Disaster Recovery Job for tables like dedup/upsert where consistency
// of data is higher in priority than availability.
CONSISTENCY_FIRST
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call this DEFAULT, and use javadoc to explain its behavior


@JsonPropertyDescription("Recovery mode which is used to decide how to recover a segment online in IS but having no"
+ " completed (immutable) replica on any server in pause-less ingestion")
private DisasterRecoveryMode _disasterRecoveryMode = DisasterRecoveryMode.ALWAYS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behavior change. Do we want to keep the existing behavior?

Copy link
Contributor Author

@noob-se7en noob-se7en Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, data correctness makes sense in default mode.
Thanks, changed to DEFAULT which is skip upsert/dedup.

@noob-se7en noob-se7en requested a review from Jackie-Jiang June 30, 2025 20:48
@Jackie-Jiang Jackie-Jiang merged commit a1b37cf into apache:master Jun 30, 2025
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dedup changes related to realtime ingestion dedup handling feature ingestion real-time

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants