Skip to content

Commit 9c182ee

Browse files
authored
Merge branch 'dev' into resource-center-obs
2 parents 1888d89 + dde45db commit 9c182ee

File tree

7 files changed

+55
-43
lines changed

7 files changed

+55
-43
lines changed

.github/workflows/unit-test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ jobs:
7979
- name: Upload coverage report to codecov
8080
run: CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash)
8181

82-
# Set up JDK 11 for SonarCloud.
83-
- name: Set up JDK 11
82+
# Set up JDK 17 for SonarCloud.
83+
- name: Set up JDK 17
8484
uses: actions/setup-java@v2
8585
with:
86-
java-version: 11
86+
java-version: 17
8787
distribution: 'adopt'
8888
- name: Run SonarCloud Analysis
8989
run: >

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import static org.apache.dolphinscheduler.common.constants.Constants.UTF_8;
2626
import static org.apache.dolphinscheduler.common.constants.DateConstants.YYYYMMDDHHMMSS;
2727

28+
import org.apache.dolphinscheduler.common.constants.TenantConstants;
29+
2830
import org.apache.commons.io.IOUtils;
2931
import org.apache.commons.lang3.SystemUtils;
3032

@@ -35,12 +37,15 @@
3537
import java.io.IOException;
3638
import java.io.InputStream;
3739
import java.nio.charset.StandardCharsets;
40+
import java.nio.file.FileSystems;
3841
import java.nio.file.Files;
3942
import java.nio.file.NoSuchFileException;
4043
import java.nio.file.Path;
4144
import java.nio.file.attribute.FileAttribute;
4245
import java.nio.file.attribute.PosixFilePermission;
4346
import java.nio.file.attribute.PosixFilePermissions;
47+
import java.nio.file.attribute.UserPrincipal;
48+
import java.nio.file.attribute.UserPrincipalLookupService;
4449
import java.util.Set;
4550
import java.util.zip.CRC32;
4651
import java.util.zip.CheckedInputStream;
@@ -323,6 +328,29 @@ public static String getFileChecksum(String pathName) throws IOException {
323328
return crcString;
324329
}
325330

331+
public static void setFileOwner(Path path, String tenant) {
332+
try {
333+
if (TenantConstants.DEFAULT_TENANT_CODE.equals(tenant)) {
334+
log.debug("The current tenant: {} is the default tenant, no need to set the owner for file: {}", tenant,
335+
path);
336+
return;
337+
}
338+
UserPrincipalLookupService userPrincipalLookupService =
339+
FileSystems.getDefault().getUserPrincipalLookupService();
340+
UserPrincipal tenantPrincipal = userPrincipalLookupService.lookupPrincipalByName(tenant);
341+
Files.setOwner(path, tenantPrincipal);
342+
} catch (IOException e) {
343+
log.error("Set file: {} owner to: {} failed", path, tenant, e);
344+
}
345+
}
346+
347+
public static void createDirectoryIfNotPresent(Path path) throws IOException {
348+
if (Files.exists(path)) {
349+
return;
350+
}
351+
Files.createDirectories(path);
352+
}
353+
326354
/**
327355
* Create a file with '755'.
328356
*/

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,11 @@ public class Command {
6161
private String commandParam;
6262

6363
@TableField("task_depend_type")
64+
@Builder.Default
6465
private TaskDependType taskDependType = TaskDependType.TASK_POST;
6566

6667
@TableField("failure_strategy")
68+
@Builder.Default
6769
private FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
6870

6971
@TableField("warning_type")
@@ -82,6 +84,7 @@ public class Command {
8284
private Priority processInstancePriority;
8385

8486
@TableField("update_time")
87+
@Builder.Default
8588
private Date updateTime = new Date();
8689

8790
@TableField("worker_group")

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,10 +1292,13 @@ private void submitPostNode(Long parentNodeCode) throws StateEventHandleExceptio
12921292
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskNodeObject.getCode());
12931293
if (existTaskInstanceOptional.isPresent()) {
12941294
TaskInstance existTaskInstance = existTaskInstanceOptional.get();
1295-
if (existTaskInstance.getState() == TaskExecutionStatus.RUNNING_EXECUTION
1296-
|| existTaskInstance.getState() == TaskExecutionStatus.DISPATCH) {
1295+
TaskExecutionStatus state = existTaskInstance.getState();
1296+
if (state == TaskExecutionStatus.RUNNING_EXECUTION
1297+
|| state == TaskExecutionStatus.DISPATCH
1298+
|| state == TaskExecutionStatus.SUBMITTED_SUCCESS) {
12971299
// try to take over task instance
1298-
if (tryToTakeOverTaskInstance(existTaskInstance)) {
1300+
if (state != TaskExecutionStatus.SUBMITTED_SUCCESS
1301+
&& tryToTakeOverTaskInstance(existTaskInstance)) {
12991302
log.info("Success take over task {}", existTaskInstance.getName());
13001303
continue;
13011304
} else {

dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/LocalStorageOperatorFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
@AutoService(StorageOperateFactory.class)
2727
public class LocalStorageOperatorFactory implements StorageOperateFactory {
2828

29-
private static final String LOCAL_DEFAULT_FS = "file:///";
29+
private static final String LOCAL_DEFAULT_FS = "file:/";
3030

3131
@Override
3232
public StorageOperate createStorageOperate() {

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,11 @@ public void eventReceived(Action action, Job job) {
198198
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext =
199199
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath())) {
200200
log.info("event received : job:{} action:{}", job.getMetadata().getName(), action);
201-
if (action != Action.ADDED) {
201+
if (action == Action.DELETED) {
202+
log.error("[K8sJobExecutor-{}] fail in k8s", job.getMetadata().getName());
203+
taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
204+
countDownLatch.countDown();
205+
} else if (action != Action.ADDED) {
202206
int jobStatus = getK8sJobStatus(job);
203207
log.info("job {} status {}", job.getMetadata().getName(), jobStatus);
204208
if (jobStatus == TaskConstants.RUNNING_CODE) {

dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,9 @@
3434
import org.apache.commons.lang3.tuple.Pair;
3535

3636
import java.io.File;
37-
import java.io.IOException;
38-
import java.nio.file.FileSystems;
3937
import java.nio.file.Files;
4038
import java.nio.file.Path;
4139
import java.nio.file.Paths;
42-
import java.nio.file.attribute.UserPrincipal;
43-
import java.nio.file.attribute.UserPrincipalLookupService;
4440
import java.util.ArrayList;
4541
import java.util.List;
4642
import java.util.Map;
@@ -97,9 +93,9 @@ public static void createProcessLocalPathIfAbsent(TaskExecutionContext taskExecu
9793
taskExecutionContext.setExecutePath(execLocalPath);
9894
taskExecutionContext.setAppInfoPath(FileUtils.getAppInfoPath(execLocalPath));
9995
Path executePath = Paths.get(taskExecutionContext.getExecutePath());
100-
createDirectory(executePath);
101-
if (!TenantConstants.DEFAULT_TENANT_CODE.equals(taskExecutionContext.getTenantCode())) {
102-
setOwner(executePath, taskExecutionContext.getTenantCode());
96+
FileUtils.createDirectoryIfNotPresent(executePath);
97+
if (OSUtils.isSudoEnable()) {
98+
FileUtils.setFileOwner(executePath, taskExecutionContext.getTenantCode());
10399
}
104100
} catch (Throwable ex) {
105101
throw new TaskException("Cannot create process execute dir", ex);
@@ -126,7 +122,7 @@ public static void downloadResourcesIfNeeded(StorageOperate storageOperate,
126122
if (notExist) {
127123
downloadFiles.add(Pair.of(fullName, fileName));
128124
} else {
129-
log.info("file : {} exists ", resFile.getName());
125+
log.warn("Resource file : {} already exists will not download again ", resFile.getName());
130126
}
131127
});
132128
if (!downloadFiles.isEmpty() && !PropertyUtils.isResourceStorageStartup()) {
@@ -141,8 +137,11 @@ public static void downloadResourcesIfNeeded(StorageOperate storageOperate,
141137
log.info("get resource file from path:{}", fullName);
142138

143139
long resourceDownloadStartTime = System.currentTimeMillis();
144-
storageOperate.download(actualTenant, fullName,
145-
execLocalPath + File.separator + fileName, true);
140+
storageOperate.download(actualTenant, fullName, execLocalPath + File.separator + fileName, true);
141+
if (OSUtils.isSudoEnable()) {
142+
FileUtils.setFileOwner(Paths.get(execLocalPath, fileName),
143+
taskExecutionContext.getTenantCode());
144+
}
146145
WorkerServerMetrics
147146
.recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
148147
WorkerServerMetrics.recordWorkerResourceDownloadSize(
@@ -156,29 +155,4 @@ public static void downloadResourcesIfNeeded(StorageOperate storageOperate,
156155
}
157156
}
158157

159-
private static void createDirectory(Path filePath) {
160-
if (Files.exists(filePath)) {
161-
return;
162-
}
163-
try {
164-
Files.createDirectories(filePath);
165-
} catch (IOException e) {
166-
throw new TaskException("Create directory " + filePath + " failed ", e);
167-
}
168-
}
169-
170-
private static void setOwner(Path filePath, String tenant) {
171-
try {
172-
if (!OSUtils.isSudoEnable()) {
173-
// we need to open sudo, then we can change the owner.
174-
return;
175-
}
176-
UserPrincipalLookupService userPrincipalLookupService =
177-
FileSystems.getDefault().getUserPrincipalLookupService();
178-
UserPrincipal tenantPrincipal = userPrincipalLookupService.lookupPrincipalByName(tenant);
179-
Files.setOwner(filePath, tenantPrincipal);
180-
} catch (IOException e) {
181-
throw new TaskException("Set tenant directory " + filePath + " permission failed, tenant: " + tenant, e);
182-
}
183-
}
184158
}

0 commit comments

Comments
 (0)