Apache Doris 源码阅读与解析
第四讲:一条 SQL 的执行过程
缪翎
自我介绍
缪翎
• Apache Doris PPMC
• 百度资深研发工程师
• github id :EmmyMiao87
• 个人博客:https://emmymiao87.github.io/
SELECT * FROM xxx a b c
WHERE xxx
GROUP BY xxx 1 2 3
如果你有如下问题 ORDER BY xxx 2 3 4
• Doris 在接收 SQL 后都做了什么?
• SQL 明明长的都差不多,为啥有的快有的慢?
• 数据是如何一步步从分布式的集群中进行计算,并返回 Doris
结果的?
课程大纲
1. 生成查询计划
• 看懂 Explain 中打印的查询计划
2. 执行查询
• 数据流是如何通过计算一步步变化并返回给 Client
Plan fragment 0
生成查询计划
1. SQL -> PlanNodeTree
2. PlanNodeTree -> PlanFragmentTree Plan fragment 1
SELECT i_category,sum(ss_sales_price)
FROM item JOIN store_sales ON
ss_item_sk=i_item_sk
GROUP BY i_category Plan fragment 2 Plan fragment 3
ORDER BY sum(ss_sales_price)
SortNode
ORDER BY sum(ss_sales_price);
生成查询计划
SELECT i_category,sum(ss_sales_price) AggregationNode
GROUP BY `i_category`
1. SQL -> PlanNodeTree FROM item JOIN store_sales ON sum(`ss_sales_price`)
ss_item_sk=i_item_sk
2. PlanNodeTree -> PlanFragmentTree
GROUP BY i_category
ORDER BY sum(ss_sales_price)
JoinNode
`i_item_sk` = `ss_item_sk`
PlanNode = 逻辑算子
OlapScanNode OlapScanNode
PlanNodeTree = 逻辑执行计划 Table: item Table: store_sales
Plan fragment 1
ResultSink
SortNode
生成查询计划
SortNode AggregationNode
ORDER BY sum(ss_sales_price);
ExchangeNode
1. SQL -> PlanNodeTree
AggregationNode
2. PlanNodeTree -> PlanFragmentTree GROUP BY `i_category` Plan fragment 2
sum(`ss_sales_price`)
DataSink
AggregationNode
JoinNode
`i_item_sk` = `ss_item_sk`
HashJoinNode
ExchangeNode ExchangeNode
OlapScanNode OlapScanNode
Table: item Table: store_sales
Plan fragment 3 Plan fragment 4
DataSink DataSink
OlapScanNode OlapScanNode
生成查询计划 BE 1
1. SQL -> PlanNodeTree
Join Node
2. PlanNodeTree -> PlanFragmentTree JoinNode
`i_item_sk` =
1. 拆分 PlanNodeTree `ss_item_sk`
2. 数据传输 BE 2 BE 3
OlapScanNode OlapScanNode
Table: item Table: store_sales
OlapScanNode OlapScanNode
BE 1
生成查询计划 DataSink
Hash Join Node
1. SQL -> PlanNodeTree Exchange Exchange
Node1 Node2
2. PlanNodeTree -> PlanFragmentTree JoinNode
`i_item_sk` =
1. 拆分 PlanNodeTree `ss_item_sk`
BE 2 BE 3
2. 数据传输
OlapScanNode OlapScanNode
Table: item Table: store_sales
DataSink DataSink
OlapScanNode OlapScanNode
DataSink + ExchangeNode
Plan Fragment 1
生成查询计划 DataSink
Hash Join Node
1. SQL -> PlanNodeTree Exchange Exchange
Node1 Node2
2. PlanNodeTree -> PlanFragmentTree JoinNode
`i_item_sk` =
1. 拆分 PlanNodeTree `ss_item_sk`
2. 数据传输
PlanFragment 2 Plan Fragment 3
OlapScanNode OlapScanNode
Table: item Table: store_sales
DataSink DataSink
OlapScanNode OlapScanNode
Plan Fragment = Plan Node 子树 + Data Sink
Plan Fragment Tree = 分布式查询计划
生成查询计划
SELECT i_category,sum(ss_sales_price)
FROM item JOIN store_sales ON
ss_item_sk=i_item_sk
GROUP BY i_category
ORDER BY sum(ss_sales_price)
SQL
SortNode
ORDER BY sum(ss_sales_price);
生成查询计划
AggregationNode
GROUP BY `i_category`
sum(`ss_sales_price`)
SELECT i_category,sum(ss_sales_price)
FROM item JOIN store_sales ON
ss_item_sk=i_item_sk JoinNode
`i_item_sk` = `ss_item_sk`
GROUP BY i_category
ORDER BY sum(ss_sales_price)
OlapScanNode OlapScanNode
Table: item Table: store_sales
SQL PlanNodeTree
Plan fragment 0
SortNode
ORDER BY sum(ss_sales_price);
生成查询计划
AggregationNode
GROUP BY `i_category`
Plan fragment 1
sum(`ss_sales_price`)
SELECT i_category,sum(ss_sales_price)
FROM item JOIN store_sales ON
ss_item_sk=i_item_sk JoinNode
`i_item_sk` = `ss_item_sk`
GROUP BY i_category
ORDER BY sum(ss_sales_price)
Plan fragment 2 Plan fragment 3
OlapScanNode OlapScanNode
Table: item Table: store_sales
SQL PlanNodeTree PlanFragmentTree
查看查询计划
• Desc graph
• Explain
• Desc verbose
DESC GRAPH
SELECT i_category,sum(ss_sales_price)
FROM item JOIN store_sales ON
ss_item_sk=i_item_sk
GROUP BY i_category
ORDER BY sum(ss_sales_price)
查看查询计划
• Desc graph
• Explain
• Desc verbose
DESC GRAPH
SELECT i_category,sum(ss_sales_price)
FROM item JOIN store_sales ON
ss_item_sk=i_item_sk
GROUP BY i_category
ORDER BY sum(ss_sales_price)
查看查询计划
• Desc graph
• Explain
• Desc verbose
EXPLAIN
SELECT i_category,sum(ss_sales_price)
FROM item JOIN store_sales ON
ss_item_sk=i_item_sk
GROUP BY i_category
ORDER BY sum(ss_sales_price)
课程大纲
1. 生成查询计划
2. 执行查询
• 分配,分发
• 完整的数据流流程
执行查询
1. 分配,分发
Coordinator.java
2. 完整的数据流流程 1. prepare
2. scheduler
• computeScanRange
• assignFragment
3. send
执行查询
FE
1. 分配,分发
2. 完整的数据流流程 Plan fragment 1
数据流
Plan fragment 2 Plan fragment 3
磁盘 磁盘
执行查询 FE
1. 单个 Fragment 执行流程
Plan fragment 1
2. Fragment 和 Fragment 之间的数据交互
3. FE 和 Top Fragment 之间的数据交互
4. FE 将数据返回给前端展示
Plan fragment 2 Plan fragment 3
磁盘 磁盘
Plan fragment 1
DataSink
_sink
PlanFragmentExecutor
执行查询 _plan Hash Join Node
1. prepare
Exchange Exchange
Node1 Node2
2. open
1. 单个 Fragment 执行流程
2. Fragment 之间的数据交互 1. open plan node tree
3. FE 和 Top Fragment 2. open sink
4. FE 将数据返回给前端展示
3. while(true) {
get next batch
send
3. close
执行查询
Plan fragment 1
1. 单个 Fragment 执行流程 DataSink
• _plan->get_next() 自顶向下 get_next
2. Fragment 之间的数据交互
Hash Join Node
3. FE 和 Top Fragment
4. FE 将数据返回给前端展示 Exchange Exchange
Node1 Node2
自底向上 return batch
Plan fragment 1
DataSink
执行查询 _plan Hash Join Node
Exchange Exchange
Node1 Node2
1. 单个 Fragment 执行流程
• HashJoinNode->get_next()
open:准备工作
2. Fragment 之间的数据交互
3. FE 和 Top Fragment 1. 右孩子 get next,构建 hash 表
4. FE 将数据返回给前端展示
get_next: 返回 batch 结果
1. 读取左孩子的一个 batch
2. 根据 hash table 找到 match 的行
3. 左右行拼接成 out_row,放入
out_batch
Exchange Node Exchange Node
receiver receiver
执行查询
分发策略: channel1 channel2 channel1 channel2
1. 单个 Fragment 执行流程
1. HASH_PARTITIONED
2. UNPARTITIONED BE1: DataSink BE2: DataSink
2. Fragment 之间的数据交互
3. …
3. FE 和 Top Fragment
4. FE 将数据返回给前端展示
send 主逻辑
1. 计算 row 的hash值
2. 将 row 放入对应的channel 中
执行查询
FE:Coordinator FE
1. get next batch
from BE
1. 单个 Fragment 执行流程 ResultReceiver
2. 把 batch 放入
2. Fragment 之间的数据交互 mysql channel
3. FE 和 Top Fragment数据交互
4. FE 将数据返回给前端展示
Top Fragment: Result Sink
Row buffer
BE
MysqlResultWriter 1. 把 batch 放
入 row
buffer 中缓
存。
执行查询
1. 单个 Fragment 执行流程
2. Fragment 之间的数据交互 MysqlChannel.java
1. Write buffer
3. FE 和 Top Fragment数据交互 2. 如果达到 capacity 就 send
4. FE 将数据返回给前端展示
Plan fragment 0
总结
1. 生成查询计划
Plan fragment 1
• 逻辑查询计划 PlanNodeTree,每个 PlanNode 代表一种运算。
• 分布式查询计划 PlanFragmentTree ,每个 PlanFragment 是由
PlanNodeTree 的子树 和 Sink 节点组成的。
2. 执行查询
Plan fragment 2 Plan fragment 3
FE
总结
Plan fragment 1
1. 生成查询计划
2. 执行查询
• Plan Fragment Tree 一层层处理数据,FE 获取后,最终返回给用户
• 单个 Fragment 执行,递归调用 get_next 计算结果 Plan fragment 2 Plan fragment 3
• Fragment 和 Fragment 之间, sink 通过 channel 分发数据给上层
Exchange Node
• FE coordinator 不断获取 Top Fragment 的 row buffer 中的数据 磁盘 磁盘
• 通过 Mysql Channel 将数据返回给 Client
总结
• FE • BE
• PlanNode 及子类 • PlanNode 及子类
• PlanFragment • PlanFragmentExecutor
• Coordinator • PlanFragmentMgr
• MysqlChannel • DataSink 及子类
• StmtExecutor • MysqlResultWriter
回到最初的问题
• Doris 在接收 SQL 后都做了什么?
• 生成查询计划,执行查询计划
• SQL 明明长的都差不多,为啥有的快有的慢?
• Explain 一下查询计划,不同的查询计划,执行速度不同。
• 数据是如何一步步从分布式的集群中进行计算,并返回结果的?
• Fragment -> 上层 Fragment -> FE -> Client
期待你们的 PR
• 简单的执行算子实现
• 简单的查询规划优化
• 向量化执行引擎
• 查询优化器
进阶课程
• 查询 Profile 分析
• 查询优化
Apache Doris 直播课程群 Apache Doris 微信公众号
Thank You