Skip to content

Commit e748c2e

Browse files
LiuCanWu刘阳SbloodyS
authored
fix the #14729 problem (#14902)
Co-authored-by: 刘阳 <[email protected]> Co-authored-by: xiangzihao <[email protected]>
1 parent b303648 commit e748c2e

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,8 @@ private static List<String> buildRunCommandLineForOthers(TaskExecutionContext ta
273273
args.add(others);
274274
}
275275

276+
// determine yarn queue
277+
determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
276278
ProgramType programType = flinkParameters.getProgramType();
277279
String mainClass = flinkParameters.getMainClass();
278280
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
@@ -295,8 +297,6 @@ private static List<String> buildRunCommandLineForOthers(TaskExecutionContext ta
295297
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParameterUtils.convert(paramsMap)));
296298
}
297299

298-
// determine yarn queue
299-
determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
300300
return args;
301301
}
302302

@@ -310,8 +310,10 @@ private static void determinedYarnQueue(List<String> args, FlinkParameters flink
310310
} else {
311311
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
312312
}
313+
break;
313314
case APPLICATION:
314315
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
316+
break;
315317
}
316318
}
317319

@@ -323,9 +325,11 @@ private static void doAddQueue(List<String> args, FlinkParameters flinkParameter
323325
switch (option) {
324326
case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS:
325327
args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", yarnQueue));
328+
break;
326329
case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE:
327330
args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
328331
args.add(yarnQueue);
332+
break;
329333
}
330334
}
331335
}

0 commit comments

Comments
 (0)