Skip to content

feat: preliminary support for distributed task scheduler#2319

Merged
zyxxoo merged 11 commits intoapache:pd-storefrom
VGalaxies:dev-task-result
Oct 16, 2023
Merged

feat: preliminary support for distributed task scheduler#2319
zyxxoo merged 11 commits intoapache:pd-storefrom
VGalaxies:dev-task-result

Conversation

@VGalaxies
Copy link
Copy Markdown
Contributor

Purpose of the PR

subtask of #2265

add DistributedTaskScheduler and its corresponding TaskAndResultTransaction:

  • The scheduler type can be configured via hugegraph.properties configuration file (default to local).
  • Add a force parameter to the TaskAPI.delete.

Main Changes

For detailed adaptation documentation, refer to https://hugegraph.feishu.cn/wiki/HLIPwe8zei3mTvkWjoJchB0dn3g.

TODO: Currently, there exists duplicated codes in both the DistributedTaskScheduler and StandardTaskScheduler. We need to explore options for reuse without compromising existing functionality.

Verifying these changes

  • Trivial rework / code cleanup without any test coverage. (No Need)
  • Already covered by existing tests, such as (please modify tests here).
  • Need tests and can be verified as follows:
    • validate that the functionality of local task scheduler remains unaffected
    • verify the basic functionality of distributed task scheduler
backend task scheduler test status
hstore local basic graph import
hstore local ldbc validation (without query 4 and 5)
hstore distributed basic graph import
hstore distributed ldbc validation (without query 4 and 5)

Does this PR potentially affect the following parts?

  • Nope
  • Dependencies (add/update license info)
  • Modify configurations
  • The public API
  • Other affects (maybe breaking changes)

Documentation Status

  • Doc - TODO
  • Doc - Done
  • Doc - No Need

@@ -136,11 +136,12 @@ public Map<String, Object> get(@Context GraphManager manager,
@RedirectFilter.RedirectMasterRole
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看 DistributedTaskScheduler 实现,我理解就是所有的节点都会处理任务,通过分布式锁来保证任务只被一个节点处理,原来的方式任务只能被 master 提交,由 master 分配给其他节点处理,所以现在这个逻辑应该不需要了,对吧,硬刺这里RedirectFilter这里的拦截器实现可以加下开关,使用 DistributedTaskScheduler 的时候不生效

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.hugegraph.api.filter.RedirectFilter#filter 处进行了修改,当匹配到 TaskAPI 调用且对应 graph 的 scheduler 类型为 DistributedTaskScheduler 时不进行重定向

}
}

public void cronSchedule() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里个人有个想法可以讨论下哈:这段代码看着应该是每隔一段时间,查询所有的任务,然后过滤出没有被锁的任务,最后驱动这些没有被锁的任务的状态,我理解没错的话,那么其实所有节点都会查询一次所有的任务,再上层会做filter,这里看着似乎可以将这个逻辑下层到store,store 层每次返回给server的节点的批量数据都需要先锁定,然后在返回给server, 如果已经被锁定了,就在store 层过滤掉

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

另外,看文档时说有根据负载来执行任务的,但是这里貌似没看到,是在其他地方还是暂时没实现哈=。=?原来的调度策略是 master 根据各个节点的负载来指定任务的

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

另外,看文档时说有根据负载来执行任务的,但是这里貌似没看到,是在其他地方还是暂时没实现哈=。=?原来的调度策略是 master 根据各个节点的负载来指定任务的

check 了下内部版代码,应该是暂时未实现 😵‍💫

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里个人有个想法可以讨论下哈:这段代码看着应该是每隔一段时间,查询所有的任务,然后过滤出没有被锁的任务,最后驱动这些没有被锁的任务的状态,我理解没错的话,那么其实所有节点都会查询一次所有的任务,再上层会做filter,这里看着似乎可以将这个逻辑下层到store,store 层每次返回给server的节点的批量数据都需要先锁定,然后在返回给server, 如果已经被锁定了,就在store 层过滤掉

我的理解是,这里过滤出没有被锁 (无节点在执行) 的任务主要出于三个目的

  1. 将处于 RUNNING 状态且无节点在执行的任务状态转移到 FAILED
  2. 将处于 CANCELLING 状态且无节点在执行的任务状态转移到 CANCELLED
  3. 将处于 DELETING 状态且无节点在执行的任务删除

如果在 store 层过滤掉已经被锁定的任务,server 层就无法对这些任务进行相应的处理了 (例如对处于 RUNNING 状态的任务初始化环境变量,此时这些任务应当是被锁定的)

this.serverInfoDbExecutor);
this.schedulers.put(graph, scheduler);
LOG.info("Use {} as the scheduler of graph ({})",
graph.schedulerType(), graph.name());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

调度类型似乎全局的更合适,因为如果使用了自研的分布式存储,那其实就不会用原来的调度了,而是使用新的

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zyxxoo 意思是将 task.scheduler_type 这个选项从 CoreOptions 移动到 ServerOptions 中吗?我理解的是每个图实例都可以单独设定调度类型

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以先这样吧,没个图实例单独设置不同的调度类型,意义不大,基本都是一样的

// Do close-tx for current thread
graph.closeTx();
// Let other threads run
Thread.yield();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为啥会有这段代码哈?
为什么要让其他线程run呢

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为啥会有这段代码哈? 为什么要让其他线程run呢

参考的是 close schedulerExecutor 的逻辑 org.apache.hugegraph.task.TaskManager#closeSchedulerTx

}
}

public void cronSchedule() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

另外,看文档时说有根据负载来执行任务的,但是这里貌似没看到,是在其他地方还是暂时没实现哈=。=?原来的调度策略是 master 根据各个节点的负载来指定任务的

@zyxxoo zyxxoo merged commit f34743b into apache:pd-store Oct 16, 2023
@VGalaxies VGalaxies deleted the dev-task-result branch October 16, 2023 08:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

3 participants