-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Enable uploading segments to realtime tables #8584
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable uploading segments to realtime tables #8584
Conversation
Codecov Report
@@ Coverage Diff @@
## master #8584 +/- ##
============================================
- Coverage 68.00% 62.82% -5.18%
+ Complexity 4573 4558 -15
============================================
Files 1725 1679 -46
Lines 90085 88121 -1964
Branches 13411 13194 -217
============================================
- Hits 61260 55365 -5895
- Misses 24548 28790 +4242
+ Partials 4277 3966 -311
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest returning Integer (annotate with @Nullable) instead of Optional. Currently we usually use null to represent value not available, and several methods for Optional is not supported in JDK 8.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be more robust, let's remove the checks and return the partition id when it is available, return null otherwise. The caller can then decide how to proceed
| if (segmentPartitionMetadata == null | |
| if (segmentPartitionMetadata != null) { | |
| ColumnPartitionMetadata columnPartitionMetadata = | |
| segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn); | |
| if (columnPartitionMetadata != null && columnPartitionMetadata.getPartitions().size == 1) { | |
| return columnPartitionMetadata.getPartitions().iterator().next(); | |
| } | |
| } | |
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Remove one extra empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't pass upsert info to the segment assignment engine, set instancePartitionsType to CONSUMING instead (same as the current code).
Also, don't call isUpsertTable() because it will fetch another table config, use tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Preconditions | |
| .checkState(instancePartitionsMap.keySet().size() == 1, "One instance partition type should be provided"); | |
| InstancePartitionsType instancePartitionsType = instancePartitionsMap.keySet().stream().findFirst().get(); | |
| InstancePartitions instancePartitions = instancePartitionsMap.get(instancePartitionsType); | |
| Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided"); | |
| Map.Entry<InstancePartitionsType, InstancePartitions> entry = instancePartitionsMap.entrySet().iterator().next(); | |
| InstancePartitionsType instancePartitionsType = entry.getKey(); | |
| InstancePartitions instancePartitions = entry.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not assign one by one here. We may create a new map unpartitionedCompletedSegmentAssignment with only the unpartitioned segments, then use reassignSegments() to get minimum data movement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Rename to unpartitionedSegments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest not using functional apis. Even though this method is not very performance critical, it is still called very frequently when rebalancing the table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is quite expensive especially for large table. Calling it for each segment * numPartitions can cause performance issue. Instead we can use hash-code as suggested above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should combine the unpartitioned segments into the partitionGroupIdToSegmentsMap (assign a partition id to them). I'd suggest simply using the Math.abs(segmentName.hashCode() % numPartitions) as the partition group id because it is deterministic. We need to keep it deterministic so that during multiple rebalances, it won't switch partition which will force it to move around different servers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need some comments on why the count star result needs to be double?
8c0cf76 to
83c8318
Compare
d5a3361 to
bc1a07e
Compare
| List<String> instancesAssigned = new ArrayList<>(_replication); | ||
| for (int replicaId = 0; replicaId < _replication; replicaId++) { | ||
| instancesAssigned.add(instances.get((partitionGroupId * _replication + replicaId) % numInstances)); | ||
| int instanceIndex = Math.abs(partitionGroupId * _replication + replicaId) % numInstances; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does getPartitionGroupId() aim to return a non-negative value? If so, we don't need Math.abs() here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If partitionGroupId is a large number, partitionGroupId * _replication + replicaId will roll over to negative numbers. That's why I put the Math.abs() around it. Having said that, if we bound the partitionGroupId to a smaller number - like % 1000 that Jackie is suggesting - we can remove the Math.abs() here.
| if (segmentPartitionId == null) { | ||
| // This case is for the uploaded segments for which there's no partition information | ||
| // Choose a random, but consistent, partition id | ||
| segmentPartitionId = Math.abs(segmentName.hashCode()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can segmentName.hashCode() = Integer.MIN_VALUE? If so, the segmentPartitionId can be a negative number, is this expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. We may bound the partition group id to some large number (e.g. Math.abs(segmentName.hashCode() % 10000)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I'll update with an upper bound.
| public static @Nullable | ||
| Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(convention)
| public static @Nullable | |
| Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager, | |
| @Nullable | |
| public static Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager, |
| Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); | ||
| SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig); | ||
| InstancePartitionsType instancePartitionsType; | ||
| if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use COMPLETED type only if the COMPLETED instance partitions exists or the tag override is configured. If segment relocation is not configured, we should follow the same assignment as the CONSUMING segments.
Let's keep the comments for the upsert case. For upsert segments, we should always use the CONSUMING type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Refactored.
| Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s", | ||
| _realtimeTableName); | ||
| Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided"); | ||
| InstancePartitions instancePartitions = instancePartitionsMap.entrySet().iterator().next().getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check the InstancePartitionsType of the entry to decide how to assign the segment. The current logic is for CONSUMING type only, and we should add one for COMPLETED type, which should be similar to OfflineSegmentAssignment.assignSegment()
| * This method is implemented to allow refreshing the segments in realtime tables. | ||
| */ | ||
| @Override | ||
| public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this implementation to the BaseTableDataManager. It is common for both offline and realtime table
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only minor comments
| boolean isOfflineTable = !TableNameBuilder.isRealtimeTableResource(tableNameWithType); | ||
| boolean isUpsertTable = !isOfflineTable && tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE; | ||
| InstancePartitionsType instancePartitionsType = null; | ||
| if (isOfflineTable) { | ||
| instancePartitionsType = InstancePartitionsType.OFFLINE; | ||
| } else if (isUpsertTable) { | ||
| // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server | ||
| // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType. | ||
| instancePartitionsType = InstancePartitionsType.CONSUMING; | ||
| } | ||
| if (isOfflineTable || isUpsertTable) { | ||
| return Collections.singletonMap(instancePartitionsType, InstancePartitionsUtils | ||
| .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) slightly more readable to me
| boolean isOfflineTable = !TableNameBuilder.isRealtimeTableResource(tableNameWithType); | |
| boolean isUpsertTable = !isOfflineTable && tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE; | |
| InstancePartitionsType instancePartitionsType = null; | |
| if (isOfflineTable) { | |
| instancePartitionsType = InstancePartitionsType.OFFLINE; | |
| } else if (isUpsertTable) { | |
| // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server | |
| // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType. | |
| instancePartitionsType = InstancePartitionsType.CONSUMING; | |
| } | |
| if (isOfflineTable || isUpsertTable) { | |
| return Collections.singletonMap(instancePartitionsType, InstancePartitionsUtils | |
| .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType)); | |
| } | |
| if (TableNameBuilder.isOfflineTableResource(tableNameWithType) { | |
| return Collections.singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils | |
| .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, InstancePartitionsType.OFFLINE)); | |
| } | |
| if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) { | |
| // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server | |
| // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType. | |
| return Collections.singletonMap(InstancePartitionsType.CONSUMING, InstancePartitionsUtils | |
| .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, InstancePartitionsType.CONSUMING)); | |
| } |
| Integer partitionGroupId = SegmentUtils | ||
| .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0)); | ||
| Preconditions.checkNotNull( | ||
| String.format("PartitionGroupId is not available for segment '%s' (upsert-enabled table)", segmentName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Also include the table name
Description
Currently upload endpoint on controller accepts segments for offline table and also upsert-enabled realtime tables. This PR extends it to all realtime tables.
Corresponding issue: #8283
Testing Done
Release Notes
Uploading segments are now enabled for all realtime table. Previously one could only upload segments to upsert-enabled realtime tables. This feature unblocks the followings for realtime tables:
With segment upload support, we can remove most of the restrictions to realtime tables, and make long retention realtime table much more manageable.