Skip to content

feat(pd): add build index task in pd#23

Merged
imbajin merged 3 commits intohugegraph:3.7.1-pd-storefrom
koi2000:build_index
Sep 8, 2025
Merged

feat(pd): add build index task in pd#23
imbajin merged 3 commits intohugegraph:3.7.1-pd-storefrom
koi2000:build_index

Conversation

@koi2000
Copy link
Copy Markdown

@koi2000 koi2000 commented Aug 18, 2025

Purpose of the PR

  • close #xxx

Main Changes

Verifying these changes

  • Trivial rework / code cleanup without any test coverage. (No Need)
  • Already covered by existing tests, such as (please modify tests here).
  • Need tests and can be verified as follows:
    • xxx

Does this PR potentially affect the following parts?

Documentation Status

  • Doc - TODO
  • Doc - Done
  • Doc - No Need

Summary by CodeRabbit

  • 新功能
    • 新增索引构建任务管理:支持提交任务、查询任务状态与消息、失败任务重试,并返回任务ID。
    • 服务端按分区下发索引构建指令,分区心跳携带任务信息;系统自动汇总整体任务状态(成功/进行中/失败)。
  • 兼容性
    • 需同时升级 PD 服务与客户端以使用上述能力与接口。

Copilot AI review requested due to automatic review settings August 18, 2025 14:58
@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Aug 18, 2025
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Aug 18, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

引入“构建索引”任务的端到端能力:扩展 protobuf(任务类型与参数、PD 服务 RPC、心跳响应),新增 PDService 三个 RPC(提交/查询/重试),客户端 PDClient 提供对应封装方法;核心侧增加任务路由与持久化(PartitionService、TaskScheduleService、TaskInfoMeta、MetadataKeyHelper),心跳通知携带构建索引信息。

Changes

Cohort / File(s) Summary
Client API: BuildIndex
hugegraph-pd/hg-pd-client/src/main/java/.../PDClient.java
新增提交/查询/重试构建索引任务的 3 个公开方法;封装 Pdpb 请求并调用 stub(submitTask/queryTaskState/retryIndexTask);沿用 header 与错误处理。
Core Services: Task handling
hugegraph-pd/hg-pd-core/src/main/java/.../PartitionService.java, .../TaskScheduleService.java
PartitionService 增加 handleBuildIndexTask 持久化处理;TaskScheduleService 在 reportTask 中新增 Build_Index 分支委派至上述处理。
Metadata & Storage
hugegraph-pd/hg-pd-core/src/main/java/.../meta/MetadataKeyHelper.java, .../meta/TaskInfoMeta.java
新增构建索引任务命名空间/Key 前缀与 Key 生成;提供扫描、获取、更新 BuildIndex 任务的 3 个方法。
gRPC: Protos (Tasks/Params)
hugegraph-pd/hg-pd-grpc/src/main/proto/metaTask.proto, .../metapb.proto
TaskType 新增 Build_Index;Task 增加 buildIndex 字段;定义 BuildIndex 与 BuildIndexParam 消息(含 oneof)。
gRPC: PD API & Pulse
hugegraph-pd/hg-pd-grpc/src/main/proto/pdpb.proto, .../pd_pulse.proto
PD 服务新增 submitTask/queryTaskState/retryIndexTask 及对应请求/响应消息;PartitionHeartbeatResponse 增加 build_index 字段。
Service Implementation
hugegraph-pd/hg-pd-service/src/main/java/.../PDService.java
实现提交/查询/重试构建索引任务:领导者校验、分区枚举、任务登记与持久化、心跳通知、聚合任务状态与消息、错误返回。

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant PDClient
  participant PDService
  participant TaskInfoMeta as TaskStore
  participant PDPulse as Pulse
  participant Partition as PartitionNodes

  rect rgba(220,240,255,0.6)
  Note over Client,PDService: 提交构建索引任务
  Client->>PDClient: submitBuildIndexTask(param)
  PDClient->>PDService: submitTask(request)
  PDService->>PDService: 领导者校验
  PDService->>TaskStore: 写入每分区 MetaTask.BuildIndex
  TaskStore-->>PDService: ok
  PDService->>Pulse: 发布 PartitionHeartbeat(build_index)
  Pulse-->>Partition: 通知执行
  PDService-->>PDClient: IndexTaskCreateResponse(task_id)
  PDClient-->>Client: task_id
  end

  rect rgba(235,255,235,0.6)
  Note over Client,PDService: 查询任务状态
  Client->>PDClient: queryBuildIndexTaskStatus(task_id)
  PDClient->>PDService: queryTaskState(task_id)
  PDService->>TaskStore: 扫描 task 前缀
  TaskStore-->>PDService: 分区任务集合
  PDService-->>PDClient: IndexTaskQueryResponse(state,message)
  PDClient-->>Client: state/message
  end

  rect rgba(255,240,220,0.6)
  Note over Client,PDService: 失败/进行中任务重试
  Client->>PDClient: retryBuildIndexTask(task_id)
  PDClient->>PDService: retryIndexTask(task_id)
  PDService->>TaskStore: 获取任务集合
  TaskStore-->>PDService: 分区任务集合
  PDService->>Pulse: 对失败/进行中分区发布重试
  PDService-->>PDClient: IndexTaskQueryResponse(state,message)
  PDClient-->>Client: state/message
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

代码草间风乍起,
兔爪敲定索引旗。
任务如星分区落,
心跳传令起云梯。
查询回响明与否,
重试再跃一段崎。
嗖——新流已成齐。 🐰✨

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

@codecov-ai-reviewer review

@codecov-ai

This comment has been minimized.

@dosubot dosubot bot added the enhancement New feature or request label Aug 18, 2025
Comment on lines +1345 to +1373
public long submitBuildIndexTask(Metapb.BuildIndexParam param) throws PDException {
Pdpb.IndexTaskCreateRequest request = Pdpb.IndexTaskCreateRequest.newBuilder()
.setHeader(header)
.setParam(param)
.build();
var response = getStub().submitTask(request);
handleResponseError(response.getHeader());
return response.getTaskId();
}

public Pdpb.IndexTaskQueryResponse queryBuildIndexTaskStatus(long taskId) throws PDException {
Pdpb.IndexTaskQueryRequest request = Pdpb.IndexTaskQueryRequest.newBuilder()
.setHeader(header)
.setTaskId(taskId)
.build();
var response = getStub().queryTaskState(request);
handleResponseError(response.getHeader());
return response;
}

public Pdpb.IndexTaskQueryResponse retryBuildIndexTask(long taskId) throws PDException {
Pdpb.IndexTaskQueryRequest request = Pdpb.IndexTaskQueryRequest.newBuilder()
.setHeader(header)
.setTaskId(taskId)
.build();
var response = getStub().retryIndexTask(request);
handleResponseError(response.getHeader());
return response;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gRPC stub methods lack proper exception handling and logging. Consider adding error logging before throwing exceptions to aid in debugging. Also, consider adding null checks for the response object before accessing its fields.

Suggested change
public long submitBuildIndexTask(Metapb.BuildIndexParam param) throws PDException {
Pdpb.IndexTaskCreateRequest request = Pdpb.IndexTaskCreateRequest.newBuilder()
.setHeader(header)
.setParam(param)
.build();
var response = getStub().submitTask(request);
handleResponseError(response.getHeader());
return response.getTaskId();
}
public Pdpb.IndexTaskQueryResponse queryBuildIndexTaskStatus(long taskId) throws PDException {
Pdpb.IndexTaskQueryRequest request = Pdpb.IndexTaskQueryRequest.newBuilder()
.setHeader(header)
.setTaskId(taskId)
.build();
var response = getStub().queryTaskState(request);
handleResponseError(response.getHeader());
return response;
}
public Pdpb.IndexTaskQueryResponse retryBuildIndexTask(long taskId) throws PDException {
Pdpb.IndexTaskQueryRequest request = Pdpb.IndexTaskQueryRequest.newBuilder()
.setHeader(header)
.setTaskId(taskId)
.build();
var response = getStub().retryIndexTask(request);
handleResponseError(response.getHeader());
return response;
}
public long submitBuildIndexTask(Metapb.BuildIndexParam param) throws PDException {
try {
Pdpb.IndexTaskCreateRequest request = Pdpb.IndexTaskCreateRequest.newBuilder()
.setHeader(header)
.setParam(param)
.build();
var response = getStub().submitTask(request);
if (response == null) {
throw new PDException(-1, "Received null response from server");
}
handleResponseError(response.getHeader());
return response.getTaskId();
} catch (Exception e) {
log.error("Failed to submit build index task", e);
throw e;
}
}

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines +1330 to +1334
public void handleBuildIndexTask(MetaTask.Task task) throws PDException {
log.info("build index task {} -{} , report state: {}", task.getPartition().getGraphName(),
task.getPartition().getId(), task.getState());
storeService.getTaskInfoMeta().updateBuildIndexTask(task);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handleBuildIndexTask method only logs and updates the task but doesn't implement proper error handling or validation. Consider adding validation for the task parameter and implementing proper error handling similar to other task handlers in this class.

Suggested change
public void handleBuildIndexTask(MetaTask.Task task) throws PDException {
log.info("build index task {} -{} , report state: {}", task.getPartition().getGraphName(),
task.getPartition().getId(), task.getState());
storeService.getTaskInfoMeta().updateBuildIndexTask(task);
}
public void handleBuildIndexTask(MetaTask.Task task) throws PDException {
if (task == null || task.getPartition() == null) {
throw new PDException(-1, "Invalid build index task: task or partition is null");
}
log.info("build index task {} -{} , report state: {}",
task.getPartition().getGraphName(),
task.getPartition().getId(),
task.getState());
try {
storeService.getTaskInfoMeta().updateBuildIndexTask(task);
} catch (Exception e) {
log.error("Failed to update build index task {}", task.getId(), e);
throw new PDException(-1, "Failed to update build index task: " + e.getMessage());
}
}

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines +1808 to +1815
var partitions = partitionService.getPartitions(param.getGraph());

if (partitions.size() == 0) {
throw new PDException(-1, "graph has no partition");
}

var newTaskId = idService.getId(BUILD_INDEX_TASK_ID_KEY, 1);
// log.info("build index task id: {}", newTaskId);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method creates tasks for all partitions without checking if the graph exists or if the user has permissions. Consider adding proper validation and authorization checks before processing the request.

Suggested change
var partitions = partitionService.getPartitions(param.getGraph());
if (partitions.size() == 0) {
throw new PDException(-1, "graph has no partition");
}
var newTaskId = idService.getId(BUILD_INDEX_TASK_ID_KEY, 1);
// log.info("build index task id: {}", newTaskId);
// Add validation
if (param.getGraph() == null || param.getGraph().isEmpty()) {
throw new PDException(-1, "Graph name cannot be null or empty");
}
// Validate graph exists
if (!graphExists(param.getGraph())) {
throw new PDException(-1, "Graph does not exist: " + param.getGraph());
}
var partitions = partitionService.getPartitions(param.getGraph());

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines +1885 to +1895
} else if (state0 == MetaTask.TaskState.Task_Success) {
countOfSuccess++;
}
}

if (state == MetaTask.TaskState.Task_Doing) {
message = "Doing/" + countOfDoing + ", Success/" + countOfSuccess;
}

builder.setHeader(okHeader).setState(state).setMessage(message);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queryTaskState method lacks proper error handling when accessing task properties. The method should validate task existence and handle potential null pointer exceptions when accessing task fields.

Suggested change
} else if (state0 == MetaTask.TaskState.Task_Success) {
countOfSuccess++;
}
}
if (state == MetaTask.TaskState.Task_Doing) {
message = "Doing/" + countOfDoing + ", Success/" + countOfSuccess;
}
builder.setHeader(okHeader).setState(state).setMessage(message);
}
for (var task : tasks) {
if (task == null) {
log.warn("Found null task in build index task list for taskId: {}", request.getTaskId());
continue;
}
var state0 = task.getState();
if (state0 == MetaTask.TaskState.Task_Failure) {
state = MetaTask.TaskState.Task_Failure;
message = task.getMessage() != null ? task.getMessage() : "Task failed with no message";
break;
}
// rest of the logic
}

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines +1933 to +1949
log.info("notify client retry build index task: {}", buildIndex);

PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
.setPartition(
partition)
.setId(task.getId())
.setBuildIndex(
buildIndex));
}
}
builder.setHeader(okHeader).setState(state).setMessage(message);
}
} catch (PDException e) {
builder.setHeader(newErrorHeader(e));
}

observer.onNext(builder.build());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retryIndexTask method doesn't update the task state when retrying. Consider setting the task state back to 'Task_Doing' when retrying failed tasks to maintain proper state tracking.

Suggested change
log.info("notify client retry build index task: {}", buildIndex);
PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
.setPartition(
partition)
.setId(task.getId())
.setBuildIndex(
buildIndex));
}
}
builder.setHeader(okHeader).setState(state).setMessage(message);
}
} catch (PDException e) {
builder.setHeader(newErrorHeader(e));
}
observer.onNext(builder.build());
if (state0 == MetaTask.TaskState.Task_Failure ||
state0 == MetaTask.TaskState.Task_Doing) {
// Update task state to doing when retrying
var updatedTask = task.toBuilder()
.setState(MetaTask.TaskState.Task_Doing)
.setStartTimestamp(System.currentTimeMillis())
.build();
taskInfo.updateBuildIndexTask(updatedTask);
var partition = task.getPartition();
var buildIndex = task.getBuildIndex();
log.info("notify client retry build index task: {}", buildIndex);
// notify client with updated task
}

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds build index task functionality to the PD (Placement Driver) service in Apache HugeGraph. The implementation enables distributed index building by creating tasks that are distributed across partitions and tracked through their lifecycle.

  • Adds three new gRPC endpoints for index task management: submitTask, queryTaskState, and retryIndexTask
  • Implements task creation, status tracking, and retry mechanisms for build index operations
  • Extends the existing task framework to support build index tasks alongside existing partition tasks

Reviewed Changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
PDService.java Implements the three main gRPC endpoints for build index task management
pdpb.proto Defines gRPC service methods and message types for index task operations
pd_pulse.proto Adds build_index field to PartitionHeartbeatResponse for task notification
metapb.proto Defines BuildIndex and BuildIndexParam message structures
metaTask.proto Adds Build_Index task type and buildIndex field to Task message
TaskInfoMeta.java Implements data access methods for build index task persistence
MetadataKeyHelper.java Adds key generation methods for build index task storage
TaskScheduleService.java Adds build index task handling to the task reporting switch
PartitionService.java Implements build index task handling logic
PDClient.java Provides client-side API methods for build index task operations

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

}

public static byte[] getBuildIndexTaskPrefix(long taskId) {
// TASK_MOVE/{GraphName}/to PartitionID/{source partitionID}
Copy link

Copilot AI Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is incorrect and appears to be copy-pasted from another method. It should describe the build index task prefix format instead of the move task format.

Suggested change
// TASK_MOVE/{GraphName}/to PartitionID/{source partitionID}
// TASK_BI/{taskId}

Copilot uses AI. Check for mistakes.
@koi2000
Copy link
Copy Markdown
Author

koi2000 commented Aug 31, 2025

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Aug 31, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (4)
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java (1)

286-292: 注释错误(沿用 Move 的注释),请更正为 Build Index 前缀格式(重复反馈)

与历史评论一致:应为 TASK_BI/{taskId},否则误导维护者。

     public static byte[] getBuildIndexTaskPrefix(long taskId) {
-        // TASK_MOVE/{GraphName}/to PartitionID/{source partitionID}
+        // TASK_BI/{taskId}
         StringBuilder builder = StringBuilderHelper.get()
                                                    .append(TASK_BUILD_INDEX).append(DELIMITER)
                                                    .append(taskId);
         return builder.toString().getBytes(Charset.defaultCharset());
     }
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java (1)

1330-1334: 为构建索引任务增加空指针/类型校验与异常处理(避免NPE并提升鲁棒性)

当前直接访问 task.getPartition() 与写入元数据,缺少:

  • task / partition / buildIndex 为空校验
  • 任务类型校验(必须是 Build_Index)
  • 持久化异常捕获与转换为明确的 PD 错误码

建议按下述方式增强,避免 NPE 和脏状态写入:

-    public void handleBuildIndexTask(MetaTask.Task task) throws PDException {
-        log.info("build index task {} -{} , report state: {}", task.getPartition().getGraphName(),
-                 task.getPartition().getId(), task.getState());
-        storeService.getTaskInfoMeta().updateBuildIndexTask(task);
-    }
+    public void handleBuildIndexTask(MetaTask.Task task) throws PDException {
+        if (task == null || !task.hasPartition() || task.getPartition() == null) {
+            throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, "invalid build index task: partition is null");
+        }
+        if (task.getType() != MetaTask.TaskType.Build_Index || !task.hasBuildIndex()) {
+            throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, "invalid task type: expected Build_Index with buildIndex payload");
+        }
+        log.info("build index task {} -{} , report state: {}", 
+                 task.getPartition().getGraphName(), task.getPartition().getId(), task.getState());
+        try {
+            storeService.getTaskInfoMeta().updateBuildIndexTask(task);
+        } catch (Exception e) {
+            log.error("Failed to update build index task {}", task.getId(), e);
+            throw new PDException(Pdpb.ErrorType.ROCKSDB_WRITE_ERROR_VALUE, "update build index task failed: " + e.getMessage());
+        }
+    }
hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java (1)

1345-1353: submitBuildIndexTask 增加响应空值防护与统一异常包装(避免调用方拿到空指针)

为与类内其他 RPC 调用的健壮性一致,建议补充 null 响应检查与异常捕获/记录:

-    public long submitBuildIndexTask(Metapb.BuildIndexParam param) throws PDException {
-        Pdpb.IndexTaskCreateRequest request = Pdpb.IndexTaskCreateRequest.newBuilder()
-                                                                         .setHeader(header)
-                                                                         .setParam(param)
-                                                                         .build();
-        var response = getStub().submitTask(request);
-        handleResponseError(response.getHeader());
-        return response.getTaskId();
-    }
+    public long submitBuildIndexTask(Metapb.BuildIndexParam param) throws PDException {
+        Pdpb.IndexTaskCreateRequest request = Pdpb.IndexTaskCreateRequest.newBuilder()
+                                                                         .setHeader(header)
+                                                                         .setParam(param)
+                                                                         .build();
+        try {
+            var response = getStub().submitTask(request);
+            if (response == null) {
+                throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE, "submitTask returned null response");
+            }
+            handleResponseError(response.getHeader());
+            return response.getTaskId();
+        } catch (Exception e) {
+            log.error("submitBuildIndexTask failed", e);
+            if (e instanceof PDException) throw (PDException) e;
+            throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage());
+        }
+    }
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java (1)

127-131: updateBuildIndexTask 增加入参与类型校验,防止写入非法/空任务

为避免 NPE 与脏数据,建议加入 null、类型与 payload 校验:

-    public void updateBuildIndexTask(MetaTask.Task task) throws PDException {
-        var bt = task.getBuildIndex();
-        byte[] key = MetadataKeyHelper.getBuildIndexTaskKey(bt.getTaskId(), bt.getPartitionId());
-        put(key, task.toByteArray());
-    }
+    public void updateBuildIndexTask(MetaTask.Task task) throws PDException {
+        if (task == null) {
+            throw new PDException(-1, "Task cannot be null");
+        }
+        if (task.getType() != MetaTask.TaskType.Build_Index || !task.hasBuildIndex() || !task.hasPartition()) {
+            throw new PDException(-1, "Task must be Build_Index with buildIndex and partition");
+        }
+        var bt = task.getBuildIndex();
+        byte[] key = MetadataKeyHelper.getBuildIndexTaskKey(bt.getTaskId(), bt.getPartitionId());
+        put(key, task.toByteArray());
+    }
🧹 Nitpick comments (8)
hugegraph-pd/hg-pd-grpc/src/main/proto/metapb.proto (1)

393-397: 关于字段类型:优先统一 ID 标量类型以降低客户端复杂度

当前 label_id / index_label 使用 bytes;若 ID 实际为数值,建议统一为 uint64(或统一保持 bytes,但需在文档明确编码方式)。混用会增加多语言客户端解析复杂度。

hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java (1)

277-284: Key 结构合理,但注释建议规范化

当前注释存在多余空格,“task id / partition id” 建议统一为更明确的格式样例。

应用以下注释微调:

     public static byte[] getBuildIndexTaskKey(long taskId, int partitionId) {
-        // TASK_BI/ task id / partition id
+        // TASK_BI/{taskId}/{partitionId}
         StringBuilder builder = StringBuilderHelper.get()
                                                    .append(TASK_BUILD_INDEX).append(DELIMITER)
                                                    .append(taskId).append(DELIMITER)
                                                    .append(partitionId);
         return builder.toString().getBytes(Charset.defaultCharset());
     }
hugegraph-pd/hg-pd-grpc/src/main/proto/pd_pulse.proto (1)

96-97: 在心跳响应中携带 BuildIndex:请补充字段语义与兼容性说明

  • 建议补注释说明:该字段何时下发、是否仅面向分区 Leader、是否可能与其他变更指令并存。
  • 确认旧版本 Store/PD 对未知字段的兼容性策略(应忽略 11 号未知字段)。

可在字段上添加简短注释:

-  metapb.BuildIndex build_index = 11;
+  // 下发构建索引指令(仅当 PD 需要促发时设置;多版本兼容:旧 Store 忽略未知字段)
+  metapb.BuildIndex build_index = 11;
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java (1)

673-676: 增加 Build_Index 分支合理;建议评估与均衡/迁移任务的并发影响

  • 报告路径已接入 partitionService.handleBuildIndexTask(task),对齐其他任务类型。
  • 建议在均衡/迁移等关键流程(如 balancePartitionLeaderbalancePartitionShard)考虑 Build_Index 的占用影响,必要时参考 hasSplitTaskDoing()/hasMoveTaskDoing() 增加 hasBuildIndexTaskDoing() 之类短路保护,避免资源争用。
hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java (2)

1355-1363: queryBuildIndexTaskStatus:补充空响应防护

建议与 submit 对齐,增加空响应检查,避免后续 NPE:

-        var response = getStub().queryTaskState(request);
+        var response = getStub().queryTaskState(request);
+        if (response == null) {
+            throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE, "queryTaskState returned null response");
+        }
         handleResponseError(response.getHeader());
         return response;

1365-1373: retryBuildIndexTask:补充空响应防护

同上,建议增加 null 检查:

-        var response = getStub().retryIndexTask(request);
+        var response = getStub().retryIndexTask(request);
+        if (response == null) {
+            throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE, "retryIndexTask returned null response");
+        }
         handleResponseError(response.getHeader());
         return response;
hugegraph-pd/hg-pd-grpc/src/main/proto/pdpb.proto (2)

109-115: RPC 命名建议更具体(可选)

submitTask/queryTaskState/retryIndexTask 语义泛化,易与未来其它类型任务混淆。可选地重命名为 BuildIndex 语义更清晰(需同步客户端/服务端和再生成 stub):

-  // Submit rebuild index task
-  rpc submitTask(IndexTaskCreateRequest) returns (IndexTaskCreateResponse) {}
-  // Check task status
-  rpc queryTaskState(IndexTaskQueryRequest) returns (IndexTaskQueryResponse) {}
-  // Retry index
-  rpc retryIndexTask(IndexTaskQueryRequest) returns (IndexTaskQueryResponse){}
+  // Submit rebuild index task
+  rpc SubmitBuildIndexTask(IndexTaskCreateRequest) returns (IndexTaskCreateResponse) {}
+  // Check build index task status
+  rpc QueryBuildIndexTaskState(IndexTaskQueryRequest) returns (IndexTaskQueryResponse) {}
+  // Retry build index task
+  rpc RetryBuildIndexTask(IndexTaskQueryRequest) returns (IndexTaskQueryResponse) {}

628-632: IndexTaskQueryResponse 建议补充统计字段以便客户端直观展示(兼容性增强,向后兼容)

为减少客户端重复聚合,建议直接返回 Doing/Success 计数(新增字段对旧客户端透明):

 message IndexTaskQueryResponse{
   ResponseHeader header = 1;
   metaTask.TaskState state = 2;
   string message = 3;
+  uint32 doing_count = 4;
+  uint32 success_count = 5;
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 26b91e4 and 7e15674.

📒 Files selected for processing (10)
  • hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java (1 hunks)
  • hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java (1 hunks)
  • hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java (1 hunks)
  • hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java (2 hunks)
  • hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java (1 hunks)
  • hugegraph-pd/hg-pd-grpc/src/main/proto/metaTask.proto (2 hunks)
  • hugegraph-pd/hg-pd-grpc/src/main/proto/metapb.proto (1 hunks)
  • hugegraph-pd/hg-pd-grpc/src/main/proto/pd_pulse.proto (1 hunks)
  • hugegraph-pd/hg-pd-grpc/src/main/proto/pdpb.proto (2 hunks)
  • hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java (3 hunks)
🔇 Additional comments (4)
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java (1)

43-43: 新增命名空间常量 OK

TASK_BUILD_INDEX = "TASK_BI" 命名简洁,与现有风格一致。

hugegraph-pd/hg-pd-grpc/src/main/proto/metaTask.proto (1)

31-32: 确认 Build_Index 任务全链路已实现,无缺失,可合并。

hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java (2)

117-120: 按前缀扫描构建索引任务的实现看起来正确

接口命名与使用 scanPrefix 的方式与现有 Split/Move 任务保持一致,LGTM。


122-125: 按键获取构建索引任务的实现看起来正确

键生成与解析器一致,返回单任务对象,LGTM。

Comment on lines +393 to +410
message BuildIndex {
uint64 taskId = 1;
uint32 partition_id = 2;
BuildIndexParam param = 11;
}

message BuildIndexParam {
string graph = 1;
bytes label_id = 2;
bool is_vertex_label = 3;
bytes prefix = 4; // query prefix

oneof request_param_union {
bytes index_label = 11; // label id
bool all_index = 12; // rebuild all index
bool label_index = 13; // ??
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

BuildIndex/BuildIndexParam 协议语义与命名需在发布前定型;移除不确定注释并补充保留字段以防将来冲突

  • 字段命名风格不一致:taskId 使用驼峰而本文件其他字段多为下划线(如 partition_id)。为统一性建议改为 task_id(若已对外发布则至少补充注释说明)。
  • BuildIndexParamlabel_idindex_label 概念重叠不清,// ?? 注释不可出现在协议;请明确“基础标签ID”与“索引标签ID”的关系并补充注释,或重命名为 base_label_id / index_label_id
  • oneof request_param_union 用 bool(all_indexlabel_index)作为分支标记不直观,建议改为空消息或使用枚举描述范围;至少需严格说明三者互斥的语义。
  • 若有意跳过 3~10 号字段,请在消息内显式 reserved,避免未来扩展时产生 Tag 冲突。
  • prefix 的用途应更具体(例如“KV 扫描前缀/数据扫描前缀”)。

[建议变更]
仅调整注释与保留字段,尽量不破坏现有生成代码:

 message BuildIndex {
-  uint64 taskId = 1;
+  // 任务ID(建议与其他字段风格统一:task_id)
+  uint64 taskId = 1;
   uint32 partition_id = 2;
+  // 预留 3~10,避免未来冲突
+  reserved 3 to 10;
   BuildIndexParam param = 11;
 }
 
 message BuildIndexParam {
-  string graph = 1;
-  bytes label_id = 2;
-  bool is_vertex_label = 3;
-  bytes prefix = 4; // query prefix
+  string graph = 1;                // 图空间名称
+  bytes label_id = 2;              // 基础标签ID(顶点/边)
+  bool is_vertex_label = 3;        // true=顶点标签,false=边标签
+  bytes prefix = 4;                // 扫描/查询使用的前缀
 
   oneof request_param_union {
-    bytes index_label = 11; // label id
-    bool all_index = 12; // rebuild all index
-    bool label_index = 13; // ??
+    bytes index_label = 11;  // 索引标签ID(指定构建某一索引标签)
+    bool all_index = 12;     // 重建该 label 下的全部索引
+    bool label_index = 13;   // 仅重建 label 级索引(非具体索引标签)
   }
 }

如需要,我可以给出使用枚举/空消息替代 bool 的 oneof 版本,以提升可读性与前向兼容性。

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
message BuildIndex {
uint64 taskId = 1;
uint32 partition_id = 2;
BuildIndexParam param = 11;
}
message BuildIndexParam {
string graph = 1;
bytes label_id = 2;
bool is_vertex_label = 3;
bytes prefix = 4; // query prefix
oneof request_param_union {
bytes index_label = 11; // label id
bool all_index = 12; // rebuild all index
bool label_index = 13; // ??
}
}
message BuildIndex {
// 任务ID(建议与其他字段风格统一:task_id)
uint64 taskId = 1;
uint32 partition_id = 2;
// 预留 3~10,避免未来冲突
reserved 3 to 10;
BuildIndexParam param = 11;
}
message BuildIndexParam {
string graph = 1; // 图空间名称
bytes label_id = 2; // 基础标签ID(顶点/边)
bool is_vertex_label = 3; // true=顶点标签,false=边标签
bytes prefix = 4; // 扫描/查询使用的前缀
oneof request_param_union {
bytes index_label = 11; // 索引标签ID(指定构建某一索引标签)
bool all_index = 12; // 重建该 label 下的全部索引
bool label_index = 13; // 仅重建 label 级索引(非具体索引标签)
}
}

Comment on lines +1798 to +1850
public void submitTask(Pdpb.IndexTaskCreateRequest request,
StreamObserver<Pdpb.IndexTaskCreateResponse> observer) {
if (!isLeader()) {
redirectToLeader(PDGrpc.getSubmitTaskMethod(), request, observer);
return;
}

var builder = Pdpb.IndexTaskCreateResponse.newBuilder();
var param = request.getParam();
try {
var partitions = partitionService.getPartitions(param.getGraph());

if (partitions.size() == 0) {
throw new PDException(-1, "graph has no partition");
}

var newTaskId = idService.getId(BUILD_INDEX_TASK_ID_KEY, 1);
// log.info("build index task id: {}", newTaskId);

var taskInfo = storeNodeService.getTaskInfoMeta();
for (var partition : partitions) {
var buildIndex = Metapb.BuildIndex.newBuilder()
.setPartitionId(partition.getId())
.setTaskId(newTaskId)
.setParam(param)
.build();

var task = MetaTask.Task.newBuilder()
.setId(newTaskId)
.setType(MetaTask.TaskType.Build_Index)
.setState(MetaTask.TaskState.Task_Doing)
.setStartTimestamp(System.currentTimeMillis())
.setPartition(partition)
.setBuildIndex(buildIndex)
.build();

taskInfo.updateBuildIndexTask(task);

log.info("notify client build index task: {}", buildIndex);

PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
.setPartition(partition)
.setId(idService.getId(
TASK_ID_KEY, 1))
.setBuildIndex(buildIndex));
}
observer.onNext(builder.setHeader(okHeader).setTaskId(newTaskId).build());
} catch (PDException e) {
log.error("IndexTaskGrpcService.submitTask", e);
observer.onNext(builder.setHeader(newErrorHeader(e)).build());
}
observer.onCompleted();
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

submitTask:补充入参校验、空列表判断与通用异常捕获;避免提交空图/空分区与部分失败

问题与建议:

  • 未校验 request/param/graph 是否为空;空图名会导致意外行为。
  • 仅捕获 PDException,其他异常将导致 gRPC 流未按协议结束。
  • 判空建议使用 isEmpty()。
  • 当存在部分分区更新失败时应记录并返回错误,避免“半提交”。

建议修改如下(核心片段):

         var builder = Pdpb.IndexTaskCreateResponse.newBuilder();
         var param = request.getParam();
         try {
+            if (param == null || param.getGraph() == null || param.getGraph().isEmpty()) {
+                observer.onNext(builder.setHeader(newErrorHeader(Pdpb.ErrorType.NOT_FOUND_VALUE, "graph is null or empty")).build());
+                observer.onCompleted();
+                return;
+            }
+            // 校验图是否存在
+            var g = partitionService.getGraph(param.getGraph());
+            if (g == null) {
+                observer.onNext(builder.setHeader(newErrorHeader(Pdpb.ErrorType.NOT_FOUND_VALUE, "graph not found: " + param.getGraph())).build());
+                observer.onCompleted();
+                return;
+            }
             var partitions = partitionService.getPartitions(param.getGraph());

-            if (partitions.size() == 0) {
+            if (partitions.isEmpty()) {
                 throw new PDException(-1, "graph has no partition");
             }
@@
-                PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
+                PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
                                                                       .setPartition(partition)
-                                                                      .setId(idService.getId(
-                                                                              TASK_ID_KEY, 1))
+                                                                      .setId(idService.getId(TASK_ID_KEY, 1))
                                                                       .setBuildIndex(buildIndex));
             }
             observer.onNext(builder.setHeader(okHeader).setTaskId(newTaskId).build());
-        } catch (PDException e) {
+        } catch (PDException e) {
             log.error("IndexTaskGrpcService.submitTask", e);
             observer.onNext(builder.setHeader(newErrorHeader(e)).build());
+        } catch (Exception e) {
+            log.error("IndexTaskGrpcService.submitTask (unexpected)", e);
+            observer.onNext(builder.setHeader(newErrorHeader(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage())).build());
         }
         observer.onCompleted();

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +1852 to +1902
@Override
public void queryTaskState(org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryRequest request,
StreamObserver<org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryResponse> observer) {

if (!isLeader()) {
redirectToLeader(PDGrpc.getQueryTaskStateMethod(), request, observer);
return;
}

var taskInfo = storeNodeService.getTaskInfoMeta();
var builder = Pdpb.IndexTaskQueryResponse.newBuilder();

try {
var tasks = taskInfo.scanBuildIndexTask(request.getTaskId());

if (tasks.size() == 0) {
builder.setHeader(okHeader).setState(MetaTask.TaskState.Task_Unknown)
.setMessage("task not found");
} else {
var state = MetaTask.TaskState.Task_Success;
String message = "OK";
int countOfSuccess = 0;
int countOfDoing = 0;

for (var task : tasks) {
var state0 = task.getState();
if (state0 == MetaTask.TaskState.Task_Failure) {
state = MetaTask.TaskState.Task_Failure;
message = task.getMessage();
break;
} else if (state0 == MetaTask.TaskState.Task_Doing) {
state = MetaTask.TaskState.Task_Doing;
countOfDoing++;
} else if (state0 == MetaTask.TaskState.Task_Success) {
countOfSuccess++;
}
}

if (state == MetaTask.TaskState.Task_Doing) {
message = "Doing/" + countOfDoing + ", Success/" + countOfSuccess;
}

builder.setHeader(okHeader).setState(state).setMessage(message);
}
} catch (PDException e) {
builder.setHeader(newErrorHeader(e));
}

observer.onNext(builder.build());
observer.onCompleted();
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

queryTaskState:防御式编程,避免任务列表/字段空指针并直出计数

问题与建议:

  • tasks 为空使用 isEmpty()。
  • 访问 task.getMessage() 可能为 null,需安全处理。
  • 遇到空 task 应跳过并记录。

建议修改核心循环与返回:

-            var tasks = taskInfo.scanBuildIndexTask(request.getTaskId());
-
-            if (tasks.size() == 0) {
+            var tasks = taskInfo.scanBuildIndexTask(request.getTaskId());
+            if (tasks == null || tasks.isEmpty()) {
                 builder.setHeader(okHeader).setState(MetaTask.TaskState.Task_Unknown)
                        .setMessage("task not found");
             } else {
                 var state = MetaTask.TaskState.Task_Success;
                 String message = "OK";
                 int countOfSuccess = 0;
                 int countOfDoing = 0;

-                for (var task : tasks) {
+                for (var task : tasks) {
+                    if (task == null) {
+                        log.warn("null task in build index task list for taskId: {}", request.getTaskId());
+                        continue;
+                    }
                     var state0 = task.getState();
                     if (state0 == MetaTask.TaskState.Task_Failure) {
                         state = MetaTask.TaskState.Task_Failure;
-                        message = task.getMessage();
+                        message = (task.getMessage() == null || task.getMessage().isEmpty())
+                                  ? "Task failed" : task.getMessage();
                         break;
                     } else if (state0 == MetaTask.TaskState.Task_Doing) {
                         state = MetaTask.TaskState.Task_Doing;
                         countOfDoing++;
                     } else if (state0 == MetaTask.TaskState.Task_Success) {
                         countOfSuccess++;
                     }
                 }
@@
-                builder.setHeader(okHeader).setState(state).setMessage(message);
+                builder.setHeader(okHeader).setState(state).setMessage(message)
+                       .setDoingCount(countOfDoing).setSuccessCount(countOfSuccess);
             }
-        } catch (PDException e) {
+        } catch (PDException e) {
             builder.setHeader(newErrorHeader(e));
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Override
public void queryTaskState(org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryRequest request,
StreamObserver<org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryResponse> observer) {
if (!isLeader()) {
redirectToLeader(PDGrpc.getQueryTaskStateMethod(), request, observer);
return;
}
var taskInfo = storeNodeService.getTaskInfoMeta();
var builder = Pdpb.IndexTaskQueryResponse.newBuilder();
try {
var tasks = taskInfo.scanBuildIndexTask(request.getTaskId());
if (tasks.size() == 0) {
builder.setHeader(okHeader).setState(MetaTask.TaskState.Task_Unknown)
.setMessage("task not found");
} else {
var state = MetaTask.TaskState.Task_Success;
String message = "OK";
int countOfSuccess = 0;
int countOfDoing = 0;
for (var task : tasks) {
var state0 = task.getState();
if (state0 == MetaTask.TaskState.Task_Failure) {
state = MetaTask.TaskState.Task_Failure;
message = task.getMessage();
break;
} else if (state0 == MetaTask.TaskState.Task_Doing) {
state = MetaTask.TaskState.Task_Doing;
countOfDoing++;
} else if (state0 == MetaTask.TaskState.Task_Success) {
countOfSuccess++;
}
}
if (state == MetaTask.TaskState.Task_Doing) {
message = "Doing/" + countOfDoing + ", Success/" + countOfSuccess;
}
builder.setHeader(okHeader).setState(state).setMessage(message);
}
} catch (PDException e) {
builder.setHeader(newErrorHeader(e));
}
observer.onNext(builder.build());
observer.onCompleted();
}
@Override
public void queryTaskState(org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryRequest request,
StreamObserver<org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryResponse> observer) {
if (!isLeader()) {
redirectToLeader(PDGrpc.getQueryTaskStateMethod(), request, observer);
return;
}
var taskInfo = storeNodeService.getTaskInfoMeta();
var builder = Pdpb.IndexTaskQueryResponse.newBuilder();
try {
var tasks = taskInfo.scanBuildIndexTask(request.getTaskId());
if (tasks == null || tasks.isEmpty()) {
builder.setHeader(okHeader)
.setState(MetaTask.TaskState.Task_Unknown)
.setMessage("task not found");
} else {
var state = MetaTask.TaskState.Task_Success;
String message = "OK";
int countOfSuccess = 0;
int countOfDoing = 0;
for (var task : tasks) {
if (task == null) {
log.warn("null task in build index task list for taskId: {}", request.getTaskId());
continue;
}
var state0 = task.getState();
if (state0 == MetaTask.TaskState.Task_Failure) {
state = MetaTask.TaskState.Task_Failure;
message = (task.getMessage() == null || task.getMessage().isEmpty())
? "Task failed" : task.getMessage();
break;
} else if (state0 == MetaTask.TaskState.Task_Doing) {
state = MetaTask.TaskState.Task_Doing;
countOfDoing++;
} else if (state0 == MetaTask.TaskState.Task_Success) {
countOfSuccess++;
}
}
if (state == MetaTask.TaskState.Task_Doing) {
message = "Doing/" + countOfDoing + ", Success/" + countOfSuccess;
}
builder.setHeader(okHeader)
.setState(state)
.setMessage(message)
.setDoingCount(countOfDoing)
.setSuccessCount(countOfSuccess);
}
} catch (PDException e) {
builder.setHeader(newErrorHeader(e));
}
observer.onNext(builder.build());
observer.onCompleted();
}
🤖 Prompt for AI Agents
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java
around lines 1852 to 1902: the current implementation uses tasks.size() == 0 and
directly accesses task and task.getMessage() which can be null; change the empty
check to tasks.isEmpty(), and in the for-loop skip any null task (and optionally
log a debug/warn), treat a null task.getMessage() as an empty string or a safe
default when setting message, and ensure countOfDoing/countOfSuccess only
increment for non-null tasks; also when encountering Task_Failure use a
null-safe message (e.g., task.getMessage() != null ? task.getMessage() : "");
make these defensive checks so the response never NPEs and counts reflect only
valid tasks.

Comment on lines +1904 to +1951
@Override
public void retryIndexTask(Pdpb.IndexTaskQueryRequest request,
StreamObserver<Pdpb.IndexTaskQueryResponse> observer) {

if (!isLeader()) {
redirectToLeader(PDGrpc.getRetryIndexTaskMethod(), request, observer);
return;
}

var taskInfo = storeNodeService.getTaskInfoMeta();
var builder = Pdpb.IndexTaskQueryResponse.newBuilder();
var taskId = request.getTaskId();

try {
var tasks = taskInfo.scanBuildIndexTask(taskId);

if (tasks.size() == 0) {
builder.setHeader(okHeader).setState(MetaTask.TaskState.Task_Failure)
.setMessage("task not found");
} else {
var state = MetaTask.TaskState.Task_Success;
String message = "OK";
for (var task : tasks) {
var state0 = task.getState();
if (state0 == MetaTask.TaskState.Task_Failure ||
state0 == MetaTask.TaskState.Task_Doing) {
var partition = task.getPartition();
var buildIndex = task.getBuildIndex();

log.info("notify client retry build index task: {}", buildIndex);

PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
.setPartition(
partition)
.setId(task.getId())
.setBuildIndex(
buildIndex));
}
}
builder.setHeader(okHeader).setState(state).setMessage(message);
}
} catch (PDException e) {
builder.setHeader(newErrorHeader(e));
}

observer.onNext(builder.build());
observer.onCompleted();
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

retryIndexTask:重试时应将失败任务显式置回 Doing 并更新时间戳;建议仅对失败任务下发重试信号且使用新的事件ID

当前未更新任务状态,且对 Doing 任务也重复下发重试信号,可能导致状态与行为不一致。建议:

  • 仅对失败任务重试,状态置为 Task_Doing,刷新 startTimestamp 并持久化;
  • 发送心跳通知时使用新的任务事件 ID(与创建时一致),避免复用 taskId。

参考修改:

-            var tasks = taskInfo.scanBuildIndexTask(taskId);
-
-            if (tasks.size() == 0) {
+            var tasks = taskInfo.scanBuildIndexTask(taskId);
+            if (tasks == null || tasks.isEmpty()) {
                 builder.setHeader(okHeader).setState(MetaTask.TaskState.Task_Failure)
                        .setMessage("task not found");
             } else {
-                var state = MetaTask.TaskState.Task_Success;
-                String message = "OK";
-                for (var task : tasks) {
-                    var state0 = task.getState();
-                    if (state0 == MetaTask.TaskState.Task_Failure ||
-                        state0 == MetaTask.TaskState.Task_Doing) {
-                        var partition = task.getPartition();
-                        var buildIndex = task.getBuildIndex();
-
-                        log.info("notify client retry build index task: {}", buildIndex);
-
-                        PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
-                                                                              .setPartition(
-                                                                                      partition)
-                                                                              .setId(task.getId())
-                                                                              .setBuildIndex(
-                                                                                      buildIndex));
-                    }
-                }
-                builder.setHeader(okHeader).setState(state).setMessage(message);
+                int retried = 0;
+                for (var task : tasks) {
+                    if (task == null) continue;
+                    if (task.getState() == MetaTask.TaskState.Task_Failure) {
+                        var updatedTask = task.toBuilder()
+                                              .setState(MetaTask.TaskState.Task_Doing)
+                                              .setStartTimestamp(System.currentTimeMillis())
+                                              .build();
+                        taskInfo.updateBuildIndexTask(updatedTask);
+                        var partition = updatedTask.getPartition();
+                        var buildIndex = updatedTask.getBuildIndex();
+                        log.info("notify client retry build index task: {}", buildIndex);
+                        PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
+                                                                              .setPartition(partition)
+                                                                              .setId(idService.getId(TASK_ID_KEY, 1))
+                                                                              .setBuildIndex(buildIndex));
+                        retried++;
+                    }
+                }
+                builder.setHeader(okHeader)
+                       .setState(retried > 0 ? MetaTask.TaskState.Task_Doing : MetaTask.TaskState.Task_Success)
+                       .setMessage(retried > 0 ? ("retry started for " + retried + " partitions") : "no failed partitions to retry");
             }
-        } catch (PDException e) {
+        } catch (PDException e) {
             builder.setHeader(newErrorHeader(e));
         }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java
lines 1904-1951: currently retryIndexTask iterates all matching tasks and
re-sends retry heartbeats for both FAILED and DOING tasks without persisting
state changes or updating timestamps, and it reuses the taskId as the event id;
change logic to only target tasks whose state == Task_Failure, set their state
to Task_Doing, refresh startTimestamp (and any other relevant timestamp fields),
persist the updated task record via the storeNodeService (or equivalent update
API) before sending notifications, and when building the
PartitionHeartbeatResponse generate and use a new event id (not the original
taskId) for the heartbeat so the retry is represented as a new event; ensure
observer still returns the correct response header/state after updates and
catch/persist failures appropriately.

Comment on lines +1330 to +1334
public void handleBuildIndexTask(MetaTask.Task task) throws PDException {
log.info("build index task {} -{} , report state: {}", task.getPartition().getGraphName(),
task.getPartition().getId(), task.getState());
storeService.getTaskInfoMeta().updateBuildIndexTask(task);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

oneof request_param_union {
bytes index_label = 11; // label id
bool all_index = 12; // rebuild all index
bool label_index = 13; // ??
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specify this comment or remove it

}

var newTaskId = idService.getId(BUILD_INDEX_TASK_ID_KEY, 1);
// log.info("build index task id: {}", newTaskId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove useless log

@imbajin imbajin merged commit 1fa42df into hugegraph:3.7.1-pd-store Sep 8, 2025
4 of 14 checks passed
Tsukilc pushed a commit that referenced this pull request Sep 23, 2025
* refactor(pd): added validation and refactor code
Tsukilc pushed a commit that referenced this pull request Sep 23, 2025
* refactor(pd): added validation and refactor code
imbajin pushed a commit that referenced this pull request Oct 26, 2025
* refactor(pd): added validation and refactor code
imbajin pushed a commit that referenced this pull request Oct 29, 2025
* refactor(pd): added validation and refactor code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants