-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Extend enableParallePushProtection support in UploadSegment API #8110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend enableParallePushProtection support in UploadSegment API #8110
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to add a new method to FileUploadDownloadClient because the existing methods don't accept enableParallelPushProtection as a parameter. This new method is used by my changes in the integration test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might not need this new API as any new parameter can be passed into List<NameValuePair> parameters and then you can leverage the existing API in Line 856.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that we could use existing API. But IMO having this method provides a convenient wrapper around the existing API. I envision that any new parameters added to Restlet.UploadSegment() API will also be added here.
f4b3692 to
28d67aa
Compare
Codecov Report
@@ Coverage Diff @@
## master #8110 +/- ##
============================================
+ Coverage 64.67% 71.42% +6.75%
- Complexity 4261 4302 +41
============================================
Files 1562 1617 +55
Lines 81539 83927 +2388
Branches 12256 12549 +293
============================================
+ Hits 52732 59945 +7213
+ Misses 25048 19898 -5150
- Partials 3759 4084 +325
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
28d67aa to
b7fdf9f
Compare
mcvsubbu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make this feature enabled via a controller config (as opposed to http header setting)? I would imagine that the installations that need protection against new segments being pushed simultaneously (for whatever reason) will want to do so for all requests and all tables.
I agree that making this a controller config would be ideal. But here are the concerns with doing that in the current implementation:
Given these concerns, I feel that it would be better to address this as a TODO in the future as follows:
Thoughts? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might not need this new API as any new parameter can be passed into List<NameValuePair> parameters and then you can leverage the existing API in Line 856.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's to process new segment, why not update the Zk metadata with an expected version like 0 here? E.g., if two controllers are trying to upload for the same new segment, then only 1 controller should succeed with the segment and the other controller should fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expected version 0 (current version) won't work if there is no existing ZK record. I don't know if ZK can create record only if it does not exist. IIRC expected version -1 means override anyway. If not, then there is still a race condition if 2 uploads happen at the same time and both of them run into this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this comment. As Jackie pointed out, expected version -1 could end up overwriting anyway if there are concurrent calls.
To avoid this we can use ZkCacheBaseDataAccessor.set(). This will only allow one call to succeed and fail the second.
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original exception can be re-thrown to the top.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use waitForCondition method to validate instead of sleeping a fixed amount of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing end empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add Assert.fail() right after this line since we should expect the exception to be thrown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might not always hit an exception here because of timing. That's why I didn't add a fail().
However, if we hit an exception, I make sure that the exception is hit because of concurrent uploads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we test it in a test method instead of testing it in the setUp method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a TODO. I will create a new PR to address this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a javadoc for this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expected version 0 (current version) won't work if there is no existing ZK record. I don't know if ZK can create record only if it does not exist. IIRC expected version -1 means override anyway. If not, then there is still a race condition if 2 uploads happen at the same time and both of them run into this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not read a new ZNRecord because this might not be the original one. We should reuse the newSegmentZKMetadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be the same ZnRecord because we have acquired a lock, right?
However, I understand that we could do away with a redundant lookup for ZNRecord. So removed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some tests to ensure this can properly clean up the ZK entry and the segment file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a test already added in ZkOperatorTest to ensure that the clean up is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This modification should be applied when creating the initial metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest not passing in enableParallelPushProtection but check it in this method and call setSegmentUploadStartTime() within this method to limit the scope of parallel push protection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
b7fdf9f to
4e4a140
Compare
The current implementation doesn't protect against concurrent UploadSegment calls for a non-existing segment. The existing implementation of ProcessNewSegment is as follows: 1. Move segment to permanent directory 2. Set SegmentZkMetadata 3. AssignTableSegments 4. Update customZkMetadata map Assume there are two concurrent uploadSegment calls for the same non-existing segment: 1. Calls C1 and C2 will try to move to permanent directory. There could be a race here and either C1 or C2 can win. Additionally, move() implementation in PinotFS is not atomic. 2. Now, only one of C1 and C2 can set SegmentZkMetadata. This is because updates to Zk are made atomic with expectedVersion. But there is no guarantee that the call that succeeded to move the segment to permanent location sets the Zk metadata. To fix this issue, enableParallelPushProtection can be extended to protect against concurrent calls when uploading non-existent segments as well. Made the following changes: 1. Added locking to ProcessNewSegment codepath if enableParallelPushProtection is enabled. 2. Reordered code so that "MoveSegmentToFinalLocation", AssignTableSegments, customZkMetadata map update is done within the lock. 3. Enhanced exception handling to clean up segmentZkMetadata and stale segments in permanent directory. Testing: 1. Unit test verifying that lock is released after upload when enableParallelPushProtection=true for non-existing segments. 2. Unit test verifying that segmentZkMetadata is cleaned up if exception is raised. 3. Integration test uploading concurrent segments with the same name.
4e4a140 to
f3c0eb8
Compare
|
Addressed review comments. Please take a look @Jackie-Jiang , @jackjlli and @siddharthteotia. |
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Good job!
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
Outdated
Show resolved
Hide resolved
- Removed unnecessary variable for expectedVersionNumber.
9e4ee79 to
b440780
Compare
jackjlli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor but LGTM. Thanks for making the change!
| .create(constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()), | ||
| segmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT); | ||
| } catch (Exception e) { | ||
| return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a message here when any exception is thrown from the ZK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to send up a followup PR for another change. I'll address this change there.
| } | ||
|
|
||
| /** | ||
| * Construct segmentZkMetadata for the realtime or offline table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for new segment of realtime or offline table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to send up a followup PR for another change. I'll address this change there.
- Improve description for a method. - Add message when exception is thrown.
* Add overwriteIfExists option to UploadSegment Currently, when a segment is uploaded, we always overwrite if a segment with the same name already exists. Having an overwrite parameter in the UploadSegment API will give clients finer control during uploading segments. Note that the default option is set to true to retain existing behavior. * Address minor review comments from PR #8110 - Improve description for a method. - Add message when exception is thrown. * Rename overwriteIfExists to allowRefresh Co-authored-by: Vivek Iyer Vaidyanathan <[email protected]>
The current implementation doesn't protect against concurrent
UploadSegmentcalls for a non-existing segment.The current implementation of
processNewSegmentinZKOperatoris as follows:Assume there are two concurrent uploadSegment calls for the same non-existing segment:
Calls C1 and C2 will try to move to permanent directory. There could be a race here and either C1 or C2 can win. Additionally, move() implementation in PinotFS is not atomic.
Now, only one of C1 and C2 can set SegmentZkMetadata. This is guaranteed in today's code because updates to Zk are made atomic with expectedVersion as -1 so only one of C1 or C2 will end up creating the ZNode for non-existing segment. But there is no guarantee that the call that succeeded to move the segment to permanent location sets the Zk metadata.
To fix this issue,
enableParallelPushProtectioncan be extended to protect against concurrent calls when uploading non-existent segments as well.Made the following changes:
processNewSegmentinZKOperatorcodepath if enableParallelPushProtection is enabled.Testing: