Skip to content

Add finalization to aggregate functions#10383

Closed
KochetovNicolai wants to merge 22 commits intomasterfrom
finalization-of-aggregate-functions
Closed

Add finalization to aggregate functions#10383
KochetovNicolai wants to merge 22 commits intomasterfrom
finalization-of-aggregate-functions

Conversation

@KochetovNicolai
Copy link
Copy Markdown
Member

@KochetovNicolai KochetovNicolai commented Apr 20, 2020

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

Changelog category (leave one):

  • Bug Fix

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Fix possible race which could happen when you get result from aggregate function state from multiple thread for the same column. The only way (which I found) it can happen is when you use finalizeAggregation function while reading from table with Memory engine which stores AggregateFunction state for quanite* function.

Add finalization step to aggregate function interface. Also update quantile functions.

@blinkov blinkov added the pr-not-for-changelog This PR should not be mentioned in the changelog label Apr 20, 2020
Copy link
Copy Markdown
Member

@alexey-milovidov alexey-milovidov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! And please add your test with UNION ALL...

@KochetovNicolai KochetovNicolai marked this pull request as ready for review April 21, 2020 20:10
@alexey-milovidov alexey-milovidov added pr-bugfix Pull request with bugfix, not backported by default pr-no-backport and removed pr-not-for-changelog This PR should not be mentioned in the changelog labels Apr 22, 2020
@alexey-milovidov
Copy link
Copy Markdown
Member

Need to adjust description, because it's a bugfix and it should be in changelog.

@blinkov blinkov added the pr-not-for-changelog This PR should not be mentioned in the changelog label Apr 22, 2020
@KochetovNicolai KochetovNicolai removed the pr-not-for-changelog This PR should not be mentioned in the changelog label Apr 22, 2020
@alexey-milovidov
Copy link
Copy Markdown
Member

2020-04-23 09:21:26 /build/obj-x86_64-linux-gnu/../src/Interpreters/Aggregator.cpp:984:10: error: 'auto &function' can be declared as 'const auto &function' [readability-qualified-auto,-warnings-as-errors]
2020-04-23 09:21:26     for (auto & function : aggregate_functions)
2020-04-23 09:21:26          ^~~~~~
2020-04-23 09:21:26          const auto &

if (current.column)
current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues();
{
auto mut_column = (*std::move(current.column)).mutate();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're performing finalization on a transient mutable copy of source column, that we will discard right away? Can we make it more explicit, like, a separate method for finalization of states + the old method that extracts values from states? It would be easier to reason about this code. For now, it's not immediately clear that it is correct -- if the column is not shared, why do any mutations and just not assert ownership and change in place. And if it is shared, we will perform finalization many times instead of one.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're performing finalization on a transient mutable copy of source column, that we will discard right away?

Yes.

Can we make it more explicit, like, a separate method for finalization of states + the old method that extracts values from states?

I can add new finalization method to ColumnAggregateFunction, but don't see the reason. It will make things more complex: ColumnAggregateFunction will have two states: finalized and not finalized, and we will need to check that convertToValues was called after finalization.
Also, finalize won't be constant too, so we will mutate column anyway.

if the column is not shared, why do any mutations and just not assert ownership and change in place

If column is not shared, mutation does exactly what you said:

    MutablePtr shallowMutate() const
    {
        if (this->use_count() > 1)
            return derived()->clone();
        else
            return assumeMutable();
    }

And if it is shared, we will perform finalization many times instead of one.

Yes, and I think it's ok. Here column is constant, and it is a bad idea change something in constant column. It may be done under mutex, but even then we don't know if columns are logically same or different. It may be possible that other copy of column is being used to merge aggregate states together. So, such mutex must protect any read access to aggregation state data.

But actually, this column almost always is not shared (so, mutation is free). The only counterexample I found is in test below (which also causes race condition on master now).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will make things more complex: ColumnAggregateFunction will have two states: finalized and not finalized, and we will need to check that convertToValues was called after finalization.

It does already have these two states, only they are not explicit. It would be good if we could merge them together, but I'm not sure how to do it. Why do non-finalized columns leave Aggregator at all? Is it for distributed aggregation, so that we can pass around blocks with non-finalized values?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I know, we serialize non-finalized columns.

@akuzm
Copy link
Copy Markdown
Contributor

akuzm commented Apr 29, 2020

A general question: what is finalization? Presumably, an aggregation function has some complex state in run time, and then we finalize it before we can calculate the result.

  • can we add more values to finalized state?
  • can we merge finalized states? finalized to unfinalized?
  • what state we serialize in aggregate function state columns?

This should be in the comments and assertions.

@akuzm
Copy link
Copy Markdown
Contributor

akuzm commented Apr 29, 2020

In general, if we can't put finalization into the function that calculates the result -- it means that finalization is some meaningful stage of computation that is distinct from the stage where we compute the final result. We must describe what data we have on this stage, when does it happen as opposed to computation of result, what is its relation to all other stages (adding more data, merging, serialization), and what transitions between these stages are possible.

create table test_quantile (x AggregateFunction(quantileTiming(0.2), UInt64)) engine = Memory;
insert into test_quantile select medianTimingState(.2)(number) from (select * from numbers(1000) order by number desc);
select y from (
select finalizeAggregation(x) as y from test_quantile union all
Copy link
Copy Markdown
Contributor

@akuzm akuzm Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so the race is between concurrent convertToValue calls that share the same source column? Like, it modifies the column despite the fact that it's constant, and turns out that it's shared. So the progressive steps to improvement would be:

  1. assert that the column is not shared
  2. add column mutation -- this is what you're doing in this PR

But this change would not require adding finalization to interface?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this concrete issue may be solved this way.
However, it's wired for me, that some aggregate states now change itself at the time we get aggregation result. It looks like reading operation, and race I found is just an consequence of this behavior.
This pr was not made to fix this race. In the opposite, this race was found because I expected it may happen in current approach.
Finalization is a logical step itself, and is needed to merge pr with clusterisation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so this is a bug fix + interface for new PR. Do you mean this one: #6185 ?
I'll take a look at it, maybe it'll help me understand what's going on.

@KochetovNicolai
Copy link
Copy Markdown
Member Author

I will add more comments to IAggregateFunction interface to clarify your questions.

@KochetovNicolai
Copy link
Copy Markdown
Member Author

Cool! I will add changes to other functions.


compress();
if (unmerged > 0)
throw Poco::Exception("QuantileTDigest was not finalized");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the code should be LOGICAL_ERROR (or assert?). See the one below as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the ones in ReservoirSampler.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make it LOGICAL_ERROR.
Just this class didn't use our exceptions, so I used Poco::Exception too.

{
if (sorted)
return;
sorted = true;
Copy link
Copy Markdown
Contributor

@akuzm akuzm May 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this flag now, or even this function, when we have explicit finalization? The above exceptions could become assert(std::is_sorted(...)).

This is another sign of the problem that I'm talking about -- finalization is not explicit enough, we're not really sure when the state is finalized and when it's not, so in some places we add additional checks, and in some places we don't... So when reading this code in the future, it's going to be even harder to answer this question.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's replace it to assert. I agree it will be more reasonable.

virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0;

/// Finalize state. This function is called once after all 'add' and 'merge' calls. Only if isFinalizationNeeded().
/// 'add', 'merge', and 'serialize' can't be performed after 'finalize' call.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can't serialize the finalized state, this means it would be a mistake to send a block with a finalized ColumnAggregateFunction over network? Is there some check to guarantee we won't do this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add, though I don't know how. We can't just check it for every state.
The best way I see is to support invariants in Aggregator and ColumnAggregateFunction interfaces:

  • Aggregator can return column with already calculated values or ColumnAggregateFunction. There is no way to get raw aggregating data.
  • ColumnAggregateFunction always stores not finalized aggregation states.
  • We can only once call ColumnAggregateFunction::convertToValues. After that states will be finalized only once, and ColumnAggregateFunction shouldn't be used any more. Let's make convertToValues rvalue-qualified.

@akuzm
Copy link
Copy Markdown
Contributor

akuzm commented May 12, 2020

Given the above discussion, I am inclined to think that the finalized aggregation function state is sufficiently distinct to warrant having a separate data type for it. Also, finalizeAggregation(state) should be rewritten by the planner to something like getResult(finalize(state)), which would enable proper CSE that avoids multiple finalization.

For the time being, maybe we can add a flag isFinalized to ColumnAggregateFunction, and check it when we finalize, serialize and convert to result? I can't see a way to confine the non-finalized (or, instead, the finalized) state to some tiny portion of code, so we have to embrace these states and make them explicit.

@KochetovNicolai
Copy link
Copy Markdown
Member Author

Right now I don't like to make a new type, because it does not actually represent new way of soring data, operations or performance improvement. The only thing we can do is to throw exception in respect to isFinalized flag.

Adding finalization is initially a clarification of convertToValues method. Like, we separate the part of convertToValues which can modify data.

Now I think that we may just change convertToValues guarantee. Maybe let it modify data, but expect that it is called only once, and after that aggregation state can't be used at all.

@KochetovNicolai
Copy link
Copy Markdown
Member Author

Closing it in favor of #10890

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-bugfix Pull request with bugfix, not backported by default pr-no-backport

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants