Skip to content

Commit b1f938d

Browse files
authored
Merge 081f834 into a71ae24
2 parents a71ae24 + 081f834 commit b1f938d

File tree

3 files changed

+16
-14
lines changed
  • dolphinscheduler-task-plugin

3 files changed

+16
-14
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testRunJarInApplicationMode() throws Exception {
6969
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
7070

7171
Assertions.assertEquals(
72-
"flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
72+
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
7373
joinStringListWithSpace(commandLine));
7474
}
7575

@@ -81,23 +81,23 @@ public void testRunJarInClusterMode() throws Exception {
8181
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
8282

8383
Assertions.assertEquals(
84-
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
84+
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
8585
joinStringListWithSpace(commandLine1));
8686

8787
flinkParameters.setFlinkVersion("<1.10");
8888
List<String> commandLine2 =
8989
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
9090

9191
Assertions.assertEquals(
92-
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
92+
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
9393
joinStringListWithSpace(commandLine2));
9494

9595
flinkParameters.setFlinkVersion(">=1.12");
9696
List<String> commandLine3 =
9797
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
9898

9999
Assertions.assertEquals(
100-
"flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
100+
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
101101
joinStringListWithSpace(commandLine3));
102102
}
103103

@@ -107,7 +107,7 @@ public void testRunJarInLocalMode() throws Exception {
107107
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
108108

109109
Assertions.assertEquals(
110-
"flink run -p 4 -sae -c org.example.Main /opt/job.jar",
110+
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
111111
joinStringListWithSpace(commandLine));
112112
}
113113

@@ -117,7 +117,8 @@ public void testRunSql() throws Exception {
117117
flinkParameters.setProgramType(ProgramType.SQL);
118118
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
119119

120-
Assertions.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
120+
Assertions.assertEquals(
121+
"${FLINK_HOME}/bin/sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
121122
joinStringListWithSpace(commandLine));
122123
}
123124

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ private FlinkConstants() {
2727
* flink command
2828
* usage: flink run [OPTIONS] <jar-file> <arguments>
2929
*/
30-
public static final String FLINK_COMMAND = "flink";
30+
public static final String FLINK_COMMAND = "${FLINK_HOME}/bin/flink";
3131
public static final String FLINK_RUN = "run";
3232

3333
/**
3434
* flink sql command
3535
* usage: sql-client.sh -i <initialization file>, -f <script file>
3636
*/
37-
public static final String FLINK_SQL_COMMAND = "sql-client.sh";
37+
public static final String FLINK_SQL_COMMAND = "${FLINK_HOME}/bin/sql-client.sh";
3838

3939
/**
4040
* flink run options

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testRunJarInApplicationMode() throws Exception {
6969
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
7070

7171
Assertions.assertEquals(
72-
"flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
72+
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
7373
joinStringListWithSpace(commandLine));
7474
}
7575

@@ -81,23 +81,23 @@ public void testRunJarInClusterMode() {
8181
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
8282

8383
Assertions.assertEquals(
84-
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
84+
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
8585
joinStringListWithSpace(commandLine1));
8686

8787
flinkParameters.setFlinkVersion("<1.10");
8888
List<String> commandLine2 =
8989
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
9090

9191
Assertions.assertEquals(
92-
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
92+
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
9393
joinStringListWithSpace(commandLine2));
9494

9595
flinkParameters.setFlinkVersion(">=1.12");
9696
List<String> commandLine3 =
9797
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
9898

9999
Assertions.assertEquals(
100-
"flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
100+
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
101101
joinStringListWithSpace(commandLine3));
102102
}
103103

@@ -107,7 +107,7 @@ public void testRunJarInLocalMode() {
107107
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
108108

109109
Assertions.assertEquals(
110-
"flink run -p 4 -sae -c org.example.Main /opt/job.jar",
110+
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
111111
joinStringListWithSpace(commandLine));
112112
}
113113

@@ -117,7 +117,8 @@ public void testRunSql() {
117117
flinkParameters.setProgramType(ProgramType.SQL);
118118
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
119119

120-
Assertions.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
120+
Assertions.assertEquals(
121+
"${FLINK_HOME}/bin/sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
121122
joinStringListWithSpace(commandLine));
122123
}
123124

0 commit comments

Comments
 (0)