ARROW-17381: [C++][Acero] Centralize error handling in ExecPlan#13848
ARROW-17381: [C++][Acero] Centralize error handling in ExecPlan#13848save-buffer wants to merge 1 commit intoapache:mainfrom
Conversation
|
|
9dbb7e2 to
25dbf30
Compare
25dbf30 to
2cdc239
Compare
westonpace
left a comment
There was a problem hiding this comment.
I like the cleanup, this is definitely simplifying ExecNode/ExecPlan. I have some initial thoughts.
There was a problem hiding this comment.
| // COMMIT cd5346e14450d7e5ca156acb4c2f396885c77aa0 |
There was a problem hiding this comment.
Eventually this case will go away
There was a problem hiding this comment.
Why does the order no longer matter here?
There was a problem hiding this comment.
Wouldn't a more appropriate place to trigger EndTaskGroup be when InputFinished is received on all sinks?
There was a problem hiding this comment.
EndTaskGroup has a nice property that it ends when it runs out of tasks to perform, here's the comment:
/// It is allowed for tasks to be added after this call provided the future has not yet
/// completed. This should be safe as long as the tasks being added are added as part
/// of a task that is tracked. As soon as the count of running tasks reaches 0 this
/// future will be marked complete.
So we will end when all of the tasks have finished running and no new tasks have been scheduled.
There was a problem hiding this comment.
Wouldn't we call node->Abort when we transition to aborted_ = true?
There was a problem hiding this comment.
We want to avoid any possible race conditions while aborting/doing cleanup and running tasks, so it's only safe to Abort when we're sure that no other tasks are running.
There was a problem hiding this comment.
Very happy to see this move into the base class.
There was a problem hiding this comment.
At this point maybe we should just move the body of DoProject into this method?
There was a problem hiding this comment.
Can this be a default implementation for ExecNode::InputFinished?
There was a problem hiding this comment.
Yeah it probably can be. Actually this span thing is a bit broken right now in general because we don't enforce that InputFinished is called after all batches have been output. InputFinished is merely to specify the total number of batches that will be output, so e.g. in the case of scalar aggregates that output only one row ever, InputFinished is called in StartProducing, and so a project above a scalar aggregate node would be ended immediately.
There was a problem hiding this comment.
What does Abort execution mean for a node? In theory all "execution" is handled via the scheduler so does a node really need to do anything here? Why ExecNode::Abort instead of doing the cleanup in the ExecNode destructor?
|
@zagto do you mind taking a look at this when you get a chance? |
zagto
left a comment
There was a problem hiding this comment.
Nice work. I love seeing the code becoming cleaner and easier to unterstand.
There was a problem hiding this comment.
I don't think this std::move does anything, given that status is a const reference.
There was a problem hiding this comment.
If we get a non-ok status here, would that mean we just abort while discarding the Status/message? This seems confusing to the user. Maybe we could have an ExecPlan::Abort(Status) that adds the status to ExecPlanImpl::errors_?
There was a problem hiding this comment.
| auto values = batch.values; | |
| auto values = std::move(batch.values); |
There was a problem hiding this comment.
Why do we 3 calls to SleepABit? Probably because one may not be enough on slower systems, but I think a comment would be helpful here
1cc334d to
279bf83
Compare
279bf83 to
1c75db4
Compare
westonpace
left a comment
There was a problem hiding this comment.
Are you interested in dusting this off and rebasing now that the previous cleanup has merged?
| /// \brief Stop producing definitively to a single output | ||
| /// | ||
| /// This call is a hint that an output node has completed and is not willing | ||
| /// to receive any further data. | ||
| virtual void StopProducing(ExecNode* output) = 0; |
There was a problem hiding this comment.
I've since learned that this is still needed. This covers the case where a LIMIT X node is placed on one branch of a query. It is intended to stop part of the plan but not abort the entire plan. Do you think we can leave it in?
|
@save-buffer are you interested in rebasing this? |
|
Closing because it has been untouched for a while, in case it's still relevant feel free to reopen and move it forward 👍 |
No description provided.