feat: preliminary support for distributed task scheduler#2319
feat: preliminary support for distributed task scheduler#2319zyxxoo merged 11 commits intoapache:pd-storefrom
Conversation
...-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java
Outdated
Show resolved
Hide resolved
| @@ -136,11 +136,12 @@ public Map<String, Object> get(@Context GraphManager manager, | |||
| @RedirectFilter.RedirectMasterRole | |||
There was a problem hiding this comment.
看 DistributedTaskScheduler 实现,我理解就是所有的节点都会处理任务,通过分布式锁来保证任务只被一个节点处理,原来的方式任务只能被 master 提交,由 master 分配给其他节点处理,所以现在这个逻辑应该不需要了,对吧,硬刺这里RedirectFilter这里的拦截器实现可以加下开关,使用 DistributedTaskScheduler 的时候不生效
There was a problem hiding this comment.
在 org.apache.hugegraph.api.filter.RedirectFilter#filter 处进行了修改,当匹配到 TaskAPI 调用且对应 graph 的 scheduler 类型为 DistributedTaskScheduler 时不进行重定向
| } | ||
| } | ||
|
|
||
| public void cronSchedule() { |
There was a problem hiding this comment.
这里个人有个想法可以讨论下哈:这段代码看着应该是每隔一段时间,查询所有的任务,然后过滤出没有被锁的任务,最后驱动这些没有被锁的任务的状态,我理解没错的话,那么其实所有节点都会查询一次所有的任务,再上层会做filter,这里看着似乎可以将这个逻辑下层到store,store 层每次返回给server的节点的批量数据都需要先锁定,然后在返回给server, 如果已经被锁定了,就在store 层过滤掉
There was a problem hiding this comment.
另外,看文档时说有根据负载来执行任务的,但是这里貌似没看到,是在其他地方还是暂时没实现哈=。=?原来的调度策略是 master 根据各个节点的负载来指定任务的
There was a problem hiding this comment.
另外,看文档时说有根据负载来执行任务的,但是这里貌似没看到,是在其他地方还是暂时没实现哈=。=?原来的调度策略是 master 根据各个节点的负载来指定任务的
check 了下内部版代码,应该是暂时未实现 😵💫
There was a problem hiding this comment.
这里个人有个想法可以讨论下哈:这段代码看着应该是每隔一段时间,查询所有的任务,然后过滤出没有被锁的任务,最后驱动这些没有被锁的任务的状态,我理解没错的话,那么其实所有节点都会查询一次所有的任务,再上层会做filter,这里看着似乎可以将这个逻辑下层到store,store 层每次返回给server的节点的批量数据都需要先锁定,然后在返回给server, 如果已经被锁定了,就在store 层过滤掉
我的理解是,这里过滤出没有被锁 (无节点在执行) 的任务主要出于三个目的
- 将处于 RUNNING 状态且无节点在执行的任务状态转移到 FAILED
- 将处于 CANCELLING 状态且无节点在执行的任务状态转移到 CANCELLED
- 将处于 DELETING 状态且无节点在执行的任务删除
如果在 store 层过滤掉已经被锁定的任务,server 层就无法对这些任务进行相应的处理了 (例如对处于 RUNNING 状态的任务初始化环境变量,此时这些任务应当是被锁定的)
| this.serverInfoDbExecutor); | ||
| this.schedulers.put(graph, scheduler); | ||
| LOG.info("Use {} as the scheduler of graph ({})", | ||
| graph.schedulerType(), graph.name()); |
There was a problem hiding this comment.
调度类型似乎全局的更合适,因为如果使用了自研的分布式存储,那其实就不会用原来的调度了,而是使用新的
There was a problem hiding this comment.
@zyxxoo 意思是将 task.scheduler_type 这个选项从 CoreOptions 移动到 ServerOptions 中吗?我理解的是每个图实例都可以单独设定调度类型
There was a problem hiding this comment.
可以先这样吧,没个图实例单独设置不同的调度类型,意义不大,基本都是一样的
| // Do close-tx for current thread | ||
| graph.closeTx(); | ||
| // Let other threads run | ||
| Thread.yield(); |
There was a problem hiding this comment.
这里为啥会有这段代码哈? 为什么要让其他线程run呢
参考的是 close schedulerExecutor 的逻辑 org.apache.hugegraph.task.TaskManager#closeSchedulerTx
| } | ||
| } | ||
|
|
||
| public void cronSchedule() { |
There was a problem hiding this comment.
另外,看文档时说有根据负载来执行任务的,但是这里貌似没看到,是在其他地方还是暂时没实现哈=。=?原来的调度策略是 master 根据各个节点的负载来指定任务的
duplicated commit in hstore for clusterRole
Purpose of the PR
subtask of #2265
add
DistributedTaskSchedulerand its correspondingTaskAndResultTransaction:hugegraph.propertiesconfiguration file (default to local).forceparameter to theTaskAPI.delete.Main Changes
For detailed adaptation documentation, refer to https://hugegraph.feishu.cn/wiki/HLIPwe8zei3mTvkWjoJchB0dn3g.
TODO: Currently, there exists duplicated codes in both the
DistributedTaskSchedulerandStandardTaskScheduler. We need to explore options for reuse without compromising existing functionality.Verifying these changes
Does this PR potentially affect the following parts?
Documentation Status
Doc - TODODoc - DoneDoc - No Need