Skip to content

Error with AggregateFunction quantilesTDigest causing part merges to fail #32107

@jamesmaidment

Description

@jamesmaidment

Describe what's wrong

We had an issue with INSERTs into ClickHouse throwing an error:

2021-11-29 18:26:13.177 ESTError message from worker: ru.yandex.clickhouse.except.ClickHouseUnknownException: ClickHouse exception, code: 1002, host: <clickhouse_host>, port: 8123; Code: 252. DB::Exception: Too many parts (301). Merges are processing significantly slower than inserts: while pushing to view default.metrics_shard_stat_10m_view (aa99702d-bd2c-4aca-aa99-702dbd2c6aca): while pushing to view default.metrics_shard_stat_1m_view (c2e4400c-c18e-46ae-82e4-400cc18ef6ae). (TOO_MANY_PARTS) (version 21.11.3.6 (official build))

Upon investigation it appeared that there were a large amount of unmerged parts for the metrics_shard_stat_10m table. This seems to be due to this error thrown during a deserialize of an AggregateFunction quantilesTDigest(). This was also causing the node to have a very high CPU usage.

Does it reproduce on recent release?

We're using ClickHouse server version 21.11.3.6

Table Schema

metrics_shard_stat_1m:

CREATE TABLE default.metrics_shard_stat_1m ON CLUSTER testcluster
(
    metric_id                  UUID,
    date                       Date DEFAULT toDate(bucket) CODEC (DoubleDelta),
    bucket                     DateTime CODEC (DoubleDelta, LZ4),
    s_avg                      AggregateFunction(avg, Float64),
    s_count                    AggregateFunction(count, Float64),
    sum                        SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    sum2                       SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    min                        SimpleAggregateFunction(min, Float64) CODEC (Gorilla),
    max                        SimpleAggregateFunction(max, Float64) CODEC (Gorilla),
    first                      SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    last                       SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    s_linear_regression        AggregateFunction(simpleLinearRegression, UInt32, Float64),
    s_variance                 AggregateFunction(varSamp, Float64),
    s_quantiles                AggregateFunction(quantilesTDigest(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99),
                                   Float64),
    s_avg_nozero               AggregateFunction(avg, Float64),
    s_count_nozero             AggregateFunction(count, Float64),
    sum_nozero                 SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    min_nozero                 SimpleAggregateFunction(min, Float64) CODEC (Gorilla),
    max_nozero                 SimpleAggregateFunction(max, Float64) CODEC (Gorilla),
    first_nozero               SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    last_nozero                SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    s_linear_regression_nozero AggregateFunction(simpleLinearRegression, UInt32, Float64),
    s_variance_nozero          AggregateFunction(varSamp, Float64),
    s_quantiles_nozero         AggregateFunction(quantilesTDigest(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99),
                                   Float64)
) ENGINE = ReplicatedAggregatingMergeTree() PARTITION BY toYYYYMM(bucket)
      ORDER BY (metric_id, date, bucket) SETTINGS index_granularity = 64;

CREATE MATERIALIZED VIEW default.metrics_shard_stat_1m_view ON CLUSTER testcluster TO default.metrics_shard_stat_1m AS
SELECT metric_id,
       date,
       toStartOfMinute(toDateTime64(timestamp, 3))                                         as bucket,
       avgState(value)                                                                     as s_avg,
       countState(value)                                                                   as s_count,
       sum(value)                                                                          as sum,
       sum(value * value)                                                                  as sum2,
       min(value)                                                                          as min,
       max(value)                                                                          as max,
       any(value)                                                                          as first,
       anyLast(value)                                                                      as last,
       quantilesTDigestState(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99)(value)              as s_quantiles,
       simpleLinearRegressionState(toUnixTimestamp(timestamp), value)                      as s_linear_regression,
       varSampState(value)                                                                 as s_variance,
       avgStateIf(value, value > 0)                                                        as s_avg_nozero,
       countStateIf(value, value > 0)                                                      as s_count_nozero,
       sumIf(value, value > 0)                                                             as sum_nozero,
       minIf(value, value > 0)                                                             as min_nozero,
       maxIf(value, value > 0)                                                             as max_nozero,
       anyIf(value, value > 0)                                                             as first_nozero,
       anyLastIf(value, value > 0)                                                         as last_nozero,
       quantilesTDigestStateIf(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99)(value, value > 0) as s_quantiles_nozero,
       simpleLinearRegressionStateIf(toUnixTimestamp(timestamp), value, value >
                                                                        0)                 as s_linear_regression_nozero,
       varSampStateIf(value, value > 0)                                                    as s_variance_nozero
FROM default.metrics_shard
GROUP BY metric_id,
         date,
         bucket;

CREATE TABLE default.metrics_stat_1m ON CLUSTER testcluster
(
    metric_id                  UUID,
    date                       Date DEFAULT toDate(bucket) CODEC (DoubleDelta),
    bucket                     DateTime CODEC (DoubleDelta, LZ4),
    s_avg                      AggregateFunction(avg, Float64),
    s_count                    AggregateFunction(count, Float64),
    sum                        SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    sum2                       SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    min                        SimpleAggregateFunction(min, Float64) CODEC (Gorilla),
    max                        SimpleAggregateFunction(max, Float64) CODEC (Gorilla),
    first                      SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    last                       SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    s_quantiles                AggregateFunction(quantilesTDigest(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99),
                                   Float64),
    s_linear_regression        AggregateFunction(simpleLinearRegression, UInt32, Float64),
    s_variance                 AggregateFunction(varSamp, Float64),
    s_avg_nozero               AggregateFunction(avg, Float64),
    s_count_nozero             AggregateFunction(count, Float64),
    sum_nozero                 SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    min_nozero                 SimpleAggregateFunction(min, Float64) CODEC (Gorilla),
    max_nozero                 SimpleAggregateFunction(max, Float64) CODEC (Gorilla),
    first_nozero               SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    last_nozero                SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    s_quantiles_nozero         AggregateFunction(quantilesTDigest(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99),
                                   Float64),
    s_linear_regression_nozero AggregateFunction(simpleLinearRegression, UInt32, Float64),
    s_variance_nozero          AggregateFunction(varSamp, Float64)
) ENGINE = Distributed(
           testcluster,
           default,
           metrics_shard_stat_1m,
           farmFingerprint64(metric_id)
    );

metrics_shard_stat_10m:

CREATE TABLE default.metrics_shard_stat_10m ON cluster testcluster
(
    metric_id                  UUID,
    date                       Date DEFAULT toDate(bucket) CODEC (DoubleDelta),
    bucket                     DateTime CODEC (DoubleDelta, LZ4),
    s_avg                      AggregateFunction(avg, Float64),
    s_count                    AggregateFunction(count, Float64),
    sum                        SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    sum2                       SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    min                        SimpleAggregateFunction(min, Float64) CODEC (Gorilla),
    max                        SimpleAggregateFunction(max, Float64) CODEC (Gorilla),
    first                      SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    last                       SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    s_linear_regression        AggregateFunction(simpleLinearRegression, UInt32, Float64),
    s_variance                 AggregateFunction(varSamp, Float64),
    s_quantiles                AggregateFunction(quantilesTDigest(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99),
                                   Float64),
    s_avg_nozero               AggregateFunction(avg, Float64),
    s_count_nozero             AggregateFunction(count, Float64),
    sum_nozero                 SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    min_nozero                 SimpleAggregateFunction(min, Float64) CODEC (Gorilla),
    max_nozero                 SimpleAggregateFunction(max, Float64) CODEC (Gorilla),
    first_nozero               SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    last_nozero                SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    s_linear_regression_nozero AggregateFunction(simpleLinearRegression, UInt32, Float64),
    s_variance_nozero          AggregateFunction(varSamp, Float64),
    s_quantiles_nozero         AggregateFunction(quantilesTDigest(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99),
                                   Float64)
) ENGINE = ReplicatedAggregatingMergeTree() PARTITION BY toYYYYMM(bucket)
      ORDER BY (metric_id, date, bucket) SETTINGS index_granularity = 64;

CREATE MATERIALIZED VIEW default.metrics_shard_stat_10m_view ON CLUSTER testcluster TO default.metrics_shard_stat_10m AS
SELECT metric_id,
       date,
       toStartOfTenMinutes(toDateTime64(bucket, 3))                                             as bucket,
       avgMergeState(s_avg)                                                                     as s_avg,
       countMergeState(s_count)                                                                 as s_count,
       sum(sum)                                                                                 as sum,
       sum(sum2)                                                                                as sum2,
       min(min)                                                                                 as min,
       max(max)                                                                                 as max,
       any(first)                                                                               as first,
       anyLast(last)                                                                            as last,
       quantilesTDigestMergeState(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99)(s_quantiles)        as s_quantiles,
       simpleLinearRegressionMergeState(s_linear_regression)                                    as s_linear_regression,
       varSampMergeState(s_variance)                                                            as s_variance,
       avgMergeState(s_avg_nozero)                                                              as s_avg_nozero,
       countMergeState(s_count_nozero)                                                          as s_count_nozero,
       sum(sum_nozero)                                                                          as sum_nozero,
       min(min_nozero)                                                                          as min_nozero,
       max(max_nozero)                                                                          as max_nozero,
       any(first_nozero)                                                                        as first_nozero,
       anyLast(last_nozero)                                                                     as last_nozero,
       quantilesTDigestMergeState(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99)(s_quantiles_nozero) as s_quantiles_nozero,
       simpleLinearRegressionMergeState(s_linear_regression_nozero)                             as s_linear_regression_nozero,
       varSampMergeState(s_variance_nozero)                                                     as s_variance_nozero
FROM default.metrics_shard_stat_1m
GROUP BY metric_id,
         date,
         bucket;

CREATE TABLE default.metrics_stat_10m ON CLUSTER testcluster
(
    metric_id                  UUID,
    date                       Date DEFAULT toDate(bucket) CODEC (DoubleDelta),
    bucket                     DateTime CODEC (DoubleDelta, LZ4),
    s_avg                      AggregateFunction(avg, Float64),
    s_count                    AggregateFunction(count, Float64),
    sum                        SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    sum2                       SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    min                        SimpleAggregateFunction(min, Float64) CODEC (Gorilla),
    max                        SimpleAggregateFunction(max, Float64) CODEC (Gorilla),
    first                      SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    last                       SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    s_quantiles                AggregateFunction(quantilesTDigest(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99),
                                   Float64),
    s_linear_regression        AggregateFunction(simpleLinearRegression, UInt32, Float64),
    s_variance                 AggregateFunction(varSamp, Float64),
    s_avg_nozero               AggregateFunction(avg, Float64),
    s_count_nozero             AggregateFunction(count, Float64),
    sum_nozero                 SimpleAggregateFunction(sum, Float64) CODEC (Gorilla),
    min_nozero                 SimpleAggregateFunction(min, Float64) CODEC (Gorilla),
    max_nozero                 SimpleAggregateFunction(max, Float64) CODEC (Gorilla),
    first_nozero               SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    last_nozero                SimpleAggregateFunction(any, Float64) CODEC (Gorilla),
    s_quantiles_nozero         AggregateFunction(quantilesTDigest(0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99),
                                   Float64),
    s_linear_regression_nozero AggregateFunction(simpleLinearRegression, UInt32, Float64),
    s_variance_nozero          AggregateFunction(varSamp, Float64)
) ENGINE = Distributed(
           testcluster,
           default,
           metrics_shard_stat_10m,
           farmFingerprint64(metric_id)
    );

Expected behavior

quantilesTDigest() function should work or throw a meaningful without causing part merges to be blocked forever.

Error message and/or stacktrace

2021.12.01 03:58:30.375438 [ 62 ] {} <Error> void DB::MergeTreeBackgroundExecutor<DB::MergeMutateRuntimeQueue>::routine(DB::TaskRuntimeDataPtr) [Queue = DB::MergeMutateRuntimeQueue]: Code: 27. DB::Exception: Invalid centroid 2.000000:-nan: (while reading column s_quantiles): (while reading from part /var/lib/clickhouse/store/29e/29ec85cb-e183-4946-a9ec-85cbe1837946/202111_5921759_5923975_485/ from mark 5 with max_rows_to_read = 64): While executing MergeTreeSequentialSource. (CANNOT_PARSE_INPUT_ASSERTION_FAILED), Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0x9b63054 in /usr/bin/clickhouse
1. DB::QuantileTDigest<double>::deserialize(DB::ReadBuffer&) @ 0xa51b433 in /usr/bin/clickhouse
2. DB::SerializationAggregateFunction::deserializeBinaryBulk(DB::IColumn&, DB::ReadBuffer&, unsigned long, double) const @ 0x11cd71fa in /usr/bin/clickhouse
3. DB::ISerialization::deserializeBinaryBulkWithMultipleStreams(COW<DB::IColumn>::immutable_ptr<DB::IColumn>&, unsigned long, DB::ISerialization::DeserializeBinaryBulkSettings&, std::__1::shared_ptr<DB::ISerialization::DeserializeBinaryBulkState>&, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, COW<DB::IColumn>::immutable_ptr<DB::IColumn>, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, COW<DB::IColumn>::immutable_ptr<DB::IColumn> > > >*) const @ 0x11cd4435 in /usr/bin/clickhouse
4. DB::MergeTreeReaderWide::readData(DB::NameAndTypePair const&, COW<DB::IColumn>::immutable_ptr<DB::IColumn>&, unsigned long, bool, unsigned long, unsigned long, std::__1::unordered_map<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, COW<DB::IColumn>::immutable_ptr<DB::IColumn>, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const, COW<DB::IColumn>::immutable_ptr<DB::IColumn> > > >&, bool) @ 0x12e854a2 in /usr/bin/clickhouse
5. DB::MergeTreeReaderWide::readRows(unsigned long, unsigned long, bool, unsigned long, std::__1::vector<COW<DB::IColumn>::immutable_ptr<DB::IColumn>, std::__1::allocator<COW<DB::IColumn>::immutable_ptr<DB::IColumn> > >&) @ 0x12e84279 in /usr/bin/clickhouse
6. DB::MergeTreeSequentialSource::generate() @ 0x12e88377 in /usr/bin/clickhouse
7. DB::ISource::tryGenerate() @ 0x131156b5 in /usr/bin/clickhouse
8. DB::ISource::work() @ 0x1311527a in /usr/bin/clickhouse
9. DB::SourceWithProgress::work() @ 0x13321062 in /usr/bin/clickhouse
10. ? @ 0x13130b1b in /usr/bin/clickhouse
11. DB::PipelineExecutor::executeStepImpl(unsigned long, unsigned long, std::__1::atomic<bool>*) @ 0x1312cad1 in /usr/bin/clickhouse
12. DB::PipelineExecutor::executeStep(std::__1::atomic<bool>*) @ 0x1312b2e5 in /usr/bin/clickhouse
13. DB::PullingPipelineExecutor::pull(DB::Chunk&) @ 0x13139d4b in /usr/bin/clickhouse
14. DB::PullingPipelineExecutor::pull(DB::Block&) @ 0x13139fec in /usr/bin/clickhouse
15. DB::MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() @ 0x12d3d52b in /usr/bin/clickhouse
16. DB::MergeTask::ExecuteAndFinalizeHorizontalPart::execute() @ 0x12d3d48b in /usr/bin/clickhouse
17. DB::MergeTask::execute() @ 0x12d4217a in /usr/bin/clickhouse
18. DB::MergePlainMergeTreeTask::executeStep() @ 0x12fbceec in /usr/bin/clickhouse
19. DB::MergeTreeBackgroundExecutor<DB::MergeMutateRuntimeQueue>::routine(std::__1::shared_ptr<DB::TaskRuntimeData>) @ 0x12d50bdd in /usr/bin/clickhouse
20. DB::MergeTreeBackgroundExecutor<DB::MergeMutateRuntimeQueue>::threadFunction() @ 0x12d5167a in /usr/bin/clickhouse
21. ThreadPoolImpl<ThreadFromGlobalPool>::worker(std::__1::__list_iterator<ThreadFromGlobalPool, void*>) @ 0x9ba7d0a in /usr/bin/clickhouse
22. ThreadFromGlobalPool::ThreadFromGlobalPool<void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()>(void&&, void ThreadPoolImpl<ThreadFromGlobalPool>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda0'()&&...)::'lambda'()::operator()() @ 0x9ba9b27 in /usr/bin/clickhouse
23. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x9ba5117 in /usr/bin/clickhouse
24. ? @ 0x9ba8b1d in /usr/bin/clickhouse
25. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so
26. clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so
 (version 21.11.3.6 (official build))

System Settings

connect_timeout_with_failover_ms,1000,1,Connection timeout for selecting first healthy replica.,,,0,Milliseconds
load_balancing,random,1,Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.,,,0,LoadBalancing
distributed_aggregation_memory_efficient,1,1,Is the memory-saving mode of distributed aggregation enabled.,,,0,Bool
log_queries,1,1,Log requests and write the log to the system table.,,,0,Bool
max_memory_usage,10000000000,1,Maximum memory usage for processing of single query. Zero means unlimited.,,,0,UInt64
parallel_view_processing,1,1,Enables pushing to attached views concurrently instead of sequentially.,,,0,Bool
default_database_engine,Ordinary,1,Default database engine.,,,0,DefaultDatabaseEngine

Metadata

Metadata

Assignees

Labels

comp-functionsBuilt-in SQL function implementations + function infrastructure.potential bugTo be reviewed by developers and confirmed/rejected.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions