Skip to content

Commit 6fddffe

Browse files
walterddrRong Rong
andauthored
make reset table async (#9203)
* make reset table async * change to use external view resync to check status Co-authored-by: Rong Rong <[email protected]>
1 parent da7d534 commit 6fddffe

File tree

8 files changed

+217
-124
lines changed

8 files changed

+217
-124
lines changed

pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -507,14 +507,13 @@ public SuccessResponse resetSegment(
507507
@ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType")
508508
String tableNameWithType,
509509
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
510-
@ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses "
511-
+ "serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
510+
@ApiParam(value = "Name of the target instance to reset") @QueryParam("targetInstance") @Nullable
511+
String targetInstance) {
512512
segmentName = URIUtils.decode(segmentName);
513513
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
514514
try {
515515
Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
516-
_pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
517-
maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
516+
_pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName, targetInstance);
518517
return new SuccessResponse(
519518
String.format("Successfully reset segment: %s of table: %s", segmentName, tableNameWithType));
520519
} catch (IllegalStateException e) {
@@ -543,14 +542,13 @@ public SuccessResponse resetSegment(
543542
+ " finally enabling the segments", notes = "Resets a segment by disabling and then enabling a segment")
544543
public SuccessResponse resetAllSegments(
545544
@ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType")
546-
String tableNameWithType, @ApiParam(
547-
value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses "
548-
+ "serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
545+
String tableNameWithType,
546+
@ApiParam(value = "Name of the target instance to reset") @QueryParam("targetInstance") @Nullable
547+
String targetInstance) {
549548
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
550549
try {
551550
Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
552-
_pinotHelixResourceManager.resetAllSegments(tableNameWithType,
553-
maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
551+
_pinotHelixResourceManager.resetAllSegments(tableNameWithType, targetInstance);
554552
return new SuccessResponse(String.format("Successfully reset all segments of table: %s", tableNameWithType));
555553
} catch (IllegalStateException e) {
556554
throw new ControllerApplicationException(LOGGER,

pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,26 @@ public long getTableSize(String tableName)
140140
}
141141
}
142142

143+
public void resetTable(String tableNameWithType, String targetInstance)
144+
throws IOException {
145+
try {
146+
HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL(
147+
_controllerRequestURLBuilder.forTableReset(tableNameWithType, targetInstance)).toURI(), null));
148+
} catch (HttpErrorStatusException | URISyntaxException e) {
149+
throw new IOException(e);
150+
}
151+
}
152+
153+
public void resetSegment(String tableNameWithType, String segmentName, String targetInstance)
154+
throws IOException {
155+
try {
156+
HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL(
157+
_controllerRequestURLBuilder.forSegmentReset(tableNameWithType, segmentName, targetInstance)).toURI(), null));
158+
} catch (HttpErrorStatusException | URISyntaxException e) {
159+
throw new IOException(e);
160+
}
161+
}
162+
143163
public void reloadTable(String tableName, TableType tableType, boolean forceDownload)
144164
throws IOException {
145165
try {

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java

Lines changed: 117 additions & 115 deletions
Large diffs are not rendered by default.

pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.pinot.spi.stream.StreamConfigProperties;
5656
import org.apache.pinot.spi.stream.StreamDataServerStartable;
5757
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
58+
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
5859
import org.apache.pinot.tools.utils.KafkaStarterUtils;
5960
import org.apache.pinot.util.TestUtils;
6061
import org.testng.Assert;
@@ -594,6 +595,15 @@ public Boolean apply(@Nullable Void aVoid) {
594595
}, 100L, timeoutMs, "Failed to load " + countStarResult + " documents", raiseError);
595596
}
596597

598+
/**
599+
* Reset table utils.
600+
*/
601+
protected void resetTable(String tableName, TableType tableType, @Nullable String targetInstance)
602+
throws IOException {
603+
getControllerRequestClient().resetTable(TableNameBuilder.forType(tableType).tableNameWithType(tableName),
604+
targetInstance);
605+
}
606+
597607
/**
598608
* Run equivalent Pinot and H2 query and compare the results.
599609
*/

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,27 @@
2323
import java.io.InputStream;
2424
import java.io.InputStreamReader;
2525
import java.util.List;
26+
import java.util.Map;
2627
import org.apache.commons.lang3.StringUtils;
28+
import org.apache.helix.PropertyKey;
29+
import org.apache.helix.model.ExternalView;
2730
import org.apache.helix.model.InstanceConfig;
31+
import org.apache.helix.model.Message;
2832
import org.apache.pinot.client.ResultSet;
2933
import org.apache.pinot.client.ResultSetGroup;
3034
import org.apache.pinot.common.exception.QueryException;
3135
import org.apache.pinot.core.query.utils.idset.IdSet;
3236
import org.apache.pinot.core.query.utils.idset.IdSets;
3337
import org.apache.pinot.server.starter.helix.BaseServerStarter;
38+
import org.apache.pinot.spi.config.table.TableType;
3439
import org.apache.pinot.spi.data.DimensionFieldSpec;
3540
import org.apache.pinot.spi.data.FieldSpec;
3641
import org.apache.pinot.spi.data.MetricFieldSpec;
3742
import org.apache.pinot.spi.data.Schema;
3843
import org.apache.pinot.spi.utils.CommonConstants;
3944
import org.apache.pinot.spi.utils.InstanceTypeUtils;
4045
import org.apache.pinot.spi.utils.JsonUtils;
46+
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
4147
import org.apache.pinot.util.TestUtils;
4248
import org.slf4j.Logger;
4349
import org.slf4j.LoggerFactory;
@@ -531,6 +537,42 @@ private void checkForEmptyRoutingTable(boolean shouldBeEmpty) {
531537
}, 60_000L, errorMessage);
532538
}
533539

540+
public void testReset(TableType tableType)
541+
throws Exception {
542+
String rawTableName = getTableName();
543+
544+
// reset the table.
545+
resetTable(rawTableName, tableType, null);
546+
547+
// wait for all live messages clear the queue.
548+
List<String> instances = _helixResourceManager.getServerInstancesForTable(rawTableName, tableType);
549+
PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
550+
TestUtils.waitForCondition(aVoid -> {
551+
int liveMessageCount = 0;
552+
for (String instanceName : instances) {
553+
List<Message> messages = _helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true);
554+
liveMessageCount += messages.size();
555+
}
556+
return liveMessageCount == 0;
557+
}, 30_000L, "Failed to wait for all segment reset messages clear helix state transition!");
558+
559+
// Check that all segment states come back to ONLINE.
560+
TestUtils.waitForCondition(aVoid -> {
561+
// check external view and wait for everything to come back online
562+
ExternalView externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(),
563+
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName));
564+
for (Map<String, String> externalViewStateMap : externalView.getRecord().getMapFields().values()) {
565+
for (String state : externalViewStateMap.values()) {
566+
if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)
567+
&& !CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)) {
568+
return false;
569+
}
570+
}
571+
}
572+
return true;
573+
}, 30_000L, "Failed to wait for all segments come back online");
574+
}
575+
534576
/**
535577
* TODO: Support removing new added columns for MutableSegment and remove the new added columns before running the
536578
* next test. Use this to replace {@link OfflineClusterIntegrationTest#testDefaultColumns()}.

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,12 @@ public void testReload()
277277
testReload(false);
278278
}
279279

280+
@Test
281+
public void testReset()
282+
throws Exception {
283+
super.testReset(TableType.REALTIME);
284+
}
285+
280286
@Test
281287
@Override
282288
public void testHardcodedServerPartitionedSqlQueries()

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2663,6 +2663,11 @@ private void validateMetadataResponse(JsonNode response, int numTotalColumn, int
26632663
assertEquals(response.get(MAX_NUM_MULTI_VALUES_MAP_KEY).size(), numMVColumn);
26642664
}
26652665

2666+
@Test
2667+
public void testReset()
2668+
throws Exception {
2669+
super.testReset(TableType.OFFLINE);
2670+
}
26662671

26672672
@Test
26682673
public void testJDBCClient()

pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ public String forTableReload(String tableName, TableType tableType, boolean forc
223223
return StringUtil.join("/", _baseUrl, "segments", tableName, query);
224224
}
225225

226+
public String forTableReset(String tableNameWithType, @Nullable String targetInstance) {
227+
String query = targetInstance == null ? "reset" : String.format("reset?targetInstance=%s", targetInstance);
228+
return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, query);
229+
}
230+
226231
public String forControllerJobStatus(String jobId) {
227232
return StringUtil.join("/", _baseUrl, "segments", "segmentReloadStatus", jobId);
228233
}
@@ -316,6 +321,11 @@ public String forSegmentReload(String tableName, String segmentName, boolean for
316321
"reload?forceDownload=" + forceDownload);
317322
}
318323

324+
public String forSegmentReset(String tableNameWithType, String segmentName, String targetInstance) {
325+
String query = targetInstance == null ? "reset" : String.format("reset?targetInstance=%s", targetInstance);
326+
return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, encode(segmentName), query);
327+
}
328+
319329
public String forSegmentDownload(String tableName, String segmentName) {
320330
return StringUtil.join("/", _baseUrl, "segments", tableName, encode(segmentName));
321331
}

0 commit comments

Comments
 (0)