2222import java .io .File ;
2323import java .util .ArrayList ;
2424import java .util .Arrays ;
25+ import java .util .HashMap ;
2526import java .util .List ;
2627import java .util .Map ;
2728import java .util .UUID ;
2829import java .util .stream .Collectors ;
2930import org .apache .commons .io .FileUtils ;
31+ import org .apache .commons .lang3 .StringUtils ;
3032import org .apache .http .Header ;
3133import org .apache .http .NameValuePair ;
3234import org .apache .http .message .BasicHeader ;
5759 */
5860public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExecutor {
5961 private static final Logger LOGGER = LoggerFactory .getLogger (BaseMultipleSegmentsConversionExecutor .class );
62+ private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = "lineageEntryId" ;
6063
6164 protected MinionConf _minionConf ;
6265
@@ -89,6 +92,33 @@ protected void preProcess(PinotTaskConfig pinotTaskConfig) {
8992 protected void postProcess (PinotTaskConfig pinotTaskConfig ) {
9093 }
9194
95+ protected void preUploadSegments (SegmentUploadContext context )
96+ throws Exception {
97+ // Update the segment lineage to indicate that the segment replacement is in progress.
98+ if (context .isReplaceSegmentsEnabled ()) {
99+ List <String > segmentsFrom =
100+ Arrays .stream (StringUtils .split (context .getInputSegmentNames (), MinionConstants .SEGMENT_NAME_SEPARATOR ))
101+ .map (String ::trim ).collect (Collectors .toList ());
102+ List <String > segmentsTo =
103+ context .getSegmentConversionResults ().stream ().map (SegmentConversionResult ::getSegmentName )
104+ .collect (Collectors .toList ());
105+ String lineageEntryId =
106+ SegmentConversionUtils .startSegmentReplace (context .getTableNameWithType (), context .getUploadURL (),
107+ new StartReplaceSegmentsRequest (segmentsFrom , segmentsTo ), context .getAuthToken ());
108+ context .setCustomContext (CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID , lineageEntryId );
109+ }
110+ }
111+
112+ protected void postUploadSegments (SegmentUploadContext context )
113+ throws Exception {
114+ // Update the segment lineage to indicate that the segment replacement is done.
115+ if (context .isReplaceSegmentsEnabled ()) {
116+ String lineageEntryId = (String ) context .getCustomContext (CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID );
117+ SegmentConversionUtils .endSegmentReplace (context .getTableNameWithType (), context .getUploadURL (), lineageEntryId ,
118+ _minionConf .getEndReplaceSegmentsTimeoutMs (), context .getAuthToken ());
119+ }
120+ }
121+
92122 @ Override
93123 public List <SegmentConversionResult > executeTask (PinotTaskConfig pinotTaskConfig )
94124 throws Exception {
@@ -102,8 +132,6 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
102132 String [] downloadURLs = downloadURLString .split (MinionConstants .URL_SEPARATOR );
103133 String uploadURL = configs .get (MinionConstants .UPLOAD_URL_KEY );
104134 String authToken = configs .get (MinionConstants .AUTH_TOKEN );
105- String replaceSegmentsString = configs .get (MinionConstants .ENABLE_REPLACE_SEGMENTS_KEY );
106- boolean replaceSegmentsEnabled = Boolean .parseBoolean (replaceSegmentsString );
107135
108136 LOGGER .info ("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}" , taskType ,
109137 tableNameWithType , inputSegmentNames , downloadURLString , uploadURL );
@@ -168,16 +196,8 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
168196 taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled" );
169197 }
170198
171- // Update the segment lineage to indicate that the segment replacement is in progress.
172- String lineageEntryId = null ;
173- if (replaceSegmentsEnabled ) {
174- List <String > segmentsFrom =
175- Arrays .stream (inputSegmentNames .split ("," )).map (String ::trim ).collect (Collectors .toList ());
176- List <String > segmentsTo =
177- segmentConversionResults .stream ().map (SegmentConversionResult ::getSegmentName ).collect (Collectors .toList ());
178- lineageEntryId = SegmentConversionUtils .startSegmentReplace (tableNameWithType , uploadURL ,
179- new StartReplaceSegmentsRequest (segmentsFrom , segmentsTo ), authToken );
180- }
199+ SegmentUploadContext segmentUploadContext = new SegmentUploadContext (pinotTaskConfig , segmentConversionResults );
200+ preUploadSegments (segmentUploadContext );
181201
182202 // Upload the tarred segments
183203 for (int i = 0 ; i < numOutputSegments ; i ++) {
@@ -211,11 +231,7 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
211231 }
212232 }
213233
214- // Update the segment lineage to indicate that the segment replacement is done.
215- if (replaceSegmentsEnabled ) {
216- SegmentConversionUtils .endSegmentReplace (tableNameWithType , uploadURL , lineageEntryId ,
217- _minionConf .getEndReplaceSegmentsTimeoutMs (), authToken );
218- }
234+ postUploadSegments (segmentUploadContext );
219235
220236 String outputSegmentNames = segmentConversionResults .stream ().map (SegmentConversionResult ::getSegmentName )
221237 .collect (Collectors .joining ("," ));
@@ -229,4 +245,69 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
229245 FileUtils .deleteQuietly (tempDataDir );
230246 }
231247 }
248+
249+ // SegmentUploadContext holds the info to conduct certain actions
250+ // before and after uploading multiple segments.
251+ protected static class SegmentUploadContext {
252+ private final PinotTaskConfig _pinotTaskConfig ;
253+ private final List <SegmentConversionResult > _segmentConversionResults ;
254+
255+ private final String _tableNameWithType ;
256+ private final String _uploadURL ;
257+ private final String _authToken ;
258+ private final String _inputSegmentNames ;
259+ private final boolean _replaceSegmentsEnabled ;
260+ private final Map <String , Object > _customMap ;
261+
262+ public SegmentUploadContext (PinotTaskConfig pinotTaskConfig ,
263+ List <SegmentConversionResult > segmentConversionResults ) {
264+ _pinotTaskConfig = pinotTaskConfig ;
265+ _segmentConversionResults = segmentConversionResults ;
266+
267+ Map <String , String > configs = pinotTaskConfig .getConfigs ();
268+ _tableNameWithType = configs .get (MinionConstants .TABLE_NAME_KEY );
269+ _uploadURL = configs .get (MinionConstants .UPLOAD_URL_KEY );
270+ _authToken = configs .get (MinionConstants .AUTH_TOKEN );
271+ _inputSegmentNames = configs .get (MinionConstants .SEGMENT_NAME_KEY );
272+ String replaceSegmentsString = configs .get (MinionConstants .ENABLE_REPLACE_SEGMENTS_KEY );
273+ _replaceSegmentsEnabled = Boolean .parseBoolean (replaceSegmentsString );
274+ _customMap = new HashMap <>();
275+ }
276+
277+ public PinotTaskConfig getPinotTaskConfig () {
278+ return _pinotTaskConfig ;
279+ }
280+
281+ public List <SegmentConversionResult > getSegmentConversionResults () {
282+ return _segmentConversionResults ;
283+ }
284+
285+ public String getTableNameWithType () {
286+ return _tableNameWithType ;
287+ }
288+
289+ public String getUploadURL () {
290+ return _uploadURL ;
291+ }
292+
293+ public String getAuthToken () {
294+ return _authToken ;
295+ }
296+
297+ public String getInputSegmentNames () {
298+ return _inputSegmentNames ;
299+ }
300+
301+ public boolean isReplaceSegmentsEnabled () {
302+ return _replaceSegmentsEnabled ;
303+ }
304+
305+ public Object getCustomContext (String key ) {
306+ return _customMap .get (key );
307+ }
308+
309+ public void setCustomContext (String key , Object value ) {
310+ _customMap .put (key , value );
311+ }
312+ }
232313}
0 commit comments