Skip to content

Commit 5a34e0f

Browse files
author
Xiaobing Li
committed
add auth token for segment replace rest APIs
1 parent 8bbf93a commit 5a34e0f

File tree

3 files changed

+25
-14
lines changed

3 files changed

+25
-14
lines changed

pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -435,17 +435,24 @@ private static HttpUriRequest getSendSegmentJsonRequest(URI uri, String jsonStri
435435
return requestBuilder.build();
436436
}
437437

438-
private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs) {
438+
private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs,
439+
@Nullable String authToken) {
439440
RequestBuilder requestBuilder =
440441
RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1).setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE)
441442
.setEntity(new StringEntity(jsonRequestBody, ContentType.APPLICATION_JSON));
443+
if (StringUtils.isNotBlank(authToken)) {
444+
requestBuilder.addHeader("Authorization", authToken);
445+
}
442446
setTimeout(requestBuilder, socketTimeoutMs);
443447
return requestBuilder.build();
444448
}
445449

446-
private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs) {
450+
private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs, @Nullable String authToken) {
447451
RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1)
448452
.setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE);
453+
if (StringUtils.isNotBlank(authToken)) {
454+
requestBuilder.addHeader("Authorization", authToken);
455+
}
449456
setTimeout(requestBuilder, socketTimeoutMs);
450457
return requestBuilder.build();
451458
}
@@ -1018,28 +1025,31 @@ public SimpleHttpResponse sendSegmentJson(URI uri, String jsonString)
10181025
*
10191026
* @param uri URI
10201027
* @param startReplaceSegmentsRequest request
1028+
* @param authToken auth token
10211029
* @return Response
10221030
* @throws IOException
10231031
* @throws HttpErrorStatusException
10241032
*/
1025-
public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest)
1033+
public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest,
1034+
@Nullable String authToken)
10261035
throws IOException, HttpErrorStatusException {
10271036
return sendRequest(getStartReplaceSegmentsRequest(uri, JsonUtils.objectToString(startReplaceSegmentsRequest),
1028-
DEFAULT_SOCKET_TIMEOUT_MS));
1037+
DEFAULT_SOCKET_TIMEOUT_MS, authToken));
10291038
}
10301039

10311040
/**
10321041
* End replace segments with default settings.
10331042
*
10341043
* @param uri URI
10351044
* @oaram socketTimeoutMs Socket timeout in milliseconds
1045+
* @param authToken auth token
10361046
* @return Response
10371047
* @throws IOException
10381048
* @throws HttpErrorStatusException
10391049
*/
1040-
public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs)
1050+
public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs, @Nullable String authToken)
10411051
throws IOException, HttpErrorStatusException {
1042-
return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs));
1052+
return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs, authToken));
10431053
}
10441054

10451055
/**

pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
176176
List<String> segmentsTo =
177177
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
178178
lineageEntryId = SegmentConversionUtils.startSegmentReplace(tableNameWithType, uploadURL,
179-
new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
179+
new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), authToken);
180180
}
181181

182182
// Upload the tarred segments
@@ -213,9 +213,8 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
213213

214214
// Update the segment lineage to indicate that the segment replacement is done.
215215
if (replaceSegmentsEnabled) {
216-
SegmentConversionUtils
217-
.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
218-
_minionConf.getEndReplaceSegmentsTimeoutMs());
216+
SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
217+
_minionConf.getEndReplaceSegmentsTimeoutMs(), authToken);
219218
}
220219

221220
String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)

pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.net.URI;
2424
import java.util.List;
2525
import java.util.Map;
26+
import javax.annotation.Nullable;
2627
import javax.net.ssl.SSLContext;
2728
import org.apache.http.Header;
2829
import org.apache.http.HttpHeaders;
@@ -122,15 +123,16 @@ public static void uploadSegment(Map<String, String> configs, List<Header> httpH
122123
}
123124

124125
public static String startSegmentReplace(String tableNameWithType, String uploadURL,
125-
StartReplaceSegmentsRequest startReplaceSegmentsRequest)
126+
StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable String authToken)
126127
throws Exception {
127128
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
128129
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
129130
SSLContext sslContext = MinionContext.getInstance().getSSLContext();
130131
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
131132
URI uri =
132133
FileUploadDownloadClient.getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), true);
133-
SimpleHttpResponse response = fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest);
134+
SimpleHttpResponse response =
135+
fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest, authToken);
134136
String responseString = response.getResponse();
135137
LOGGER.info(
136138
"Got response {}: {} while sending start replace segment request for table: {}, uploadURL: {}, request: {}",
@@ -140,15 +142,15 @@ public static String startSegmentReplace(String tableNameWithType, String upload
140142
}
141143

142144
public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId,
143-
int socketTimeoutMs)
145+
int socketTimeoutMs, @Nullable String authToken)
144146
throws Exception {
145147
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
146148
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
147149
SSLContext sslContext = MinionContext.getInstance().getSSLContext();
148150
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
149151
URI uri = FileUploadDownloadClient
150152
.getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), segmentLineageEntryId);
151-
SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs);
153+
SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs, authToken);
152154
LOGGER.info("Got response {}: {} while sending end replace segment request for table: {}, uploadURL: {}",
153155
response.getStatusCode(), response.getResponse(), tableNameWithType, uploadURL);
154156
}

0 commit comments

Comments
 (0)