Projection Order Propagation#7364
Projection Order Propagation#7364ozankabak merged 21 commits intoapache:mainfrom synnada-ai:feature/projection-order-propagation
Conversation
| --Projection: multiple_ordered_table.b + multiple_ordered_table.a + multiple_ordered_table.c AS result | ||
| ----TableScan: multiple_ordered_table projection=[a, b, c] | ||
| physical_plan | ||
| SortPreservingMergeExec: [result@0 ASC NULLS LAST] |
There was a problem hiding this comment.
The SortPreservingMergeExec seems still rather "nonoptimal" as the batches themselves are already sorted (but the info is thrown away by RepartitionExec). Any plans for addressing this as well in the future?
There was a problem hiding this comment.
Yes, if I'm not mistaken one of the WIP PRs in our pipeline will address this 🙂
There was a problem hiding this comment.
RepartitionExec does not break the order of partitions in this case as it partitions 1 to 4. Because these partitions are already ordered, the presence of SortPreservingMerge is correct. For general cases, SortPreservingRepartitionExec implementation is on the way, which will have the capability to preserve order for all kinds of partitioning.
There was a problem hiding this comment.
What I essentially mean is this:
The batches of the table multiple_ordered_table are ordered and order is preserved in RepartitionExec and ProjectionExec. If SortPreservingMergeExec would know the number of the batch (batch 0, batch 1, batch 2, etc.) as they were returned from the table it would only need to wait on batch 0, batch 1, batch 2, etc. to appear from the partition streams, but not the rows itself, which would be much faster (i.e. not having to merge).
There was a problem hiding this comment.
Is that something you plan as well?
There was a problem hiding this comment.
To maintain batch indices for preserving order, it’s actually a good idea. The current sort preserving algorithm, designed to preserve the hash repartition, tends to overfit to the row sorting. We could potentially collaborate on implementing this optimization in future work.
|
Any objections to this? This is gating some other improvements so I would like to go ahead and merge this if there are no objections. |
|
I have no objections (I haven't had a chance to review it either, but I can do so after the merge) |
|
I am checking this one out now... |
|
Thanks @alamb, feel free to merge when you are done 👍 |
alamb
left a comment
There was a problem hiding this comment.
Thank you for this PR @berkaysynnada -- the tests and the documentation really help.
I don't fully understand all the parts of this PR but what I did read made reasonable sense to me. My key piece of feedback about all the sort order code in general is that it is getting quite complicated and spread out (and I wonder therefore how much duplication or inconsistency there is)
Maybe we could start to consolidate / pull more of this logic together and make it easier to work with and find (to start, just pulling it into individual modules might help)
I left some small structural / naming suggestions. Let me know if you want to address them prior to merge.
| /// Expressions' normalized orderings (as given by the output ordering API | ||
| /// and normalized with respect to equivalence classes of input plan). The | ||
| /// projected expressions are mapped by their indices to this vector. | ||
| orderings: Vec<Option<PhysicalSortExpr>>, |
There was a problem hiding this comment.
I don't fully understand how this is different than output_ordering (which can also be a Vec of PhysicalSortExpr`). Would it makes sense to always normalize the output ordering in terms of the input's equivalence classes?
I thought the notion of multiple equivalent orderings was expressed using https://docs.rs/datafusion-physical-expr/28.0.0/datafusion_physical_expr/equivalence/type.OrderingEquivalentClass.html
Or maybe the key difference is this can track the ordering of each expression independently?
There was a problem hiding this comment.
You are correct in your last sentence. We calculate the order information for each expression that is projected. Since we use this information when determining ordering equivalences, I thought it would be wise to keep it in the executor's state. I would appreciate it if you have any ideas on how to handle this more effectively.
|
Thanks for the review @alamb. @berkaysynnada will finalize the PR according to your comments and take notes for any follow-on work. I will merge this after he finalizes |
|
@alamb, the code organization now should be in line with what you had in mind. I will merge after CI passes, but it'd be great if you can take a quick look in the meantime to make sure we got your suggestions right. AFAICT, there is only one potential room for improvement which we will address in a follow-on. |
Which issue does this PR close?
Closes #7363.
Rationale for this change
ProjectionExeccannot make the orderings of the non-column expressions propagate. However, we can find the PhysicalSortExpr of the expressions which are subject to projection, and use them to preserve ordering information.This is the second step of a three-step improvement process. The first one addressed some bug fixes, and the third one will focus on
ScalarFunctionExpr's. With the assistance of these PRs, we can propagate order information over projections for various types of PhysicalExpr's.What changes are included in this PR?
ProjectionExecholds the information of orderings of its projections.PhysicalExpr's have a new methodget_ordering(), which returns theSortPropertiesof the expression.We need to be able to differentiate a non-ordered column and a literal value, so such a struct is added.
Are these changes tested?
Yes.
Are there any user-facing changes?