Skip to content

[proposal] Reuse common expressions in a query #14196

@gortiz

Description

@gortiz

Introduction

In multi-stage query engine it is not very difficult to create queries that end up reading from the same table or executing the same join twice. This is specially easy when writing a SQL query using WITH expressions, but that is not the only case.

For example, a query like:

SELECT * 
FROM T1 
JOIN T2 as t2first 
    ON T1.col1 = t2first.col2 
JOIN T2 as t2second 
    ON t2first.col3 = t2second.col3

Generates the following plan:

flowchart BT
    J2([JOIN 2])
    J1([JOIN 1])
    S1([Scan T1])
    S2([Scan T2])
    S22([Scan T2])
    
    J2 --> J1
    S1 --> J2
    S2 --> J2
    S22 --> J1
Loading

In this case, we have two instances of Scan T2, which means it is executed twice and therefore the table is scanned twice. That may be already expensive, but it gets even worse when the subtree is more complex. For example imagine if it was a join or an aggregation.
To avoid this problem, we can reuse the subtree and therefore read T2 only once. In our example, the plan should be something like:

flowchart BT
    J2([JOIN 2])
    J1([JOIN 1])
    S1([Scan T1])
    S2([Scan T2])
    
    J2 --> J1
    S1 --> J2
    S2 --> J2
    S2 --> J1
Loading

This feature is supported in some databases and it is known as Spool in Calcite.

Design

The design document can be found here. This document includes some issues we can expect and how we could resolve them.

Current proposal

The proposal consider that the equivalence unit is the stage. We are not going to look for equivalent individual expressions in the tree but for equivalent stages. If at least two stages are found to be equivalent, then a new stage is created. This stage is equivalent to the equivalent stages in all operators but the mailbox send operator. The equivalent stages are then substituted by new stages with the same id but simplified to be a pair of receive and send mailboxes. The newly created stage broadcast all rows to the simplified stages.

For example, this query:

flowchart RL
    R([Root])
    J2([JOIN 2])
    J1([JOIN 1])
    S1([Scan T1])
    S2([Scan T2])

    subgraph Stage 1
        R
        Receiver1([Receive 1])
        Receiver1 --> R
    end

    subgraph Stage 2
        J1
        Sender2([Send 2])
        Receiver21([Receive 2.1])
        Receiver22([Receive 2.2])
        J1 --> Sender2
        Sender2 --> Receiver1
        Receiver21 -- hashtable --> J1
        Receiver22 -- stream --> J1
    end

    subgraph Stage 3
        J2
        Sender3([Send 3])
        Receiver31([Receive 3.1])
        Receiver32([Receive 3.2])

        J2 --> Sender3
        Sender3 -- broadcast --> Receiver21
        Receiver31 -- stream --> J2
        Receiver32 -- hashtable --> J2
    end

    subgraph Stage 4
        S1
        Sender4([Send 4])
        Sender4 -- random --> Receiver31
        S1 --> Sender4
    end

    subgraph Stage 5
        S2 --> Sender5
        Sender5([Send 5])
        
        Sender5 -- random --> Receiver22
    end
    
    subgraph Stage 6
        T22([Scan T2])
        Sender6([Send 6])
        Sender6 -- broadcast --> Receiver32
        T22 --> Sender6
    end
Loading

Will be transformed into:

flowchart RL
    R([Root])
    J2([JOIN 2])
    J1([JOIN 1])
    S1([Scan T1])
    S2([Scan T2])

    subgraph Stage 1
        R
        Receiver1([Receive 1])
        Receiver1 --> R
    end

    subgraph Stage 2
        J1
        Sender2([Send 2])
        Receiver21([Receive 2.1])
        Receiver22([Receive 2.2])
        J1 --> Sender2
        Sender2 --> Receiver1
        Receiver21 -- hashtable --> J1
        Receiver22 -- stream --> J1
    end
    
    subgraph Stage 3
        J2
        Sender3([Send 3])
        Receiver31([Receive 3.1])
        Receiver32([Receive 3.2])

        J2 --> Sender3
        Sender3 -- broadcast --> Receiver21
        Receiver31 -- stream --> J2
        Receiver32 -- hashtable --> J2
    end

    subgraph Stage 4
        S1
        Sender4([Send 4])
        Sender4 -- random --> Receiver31
        S1 --> Sender4
    end

    subgraph Same Server

    subgraph Stage 6
        Receiver6([Receiver 6])
        Sender6([Send 6])

        Receiver6 --> Sender6
        Sender6 -- broadcast --> Receiver32
    end

    subgraph Stage 7
        Receiver7([Receiver 6])
        Sender7([Send 6])

        Receiver7 --> Sender7
        Sender7 -- random --> Receiver22
    end

    subgraph Stage 5
        Sender5([Send 5])
        
        S2 --> Sender5
        Sender5 -- broadcast in memory --> Receiver7
        Sender5 -- broadcast in memory --> Receiver6
    end
    end
Loading

Challenges

Blocking and buffering

Given the blocking nature of some of our operators, some queries may end up in a inter-block when this feature is used. The design document describes one of this cases. The way we can face this problem is to buffer the data, probably offheap or even on disk. Alternatively we could just fail in this scenarios.

TODO list

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions