Skip to content

Commit e8c849e

Browse files
authored
Allow subclass to customize what happens pre/post segment uploading (#8203)
Added pre/postUploadSegments() methods for BaseMultipleSegmentsConversionExecutor, so that subclass can customize what happens before/after segment uploading, e.g. RT2OFF task should use segment lineage against the dest offline table instead of the src realtime table, but MergeRollup uses segment lineage against a single table that is both src/dest table.
1 parent 8042408 commit e8c849e

File tree

1 file changed

+98
-17
lines changed

1 file changed

+98
-17
lines changed

pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java

Lines changed: 98 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import java.io.File;
2323
import java.util.ArrayList;
2424
import java.util.Arrays;
25+
import java.util.HashMap;
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.UUID;
2829
import java.util.stream.Collectors;
2930
import org.apache.commons.io.FileUtils;
31+
import org.apache.commons.lang3.StringUtils;
3032
import org.apache.http.Header;
3133
import org.apache.http.NameValuePair;
3234
import org.apache.http.message.BasicHeader;
@@ -57,6 +59,7 @@
5759
*/
5860
public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExecutor {
5961
private static final Logger LOGGER = LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
62+
private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = "lineageEntryId";
6063

6164
protected MinionConf _minionConf;
6265

@@ -89,6 +92,33 @@ protected void preProcess(PinotTaskConfig pinotTaskConfig) {
8992
protected void postProcess(PinotTaskConfig pinotTaskConfig) {
9093
}
9194

95+
protected void preUploadSegments(SegmentUploadContext context)
96+
throws Exception {
97+
// Update the segment lineage to indicate that the segment replacement is in progress.
98+
if (context.isReplaceSegmentsEnabled()) {
99+
List<String> segmentsFrom =
100+
Arrays.stream(StringUtils.split(context.getInputSegmentNames(), MinionConstants.SEGMENT_NAME_SEPARATOR))
101+
.map(String::trim).collect(Collectors.toList());
102+
List<String> segmentsTo =
103+
context.getSegmentConversionResults().stream().map(SegmentConversionResult::getSegmentName)
104+
.collect(Collectors.toList());
105+
String lineageEntryId =
106+
SegmentConversionUtils.startSegmentReplace(context.getTableNameWithType(), context.getUploadURL(),
107+
new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), context.getAuthToken());
108+
context.setCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID, lineageEntryId);
109+
}
110+
}
111+
112+
protected void postUploadSegments(SegmentUploadContext context)
113+
throws Exception {
114+
// Update the segment lineage to indicate that the segment replacement is done.
115+
if (context.isReplaceSegmentsEnabled()) {
116+
String lineageEntryId = (String) context.getCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID);
117+
SegmentConversionUtils.endSegmentReplace(context.getTableNameWithType(), context.getUploadURL(), lineageEntryId,
118+
_minionConf.getEndReplaceSegmentsTimeoutMs(), context.getAuthToken());
119+
}
120+
}
121+
92122
@Override
93123
public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig)
94124
throws Exception {
@@ -102,8 +132,6 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
102132
String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR);
103133
String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
104134
String authToken = configs.get(MinionConstants.AUTH_TOKEN);
105-
String replaceSegmentsString = configs.get(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY);
106-
boolean replaceSegmentsEnabled = Boolean.parseBoolean(replaceSegmentsString);
107135

108136
LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType,
109137
tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
@@ -168,16 +196,8 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
168196
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
169197
}
170198

171-
// Update the segment lineage to indicate that the segment replacement is in progress.
172-
String lineageEntryId = null;
173-
if (replaceSegmentsEnabled) {
174-
List<String> segmentsFrom =
175-
Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
176-
List<String> segmentsTo =
177-
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
178-
lineageEntryId = SegmentConversionUtils.startSegmentReplace(tableNameWithType, uploadURL,
179-
new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), authToken);
180-
}
199+
SegmentUploadContext segmentUploadContext = new SegmentUploadContext(pinotTaskConfig, segmentConversionResults);
200+
preUploadSegments(segmentUploadContext);
181201

182202
// Upload the tarred segments
183203
for (int i = 0; i < numOutputSegments; i++) {
@@ -211,11 +231,7 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
211231
}
212232
}
213233

214-
// Update the segment lineage to indicate that the segment replacement is done.
215-
if (replaceSegmentsEnabled) {
216-
SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
217-
_minionConf.getEndReplaceSegmentsTimeoutMs(), authToken);
218-
}
234+
postUploadSegments(segmentUploadContext);
219235

220236
String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
221237
.collect(Collectors.joining(","));
@@ -229,4 +245,69 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
229245
FileUtils.deleteQuietly(tempDataDir);
230246
}
231247
}
248+
249+
// SegmentUploadContext holds the info to conduct certain actions
250+
// before and after uploading multiple segments.
251+
protected static class SegmentUploadContext {
252+
private final PinotTaskConfig _pinotTaskConfig;
253+
private final List<SegmentConversionResult> _segmentConversionResults;
254+
255+
private final String _tableNameWithType;
256+
private final String _uploadURL;
257+
private final String _authToken;
258+
private final String _inputSegmentNames;
259+
private final boolean _replaceSegmentsEnabled;
260+
private final Map<String, Object> _customMap;
261+
262+
public SegmentUploadContext(PinotTaskConfig pinotTaskConfig,
263+
List<SegmentConversionResult> segmentConversionResults) {
264+
_pinotTaskConfig = pinotTaskConfig;
265+
_segmentConversionResults = segmentConversionResults;
266+
267+
Map<String, String> configs = pinotTaskConfig.getConfigs();
268+
_tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
269+
_uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
270+
_authToken = configs.get(MinionConstants.AUTH_TOKEN);
271+
_inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
272+
String replaceSegmentsString = configs.get(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY);
273+
_replaceSegmentsEnabled = Boolean.parseBoolean(replaceSegmentsString);
274+
_customMap = new HashMap<>();
275+
}
276+
277+
public PinotTaskConfig getPinotTaskConfig() {
278+
return _pinotTaskConfig;
279+
}
280+
281+
public List<SegmentConversionResult> getSegmentConversionResults() {
282+
return _segmentConversionResults;
283+
}
284+
285+
public String getTableNameWithType() {
286+
return _tableNameWithType;
287+
}
288+
289+
public String getUploadURL() {
290+
return _uploadURL;
291+
}
292+
293+
public String getAuthToken() {
294+
return _authToken;
295+
}
296+
297+
public String getInputSegmentNames() {
298+
return _inputSegmentNames;
299+
}
300+
301+
public boolean isReplaceSegmentsEnabled() {
302+
return _replaceSegmentsEnabled;
303+
}
304+
305+
public Object getCustomContext(String key) {
306+
return _customMap.get(key);
307+
}
308+
309+
public void setCustomContext(String key, Object value) {
310+
_customMap.put(key, value);
311+
}
312+
}
232313
}

0 commit comments

Comments
 (0)