Add finalization to aggregate functions#10383
Add finalization to aggregate functions#10383KochetovNicolai wants to merge 22 commits intomasterfrom
Conversation
alexey-milovidov
left a comment
There was a problem hiding this comment.
Awesome! And please add your test with UNION ALL...
|
Need to adjust description, because it's a bugfix and it should be in changelog. |
|
| if (current.column) | ||
| current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues(); | ||
| { | ||
| auto mut_column = (*std::move(current.column)).mutate(); |
There was a problem hiding this comment.
So we're performing finalization on a transient mutable copy of source column, that we will discard right away? Can we make it more explicit, like, a separate method for finalization of states + the old method that extracts values from states? It would be easier to reason about this code. For now, it's not immediately clear that it is correct -- if the column is not shared, why do any mutations and just not assert ownership and change in place. And if it is shared, we will perform finalization many times instead of one.
There was a problem hiding this comment.
So we're performing finalization on a transient mutable copy of source column, that we will discard right away?
Yes.
Can we make it more explicit, like, a separate method for finalization of states + the old method that extracts values from states?
I can add new finalization method to ColumnAggregateFunction, but don't see the reason. It will make things more complex: ColumnAggregateFunction will have two states: finalized and not finalized, and we will need to check that convertToValues was called after finalization.
Also, finalize won't be constant too, so we will mutate column anyway.
if the column is not shared, why do any mutations and just not assert ownership and change in place
If column is not shared, mutation does exactly what you said:
MutablePtr shallowMutate() const
{
if (this->use_count() > 1)
return derived()->clone();
else
return assumeMutable();
}
And if it is shared, we will perform finalization many times instead of one.
Yes, and I think it's ok. Here column is constant, and it is a bad idea change something in constant column. It may be done under mutex, but even then we don't know if columns are logically same or different. It may be possible that other copy of column is being used to merge aggregate states together. So, such mutex must protect any read access to aggregation state data.
But actually, this column almost always is not shared (so, mutation is free). The only counterexample I found is in test below (which also causes race condition on master now).
There was a problem hiding this comment.
It will make things more complex: ColumnAggregateFunction will have two states: finalized and not finalized, and we will need to check that convertToValues was called after finalization.
It does already have these two states, only they are not explicit. It would be good if we could merge them together, but I'm not sure how to do it. Why do non-finalized columns leave Aggregator at all? Is it for distributed aggregation, so that we can pass around blocks with non-finalized values?
There was a problem hiding this comment.
Ah, I know, we serialize non-finalized columns.
|
A general question: what is finalization? Presumably, an aggregation function has some complex state in run time, and then we finalize it before we can calculate the result.
This should be in the comments and assertions. |
|
In general, if we can't put finalization into the function that calculates the result -- it means that finalization is some meaningful stage of computation that is distinct from the stage where we compute the final result. We must describe what data we have on this stage, when does it happen as opposed to computation of result, what is its relation to all other stages (adding more data, merging, serialization), and what transitions between these stages are possible. |
| create table test_quantile (x AggregateFunction(quantileTiming(0.2), UInt64)) engine = Memory; | ||
| insert into test_quantile select medianTimingState(.2)(number) from (select * from numbers(1000) order by number desc); | ||
| select y from ( | ||
| select finalizeAggregation(x) as y from test_quantile union all |
There was a problem hiding this comment.
Oh, so the race is between concurrent convertToValue calls that share the same source column? Like, it modifies the column despite the fact that it's constant, and turns out that it's shared. So the progressive steps to improvement would be:
- assert that the column is not shared
- add column mutation -- this is what you're doing in this PR
But this change would not require adding finalization to interface?
There was a problem hiding this comment.
Yes, this concrete issue may be solved this way.
However, it's wired for me, that some aggregate states now change itself at the time we get aggregation result. It looks like reading operation, and race I found is just an consequence of this behavior.
This pr was not made to fix this race. In the opposite, this race was found because I expected it may happen in current approach.
Finalization is a logical step itself, and is needed to merge pr with clusterisation.
There was a problem hiding this comment.
OK, so this is a bug fix + interface for new PR. Do you mean this one: #6185 ?
I'll take a look at it, maybe it'll help me understand what's going on.
|
I will add more comments to |
|
Some other broken aggregate functions that modify data in insertResultInto: |
|
Cool! I will add changes to other functions. |
|
|
||
| compress(); | ||
| if (unmerged > 0) | ||
| throw Poco::Exception("QuantileTDigest was not finalized"); |
There was a problem hiding this comment.
Looks like the code should be LOGICAL_ERROR (or assert?). See the one below as well.
There was a problem hiding this comment.
And the ones in ReservoirSampler.
There was a problem hiding this comment.
I can make it LOGICAL_ERROR.
Just this class didn't use our exceptions, so I used Poco::Exception too.
| { | ||
| if (sorted) | ||
| return; | ||
| sorted = true; |
There was a problem hiding this comment.
Do we still need this flag now, or even this function, when we have explicit finalization? The above exceptions could become assert(std::is_sorted(...)).
This is another sign of the problem that I'm talking about -- finalization is not explicit enough, we're not really sure when the state is finalized and when it's not, so in some places we add additional checks, and in some places we don't... So when reading this code in the future, it's going to be even harder to answer this question.
There was a problem hiding this comment.
Let's replace it to assert. I agree it will be more reasonable.
| virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0; | ||
|
|
||
| /// Finalize state. This function is called once after all 'add' and 'merge' calls. Only if isFinalizationNeeded(). | ||
| /// 'add', 'merge', and 'serialize' can't be performed after 'finalize' call. |
There was a problem hiding this comment.
If we can't serialize the finalized state, this means it would be a mistake to send a block with a finalized ColumnAggregateFunction over network? Is there some check to guarantee we won't do this?
There was a problem hiding this comment.
It would be nice to add, though I don't know how. We can't just check it for every state.
The best way I see is to support invariants in Aggregator and ColumnAggregateFunction interfaces:
Aggregatorcan return column with already calculated values orColumnAggregateFunction. There is no way to get raw aggregating data.ColumnAggregateFunctionalways stores not finalized aggregation states.- We can only once call
ColumnAggregateFunction::convertToValues. After that states will be finalized only once, andColumnAggregateFunctionshouldn't be used any more. Let's makeconvertToValuesrvalue-qualified.
|
Given the above discussion, I am inclined to think that the finalized aggregation function state is sufficiently distinct to warrant having a separate data type for it. Also, For the time being, maybe we can add a flag |
|
Right now I don't like to make a new type, because it does not actually represent new way of soring data, operations or performance improvement. The only thing we can do is to throw exception in respect to Adding finalization is initially a clarification of Now I think that we may just change |
|
Closing it in favor of #10890 |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Fix possible race which could happen when you get result from aggregate function state from multiple thread for the same column. The only way (which I found) it can happen is when you use
finalizeAggregationfunction while reading from table withMemoryengine which storesAggregateFunctionstate forquanite*function.Add finalization step to aggregate function interface. Also update quantile functions.