-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Pauseless Consumption #3: Disaster Recovery with Reingestion #14920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1. Changing FSM 2. Changing the 3 steps performed during the commit protocol to update ZK and Ideal state
1. Changes in the commit protocol to start segment commit before the build 2. Changes in the BaseTableDataManager to ensure that the locally built segment is replaced by a downloaded one only when the CRC is present in the ZK Metadata 3. Changes in the download segment method to allow waited download in case of pauseless consumption
…segment commit end metadata call Refactoing code for redability
… ingestion by moving it out of streamConfigMap
…auseless ingestion in RealtimeSegmentValidationManager
…d by RealtimeSegmentValitdationManager to fix commit protocol failures
…g commit protocol
…ption is enabled or not
…eepstore path with fallbacks
During reingestion segment metadata upload is triggered after segment building. Hence, the segment status appears as "UPLOADED" rather than "DONE". Upon upload completion, a segment refresh operation is triggered, which replaces the failed RealtimeSegmentDataManager with an ImmutableSegmentDataManager. During this replacement process, the system automatically releases associated semaphores. Once the segment is refreshed, we send a reset message so that it starts showing up as ONLINE in EV |
7432261 to
fd79338
Compare
fd79338 to
5a42c28
Compare
pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
Outdated
Show resolved
Hide resolved
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
Outdated
Show resolved
Hide resolved
| @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName, | ||
| long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers) | ||
| throws Exception { | ||
| completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, segmentFile, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not coupling the logic of reingestion into current regular segment complete handling. Currently the logic is very hard to read, and we are coupling reset with reingestion, which is not correct.
We can add a new method completeReingestedSegmentOperations(). Reingested segment holds different assumptions then regular uploaded segment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java
Outdated
Show resolved
Hide resolved
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java
Outdated
Show resolved
Hide resolved
| try { | ||
| int partitionGroupId = llcSegmentName.getPartitionGroupId(); | ||
|
|
||
| Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to know which stream config to use to generate the segment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not possible. Ideally though in multi-stream only broker and topic names should differ across stream configs, rest everything should remain the same
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java
Outdated
Show resolved
Hide resolved
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ReIngestionResource.java
Outdated
Show resolved
Hide resolved
|
This is a great feature. And I believe we could expand its usage beyond failure recovery.
|
| LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName()); | ||
|
|
||
| _segmentName = segmentZKMetadata.getSegmentName(); | ||
| _partitionGroupId = llcSegmentName.getPartitionGroupId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be incompatible with multi-stream ingestion. We need to use the streamPartitionGroupId to do the ingestion. Refer to the RealtimeSegmentDataManager
|
|
||
| _logger = LoggerFactory.getLogger(StatelessRealtimeSegmentWriter.class.getName() + "_" + _segmentName); | ||
|
|
||
| Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(_tableConfig).get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
This PR adds support for Disaster recovery for Pauseless Ingestion along with Reingestion. These changes help solve the scenario where real-time segments permanently fail to transition out of ERROR state, leading to data gaps. With reingestion, Pinot can recover such segments, ensuring availability and correctness of real-time data.
During Pauseless ingestion, a ONLINE segment can wind up in an ERROR state if its commit fails due to server restart and there are no other replicas. Currently in pinot, there is no way to recover from such failures.
Reingestion Flow
Segments that fail to commit or end up in ERROR state can now be re-ingested by calling a new endpoint (
/reingestSegment) on the server.The ReIngestionResource reconstructs the segment from the stream, builds it, and commits it, ensuring that offline peers and the deep store get updated properly.
If successful, the re-ingested segment transitions from ERROR to ONLINE.
New APIs introduced:
Get Running Re-ingestion Jobs (Server)
GET /reingestSegment/jobs
Returns all currently running re-ingestion jobs with their status information.
Response
Responses
Re-ingest Segment (Server)
POST /reingestSegment
Asynchronously re-ingests a segment with updated configurations.
Request Body
Response
Responses
Complete Re-ingest Segment (Controller
POST /reingestSegment
Uploads the segment to deep store, updates ZK metadata to DONE and resets the segment
Same signature as
/v2/segmentupload with multi-part form data except that it doesn't allow refreshReingestion data flow
sequenceDiagram participant Controller participant Server participant ReIngestionResource participant SimpleRealtimeSegmentDataManager Controller->>Controller: Finds ERROR segment in validation task Controller->>Controller: Pick one alive server from IS for segment Controller->>Server: POST /reIngestSegment (tableName, segmentName) Server->>ReIngestionResource: ReIngestionRequest ReIngestionResource->>SimpleRealtimeSegmentDataManager: startConsumption() SimpleRealtimeSegmentDataManager-->>SimpleRealtimeSegmentDataManager: Consume data & Build Segment SimpleRealtimeSegmentDataManager->>ReIngestionResource: Segment Tar File ReIngestionResource->>Controller: POST (/segment/completeReingestion) Server-->>Controller: 200 OK (Reingestion complete) Controller->>Controller: Copy segment to deep store Controller->>Controller: Update ZK segment status to DONE Controller->>Server: Reset segment to ONLINEReingestion design diagram
flowchart TD A[Start reingestion request] --> B[Check concurrency <br> & segment ingestion map] B --> C{Already ingesting?} C -- Yes --> D[Return 409 conflict] C -- No --> E[Acquire semaphore, set ingesting] E --> F[Create & start SimpleRealtimeSegmentDataManager] F --> G{Consume from<br>startOffset to endOffset} G --> H[Build & tar segment] H --> I[Push metadata to controller] I --> J[Trigger reingestion completion API] J --> K[Release semaphore, mark not ingesting] K --> L[Done]