Skip to content

Commit 7c3c8e8

Browse files
saurabhd336Saurabh Dubey
andauthored
Allow replica group assignment support in tier configs (#10255)
* Allow replica group assignment support in tier configs * Add reassignment logic * Add tier to assignInstances * Make changes to APIs * Consolidate tier assignemnt configs inside InstanceAssignmentConfigsMap * Add removal logic for tier partitions * Lint fix * Review comments * Review comments * Add tests * Fix java8 quickstart * Fix test --------- Co-authored-by: Saurabh Dubey <[email protected]>
1 parent 3ffa7c3 commit 7c3c8e8

File tree

19 files changed

+594
-211
lines changed

19 files changed

+594
-211
lines changed

pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ private InstanceAssignmentConfigUtils() {
4444
* backward-compatibility) COMPLETED server tag is overridden to be different from the CONSUMING server tag.
4545
*/
4646
public static boolean shouldRelocateCompletedSegments(TableConfig tableConfig) {
47-
Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
48-
tableConfig.getInstanceAssignmentConfigMap();
47+
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
4948
return (instanceAssignmentConfigMap != null
50-
&& instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED) != null) || TagNameUtils
51-
.isRelocateCompletedSegments(tableConfig.getTenantConfig());
49+
&& instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED.toString()) != null)
50+
|| TagNameUtils.isRelocateCompletedSegments(tableConfig.getTenantConfig());
5251
}
5352

5453
/**
@@ -60,21 +59,20 @@ public static boolean allowInstanceAssignment(TableConfig tableConfig,
6059
return true;
6160
}
6261
TableType tableType = tableConfig.getTableType();
63-
Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
64-
tableConfig.getInstanceAssignmentConfigMap();
62+
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
6563
switch (instancePartitionsType) {
6664
// Allow OFFLINE instance assignment if the offline table has it configured or (for backward-compatibility) is
6765
// using replica-group segment assignment
6866
case OFFLINE:
6967
return tableType == TableType.OFFLINE && ((instanceAssignmentConfigMap != null
70-
&& instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE) != null)
68+
&& instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString()) != null)
7169
|| AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
7270
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()));
7371
// Allow CONSUMING/COMPLETED instance assignment if the real-time table has it configured
7472
case CONSUMING:
7573
case COMPLETED:
7674
return tableType == TableType.REALTIME && (instanceAssignmentConfigMap != null
77-
&& instanceAssignmentConfigMap.get(instancePartitionsType) != null);
75+
&& instanceAssignmentConfigMap.get(instancePartitionsType.toString()) != null);
7876
default:
7977
throw new IllegalStateException();
8078
}
@@ -89,10 +87,10 @@ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig t
8987
"Instance assignment is not allowed for the given table config");
9088

9189
// Use the instance assignment config from the table config if it exists
92-
Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
93-
tableConfig.getInstanceAssignmentConfigMap();
90+
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
9491
if (instanceAssignmentConfigMap != null) {
95-
InstanceAssignmentConfig instanceAssignmentConfig = instanceAssignmentConfigMap.get(instancePartitionsType);
92+
InstanceAssignmentConfig instanceAssignmentConfig =
93+
instanceAssignmentConfigMap.get(instancePartitionsType.toString());
9694
if (instanceAssignmentConfig != null) {
9795
return instanceAssignmentConfig;
9896
}

pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ private InstancePartitionsUtils() {
4646
}
4747

4848
public static final char TYPE_SUFFIX_SEPARATOR = '_';
49+
public static final String TIER_SUFFIX = "__TIER__";
4950

5051
/**
5152
* Returns the name of the instance partitions for the given table name (with or without type suffix) and instance
@@ -93,6 +94,11 @@ public static InstancePartitions fetchInstancePartitions(HelixPropertyStore<ZNRe
9394
return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null;
9495
}
9596

97+
public static String getInstancePartitionsNameForTier(String tableName, String tierName) {
98+
return TableNameBuilder.extractRawTableName(tableName) + TIER_SUFFIX + tierName;
99+
}
100+
101+
96102
/**
97103
* Gets the instance partitions with the given name, and returns a re-named copy of the same.
98104
* This method is useful when we use a table with instancePartitionsMap since in that case
@@ -177,4 +183,14 @@ public static void removeInstancePartitions(HelixPropertyStore<ZNRecord> propert
177183
throw new ZkException("Failed to remove instance partitions: " + instancePartitionsName);
178184
}
179185
}
186+
187+
public static void removeTierInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
188+
String tableNameWithType) {
189+
List<InstancePartitions> instancePartitions = ZKMetadataProvider.getAllInstancePartitions(propertyStore);
190+
instancePartitions.stream().filter(instancePartition -> instancePartition.getInstancePartitionsName()
191+
.startsWith(TableNameBuilder.extractRawTableName(tableNameWithType) + TIER_SUFFIX))
192+
.forEach(instancePartition -> {
193+
removeInstancePartitions(propertyStore, instancePartition.getInstancePartitionsName());
194+
});
195+
}
180196
}

pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import java.util.stream.Collectors;
2727
import javax.annotation.Nullable;
2828
import org.apache.helix.AccessOption;
29+
import org.apache.helix.store.HelixPropertyStore;
2930
import org.apache.helix.store.zk.ZkHelixPropertyStore;
3031
import org.apache.helix.zookeeper.datamodel.ZNRecord;
3132
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
33+
import org.apache.pinot.common.assignment.InstancePartitions;
3234
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
3335
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
3436
import org.apache.pinot.common.utils.SchemaUtils;
@@ -261,6 +263,20 @@ public static UserConfig getUserConfig(ZkHelixPropertyStore<ZNRecord> propertySt
261263
}
262264
}
263265

266+
@Nullable
267+
public static List<InstancePartitions> getAllInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore) {
268+
List<ZNRecord> znRecordss =
269+
propertyStore.getChildren(PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, null, AccessOption.PERSISTENT);
270+
271+
try {
272+
return Optional.ofNullable(znRecordss).orElseGet(ArrayList::new).stream().map(InstancePartitions::fromZNRecord)
273+
.collect(Collectors.toList());
274+
} catch (Exception e) {
275+
LOGGER.error("Caught exception while getting instance partitions", e);
276+
return null;
277+
}
278+
}
279+
264280
@Nullable
265281
public static List<UserConfig> getAllUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore) {
266282
List<ZNRecord> znRecordss =

pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,11 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
113113
queryConfig = JsonUtils.stringToObject(queryConfigString, QueryConfig.class);
114114
}
115115

116-
Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap = null;
116+
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = null;
117117
String instanceAssignmentConfigMapString = simpleFields.get(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY);
118118
if (instanceAssignmentConfigMapString != null) {
119119
instanceAssignmentConfigMap = JsonUtils.stringToObject(instanceAssignmentConfigMapString,
120-
new TypeReference<Map<InstancePartitionsType, InstanceAssignmentConfig>>() {
120+
new TypeReference<Map<String, InstanceAssignmentConfig>>() {
121121
});
122122
}
123123

@@ -181,9 +181,9 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
181181
}
182182

183183
return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
184-
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap,
185-
fieldConfigList, upsertConfig, dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList, isDimTable,
186-
tunerConfigList, instancePartitionsMap, segmentAssignmentConfigMap);
184+
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
185+
dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList, isDimTable, tunerConfigList,
186+
instancePartitionsMap, segmentAssignmentConfigMap);
187187
}
188188

189189
public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -216,8 +216,7 @@ public static ZNRecord toZNRecord(TableConfig tableConfig)
216216
if (queryConfig != null) {
217217
simpleFields.put(TableConfig.QUERY_CONFIG_KEY, queryConfig.toJsonString());
218218
}
219-
Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
220-
tableConfig.getInstanceAssignmentConfigMap();
219+
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
221220
if (instanceAssignmentConfigMap != null) {
222221
simpleFields
223222
.put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, JsonUtils.objectToString(instanceAssignmentConfigMap));

pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public void testSerDe()
212212
new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")),
213213
new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false));
214214
TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap(
215-
Collections.singletonMap(InstancePartitionsType.OFFLINE, instanceAssignmentConfig)).build();
215+
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();
216216

217217
checkInstanceAssignmentConfig(tableConfig);
218218

@@ -488,12 +488,12 @@ private void checkTierConfigList(TableConfig tableConfig) {
488488
}
489489

490490
private void checkInstanceAssignmentConfig(TableConfig tableConfig) {
491-
Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
492-
tableConfig.getInstanceAssignmentConfigMap();
491+
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
493492
assertNotNull(instanceAssignmentConfigMap);
494493
assertEquals(instanceAssignmentConfigMap.size(), 1);
495-
assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE));
496-
InstanceAssignmentConfig instanceAssignmentConfig = instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE);
494+
assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
495+
InstanceAssignmentConfig instanceAssignmentConfig =
496+
instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString());
497497

498498
InstanceTagPoolConfig tagPoolConfig = instanceAssignmentConfig.getTagPoolConfig();
499499
assertEquals(tagPoolConfig.getTag(), "tenant_OFFLINE");

0 commit comments

Comments
 (0)