-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Pauseless Ingestion #2: Handle Failure scenarios without DR #14798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pauseless Ingestion #2: Handle Failure scenarios without DR #14798
Conversation
1. Changing FSM 2. Changing the 3 steps performed during the commit protocol to update ZK and Ideal state
1. Changes in the commit protocol to start segment commit before the build 2. Changes in the BaseTableDataManager to ensure that the locally built segment is replaced by a downloaded one only when the CRC is present in the ZK Metadata 3. Changes in the download segment method to allow waited download in case of pauseless consumption
…segment commit end metadata call Refactoing code for redability
… ingestion by moving it out of streamConfigMap
…auseless ingestion in RealtimeSegmentValidationManager
…d by RealtimeSegmentValitdationManager to fix commit protocol failures
…g commit protocol
...r/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
Outdated
Show resolved
Hide resolved
| return; | ||
| TableConfig tableConfig = indexLoadingConfig.getTableConfig(); | ||
| // For pauseless tables, we should replace the segment if download url is missing even if crc is same | ||
| if (!PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also LOG that since pauseless is enabled we are not going directly for segment download but waiting for catchup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@KKcorps I don't understand why this is different for pauseless. I see you have added these along with reingestion tests.
.../org/apache/pinot/integration/tests/realtime/FailureInjectingRealtimeSegmentDataManager.java
Outdated
Show resolved
Hide resolved
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
Outdated
Show resolved
Hide resolved
| private static final ConcurrentHashMap<String, AtomicBoolean> SEGMENT_INGESTION_MAP = new ConcurrentHashMap<>(); | ||
|
|
||
| // Semaphore to enforce global concurrency limit | ||
| private static final Semaphore REINGESTION_SEMAPHORE = new Semaphore(MAX_PARALLEL_REINGESTIONS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this count configurable
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java
Outdated
Show resolved
Hide resolved
5782928 to
aee514c
Compare
…jor fields of SegmentZKMetadata
| * SimpleSegmentMetadata deserialized = objectMapper.readValue(json, SimpleSegmentMetadata.class); | ||
| */ | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| public class SimpleSegmentMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Jackie-Jiang have added this to simplify ser/deser of SegmentZkMetadata. Added few getters for easy access from simpleFields for important keys like crc.
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
| ImmutableSegmentDataManager immutableSegmentDataManager = (ImmutableSegmentDataManager) segmentDataManager; | ||
| SegmentMetadataImpl segmentMetadata = | ||
| (SegmentMetadataImpl) immutableSegmentDataManager.getSegment().getSegmentMetadata(); | ||
| SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(segmentMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to reuse the methods in ZKMetadataUtils. You can move ZKMetadataUtils into pinot-common
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // 1. DONE for normal consumption | ||
| // 2. COMMITTING for pauseless consumption | ||
| Status statusPostSegmentMetadataUpdate = | ||
| PauselessConsumptionUtils.isPauselessEnabled(tableConfig) ? Status.COMMITTING : Status.DONE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check this upfront, instead of on each partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| // Define upload methods in order of preference | ||
| List<UploadAttempt> uploadAttempts = Arrays.asList( | ||
| // Primary method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the primary method failed, we should abort the committing segment fix because fallbacks won't be able to handle that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been added for backward compatibility.
I initially relied only on the primary function but that introduced Backward compatibility issues.
| if (segmentZKMetadata.getStatus() == Status.COMMITTING) { | ||
| LOGGER.info("Updating additional metadata in ZK for segment {} as pauseless is enabled", | ||
| segmentZKMetadata.getSegmentName()); | ||
| segmentZKMetadata.setStartTime(uploadedSegmentZKMetadata.getStartTimeMs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we simply put all simple fields from the uploaded ZK metadata into current?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have update the function to copy all the simpleFields. The SegementZkMetadata returned by the uploadLLSegmentToDeepStore now also the previously missing fields like creationTime and numReplicas
c0bb465 to
3b4c9de
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new added tests are quite slow. PauselessRealtimeIngestionCommitEndMetadataFailureTest itself is taking more than 5 minutes, and we are adding 4 similar tests. Are we able to merge them into one and reduce the overall testing time?
…can only be updated after a fixed time has elapsed Reduced the time requirements by creating a FakePauselessLLCRealtimeSegmentManager.
@Jackie-Jiang I have updated the tests. Each test now runs in under a minute. |
Pauseless Ingestion Failure Resolution
Please refer to PR: #14741 for happy path. This PR aims to only cover the failure scenarios. Once the above one is merged a better diff covering only failures will be visible.
To view only diff covering failure scenarios, for the time being, refer to:
Summary
This PR aims to provide ways to resolve the failure scenarios that we can encounter during pauseless ingestion. The detailed list of failure scenarios can be found here: link along with the failure handling strategies: link
Following sequence diagrams summarizes the failure scenarios and the resolution.


Failure Scenarios & Resolution Approaches
Failures encountered during the commit protocol can be categorized into two types: recoverable and unrecoverable failures.
Recoverable failures are those in which at least one of the servers retains the segment on disk.
Unrecoverable failures occur when none of the servers have the segment on disk.
Recoverable Failures
Recoverable failures will be addressed through RealtimeSegmentValidationManager. This approach will handle scenarios such as upload failures and incomplete commit protocol executions.
The controller or server can run into issues in between any of the steps of the commit protocol as listed below:
Request Type: COMMIT_START
Request Type: COMMIT_END_METADATA
4. Update Segment ZK metadata for the committing segment (seg__0__0):
- Change status to DONE.
- Update deepstore url.
- Any additional metadata.
The RealtimeSegmentValidationManager figures out which step of the commit protocol failed and how can it be fixed. This is very similar to how commit protocol failures were handled before with some minor changes.
Non-recoverable Failures
These failures require ingesting the segment again from upstream, followed by build, upload and ZK metadata update.