Skip to content

Conversation

@vvivekiyer
Copy link
Contributor

@vvivekiyer vvivekiyer commented Feb 2, 2022

The current implementation doesn't protect against concurrent UploadSegment calls for a non-existing segment.

The current implementation of processNewSegment in ZKOperator is as follows:

  • Move segment to permanent directory
  • Set SegmentZkMetadata
  • AssignTableSegments
  • Update customZkMetadata map

Assume there are two concurrent uploadSegment calls for the same non-existing segment:

  • Calls C1 and C2 will try to move to permanent directory. There could be a race here and either C1 or C2 can win. Additionally, move() implementation in PinotFS is not atomic.

  • Now, only one of C1 and C2 can set SegmentZkMetadata. This is guaranteed in today's code because updates to Zk are made atomic with expectedVersion as -1 so only one of C1 or C2 will end up creating the ZNode for non-existing segment. But there is no guarantee that the call that succeeded to move the segment to permanent location sets the Zk metadata.

To fix this issue, enableParallelPushProtection can be extended to protect against concurrent calls when uploading non-existent segments as well.

Made the following changes:

  • Added locking to processNewSegment in ZKOperator codepath if enableParallelPushProtection is enabled.
  • Reordered code so that "MoveSegmentToFinalLocation", AssignTableSegments, customZkMetadata map update is done within the lock.
  • Enhanced exception handling to clean up segmentZkMetadata and stale segments in permanent directory.

Testing:

  • Unit test verifying that lock is released after upload when enableParallelPushProtection=true for non-existing segments.
  • Unit test verifying that segmentZkMetadata is cleaned up if exception is raised.
  • Integration test uploading concurrent segments with the same name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add a new method to FileUploadDownloadClient because the existing methods don't accept enableParallelPushProtection as a parameter. This new method is used by my changes in the integration test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might not need this new API as any new parameter can be passed into List<NameValuePair> parameters and then you can leverage the existing API in Line 856.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we could use existing API. But IMO having this method provides a convenient wrapper around the existing API. I envision that any new parameters added to Restlet.UploadSegment() API will also be added here.

@vvivekiyer vvivekiyer force-pushed the enableParallelPushProtection branch 2 times, most recently from f4b3692 to 28d67aa Compare February 2, 2022 07:35
@codecov-commenter
Copy link

codecov-commenter commented Feb 2, 2022

Codecov Report

Merging #8110 (b7fdf9f) into master (7cd53e4) will increase coverage by 6.75%.
The diff coverage is 78.84%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #8110      +/-   ##
============================================
+ Coverage     64.67%   71.42%   +6.75%     
- Complexity     4261     4302      +41     
============================================
  Files          1562     1617      +55     
  Lines         81539    83927    +2388     
  Branches      12256    12549     +293     
============================================
+ Hits          52732    59945    +7213     
+ Misses        25048    19898    -5150     
- Partials       3759     4084     +325     
Flag Coverage Δ
integration1 29.08% <53.84%> (?)
integration2 27.78% <57.69%> (?)
unittests1 67.90% <0.00%> (+0.03%) ⬆️
unittests2 14.23% <65.38%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...apache/pinot/controller/api/upload/ZKOperator.java 74.61% <50.00%> (+16.81%) ⬆️
...e/pinot/common/utils/FileUploadDownloadClient.java 63.67% <100.00%> (+46.74%) ⬆️
...ntroller/helix/core/PinotHelixResourceManager.java 65.95% <100.00%> (+4.11%) ⬆️
...ntroller/helix/core/minion/CronJobScheduleJob.java 0.00% <0.00%> (-59.10%) ⬇️
.../pinot/core/operator/docidsets/BitmapDocIdSet.java 62.50% <0.00%> (-37.50%) ⬇️
...apache/pinot/core/operator/blocks/FilterBlock.java 50.00% <0.00%> (-7.15%) ⬇️
...he/pinot/segment/local/utils/TableConfigUtils.java 65.83% <0.00%> (-4.83%) ⬇️
...oker/routing/segmentpruner/EmptySegmentPruner.java 87.93% <0.00%> (-3.45%) ⬇️
...er/routing/segmentpruner/SegmentPrunerFactory.java 92.53% <0.00%> (-2.63%) ⬇️
...ata/manager/offline/DimensionTableDataManager.java 86.27% <0.00%> (-2.62%) ⬇️
... and 404 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7cd53e4...b7fdf9f. Read the comment docs.

@vvivekiyer vvivekiyer force-pushed the enableParallelPushProtection branch from 28d67aa to b7fdf9f Compare February 2, 2022 11:18
Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this feature enabled via a controller config (as opposed to http header setting)? I would imagine that the installations that need protection against new segments being pushed simultaneously (for whatever reason) will want to do so for all requests and all tables.

@vvivekiyer
Copy link
Contributor Author

Should we make this feature enabled via a controller config (as opposed to http header setting)? I would imagine that the installations that need protection against new segments being pushed simultaneously (for whatever reason) will want to do so for all requests and all tables.

I agree that making this a controller config would be ideal. But here are the concerns with doing that in the current implementation:

  1. "enableParallelPushProtection" is an existing parameter (not derived from http header) in the REST API call. But it's being used only for existing segments. This patch extends that to work for new segments as well. That means that there could be some customers (scripts, curl commands, etc) using this API parameter currently.
  2. Removing the existing "enableParallelPushProtection" parameter from the API is not possible (for backward compatibility) without deprecating the API. At the same time, having both controller config and a parameter will get hairy in terms of code-handling, readability and customer usage.

Given these concerns, I feel that it would be better to address this as a TODO in the future as follows:

  1. Deprecate existing UploadSegment APIs. Introduce new API without the parameter.
  2. Introduce a controller config option and extend code to use the config to provide parallel push protection

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might not need this new API as any new parameter can be passed into List<NameValuePair> parameters and then you can leverage the existing API in Line 856.

Copy link
Member

@jackjlli jackjlli Feb 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's to process new segment, why not update the Zk metadata with an expected version like 0 here? E.g., if two controllers are trying to upload for the same new segment, then only 1 controller should succeed with the segment and the other controller should fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expected version 0 (current version) won't work if there is no existing ZK record. I don't know if ZK can create record only if it does not exist. IIRC expected version -1 means override anyway. If not, then there is still a race condition if 2 uploads happen at the same time and both of them run into this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this comment. As Jackie pointed out, expected version -1 could end up overwriting anyway if there are concurrent calls.
To avoid this we can use ZkCacheBaseDataAccessor.set(). This will only allow one call to succeed and fail the second.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original exception can be re-thrown to the top.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use waitForCondition method to validate instead of sleeping a fixed amount of time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing end empty line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Assert.fail() right after this line since we should expect the exception to be thrown?

Copy link
Contributor Author

@vvivekiyer vvivekiyer Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not always hit an exception here because of timing. That's why I didn't add a fail().
However, if we hit an exception, I make sure that the exception is hit because of concurrent uploads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we test it in a test method instead of testing it in the setUp method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a TODO. I will create a new PR to address this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a javadoc for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expected version 0 (current version) won't work if there is no existing ZK record. I don't know if ZK can create record only if it does not exist. IIRC expected version -1 means override anyway. If not, then there is still a race condition if 2 uploads happen at the same time and both of them run into this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not read a new ZNRecord because this might not be the original one. We should reuse the newSegmentZKMetadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be the same ZnRecord because we have acquired a lock, right?
However, I understand that we could do away with a redundant lookup for ZNRecord. So removed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests to ensure this can properly clean up the ZK entry and the segment file?

Copy link
Contributor Author

@vvivekiyer vvivekiyer Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a test already added in ZkOperatorTest to ensure that the clean up is done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This modification should be applied when creating the initial metadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest not passing in enableParallelPushProtection but check it in this method and call setSegmentUploadStartTime() within this method to limit the scope of parallel push protection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@vvivekiyer vvivekiyer force-pushed the enableParallelPushProtection branch from b7fdf9f to 4e4a140 Compare February 3, 2022 05:48
The current implementation doesn't protect against concurrent
UploadSegment calls for a non-existing segment.

The existing implementation of ProcessNewSegment is as follows:
1. Move segment to permanent directory
2. Set SegmentZkMetadata
3. AssignTableSegments
4. Update customZkMetadata map

Assume there are two concurrent uploadSegment calls for the same
non-existing segment:
1. Calls C1 and C2 will try to move to permanent directory. There
   could be a race here and either C1 or C2 can win. Additionally,
   move() implementation in PinotFS is not atomic.
2. Now, only one of C1 and C2 can set SegmentZkMetadata. This is
   because updates to Zk are made atomic with expectedVersion. But
   there is no guarantee that the call that succeeded to move the
   segment to permanent location sets the Zk metadata.

To fix this issue, enableParallelPushProtection can be extended
to protect against concurrent calls when uploading non-existent
segments as well.

Made the following changes:
1. Added locking to ProcessNewSegment codepath if
   enableParallelPushProtection is enabled.
2. Reordered code so that "MoveSegmentToFinalLocation",
   AssignTableSegments, customZkMetadata map update is done within
   the lock.
3. Enhanced exception handling to clean up segmentZkMetadata and stale
   segments in permanent directory.

Testing:
1. Unit test verifying that lock is released after upload when
   enableParallelPushProtection=true for non-existing segments.
2. Unit test verifying that segmentZkMetadata is cleaned up if
   exception is raised.
3. Integration test uploading concurrent segments with the same
   name.
@vvivekiyer vvivekiyer force-pushed the enableParallelPushProtection branch from 4e4a140 to f3c0eb8 Compare February 3, 2022 06:14
@vvivekiyer
Copy link
Contributor Author

Addressed review comments. Please take a look @Jackie-Jiang , @jackjlli and @siddharthteotia.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Good job!

- Removed unnecessary variable for expectedVersionNumber.
@vvivekiyer vvivekiyer force-pushed the enableParallelPushProtection branch from 9e4ee79 to b440780 Compare February 3, 2022 19:58
Copy link
Member

@jackjlli jackjlli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor but LGTM. Thanks for making the change!

.create(constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()),
segmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT);
} catch (Exception e) {
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a message here when any exception is thrown from the ZK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to send up a followup PR for another change. I'll address this change there.

}

/**
* Construct segmentZkMetadata for the realtime or offline table.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for new segment of realtime or offline table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to send up a followup PR for another change. I'll address this change there.

@siddharthteotia siddharthteotia merged commit 5f813c9 into apache:master Feb 3, 2022
@vvivekiyer vvivekiyer deleted the enableParallelPushProtection branch February 3, 2022 21:26
vvivekiyer pushed a commit to vvivekiyer/pinot that referenced this pull request Feb 7, 2022
- Improve description for a method.
- Add message when exception is thrown.
siddharthteotia pushed a commit that referenced this pull request Feb 7, 2022
* Add overwriteIfExists option to UploadSegment

Currently, when a segment is uploaded, we always overwrite if a
segment with the same name already exists. Having an overwrite
parameter in the UploadSegment API will give clients finer
control during uploading segments.

Note that the default option is set to true to retain existing
behavior.

* Address minor review comments from PR #8110

- Improve description for a method.
- Add message when exception is thrown.

* Rename overwriteIfExists to allowRefresh

Co-authored-by: Vivek Iyer Vaidyanathan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants