-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
Introduction
In multi-stage query engine it is not very difficult to create queries that end up reading from the same table or executing the same join twice. This is specially easy when writing a SQL query using WITH expressions, but that is not the only case.
For example, a query like:
SELECT *
FROM T1
JOIN T2 as t2first
ON T1.col1 = t2first.col2
JOIN T2 as t2second
ON t2first.col3 = t2second.col3Generates the following plan:
flowchart BT
J2([JOIN 2])
J1([JOIN 1])
S1([Scan T1])
S2([Scan T2])
S22([Scan T2])
J2 --> J1
S1 --> J2
S2 --> J2
S22 --> J1
In this case, we have two instances of Scan T2, which means it is executed twice and therefore the table is scanned twice. That may be already expensive, but it gets even worse when the subtree is more complex. For example imagine if it was a join or an aggregation.
To avoid this problem, we can reuse the subtree and therefore read T2 only once. In our example, the plan should be something like:
flowchart BT
J2([JOIN 2])
J1([JOIN 1])
S1([Scan T1])
S2([Scan T2])
J2 --> J1
S1 --> J2
S2 --> J2
S2 --> J1
This feature is supported in some databases and it is known as Spool in Calcite.
Design
The design document can be found here. This document includes some issues we can expect and how we could resolve them.
Current proposal
The proposal consider that the equivalence unit is the stage. We are not going to look for equivalent individual expressions in the tree but for equivalent stages. If at least two stages are found to be equivalent, then a new stage is created. This stage is equivalent to the equivalent stages in all operators but the mailbox send operator. The equivalent stages are then substituted by new stages with the same id but simplified to be a pair of receive and send mailboxes. The newly created stage broadcast all rows to the simplified stages.
For example, this query:
flowchart RL
R([Root])
J2([JOIN 2])
J1([JOIN 1])
S1([Scan T1])
S2([Scan T2])
subgraph Stage 1
R
Receiver1([Receive 1])
Receiver1 --> R
end
subgraph Stage 2
J1
Sender2([Send 2])
Receiver21([Receive 2.1])
Receiver22([Receive 2.2])
J1 --> Sender2
Sender2 --> Receiver1
Receiver21 -- hashtable --> J1
Receiver22 -- stream --> J1
end
subgraph Stage 3
J2
Sender3([Send 3])
Receiver31([Receive 3.1])
Receiver32([Receive 3.2])
J2 --> Sender3
Sender3 -- broadcast --> Receiver21
Receiver31 -- stream --> J2
Receiver32 -- hashtable --> J2
end
subgraph Stage 4
S1
Sender4([Send 4])
Sender4 -- random --> Receiver31
S1 --> Sender4
end
subgraph Stage 5
S2 --> Sender5
Sender5([Send 5])
Sender5 -- random --> Receiver22
end
subgraph Stage 6
T22([Scan T2])
Sender6([Send 6])
Sender6 -- broadcast --> Receiver32
T22 --> Sender6
end
Will be transformed into:
flowchart RL
R([Root])
J2([JOIN 2])
J1([JOIN 1])
S1([Scan T1])
S2([Scan T2])
subgraph Stage 1
R
Receiver1([Receive 1])
Receiver1 --> R
end
subgraph Stage 2
J1
Sender2([Send 2])
Receiver21([Receive 2.1])
Receiver22([Receive 2.2])
J1 --> Sender2
Sender2 --> Receiver1
Receiver21 -- hashtable --> J1
Receiver22 -- stream --> J1
end
subgraph Stage 3
J2
Sender3([Send 3])
Receiver31([Receive 3.1])
Receiver32([Receive 3.2])
J2 --> Sender3
Sender3 -- broadcast --> Receiver21
Receiver31 -- stream --> J2
Receiver32 -- hashtable --> J2
end
subgraph Stage 4
S1
Sender4([Send 4])
Sender4 -- random --> Receiver31
S1 --> Sender4
end
subgraph Same Server
subgraph Stage 6
Receiver6([Receiver 6])
Sender6([Send 6])
Receiver6 --> Sender6
Sender6 -- broadcast --> Receiver32
end
subgraph Stage 7
Receiver7([Receiver 6])
Sender7([Send 6])
Receiver7 --> Sender7
Sender7 -- random --> Receiver22
end
subgraph Stage 5
Sender5([Send 5])
S2 --> Sender5
Sender5 -- broadcast in memory --> Receiver7
Sender5 -- broadcast in memory --> Receiver6
end
end
Challenges
Blocking and buffering
Given the blocking nature of some of our operators, some queries may end up in a inter-block when this feature is used. The design document describes one of this cases. The way we can face this problem is to buffer the data, probably offheap or even on disk. Alternatively we could just fail in this scenarios.
TODO list
- Define when two stages are equivalent. [spool] Define equivalence between stages #14296
- Modify MailboxSendNode to be able to send to multiple stages. [Spool] Introduce stage replacer and change send nodes to be able to send to more than one stage #14495
-
MailboxSendOperatorand the plan serialization protocol to support mailboxes that send to multiple stages. In [Spool] Actual implementation #14507 - Introduce a flag that enables/disables this optimization. In [Spool] Actual implementation #14507
- Document this feature
- Optional: Fix stats (right now stats will be counted twice in spools).
- Optional: Study the impact explain with segment info.
- Optional: Introduce a buffer to solve blocking issues (Spool4 #14672)