-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Adds Disaster Recovery modes for Pauseless #16071
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #16071 +/- ##
============================================
+ Coverage 62.90% 63.21% +0.30%
+ Complexity 1386 1363 -23
============================================
Files 2867 2960 +93
Lines 163354 170879 +7525
Branches 24952 26154 +1202
============================================
+ Hits 102755 108015 +5260
- Misses 52847 54685 +1838
- Partials 7752 8179 +427
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| } else if (_currentOffset.compareTo(endOffset) == 0) { | ||
| _segmentLogger | ||
| .info("Current offset {} matches offset in zk {}. Replacing segment", _currentOffset, endOffset); | ||
| boolean replaced = buildSegmentAndReplace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are required because incase of segment buildfailure, reingestion happens. Let's say during reingestion server is successful in uploading the segment, but when offline -> online is triggered during reset of the segment build can still fail on servers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can put these changes of this file in separate PR as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see two scenarios:
- Issue with the segment e.g. incorrect data:
- In this case the re-ingestion will fail as well.
- Deleting the segment and switching to latest offset will be the way forward.
- Issue on the server e.g. out of disk/ memory:
- In this case both of the above scenarios i.e. build and downloadSegmentAndReplace might end up failing.
Also, have we come across this scenario in past ?
cc: @Jackie-Jiang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It happened in the integration test due to injection of failure during commit - RealtimeSegmentDataManager failed to build but StateLessRealtimeSegmentDataManager succeeded to build. In this segment is present in deep store but server is unable to load the segment and move forward.
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @KKcorps for review
| * Class representing configurations related to segment assignment strategy. | ||
| * @deprecated Use {@link org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig} instead. | ||
| */ | ||
| @Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot fully deprecate this config yet because partitionColumn can only be configured here when we are using instancePartitionsMap. I'd suggest not deprecating it in this PR (as it is a different scope) and addressing it in a separate PR
|
|
||
| public enum DisasterRecoveryMode { | ||
| BEST_EFFORT | ||
| // TODO: Add support for strict recovery mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest adding STRICT in the same PR, which is the default mode when not configured
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default should be null IMO. STRICT is very risky as segments will be deleted in bulk based on the time interval set and it might create data loss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant is to introduce an enum for the default mode. Using null to represent the default enum is a little bit anti pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default we mean not doing anything i.e. skipping re-ingestion for dedup and partial upserts.
Are you planning to make this the default behavior i.e. re-ingest missing segments ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default mode will be DisasterRecoveryMode .NONE. In this mode we skip repair for upsert/dedup tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually NONE will be confusing since this config is now a streamIngestion config instead of dedup/upsert config.
Let me think of sth else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm will change the definition of DisasterRecoveryMode.STRICT. DisasterRecoveryMode.STRICT will mean skip upsert/dedup tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very bad with naming, let me know if any better name:
public enum DisasterRecoveryMode {
// ALWAYS means Pinot will always run the Disaster Recovery Job
ALWAYS,
// CONSISTENCY_FIRST means Pinot will skip the Disaster Recovery Job for tables like dedup/upsert where consistency of data is
// higher in priority than availability.
CONSISTENCY_FIRST
}
| boolean isPartialUpsertEnabled = (tableConfig.getUpsertConfig() != null) && (tableConfig.getUpsertConfig().getMode() | ||
| == UpsertConfig.Mode.PARTIAL); | ||
| if (isPartialUpsertEnabled) { | ||
| // If isPartialUpsert is enabled, do not allow repair. | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also honor best effort here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@noob-se7en given that this behavior can be kept common for partial upsert and dedup, we can put this config in stream ingestion config as suggested by @Jackie-Jiang .
| @JsonPropertyDescription("Recovery mode which is used to decide how to recover a segment online in IS but having no" | ||
| + " completed (immutable) replica on any server in pause-less ingestion") | ||
| @Nullable | ||
| private DisasterRecoveryMode _disasterRecoveryMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this property belongs to StreamIngestionConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| boolean isDedupEnabled = (dedupConfig != null) && (dedupConfig.isDedupEnabled()); | ||
| if (isDedupEnabled) { | ||
| DisasterRecoveryMode disasterRecoveryMode = dedupConfig.getDisasterRecoveryMode(); | ||
| if (disasterRecoveryMode == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Default mode for not recovering here instead of relying on null.
| ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size()); | ||
| LOGGER.error("Skipping repair for errored segments in table: {} because dedup or partial upsert is enabled.", | ||
| realtimeTableName); | ||
| LOGGER.error("Skipping repair for errored segments in table: {}.", realtimeTableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move these log lines to the function allowRepairOfErrorSegments since it's only being called from here.
We can then add information like PartialUpsert enabled or repairErrorSegmentForPartialUpsertOrDedup is set to true as the reason for skipping re-ingestion instead of loosing this info.
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DisasterRecoveryMode.java
Show resolved
Hide resolved
| ALWAYS, | ||
| // CONSISTENCY_FIRST means Pinot will skip the Disaster Recovery Job for tables like dedup/upsert where consistency | ||
| // of data is higher in priority than availability. | ||
| CONSISTENCY_FIRST |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can call this DEFAULT, and use javadoc to explain its behavior
|
|
||
| @JsonPropertyDescription("Recovery mode which is used to decide how to recover a segment online in IS but having no" | ||
| + " completed (immutable) replica on any server in pause-less ingestion") | ||
| private DisasterRecoveryMode _disasterRecoveryMode = DisasterRecoveryMode.ALWAYS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a behavior change. Do we want to keep the existing behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, data correctness makes sense in default mode.
Thanks, changed to DEFAULT which is skip upsert/dedup.
…sterRecoveryMode.java Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
Problem Statement:
In Pauseless + Dedup, During a Disaster scenario (i.e. a segment online in IS but having no completed (immutable) replica on any server), Re-Ingestion is disabled by default in RVM. Its disabled because Dedup requires ingestion to only happen in strict order (i.e. a segment ingested in past cannot be re-ingested if server has consumed the following segments to it).
So incase of above Disaster scenario, there's only a manual fix requiring bulk delete of segments which is quite operationally heavy and time consuming.
PR:
There can be some use-cases where dedup constraints can be relaxed until the disaster/data-loss is recovered (priority is to recover from disaster quickly as possible letting go off Dedup metadata correctness for the time being).
This PR adds a new enum config to
StreamIngestionConfig:DisasterRecoveryMode. Please refer below: