Skip to content

Conversation

@KKcorps
Copy link
Contributor

@KKcorps KKcorps commented Jan 27, 2025

This PR adds support for Disaster recovery for Pauseless Ingestion along with Reingestion. These changes help solve the scenario where real-time segments permanently fail to transition out of ERROR state, leading to data gaps. With reingestion, Pinot can recover such segments, ensuring availability and correctness of real-time data.

During Pauseless ingestion, a ONLINE segment can wind up in an ERROR state if its commit fails due to server restart and there are no other replicas. Currently in pinot, there is no way to recover from such failures.

Reingestion Flow

Segments that fail to commit or end up in ERROR state can now be re-ingested by calling a new endpoint (/reingestSegment) on the server.

The ReIngestionResource reconstructs the segment from the stream, builds it, and commits it, ensuring that offline peers and the deep store get updated properly.

If successful, the re-ingested segment transitions from ERROR to ONLINE.

New APIs introduced:

Get Running Re-ingestion Jobs (Server)

GET /reingestSegment/jobs

Returns all currently running re-ingestion jobs with their status information.

Response

  • Type: JSON
  • Contains array of jobs with:
    • jobId: Unique identifier
    • tableNameWithType: Table being processed
    • segmentName: Segment being re-ingested
    • startTimeMs: Job start timestamp

Responses

  • 200: Success - List of running jobs

Re-ingest Segment (Server)

POST /reingestSegment

Asynchronously re-ingests a segment with updated configurations.

Request Body

  • Type: JSON
  • Required fields:
    • tableNameWithType: Table name with type (e.g. "myTable_REALTIME")
    • segmentName: Name of segment to re-ingest

Response

  • Type: JSON
  • Contains:
    • jobId: Unique identifier for tracking progress
    • message: Success confirmation

Responses

  • 200: Success - Job started successfully
  • 429: Too Many Requests - Parallel job limit reached
  • 404: Not Found - Table/segment not found
  • 409: Conflict - Segment already being re-ingested
  • 500: Internal Server Error - Server initialization issues

Complete Re-ingest Segment (Controller

POST /reingestSegment

Uploads the segment to deep store, updates ZK metadata to DONE and resets the segment

Same signature as /v2/segment upload with multi-part form data except that it doesn't allow refresh

Reingestion data flow

sequenceDiagram
    participant Controller
    participant Server
    participant ReIngestionResource
    participant SimpleRealtimeSegmentDataManager

    Controller->>Controller: Finds ERROR segment in validation task
    Controller->>Controller: Pick one alive server from IS for segment
    Controller->>Server: POST /reIngestSegment (tableName, segmentName)
    Server->>ReIngestionResource: ReIngestionRequest
    ReIngestionResource->>SimpleRealtimeSegmentDataManager: startConsumption()
    SimpleRealtimeSegmentDataManager-->>SimpleRealtimeSegmentDataManager: Consume data & Build Segment
    SimpleRealtimeSegmentDataManager->>ReIngestionResource: Segment Tar File
    ReIngestionResource->>Controller: POST (/segment/completeReingestion)
    Server-->>Controller: 200 OK (Reingestion complete)
    Controller->>Controller: Copy segment to deep store
    Controller->>Controller: Update ZK segment status to DONE
    Controller->>Server: Reset segment to ONLINE
Loading

Reingestion design diagram

flowchart TD
    A[Start reingestion request] --> B[Check concurrency <br> & segment ingestion map]
    B --> C{Already ingesting?}
    C -- Yes --> D[Return 409 conflict]
    C -- No --> E[Acquire semaphore, set ingesting]
    E --> F[Create & start SimpleRealtimeSegmentDataManager]
    F --> G{Consume from<br>startOffset to endOffset}
    G --> H[Build & tar segment]
    H --> I[Push metadata to controller]
    I --> J[Trigger reingestion completion API]
    J --> K[Release semaphore, mark not ingesting]
   K --> L[Done]
Loading

9aman and others added 30 commits January 2, 2025 16:57
1. Changing FSM
2. Changing the 3 steps performed during the commit protocol to update ZK and Ideal state
1. Changes in the commit protocol to start segment commit before the build
2. Changes in the BaseTableDataManager to ensure that the locally built segment is replaced by a downloaded one
   only when the CRC is present in the ZK Metadata
3. Changes in the download segment method to allow waited download in case of pauseless consumption
…segment commit end metadata call

Refactoing code for redability
… ingestion by moving it out of streamConfigMap
…auseless ingestion in RealtimeSegmentValidationManager
…d by RealtimeSegmentValitdationManager to fix commit protocol failures
@KKcorps
Copy link
Contributor Author

KKcorps commented Feb 4, 2025

When a segment is re-ingested, it should be DONE instead of UPLOADED. I didn't find the semaphore related logic. Is the PR description up to date?

During reingestion segment metadata upload is triggered after segment building. Hence, the segment status appears as "UPLOADED" rather than "DONE".

Upon upload completion, a segment refresh operation is triggered, which replaces the failed RealtimeSegmentDataManager with an ImmutableSegmentDataManager. During this replacement process, the system automatically releases associated semaphores.

Once the segment is refreshed, we send a reset message so that it starts showing up as ONLINE in EV

@KKcorps KKcorps force-pushed the pauseless-reingestion branch from 7432261 to fd79338 Compare February 5, 2025 13:48
@KKcorps KKcorps force-pushed the pauseless-reingestion branch from fd79338 to 5a42c28 Compare February 5, 2025 14:09
@Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName,
long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers)
throws Exception {
completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, segmentFile,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not coupling the logic of reingestion into current regular segment complete handling. Currently the logic is very hard to read, and we are coupling reset with reingestion, which is not correct.
We can add a new method completeReingestedSegmentOperations(). Reingested segment holds different assumptions then regular uploaded segment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
int partitionGroupId = llcSegmentName.getPartitionGroupId();

Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(tableConfig).get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to know which stream config to use to generate the segment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not possible. Ideally though in multi-stream only broker and topic names should differ across stream configs, rest everything should remain the same

@Jackie-Jiang Jackie-Jiang merged commit 7fcb82c into apache:master Feb 12, 2025
20 of 21 checks passed
zeronerdzerogeekzerocool pushed a commit to zeronerdzerogeekzerocool/pinot that referenced this pull request Feb 20, 2025
@lnbest0707-uber
Copy link
Contributor

This is a great feature. And I believe we could expand its usage beyond failure recovery.
E.g. we could combine the feature with this proposal #14815 for better ingestion freshness guarantee.

  • Have a similar ingestAdhoc endpoint to, instead of re-ingesting, ingest a segment based on the input partition and offset.
  • Hooked together with the proposal in Pinot real-time data ingestion freshness guarantee #14815. The controller skips the offset based on freshness status, and also spawns the asynchronous ingestAdhoc job.

LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName());

_segmentName = segmentZKMetadata.getSegmentName();
_partitionGroupId = llcSegmentName.getPartitionGroupId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be incompatible with multi-stream ingestion. We need to use the streamPartitionGroupId to do the ingestion. Refer to the RealtimeSegmentDataManager


_logger = LoggerFactory.getLogger(StatelessRealtimeSegmentWriter.class.getName() + "_" + _segmentName);

Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(_tableConfig).get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature ingestion real-time release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants