Skip to content

Conversation

@sajjad-moradi
Copy link
Contributor

@sajjad-moradi sajjad-moradi commented Apr 22, 2022

Description

Currently upload endpoint on controller accepts segments for offline table and also upsert-enabled realtime tables. This PR extends it to all realtime tables.
Corresponding issue: #8283

Testing Done

  • Extended LLC realtime integration tests to upload segments to verify both refresh and new segment upload paths
  • Extended current unit tests for both segment assignment and rebalance
  • Also verified segment assignment for completed segments by locally modifying an integration test to have multiple server instances and added completed instances partitions for 2 of those servers to the property store

Release Notes

Uploading segments are now enabled for all realtime table. Previously one could only upload segments to upsert-enabled realtime tables. This feature unblocks the followings for realtime tables:

  • Bootstrap a realtime table
  • Backfill
  • Segment merge & rollup
  • Segment purge & conversion

With segment upload support, we can remove most of the restrictions to realtime tables, and make long retention realtime table much more manageable.

@sajjad-moradi sajjad-moradi added feature release-notes Referenced by PRs that need attention when compiling the next release notes labels Apr 22, 2022
@sajjad-moradi sajjad-moradi changed the title Enable uploading segments for realtime tables Enable uploading segments to realtime tables Apr 22, 2022
@codecov-commenter
Copy link

codecov-commenter commented Apr 22, 2022

Codecov Report

Merging #8584 (85db9b6) into master (09bae15) will decrease coverage by 5.17%.
The diff coverage is 56.60%.

@@             Coverage Diff              @@
##             master    #8584      +/-   ##
============================================
- Coverage     68.00%   62.82%   -5.18%     
+ Complexity     4573     4558      -15     
============================================
  Files          1725     1679      -46     
  Lines         90085    88121    -1964     
  Branches      13411    13194     -217     
============================================
- Hits          61260    55365    -5895     
- Misses        24548    28790    +4242     
+ Partials       4277     3966     -311     
Flag Coverage Δ
integration2 ?
unittests1 66.22% <18.18%> (+0.02%) ⬆️
unittests2 14.26% <52.83%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...va/org/apache/pinot/common/utils/SegmentUtils.java 0.00% <0.00%> (-71.43%) ⬇️
.../data/manager/offline/OfflineTableDataManager.java 100.00% <ø> (ø)
...ata/manager/realtime/RealtimeTableDataManager.java 11.55% <0.00%> (-55.52%) ⬇️
...ntroller/helix/core/PinotHelixResourceManager.java 64.03% <38.88%> (-3.75%) ⬇️
...ces/PinotSegmentUploadDownloadRestletResource.java 40.41% <66.66%> (-7.94%) ⬇️
.../assignment/segment/RealtimeSegmentAssignment.java 94.30% <90.47%> (+0.22%) ⬆️
.../pinot/core/data/manager/BaseTableDataManager.java 82.60% <100.00%> (+0.13%) ⬆️
...va/org/apache/pinot/core/routing/RoutingTable.java 0.00% <0.00%> (-100.00%) ⬇️
...va/org/apache/pinot/common/config/NettyConfig.java 0.00% <0.00%> (-100.00%) ⬇️
...a/org/apache/pinot/common/metrics/MinionMeter.java 0.00% <0.00%> (-100.00%) ⬇️
... and 345 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 09bae15...85db9b6. Read the comment docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest returning Integer (annotate with @Nullable) instead of Optional. Currently we usually use null to represent value not available, and several methods for Optional is not supported in JDK 8.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more robust, let's remove the checks and return the partition id when it is available, return null otherwise. The caller can then decide how to proceed

Suggested change
if (segmentPartitionMetadata == null
if (segmentPartitionMetadata != null) {
ColumnPartitionMetadata columnPartitionMetadata =
segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn);
if (columnPartitionMetadata != null && columnPartitionMetadata.getPartitions().size == 1) {
return columnPartitionMetadata.getPartitions().iterator().next();
}
}
return null;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Remove one extra empty line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't pass upsert info to the segment assignment engine, set instancePartitionsType to CONSUMING instead (same as the current code).
Also, don't call isUpsertTable() because it will fetch another table config, use tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE instead.

Comment on lines 120 to 123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Preconditions
.checkState(instancePartitionsMap.keySet().size() == 1, "One instance partition type should be provided");
InstancePartitionsType instancePartitionsType = instancePartitionsMap.keySet().stream().findFirst().get();
InstancePartitions instancePartitions = instancePartitionsMap.get(instancePartitionsType);
Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided");
Map.Entry<InstancePartitionsType, InstancePartitions> entry = instancePartitionsMap.entrySet().iterator().next();
InstancePartitionsType instancePartitionsType = entry.getKey();
InstancePartitions instancePartitions = entry.getValue();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not assign one by one here. We may create a new map unpartitionedCompletedSegmentAssignment with only the unpartitioned segments, then use reassignSegments() to get minimum data movement

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Rename to unpartitionedSegments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest not using functional apis. Even though this method is not very performance critical, it is still called very frequently when rebalancing the table

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is quite expensive especially for large table. Calling it for each segment * numPartitions can cause performance issue. Instead we can use hash-code as suggested above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should combine the unpartitioned segments into the partitionGroupIdToSegmentsMap (assign a partition id to them). I'd suggest simply using the Math.abs(segmentName.hashCode() % numPartitions) as the partition group id because it is deterministic. We need to keep it deterministic so that during multiple rebalances, it won't switch partition which will force it to move around different servers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need some comments on why the count star result needs to be double?

@sajjad-moradi sajjad-moradi force-pushed the feature/upload.realtime.segment branch from 8c0cf76 to 83c8318 Compare April 27, 2022 22:01
@sajjad-moradi sajjad-moradi force-pushed the feature/upload.realtime.segment branch from d5a3361 to bc1a07e Compare April 28, 2022 03:28
List<String> instancesAssigned = new ArrayList<>(_replication);
for (int replicaId = 0; replicaId < _replication; replicaId++) {
instancesAssigned.add(instances.get((partitionGroupId * _replication + replicaId) % numInstances));
int instanceIndex = Math.abs(partitionGroupId * _replication + replicaId) % numInstances;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does getPartitionGroupId() aim to return a non-negative value? If so, we don't need Math.abs() here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If partitionGroupId is a large number, partitionGroupId * _replication + replicaId will roll over to negative numbers. That's why I put the Math.abs() around it. Having said that, if we bound the partitionGroupId to a smaller number - like % 1000 that Jackie is suggesting - we can remove the Math.abs() here.

if (segmentPartitionId == null) {
// This case is for the uploaded segments for which there's no partition information
// Choose a random, but consistent, partition id
segmentPartitionId = Math.abs(segmentName.hashCode());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can segmentName.hashCode() = Integer.MIN_VALUE? If so, the segmentPartitionId can be a negative number, is this expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We may bound the partition group id to some large number (e.g. Math.abs(segmentName.hashCode() % 10000)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I'll update with an upper bound.

Comment on lines 38 to 39
public static @Nullable
Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(convention)

Suggested change
public static @Nullable
Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager,
@Nullable
public static Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager,

Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType);
SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
InstancePartitionsType instancePartitionsType;
if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use COMPLETED type only if the COMPLETED instance partitions exists or the tag override is configured. If segment relocation is not configured, we should follow the same assignment as the CONSUMING segments.
Let's keep the comments for the upsert case. For upsert segments, we should always use the CONSUMING type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Refactored.

Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s",
_realtimeTableName);
Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided");
InstancePartitions instancePartitions = instancePartitionsMap.entrySet().iterator().next().getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check the InstancePartitionsType of the entry to decide how to assign the segment. The current logic is for CONSUMING type only, and we should add one for COMPLETED type, which should be similar to OfflineSegmentAssignment.assignSegment()

* This method is implemented to allow refreshing the segments in realtime tables.
*/
@Override
public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this implementation to the BaseTableDataManager. It is common for both offline and realtime table

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, only minor comments

Comment on lines 2039 to 2052
boolean isOfflineTable = !TableNameBuilder.isRealtimeTableResource(tableNameWithType);
boolean isUpsertTable = !isOfflineTable && tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE;
InstancePartitionsType instancePartitionsType = null;
if (isOfflineTable) {
instancePartitionsType = InstancePartitionsType.OFFLINE;
} else if (isUpsertTable) {
// In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server
// -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType.
instancePartitionsType = InstancePartitionsType.CONSUMING;
}
if (isOfflineTable || isUpsertTable) {
return Collections.singletonMap(instancePartitionsType, InstancePartitionsUtils
.fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) slightly more readable to me

Suggested change
boolean isOfflineTable = !TableNameBuilder.isRealtimeTableResource(tableNameWithType);
boolean isUpsertTable = !isOfflineTable && tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE;
InstancePartitionsType instancePartitionsType = null;
if (isOfflineTable) {
instancePartitionsType = InstancePartitionsType.OFFLINE;
} else if (isUpsertTable) {
// In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server
// -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType.
instancePartitionsType = InstancePartitionsType.CONSUMING;
}
if (isOfflineTable || isUpsertTable) {
return Collections.singletonMap(instancePartitionsType, InstancePartitionsUtils
.fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType));
}
if (TableNameBuilder.isOfflineTableResource(tableNameWithType) {
return Collections.singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils
.fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, InstancePartitionsType.OFFLINE));
}
if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {
// In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server
// -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType.
return Collections.singletonMap(InstancePartitionsType.CONSUMING, InstancePartitionsUtils
.fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, InstancePartitionsType.CONSUMING));
}

Integer partitionGroupId = SegmentUtils
.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0));
Preconditions.checkNotNull(
String.format("PartitionGroupId is not available for segment '%s' (upsert-enabled table)", segmentName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Also include the table name

@sajjad-moradi sajjad-moradi merged commit b8af790 into apache:master May 17, 2022
@sajjad-moradi sajjad-moradi deleted the feature/upload.realtime.segment branch May 17, 2022 01:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants