feat(pd): add build index task in pd#23
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Walkthrough引入“构建索引”任务的端到端能力:扩展 protobuf(任务类型与参数、PD 服务 RPC、心跳响应),新增 PDService 三个 RPC(提交/查询/重试),客户端 PDClient 提供对应封装方法;核心侧增加任务路由与持久化(PartitionService、TaskScheduleService、TaskInfoMeta、MetadataKeyHelper),心跳通知携带构建索引信息。 Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant PDClient
participant PDService
participant TaskInfoMeta as TaskStore
participant PDPulse as Pulse
participant Partition as PartitionNodes
rect rgba(220,240,255,0.6)
Note over Client,PDService: 提交构建索引任务
Client->>PDClient: submitBuildIndexTask(param)
PDClient->>PDService: submitTask(request)
PDService->>PDService: 领导者校验
PDService->>TaskStore: 写入每分区 MetaTask.BuildIndex
TaskStore-->>PDService: ok
PDService->>Pulse: 发布 PartitionHeartbeat(build_index)
Pulse-->>Partition: 通知执行
PDService-->>PDClient: IndexTaskCreateResponse(task_id)
PDClient-->>Client: task_id
end
rect rgba(235,255,235,0.6)
Note over Client,PDService: 查询任务状态
Client->>PDClient: queryBuildIndexTaskStatus(task_id)
PDClient->>PDService: queryTaskState(task_id)
PDService->>TaskStore: 扫描 task 前缀
TaskStore-->>PDService: 分区任务集合
PDService-->>PDClient: IndexTaskQueryResponse(state,message)
PDClient-->>Client: state/message
end
rect rgba(255,240,220,0.6)
Note over Client,PDService: 失败/进行中任务重试
Client->>PDClient: retryBuildIndexTask(task_id)
PDClient->>PDService: retryIndexTask(task_id)
PDService->>TaskStore: 获取任务集合
TaskStore-->>PDService: 分区任务集合
PDService->>Pulse: 对失败/进行中分区发布重试
PDService-->>PDClient: IndexTaskQueryResponse(state,message)
PDClient-->>Client: state/message
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@codecov-ai-reviewer review |
This comment has been minimized.
This comment has been minimized.
| public long submitBuildIndexTask(Metapb.BuildIndexParam param) throws PDException { | ||
| Pdpb.IndexTaskCreateRequest request = Pdpb.IndexTaskCreateRequest.newBuilder() | ||
| .setHeader(header) | ||
| .setParam(param) | ||
| .build(); | ||
| var response = getStub().submitTask(request); | ||
| handleResponseError(response.getHeader()); | ||
| return response.getTaskId(); | ||
| } | ||
|
|
||
| public Pdpb.IndexTaskQueryResponse queryBuildIndexTaskStatus(long taskId) throws PDException { | ||
| Pdpb.IndexTaskQueryRequest request = Pdpb.IndexTaskQueryRequest.newBuilder() | ||
| .setHeader(header) | ||
| .setTaskId(taskId) | ||
| .build(); | ||
| var response = getStub().queryTaskState(request); | ||
| handleResponseError(response.getHeader()); | ||
| return response; | ||
| } | ||
|
|
||
| public Pdpb.IndexTaskQueryResponse retryBuildIndexTask(long taskId) throws PDException { | ||
| Pdpb.IndexTaskQueryRequest request = Pdpb.IndexTaskQueryRequest.newBuilder() | ||
| .setHeader(header) | ||
| .setTaskId(taskId) | ||
| .build(); | ||
| var response = getStub().retryIndexTask(request); | ||
| handleResponseError(response.getHeader()); | ||
| return response; | ||
| } |
There was a problem hiding this comment.
The gRPC stub methods lack proper exception handling and logging. Consider adding error logging before throwing exceptions to aid in debugging. Also, consider adding null checks for the response object before accessing its fields.
| public long submitBuildIndexTask(Metapb.BuildIndexParam param) throws PDException { | |
| Pdpb.IndexTaskCreateRequest request = Pdpb.IndexTaskCreateRequest.newBuilder() | |
| .setHeader(header) | |
| .setParam(param) | |
| .build(); | |
| var response = getStub().submitTask(request); | |
| handleResponseError(response.getHeader()); | |
| return response.getTaskId(); | |
| } | |
| public Pdpb.IndexTaskQueryResponse queryBuildIndexTaskStatus(long taskId) throws PDException { | |
| Pdpb.IndexTaskQueryRequest request = Pdpb.IndexTaskQueryRequest.newBuilder() | |
| .setHeader(header) | |
| .setTaskId(taskId) | |
| .build(); | |
| var response = getStub().queryTaskState(request); | |
| handleResponseError(response.getHeader()); | |
| return response; | |
| } | |
| public Pdpb.IndexTaskQueryResponse retryBuildIndexTask(long taskId) throws PDException { | |
| Pdpb.IndexTaskQueryRequest request = Pdpb.IndexTaskQueryRequest.newBuilder() | |
| .setHeader(header) | |
| .setTaskId(taskId) | |
| .build(); | |
| var response = getStub().retryIndexTask(request); | |
| handleResponseError(response.getHeader()); | |
| return response; | |
| } | |
| public long submitBuildIndexTask(Metapb.BuildIndexParam param) throws PDException { | |
| try { | |
| Pdpb.IndexTaskCreateRequest request = Pdpb.IndexTaskCreateRequest.newBuilder() | |
| .setHeader(header) | |
| .setParam(param) | |
| .build(); | |
| var response = getStub().submitTask(request); | |
| if (response == null) { | |
| throw new PDException(-1, "Received null response from server"); | |
| } | |
| handleResponseError(response.getHeader()); | |
| return response.getTaskId(); | |
| } catch (Exception e) { | |
| log.error("Failed to submit build index task", e); | |
| throw e; | |
| } | |
| } |
Did we get this right? 👍 / 👎 to inform future reviews.
| public void handleBuildIndexTask(MetaTask.Task task) throws PDException { | ||
| log.info("build index task {} -{} , report state: {}", task.getPartition().getGraphName(), | ||
| task.getPartition().getId(), task.getState()); | ||
| storeService.getTaskInfoMeta().updateBuildIndexTask(task); | ||
| } |
There was a problem hiding this comment.
The handleBuildIndexTask method only logs and updates the task but doesn't implement proper error handling or validation. Consider adding validation for the task parameter and implementing proper error handling similar to other task handlers in this class.
| public void handleBuildIndexTask(MetaTask.Task task) throws PDException { | |
| log.info("build index task {} -{} , report state: {}", task.getPartition().getGraphName(), | |
| task.getPartition().getId(), task.getState()); | |
| storeService.getTaskInfoMeta().updateBuildIndexTask(task); | |
| } | |
| public void handleBuildIndexTask(MetaTask.Task task) throws PDException { | |
| if (task == null || task.getPartition() == null) { | |
| throw new PDException(-1, "Invalid build index task: task or partition is null"); | |
| } | |
| log.info("build index task {} -{} , report state: {}", | |
| task.getPartition().getGraphName(), | |
| task.getPartition().getId(), | |
| task.getState()); | |
| try { | |
| storeService.getTaskInfoMeta().updateBuildIndexTask(task); | |
| } catch (Exception e) { | |
| log.error("Failed to update build index task {}", task.getId(), e); | |
| throw new PDException(-1, "Failed to update build index task: " + e.getMessage()); | |
| } | |
| } |
Did we get this right? 👍 / 👎 to inform future reviews.
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java
Show resolved
Hide resolved
| var partitions = partitionService.getPartitions(param.getGraph()); | ||
|
|
||
| if (partitions.size() == 0) { | ||
| throw new PDException(-1, "graph has no partition"); | ||
| } | ||
|
|
||
| var newTaskId = idService.getId(BUILD_INDEX_TASK_ID_KEY, 1); | ||
| // log.info("build index task id: {}", newTaskId); |
There was a problem hiding this comment.
The method creates tasks for all partitions without checking if the graph exists or if the user has permissions. Consider adding proper validation and authorization checks before processing the request.
| var partitions = partitionService.getPartitions(param.getGraph()); | |
| if (partitions.size() == 0) { | |
| throw new PDException(-1, "graph has no partition"); | |
| } | |
| var newTaskId = idService.getId(BUILD_INDEX_TASK_ID_KEY, 1); | |
| // log.info("build index task id: {}", newTaskId); | |
| // Add validation | |
| if (param.getGraph() == null || param.getGraph().isEmpty()) { | |
| throw new PDException(-1, "Graph name cannot be null or empty"); | |
| } | |
| // Validate graph exists | |
| if (!graphExists(param.getGraph())) { | |
| throw new PDException(-1, "Graph does not exist: " + param.getGraph()); | |
| } | |
| var partitions = partitionService.getPartitions(param.getGraph()); |
Did we get this right? 👍 / 👎 to inform future reviews.
| } else if (state0 == MetaTask.TaskState.Task_Success) { | ||
| countOfSuccess++; | ||
| } | ||
| } | ||
|
|
||
| if (state == MetaTask.TaskState.Task_Doing) { | ||
| message = "Doing/" + countOfDoing + ", Success/" + countOfSuccess; | ||
| } | ||
|
|
||
| builder.setHeader(okHeader).setState(state).setMessage(message); | ||
| } |
There was a problem hiding this comment.
The queryTaskState method lacks proper error handling when accessing task properties. The method should validate task existence and handle potential null pointer exceptions when accessing task fields.
| } else if (state0 == MetaTask.TaskState.Task_Success) { | |
| countOfSuccess++; | |
| } | |
| } | |
| if (state == MetaTask.TaskState.Task_Doing) { | |
| message = "Doing/" + countOfDoing + ", Success/" + countOfSuccess; | |
| } | |
| builder.setHeader(okHeader).setState(state).setMessage(message); | |
| } | |
| for (var task : tasks) { | |
| if (task == null) { | |
| log.warn("Found null task in build index task list for taskId: {}", request.getTaskId()); | |
| continue; | |
| } | |
| var state0 = task.getState(); | |
| if (state0 == MetaTask.TaskState.Task_Failure) { | |
| state = MetaTask.TaskState.Task_Failure; | |
| message = task.getMessage() != null ? task.getMessage() : "Task failed with no message"; | |
| break; | |
| } | |
| // rest of the logic | |
| } |
Did we get this right? 👍 / 👎 to inform future reviews.
| log.info("notify client retry build index task: {}", buildIndex); | ||
|
|
||
| PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder() | ||
| .setPartition( | ||
| partition) | ||
| .setId(task.getId()) | ||
| .setBuildIndex( | ||
| buildIndex)); | ||
| } | ||
| } | ||
| builder.setHeader(okHeader).setState(state).setMessage(message); | ||
| } | ||
| } catch (PDException e) { | ||
| builder.setHeader(newErrorHeader(e)); | ||
| } | ||
|
|
||
| observer.onNext(builder.build()); |
There was a problem hiding this comment.
The retryIndexTask method doesn't update the task state when retrying. Consider setting the task state back to 'Task_Doing' when retrying failed tasks to maintain proper state tracking.
| log.info("notify client retry build index task: {}", buildIndex); | |
| PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder() | |
| .setPartition( | |
| partition) | |
| .setId(task.getId()) | |
| .setBuildIndex( | |
| buildIndex)); | |
| } | |
| } | |
| builder.setHeader(okHeader).setState(state).setMessage(message); | |
| } | |
| } catch (PDException e) { | |
| builder.setHeader(newErrorHeader(e)); | |
| } | |
| observer.onNext(builder.build()); | |
| if (state0 == MetaTask.TaskState.Task_Failure || | |
| state0 == MetaTask.TaskState.Task_Doing) { | |
| // Update task state to doing when retrying | |
| var updatedTask = task.toBuilder() | |
| .setState(MetaTask.TaskState.Task_Doing) | |
| .setStartTimestamp(System.currentTimeMillis()) | |
| .build(); | |
| taskInfo.updateBuildIndexTask(updatedTask); | |
| var partition = task.getPartition(); | |
| var buildIndex = task.getBuildIndex(); | |
| log.info("notify client retry build index task: {}", buildIndex); | |
| // notify client with updated task | |
| } |
Did we get this right? 👍 / 👎 to inform future reviews.
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull Request Overview
This PR adds build index task functionality to the PD (Placement Driver) service in Apache HugeGraph. The implementation enables distributed index building by creating tasks that are distributed across partitions and tracked through their lifecycle.
- Adds three new gRPC endpoints for index task management: submitTask, queryTaskState, and retryIndexTask
- Implements task creation, status tracking, and retry mechanisms for build index operations
- Extends the existing task framework to support build index tasks alongside existing partition tasks
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| PDService.java | Implements the three main gRPC endpoints for build index task management |
| pdpb.proto | Defines gRPC service methods and message types for index task operations |
| pd_pulse.proto | Adds build_index field to PartitionHeartbeatResponse for task notification |
| metapb.proto | Defines BuildIndex and BuildIndexParam message structures |
| metaTask.proto | Adds Build_Index task type and buildIndex field to Task message |
| TaskInfoMeta.java | Implements data access methods for build index task persistence |
| MetadataKeyHelper.java | Adds key generation methods for build index task storage |
| TaskScheduleService.java | Adds build index task handling to the task reporting switch |
| PartitionService.java | Implements build index task handling logic |
| PDClient.java | Provides client-side API methods for build index task operations |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| } | ||
|
|
||
| public static byte[] getBuildIndexTaskPrefix(long taskId) { | ||
| // TASK_MOVE/{GraphName}/to PartitionID/{source partitionID} |
There was a problem hiding this comment.
The comment is incorrect and appears to be copy-pasted from another method. It should describe the build index task prefix format instead of the move task format.
| // TASK_MOVE/{GraphName}/to PartitionID/{source partitionID} | |
| // TASK_BI/{taskId} |
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java
Outdated
Show resolved
Hide resolved
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java
Show resolved
Hide resolved
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java
Show resolved
Hide resolved
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (4)
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java (1)
286-292: 注释错误(沿用 Move 的注释),请更正为 Build Index 前缀格式(重复反馈)与历史评论一致:应为
TASK_BI/{taskId},否则误导维护者。public static byte[] getBuildIndexTaskPrefix(long taskId) { - // TASK_MOVE/{GraphName}/to PartitionID/{source partitionID} + // TASK_BI/{taskId} StringBuilder builder = StringBuilderHelper.get() .append(TASK_BUILD_INDEX).append(DELIMITER) .append(taskId); return builder.toString().getBytes(Charset.defaultCharset()); }hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java (1)
1330-1334: 为构建索引任务增加空指针/类型校验与异常处理(避免NPE并提升鲁棒性)当前直接访问 task.getPartition() 与写入元数据,缺少:
- task / partition / buildIndex 为空校验
- 任务类型校验(必须是 Build_Index)
- 持久化异常捕获与转换为明确的 PD 错误码
建议按下述方式增强,避免 NPE 和脏状态写入:
- public void handleBuildIndexTask(MetaTask.Task task) throws PDException { - log.info("build index task {} -{} , report state: {}", task.getPartition().getGraphName(), - task.getPartition().getId(), task.getState()); - storeService.getTaskInfoMeta().updateBuildIndexTask(task); - } + public void handleBuildIndexTask(MetaTask.Task task) throws PDException { + if (task == null || !task.hasPartition() || task.getPartition() == null) { + throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, "invalid build index task: partition is null"); + } + if (task.getType() != MetaTask.TaskType.Build_Index || !task.hasBuildIndex()) { + throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, "invalid task type: expected Build_Index with buildIndex payload"); + } + log.info("build index task {} -{} , report state: {}", + task.getPartition().getGraphName(), task.getPartition().getId(), task.getState()); + try { + storeService.getTaskInfoMeta().updateBuildIndexTask(task); + } catch (Exception e) { + log.error("Failed to update build index task {}", task.getId(), e); + throw new PDException(Pdpb.ErrorType.ROCKSDB_WRITE_ERROR_VALUE, "update build index task failed: " + e.getMessage()); + } + }hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java (1)
1345-1353: submitBuildIndexTask 增加响应空值防护与统一异常包装(避免调用方拿到空指针)为与类内其他 RPC 调用的健壮性一致,建议补充 null 响应检查与异常捕获/记录:
- public long submitBuildIndexTask(Metapb.BuildIndexParam param) throws PDException { - Pdpb.IndexTaskCreateRequest request = Pdpb.IndexTaskCreateRequest.newBuilder() - .setHeader(header) - .setParam(param) - .build(); - var response = getStub().submitTask(request); - handleResponseError(response.getHeader()); - return response.getTaskId(); - } + public long submitBuildIndexTask(Metapb.BuildIndexParam param) throws PDException { + Pdpb.IndexTaskCreateRequest request = Pdpb.IndexTaskCreateRequest.newBuilder() + .setHeader(header) + .setParam(param) + .build(); + try { + var response = getStub().submitTask(request); + if (response == null) { + throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE, "submitTask returned null response"); + } + handleResponseError(response.getHeader()); + return response.getTaskId(); + } catch (Exception e) { + log.error("submitBuildIndexTask failed", e); + if (e instanceof PDException) throw (PDException) e; + throw new PDException(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage()); + } + }hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java (1)
127-131: updateBuildIndexTask 增加入参与类型校验,防止写入非法/空任务为避免 NPE 与脏数据,建议加入 null、类型与 payload 校验:
- public void updateBuildIndexTask(MetaTask.Task task) throws PDException { - var bt = task.getBuildIndex(); - byte[] key = MetadataKeyHelper.getBuildIndexTaskKey(bt.getTaskId(), bt.getPartitionId()); - put(key, task.toByteArray()); - } + public void updateBuildIndexTask(MetaTask.Task task) throws PDException { + if (task == null) { + throw new PDException(-1, "Task cannot be null"); + } + if (task.getType() != MetaTask.TaskType.Build_Index || !task.hasBuildIndex() || !task.hasPartition()) { + throw new PDException(-1, "Task must be Build_Index with buildIndex and partition"); + } + var bt = task.getBuildIndex(); + byte[] key = MetadataKeyHelper.getBuildIndexTaskKey(bt.getTaskId(), bt.getPartitionId()); + put(key, task.toByteArray()); + }
🧹 Nitpick comments (8)
hugegraph-pd/hg-pd-grpc/src/main/proto/metapb.proto (1)
393-397: 关于字段类型:优先统一 ID 标量类型以降低客户端复杂度当前
label_id/index_label使用 bytes;若 ID 实际为数值,建议统一为uint64(或统一保持 bytes,但需在文档明确编码方式)。混用会增加多语言客户端解析复杂度。hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java (1)
277-284: Key 结构合理,但注释建议规范化当前注释存在多余空格,“task id / partition id” 建议统一为更明确的格式样例。
应用以下注释微调:
public static byte[] getBuildIndexTaskKey(long taskId, int partitionId) { - // TASK_BI/ task id / partition id + // TASK_BI/{taskId}/{partitionId} StringBuilder builder = StringBuilderHelper.get() .append(TASK_BUILD_INDEX).append(DELIMITER) .append(taskId).append(DELIMITER) .append(partitionId); return builder.toString().getBytes(Charset.defaultCharset()); }hugegraph-pd/hg-pd-grpc/src/main/proto/pd_pulse.proto (1)
96-97: 在心跳响应中携带 BuildIndex:请补充字段语义与兼容性说明
- 建议补注释说明:该字段何时下发、是否仅面向分区 Leader、是否可能与其他变更指令并存。
- 确认旧版本 Store/PD 对未知字段的兼容性策略(应忽略 11 号未知字段)。
可在字段上添加简短注释:
- metapb.BuildIndex build_index = 11; + // 下发构建索引指令(仅当 PD 需要促发时设置;多版本兼容:旧 Store 忽略未知字段) + metapb.BuildIndex build_index = 11;hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java (1)
673-676: 增加 Build_Index 分支合理;建议评估与均衡/迁移任务的并发影响
- 报告路径已接入
partitionService.handleBuildIndexTask(task),对齐其他任务类型。- 建议在均衡/迁移等关键流程(如
balancePartitionLeader、balancePartitionShard)考虑 Build_Index 的占用影响,必要时参考hasSplitTaskDoing()/hasMoveTaskDoing()增加hasBuildIndexTaskDoing()之类短路保护,避免资源争用。hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java (2)
1355-1363: queryBuildIndexTaskStatus:补充空响应防护建议与 submit 对齐,增加空响应检查,避免后续 NPE:
- var response = getStub().queryTaskState(request); + var response = getStub().queryTaskState(request); + if (response == null) { + throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE, "queryTaskState returned null response"); + } handleResponseError(response.getHeader()); return response;
1365-1373: retryBuildIndexTask:补充空响应防护同上,建议增加 null 检查:
- var response = getStub().retryIndexTask(request); + var response = getStub().retryIndexTask(request); + if (response == null) { + throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE, "retryIndexTask returned null response"); + } handleResponseError(response.getHeader()); return response;hugegraph-pd/hg-pd-grpc/src/main/proto/pdpb.proto (2)
109-115: RPC 命名建议更具体(可选)submitTask/queryTaskState/retryIndexTask 语义泛化,易与未来其它类型任务混淆。可选地重命名为 BuildIndex 语义更清晰(需同步客户端/服务端和再生成 stub):
- // Submit rebuild index task - rpc submitTask(IndexTaskCreateRequest) returns (IndexTaskCreateResponse) {} - // Check task status - rpc queryTaskState(IndexTaskQueryRequest) returns (IndexTaskQueryResponse) {} - // Retry index - rpc retryIndexTask(IndexTaskQueryRequest) returns (IndexTaskQueryResponse){} + // Submit rebuild index task + rpc SubmitBuildIndexTask(IndexTaskCreateRequest) returns (IndexTaskCreateResponse) {} + // Check build index task status + rpc QueryBuildIndexTaskState(IndexTaskQueryRequest) returns (IndexTaskQueryResponse) {} + // Retry build index task + rpc RetryBuildIndexTask(IndexTaskQueryRequest) returns (IndexTaskQueryResponse) {}
628-632: IndexTaskQueryResponse 建议补充统计字段以便客户端直观展示(兼容性增强,向后兼容)为减少客户端重复聚合,建议直接返回 Doing/Success 计数(新增字段对旧客户端透明):
message IndexTaskQueryResponse{ ResponseHeader header = 1; metaTask.TaskState state = 2; string message = 3; + uint32 doing_count = 4; + uint32 success_count = 5; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (10)
hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java(1 hunks)hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java(1 hunks)hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java(1 hunks)hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java(2 hunks)hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java(1 hunks)hugegraph-pd/hg-pd-grpc/src/main/proto/metaTask.proto(2 hunks)hugegraph-pd/hg-pd-grpc/src/main/proto/metapb.proto(1 hunks)hugegraph-pd/hg-pd-grpc/src/main/proto/pd_pulse.proto(1 hunks)hugegraph-pd/hg-pd-grpc/src/main/proto/pdpb.proto(2 hunks)hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java(3 hunks)
🔇 Additional comments (4)
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java (1)
43-43: 新增命名空间常量 OK
TASK_BUILD_INDEX = "TASK_BI"命名简洁,与现有风格一致。hugegraph-pd/hg-pd-grpc/src/main/proto/metaTask.proto (1)
31-32: 确认 Build_Index 任务全链路已实现,无缺失,可合并。hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java (2)
117-120: 按前缀扫描构建索引任务的实现看起来正确接口命名与使用 scanPrefix 的方式与现有 Split/Move 任务保持一致,LGTM。
122-125: 按键获取构建索引任务的实现看起来正确键生成与解析器一致,返回单任务对象,LGTM。
| message BuildIndex { | ||
| uint64 taskId = 1; | ||
| uint32 partition_id = 2; | ||
| BuildIndexParam param = 11; | ||
| } | ||
|
|
||
| message BuildIndexParam { | ||
| string graph = 1; | ||
| bytes label_id = 2; | ||
| bool is_vertex_label = 3; | ||
| bytes prefix = 4; // query prefix | ||
|
|
||
| oneof request_param_union { | ||
| bytes index_label = 11; // label id | ||
| bool all_index = 12; // rebuild all index | ||
| bool label_index = 13; // ?? | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
BuildIndex/BuildIndexParam 协议语义与命名需在发布前定型;移除不确定注释并补充保留字段以防将来冲突
- 字段命名风格不一致:
taskId使用驼峰而本文件其他字段多为下划线(如partition_id)。为统一性建议改为task_id(若已对外发布则至少补充注释说明)。 BuildIndexParam中label_id与index_label概念重叠不清,// ??注释不可出现在协议;请明确“基础标签ID”与“索引标签ID”的关系并补充注释,或重命名为base_label_id/index_label_id。oneof request_param_union用 bool(all_index、label_index)作为分支标记不直观,建议改为空消息或使用枚举描述范围;至少需严格说明三者互斥的语义。- 若有意跳过 3~10 号字段,请在消息内显式 reserved,避免未来扩展时产生 Tag 冲突。
prefix的用途应更具体(例如“KV 扫描前缀/数据扫描前缀”)。
[建议变更]
仅调整注释与保留字段,尽量不破坏现有生成代码:
message BuildIndex {
- uint64 taskId = 1;
+ // 任务ID(建议与其他字段风格统一:task_id)
+ uint64 taskId = 1;
uint32 partition_id = 2;
+ // 预留 3~10,避免未来冲突
+ reserved 3 to 10;
BuildIndexParam param = 11;
}
message BuildIndexParam {
- string graph = 1;
- bytes label_id = 2;
- bool is_vertex_label = 3;
- bytes prefix = 4; // query prefix
+ string graph = 1; // 图空间名称
+ bytes label_id = 2; // 基础标签ID(顶点/边)
+ bool is_vertex_label = 3; // true=顶点标签,false=边标签
+ bytes prefix = 4; // 扫描/查询使用的前缀
oneof request_param_union {
- bytes index_label = 11; // label id
- bool all_index = 12; // rebuild all index
- bool label_index = 13; // ??
+ bytes index_label = 11; // 索引标签ID(指定构建某一索引标签)
+ bool all_index = 12; // 重建该 label 下的全部索引
+ bool label_index = 13; // 仅重建 label 级索引(非具体索引标签)
}
}如需要,我可以给出使用枚举/空消息替代 bool 的 oneof 版本,以提升可读性与前向兼容性。
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| message BuildIndex { | |
| uint64 taskId = 1; | |
| uint32 partition_id = 2; | |
| BuildIndexParam param = 11; | |
| } | |
| message BuildIndexParam { | |
| string graph = 1; | |
| bytes label_id = 2; | |
| bool is_vertex_label = 3; | |
| bytes prefix = 4; // query prefix | |
| oneof request_param_union { | |
| bytes index_label = 11; // label id | |
| bool all_index = 12; // rebuild all index | |
| bool label_index = 13; // ?? | |
| } | |
| } | |
| message BuildIndex { | |
| // 任务ID(建议与其他字段风格统一:task_id) | |
| uint64 taskId = 1; | |
| uint32 partition_id = 2; | |
| // 预留 3~10,避免未来冲突 | |
| reserved 3 to 10; | |
| BuildIndexParam param = 11; | |
| } | |
| message BuildIndexParam { | |
| string graph = 1; // 图空间名称 | |
| bytes label_id = 2; // 基础标签ID(顶点/边) | |
| bool is_vertex_label = 3; // true=顶点标签,false=边标签 | |
| bytes prefix = 4; // 扫描/查询使用的前缀 | |
| oneof request_param_union { | |
| bytes index_label = 11; // 索引标签ID(指定构建某一索引标签) | |
| bool all_index = 12; // 重建该 label 下的全部索引 | |
| bool label_index = 13; // 仅重建 label 级索引(非具体索引标签) | |
| } | |
| } |
| public void submitTask(Pdpb.IndexTaskCreateRequest request, | ||
| StreamObserver<Pdpb.IndexTaskCreateResponse> observer) { | ||
| if (!isLeader()) { | ||
| redirectToLeader(PDGrpc.getSubmitTaskMethod(), request, observer); | ||
| return; | ||
| } | ||
|
|
||
| var builder = Pdpb.IndexTaskCreateResponse.newBuilder(); | ||
| var param = request.getParam(); | ||
| try { | ||
| var partitions = partitionService.getPartitions(param.getGraph()); | ||
|
|
||
| if (partitions.size() == 0) { | ||
| throw new PDException(-1, "graph has no partition"); | ||
| } | ||
|
|
||
| var newTaskId = idService.getId(BUILD_INDEX_TASK_ID_KEY, 1); | ||
| // log.info("build index task id: {}", newTaskId); | ||
|
|
||
| var taskInfo = storeNodeService.getTaskInfoMeta(); | ||
| for (var partition : partitions) { | ||
| var buildIndex = Metapb.BuildIndex.newBuilder() | ||
| .setPartitionId(partition.getId()) | ||
| .setTaskId(newTaskId) | ||
| .setParam(param) | ||
| .build(); | ||
|
|
||
| var task = MetaTask.Task.newBuilder() | ||
| .setId(newTaskId) | ||
| .setType(MetaTask.TaskType.Build_Index) | ||
| .setState(MetaTask.TaskState.Task_Doing) | ||
| .setStartTimestamp(System.currentTimeMillis()) | ||
| .setPartition(partition) | ||
| .setBuildIndex(buildIndex) | ||
| .build(); | ||
|
|
||
| taskInfo.updateBuildIndexTask(task); | ||
|
|
||
| log.info("notify client build index task: {}", buildIndex); | ||
|
|
||
| PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder() | ||
| .setPartition(partition) | ||
| .setId(idService.getId( | ||
| TASK_ID_KEY, 1)) | ||
| .setBuildIndex(buildIndex)); | ||
| } | ||
| observer.onNext(builder.setHeader(okHeader).setTaskId(newTaskId).build()); | ||
| } catch (PDException e) { | ||
| log.error("IndexTaskGrpcService.submitTask", e); | ||
| observer.onNext(builder.setHeader(newErrorHeader(e)).build()); | ||
| } | ||
| observer.onCompleted(); | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
submitTask:补充入参校验、空列表判断与通用异常捕获;避免提交空图/空分区与部分失败
问题与建议:
- 未校验 request/param/graph 是否为空;空图名会导致意外行为。
- 仅捕获 PDException,其他异常将导致 gRPC 流未按协议结束。
- 判空建议使用 isEmpty()。
- 当存在部分分区更新失败时应记录并返回错误,避免“半提交”。
建议修改如下(核心片段):
var builder = Pdpb.IndexTaskCreateResponse.newBuilder();
var param = request.getParam();
try {
+ if (param == null || param.getGraph() == null || param.getGraph().isEmpty()) {
+ observer.onNext(builder.setHeader(newErrorHeader(Pdpb.ErrorType.NOT_FOUND_VALUE, "graph is null or empty")).build());
+ observer.onCompleted();
+ return;
+ }
+ // 校验图是否存在
+ var g = partitionService.getGraph(param.getGraph());
+ if (g == null) {
+ observer.onNext(builder.setHeader(newErrorHeader(Pdpb.ErrorType.NOT_FOUND_VALUE, "graph not found: " + param.getGraph())).build());
+ observer.onCompleted();
+ return;
+ }
var partitions = partitionService.getPartitions(param.getGraph());
- if (partitions.size() == 0) {
+ if (partitions.isEmpty()) {
throw new PDException(-1, "graph has no partition");
}
@@
- PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
+ PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
.setPartition(partition)
- .setId(idService.getId(
- TASK_ID_KEY, 1))
+ .setId(idService.getId(TASK_ID_KEY, 1))
.setBuildIndex(buildIndex));
}
observer.onNext(builder.setHeader(okHeader).setTaskId(newTaskId).build());
- } catch (PDException e) {
+ } catch (PDException e) {
log.error("IndexTaskGrpcService.submitTask", e);
observer.onNext(builder.setHeader(newErrorHeader(e)).build());
+ } catch (Exception e) {
+ log.error("IndexTaskGrpcService.submitTask (unexpected)", e);
+ observer.onNext(builder.setHeader(newErrorHeader(Pdpb.ErrorType.UNKNOWN_VALUE, e.getMessage())).build());
}
observer.onCompleted();Committable suggestion skipped: line range outside the PR's diff.
| @Override | ||
| public void queryTaskState(org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryRequest request, | ||
| StreamObserver<org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryResponse> observer) { | ||
|
|
||
| if (!isLeader()) { | ||
| redirectToLeader(PDGrpc.getQueryTaskStateMethod(), request, observer); | ||
| return; | ||
| } | ||
|
|
||
| var taskInfo = storeNodeService.getTaskInfoMeta(); | ||
| var builder = Pdpb.IndexTaskQueryResponse.newBuilder(); | ||
|
|
||
| try { | ||
| var tasks = taskInfo.scanBuildIndexTask(request.getTaskId()); | ||
|
|
||
| if (tasks.size() == 0) { | ||
| builder.setHeader(okHeader).setState(MetaTask.TaskState.Task_Unknown) | ||
| .setMessage("task not found"); | ||
| } else { | ||
| var state = MetaTask.TaskState.Task_Success; | ||
| String message = "OK"; | ||
| int countOfSuccess = 0; | ||
| int countOfDoing = 0; | ||
|
|
||
| for (var task : tasks) { | ||
| var state0 = task.getState(); | ||
| if (state0 == MetaTask.TaskState.Task_Failure) { | ||
| state = MetaTask.TaskState.Task_Failure; | ||
| message = task.getMessage(); | ||
| break; | ||
| } else if (state0 == MetaTask.TaskState.Task_Doing) { | ||
| state = MetaTask.TaskState.Task_Doing; | ||
| countOfDoing++; | ||
| } else if (state0 == MetaTask.TaskState.Task_Success) { | ||
| countOfSuccess++; | ||
| } | ||
| } | ||
|
|
||
| if (state == MetaTask.TaskState.Task_Doing) { | ||
| message = "Doing/" + countOfDoing + ", Success/" + countOfSuccess; | ||
| } | ||
|
|
||
| builder.setHeader(okHeader).setState(state).setMessage(message); | ||
| } | ||
| } catch (PDException e) { | ||
| builder.setHeader(newErrorHeader(e)); | ||
| } | ||
|
|
||
| observer.onNext(builder.build()); | ||
| observer.onCompleted(); | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
queryTaskState:防御式编程,避免任务列表/字段空指针并直出计数
问题与建议:
- tasks 为空使用 isEmpty()。
- 访问 task.getMessage() 可能为 null,需安全处理。
- 遇到空 task 应跳过并记录。
建议修改核心循环与返回:
- var tasks = taskInfo.scanBuildIndexTask(request.getTaskId());
-
- if (tasks.size() == 0) {
+ var tasks = taskInfo.scanBuildIndexTask(request.getTaskId());
+ if (tasks == null || tasks.isEmpty()) {
builder.setHeader(okHeader).setState(MetaTask.TaskState.Task_Unknown)
.setMessage("task not found");
} else {
var state = MetaTask.TaskState.Task_Success;
String message = "OK";
int countOfSuccess = 0;
int countOfDoing = 0;
- for (var task : tasks) {
+ for (var task : tasks) {
+ if (task == null) {
+ log.warn("null task in build index task list for taskId: {}", request.getTaskId());
+ continue;
+ }
var state0 = task.getState();
if (state0 == MetaTask.TaskState.Task_Failure) {
state = MetaTask.TaskState.Task_Failure;
- message = task.getMessage();
+ message = (task.getMessage() == null || task.getMessage().isEmpty())
+ ? "Task failed" : task.getMessage();
break;
} else if (state0 == MetaTask.TaskState.Task_Doing) {
state = MetaTask.TaskState.Task_Doing;
countOfDoing++;
} else if (state0 == MetaTask.TaskState.Task_Success) {
countOfSuccess++;
}
}
@@
- builder.setHeader(okHeader).setState(state).setMessage(message);
+ builder.setHeader(okHeader).setState(state).setMessage(message)
+ .setDoingCount(countOfDoing).setSuccessCount(countOfSuccess);
}
- } catch (PDException e) {
+ } catch (PDException e) {
builder.setHeader(newErrorHeader(e));
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @Override | |
| public void queryTaskState(org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryRequest request, | |
| StreamObserver<org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryResponse> observer) { | |
| if (!isLeader()) { | |
| redirectToLeader(PDGrpc.getQueryTaskStateMethod(), request, observer); | |
| return; | |
| } | |
| var taskInfo = storeNodeService.getTaskInfoMeta(); | |
| var builder = Pdpb.IndexTaskQueryResponse.newBuilder(); | |
| try { | |
| var tasks = taskInfo.scanBuildIndexTask(request.getTaskId()); | |
| if (tasks.size() == 0) { | |
| builder.setHeader(okHeader).setState(MetaTask.TaskState.Task_Unknown) | |
| .setMessage("task not found"); | |
| } else { | |
| var state = MetaTask.TaskState.Task_Success; | |
| String message = "OK"; | |
| int countOfSuccess = 0; | |
| int countOfDoing = 0; | |
| for (var task : tasks) { | |
| var state0 = task.getState(); | |
| if (state0 == MetaTask.TaskState.Task_Failure) { | |
| state = MetaTask.TaskState.Task_Failure; | |
| message = task.getMessage(); | |
| break; | |
| } else if (state0 == MetaTask.TaskState.Task_Doing) { | |
| state = MetaTask.TaskState.Task_Doing; | |
| countOfDoing++; | |
| } else if (state0 == MetaTask.TaskState.Task_Success) { | |
| countOfSuccess++; | |
| } | |
| } | |
| if (state == MetaTask.TaskState.Task_Doing) { | |
| message = "Doing/" + countOfDoing + ", Success/" + countOfSuccess; | |
| } | |
| builder.setHeader(okHeader).setState(state).setMessage(message); | |
| } | |
| } catch (PDException e) { | |
| builder.setHeader(newErrorHeader(e)); | |
| } | |
| observer.onNext(builder.build()); | |
| observer.onCompleted(); | |
| } | |
| @Override | |
| public void queryTaskState(org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryRequest request, | |
| StreamObserver<org.apache.hugegraph.pd.grpc.Pdpb.IndexTaskQueryResponse> observer) { | |
| if (!isLeader()) { | |
| redirectToLeader(PDGrpc.getQueryTaskStateMethod(), request, observer); | |
| return; | |
| } | |
| var taskInfo = storeNodeService.getTaskInfoMeta(); | |
| var builder = Pdpb.IndexTaskQueryResponse.newBuilder(); | |
| try { | |
| var tasks = taskInfo.scanBuildIndexTask(request.getTaskId()); | |
| if (tasks == null || tasks.isEmpty()) { | |
| builder.setHeader(okHeader) | |
| .setState(MetaTask.TaskState.Task_Unknown) | |
| .setMessage("task not found"); | |
| } else { | |
| var state = MetaTask.TaskState.Task_Success; | |
| String message = "OK"; | |
| int countOfSuccess = 0; | |
| int countOfDoing = 0; | |
| for (var task : tasks) { | |
| if (task == null) { | |
| log.warn("null task in build index task list for taskId: {}", request.getTaskId()); | |
| continue; | |
| } | |
| var state0 = task.getState(); | |
| if (state0 == MetaTask.TaskState.Task_Failure) { | |
| state = MetaTask.TaskState.Task_Failure; | |
| message = (task.getMessage() == null || task.getMessage().isEmpty()) | |
| ? "Task failed" : task.getMessage(); | |
| break; | |
| } else if (state0 == MetaTask.TaskState.Task_Doing) { | |
| state = MetaTask.TaskState.Task_Doing; | |
| countOfDoing++; | |
| } else if (state0 == MetaTask.TaskState.Task_Success) { | |
| countOfSuccess++; | |
| } | |
| } | |
| if (state == MetaTask.TaskState.Task_Doing) { | |
| message = "Doing/" + countOfDoing + ", Success/" + countOfSuccess; | |
| } | |
| builder.setHeader(okHeader) | |
| .setState(state) | |
| .setMessage(message) | |
| .setDoingCount(countOfDoing) | |
| .setSuccessCount(countOfSuccess); | |
| } | |
| } catch (PDException e) { | |
| builder.setHeader(newErrorHeader(e)); | |
| } | |
| observer.onNext(builder.build()); | |
| observer.onCompleted(); | |
| } |
🤖 Prompt for AI Agents
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java
around lines 1852 to 1902: the current implementation uses tasks.size() == 0 and
directly accesses task and task.getMessage() which can be null; change the empty
check to tasks.isEmpty(), and in the for-loop skip any null task (and optionally
log a debug/warn), treat a null task.getMessage() as an empty string or a safe
default when setting message, and ensure countOfDoing/countOfSuccess only
increment for non-null tasks; also when encountering Task_Failure use a
null-safe message (e.g., task.getMessage() != null ? task.getMessage() : "");
make these defensive checks so the response never NPEs and counts reflect only
valid tasks.
| @Override | ||
| public void retryIndexTask(Pdpb.IndexTaskQueryRequest request, | ||
| StreamObserver<Pdpb.IndexTaskQueryResponse> observer) { | ||
|
|
||
| if (!isLeader()) { | ||
| redirectToLeader(PDGrpc.getRetryIndexTaskMethod(), request, observer); | ||
| return; | ||
| } | ||
|
|
||
| var taskInfo = storeNodeService.getTaskInfoMeta(); | ||
| var builder = Pdpb.IndexTaskQueryResponse.newBuilder(); | ||
| var taskId = request.getTaskId(); | ||
|
|
||
| try { | ||
| var tasks = taskInfo.scanBuildIndexTask(taskId); | ||
|
|
||
| if (tasks.size() == 0) { | ||
| builder.setHeader(okHeader).setState(MetaTask.TaskState.Task_Failure) | ||
| .setMessage("task not found"); | ||
| } else { | ||
| var state = MetaTask.TaskState.Task_Success; | ||
| String message = "OK"; | ||
| for (var task : tasks) { | ||
| var state0 = task.getState(); | ||
| if (state0 == MetaTask.TaskState.Task_Failure || | ||
| state0 == MetaTask.TaskState.Task_Doing) { | ||
| var partition = task.getPartition(); | ||
| var buildIndex = task.getBuildIndex(); | ||
|
|
||
| log.info("notify client retry build index task: {}", buildIndex); | ||
|
|
||
| PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder() | ||
| .setPartition( | ||
| partition) | ||
| .setId(task.getId()) | ||
| .setBuildIndex( | ||
| buildIndex)); | ||
| } | ||
| } | ||
| builder.setHeader(okHeader).setState(state).setMessage(message); | ||
| } | ||
| } catch (PDException e) { | ||
| builder.setHeader(newErrorHeader(e)); | ||
| } | ||
|
|
||
| observer.onNext(builder.build()); | ||
| observer.onCompleted(); | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
retryIndexTask:重试时应将失败任务显式置回 Doing 并更新时间戳;建议仅对失败任务下发重试信号且使用新的事件ID
当前未更新任务状态,且对 Doing 任务也重复下发重试信号,可能导致状态与行为不一致。建议:
- 仅对失败任务重试,状态置为 Task_Doing,刷新 startTimestamp 并持久化;
- 发送心跳通知时使用新的任务事件 ID(与创建时一致),避免复用 taskId。
参考修改:
- var tasks = taskInfo.scanBuildIndexTask(taskId);
-
- if (tasks.size() == 0) {
+ var tasks = taskInfo.scanBuildIndexTask(taskId);
+ if (tasks == null || tasks.isEmpty()) {
builder.setHeader(okHeader).setState(MetaTask.TaskState.Task_Failure)
.setMessage("task not found");
} else {
- var state = MetaTask.TaskState.Task_Success;
- String message = "OK";
- for (var task : tasks) {
- var state0 = task.getState();
- if (state0 == MetaTask.TaskState.Task_Failure ||
- state0 == MetaTask.TaskState.Task_Doing) {
- var partition = task.getPartition();
- var buildIndex = task.getBuildIndex();
-
- log.info("notify client retry build index task: {}", buildIndex);
-
- PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
- .setPartition(
- partition)
- .setId(task.getId())
- .setBuildIndex(
- buildIndex));
- }
- }
- builder.setHeader(okHeader).setState(state).setMessage(message);
+ int retried = 0;
+ for (var task : tasks) {
+ if (task == null) continue;
+ if (task.getState() == MetaTask.TaskState.Task_Failure) {
+ var updatedTask = task.toBuilder()
+ .setState(MetaTask.TaskState.Task_Doing)
+ .setStartTimestamp(System.currentTimeMillis())
+ .build();
+ taskInfo.updateBuildIndexTask(updatedTask);
+ var partition = updatedTask.getPartition();
+ var buildIndex = updatedTask.getBuildIndex();
+ log.info("notify client retry build index task: {}", buildIndex);
+ PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder()
+ .setPartition(partition)
+ .setId(idService.getId(TASK_ID_KEY, 1))
+ .setBuildIndex(buildIndex));
+ retried++;
+ }
+ }
+ builder.setHeader(okHeader)
+ .setState(retried > 0 ? MetaTask.TaskState.Task_Doing : MetaTask.TaskState.Task_Success)
+ .setMessage(retried > 0 ? ("retry started for " + retried + " partitions") : "no failed partitions to retry");
}
- } catch (PDException e) {
+ } catch (PDException e) {
builder.setHeader(newErrorHeader(e));
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java
lines 1904-1951: currently retryIndexTask iterates all matching tasks and
re-sends retry heartbeats for both FAILED and DOING tasks without persisting
state changes or updating timestamps, and it reuses the taskId as the event id;
change logic to only target tasks whose state == Task_Failure, set their state
to Task_Doing, refresh startTimestamp (and any other relevant timestamp fields),
persist the updated task record via the storeNodeService (or equivalent update
API) before sending notifications, and when building the
PartitionHeartbeatResponse generate and use a new event id (not the original
taskId) for the heartbeat so the retry is represented as a new event; ensure
observer still returns the correct response header/state after updates and
catch/persist failures appropriately.
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java
Show resolved
Hide resolved
| public void handleBuildIndexTask(MetaTask.Task task) throws PDException { | ||
| log.info("build index task {} -{} , report state: {}", task.getPartition().getGraphName(), | ||
| task.getPartition().getId(), task.getState()); | ||
| storeService.getTaskInfoMeta().updateBuildIndexTask(task); | ||
| } |
| oneof request_param_union { | ||
| bytes index_label = 11; // label id | ||
| bool all_index = 12; // rebuild all index | ||
| bool label_index = 13; // ?? |
There was a problem hiding this comment.
specify this comment or remove it
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| var newTaskId = idService.getId(BUILD_INDEX_TASK_ID_KEY, 1); | ||
| // log.info("build index task id: {}", newTaskId); |
* refactor(pd): added validation and refactor code
* refactor(pd): added validation and refactor code
* refactor(pd): added validation and refactor code
* refactor(pd): added validation and refactor code
Purpose of the PR
Main Changes
Verifying these changes
Does this PR potentially affect the following parts?
Documentation Status
Doc - TODODoc - DoneDoc - No NeedSummary by CodeRabbit