Skip to content

Speed up GROUP BY for large states #79428

@alexey-milovidov

Description

@alexey-milovidov

Consider the following algorithm:

Every thread aggregates into a local hash table, as usual.

But if the local hash table is larger than a certain threshold (checked on every block), it will only look up and update existing entries there.

For entries that are not found in the local hash table, we will use a single shared two-level hash table, consisted of 256 buckets, and we will use 256 mutexes. We can say that the local hash table contains frequent keys and the shared hash table contains rare keys.

Every thread for every record that didn't fit into the local hash table, will take the calculated hash of the key, find a bucket number, and put a reference to this record into an array, corresponding to the bucket. We can name these arrays as records for delayed processing per buckets. The operation of collecting these records by buckets can be named "shuffling"/"partitioning". Also it will hold the current block from destruction until these records are processed. There will be a shared state of the lists of these arrays and the corresponding blocks. The lists are protected with a mutex, so each thread can add a new array of records there or delete a processed array.

Then every thread will try to lock a mutex of the arbitrary bucket of the shared hash table, which means - it will process all delayed records for the corresponding bucket. It will iterate over the list of delayed records and put them into the shared hash table. This will be repeated until there are no delayed records or every bucket is locked. This operation is done in every thread after processing of every block.

After all the data is processed, we will merge every local hash table into the shared hash table, which represents the final result.

Advantages:

  • almost ideal parallelization for typical distribution of keys;
  • uses less amount of memory when most of the keys are found a few times;
  • not worse than the default algorithm when the number of keys is small;
  • all locking is coarse, so we don't spend much time on locking or atomics; and there is also no waiting on locks.

Disadvantages:

  • degrades if the frequent keys are not in the first millions of processed records;
  • placing hashes and record numbers can make it slightly more computationally expensive;

It reminds "splitting aggregator" (2016) and "flat combining" (2022).

Describe the situation

I want it faster.

How to reproduce

I always want it faster.

Expected performance

Faster.

Additional context

No response

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions