Skip to content

Commit 9baefea

Browse files
rtpswicexelloss
andauthored
GH-32884: [C++] Add ordered aggregation (#34311)
This PR implements "Segmented Aggregation" to the existing aggregation node to improve aggregation on ordered data. A segment group is defined as "a continuous chunk of data that have the same segment key value. e.g, if the input data looks like ``` [0, 0, 0, 1, 2, 2] ``` Then there are three segments `[0, 0, 0]` `[1]` `[2, 2]` (Note the "group" in "segment group" here is added to differentiate from "segment", which is defined as "a continuous chunk of data with in a ExecBatch") Segment aggregation can be used to replace existing hash aggregation in the case that data are ordered. The benefit of this is (1) We can output aggregation result earlier (as soon as a segment group is fully consumed). (2) We only need to hold partial aggregation for one segment group to reduce memory usage. See https://issues.apache.org/jira/browse/ARROW-17642 Replaces #14352 * Closes: #32884 Follow ups ======= * #34475 * #34529 --------- Co-authored-by: Li Jin <[email protected]>
1 parent 4c05a3b commit 9baefea

File tree

11 files changed

+1731
-195
lines changed

11 files changed

+1731
-195
lines changed

cpp/src/arrow/compute/exec.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,18 @@ ExecBatch ExecBatch::Slice(int64_t offset, int64_t length) const {
147147
return out;
148148
}
149149

150+
Result<ExecBatch> ExecBatch::SelectValues(const std::vector<int>& ids) const {
151+
std::vector<Datum> selected_values;
152+
selected_values.reserve(ids.size());
153+
for (int id : ids) {
154+
if (id < 0 || static_cast<size_t>(id) >= values.size()) {
155+
return Status::Invalid("ExecBatch invalid value selection: ", id);
156+
}
157+
selected_values.push_back(values[id]);
158+
}
159+
return ExecBatch(std::move(selected_values), length);
160+
}
161+
150162
namespace {
151163

152164
enum LengthInferenceError {

cpp/src/arrow/compute/exec.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ struct ARROW_EXPORT ExecBatch {
181181
/// \brief Infer the ExecBatch length from values.
182182
static Result<int64_t> InferLength(const std::vector<Datum>& values);
183183

184+
/// Creates an ExecBatch with length-validation.
185+
///
186+
/// If any value is given, then all values must have a common length. If the given
187+
/// length is negative, then the length of the ExecBatch is set to this common length,
188+
/// or to 1 if no values are given. Otherwise, the given length must equal the common
189+
/// length, if any value is given.
184190
static Result<ExecBatch> Make(std::vector<Datum> values, int64_t length = -1);
185191

186192
Result<std::shared_ptr<RecordBatch>> ToRecordBatch(
@@ -240,6 +246,8 @@ struct ARROW_EXPORT ExecBatch {
240246

241247
ExecBatch Slice(int64_t offset, int64_t length) const;
242248

249+
Result<ExecBatch> SelectValues(const std::vector<int>& ids) const;
250+
243251
/// \brief A convenience for returning the types from the batch.
244252
std::vector<TypeHolder> GetTypes() const {
245253
std::vector<TypeHolder> result;

cpp/src/arrow/compute/exec/aggregate_node.cc

Lines changed: 276 additions & 33 deletions
Large diffs are not rendered by default.

cpp/src/arrow/compute/exec/exec_plan.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,7 @@ class ARROW_EXPORT ExecNode {
241241
/// concurrently, potentially even before the call to StartProducing
242242
/// has finished.
243243
/// - PauseProducing(), ResumeProducing(), StopProducing() may be called
244-
/// by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished()
245-
/// methods
244+
/// by the downstream nodes' InputReceived(), InputFinished() methods
246245
///
247246
/// StopProducing may be called due to an error, by the user (e.g. cancel), or
248247
/// because a node has all the data it needs (e.g. limit, top-k on sorted data).

cpp/src/arrow/compute/exec/options.h

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,21 +221,37 @@ class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions {
221221
std::vector<std::string> names;
222222
};
223223

224-
/// \brief Make a node which aggregates input batches, optionally grouped by keys.
224+
/// \brief Make a node which aggregates input batches, optionally grouped by keys and
225+
/// optionally segmented by segment-keys. Both keys and segment-keys determine the group.
226+
/// However segment-keys are also used for determining grouping segments, which should be
227+
/// large, and allow streaming a partial aggregation result after processing each segment.
228+
/// One common use-case for segment-keys is ordered aggregation, in which the segment-key
229+
/// attribute specifies a column with non-decreasing values or a lexicographically-ordered
230+
/// set of such columns.
225231
///
226232
/// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is
227233
/// expected to be a HashAggregate function. If the keys attribute is an empty vector,
228234
/// then each aggregate is assumed to be a ScalarAggregate function.
235+
///
236+
/// If the segment_keys attribute is a non-empty vector, then segmented aggregation, as
237+
/// described above, applies.
238+
///
239+
/// The keys and segment_keys vectors must be disjoint.
229240
class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
230241
public:
231242
explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
232-
std::vector<FieldRef> keys = {})
233-
: aggregates(std::move(aggregates)), keys(std::move(keys)) {}
243+
std::vector<FieldRef> keys = {},
244+
std::vector<FieldRef> segment_keys = {})
245+
: aggregates(std::move(aggregates)),
246+
keys(std::move(keys)),
247+
segment_keys(std::move(segment_keys)) {}
234248

235249
// aggregations which will be applied to the targetted fields
236250
std::vector<Aggregate> aggregates;
237-
// keys by which aggregations will be grouped
251+
// keys by which aggregations will be grouped (optional)
238252
std::vector<FieldRef> keys;
253+
// keys by which aggregations will be segmented (optional)
254+
std::vector<FieldRef> segment_keys;
239255
};
240256

241257
constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30; // 1GiB

cpp/src/arrow/compute/exec/plan_test.cc

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,5 +1578,108 @@ TEST(ExecPlan, SourceEnforcesBatchLimit) {
15781578
}
15791579
}
15801580

1581+
TEST(ExecPlanExecution, SegmentedAggregationWithMultiThreading) {
1582+
BatchesWithSchema data;
1583+
data.batches = {ExecBatchFromJSON({int32()}, "[[1]]")};
1584+
data.schema = schema({field("i32", int32())});
1585+
Declaration plan = Declaration::Sequence(
1586+
{{"source",
1587+
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
1588+
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
1589+
{"count", nullptr, "i32", "count(i32)"},
1590+
},
1591+
/*keys=*/{"i32"}, /*segment_leys=*/{"i32"}}}});
1592+
EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented, HasSubstr("multi-threaded"),
1593+
DeclarationToExecBatches(std::move(plan)));
1594+
}
1595+
1596+
TEST(ExecPlanExecution, SegmentedAggregationWithOneSegment) {
1597+
BatchesWithSchema data;
1598+
data.batches = {
1599+
ExecBatchFromJSON({int32(), int32(), int32()}, "[[1, 1, 1], [1, 2, 1], [1, 1, 2]]"),
1600+
ExecBatchFromJSON({int32(), int32(), int32()},
1601+
"[[1, 2, 2], [1, 1, 3], [1, 2, 3]]")};
1602+
data.schema = schema({
1603+
field("a", int32()),
1604+
field("b", int32()),
1605+
field("c", int32()),
1606+
});
1607+
1608+
Declaration plan = Declaration::Sequence(
1609+
{{"source",
1610+
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
1611+
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
1612+
{"hash_sum", nullptr, "c", "sum(c)"},
1613+
{"hash_mean", nullptr, "c", "mean(c)"},
1614+
},
1615+
/*keys=*/{"b"}, /*segment_leys=*/{"a"}}}});
1616+
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
1617+
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
1618+
1619+
auto expected = ExecBatchFromJSON({int64(), float64(), int32(), int32()},
1620+
R"([[6, 2, 1, 1], [6, 2, 2, 1]])");
1621+
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
1622+
{expected});
1623+
}
1624+
1625+
TEST(ExecPlanExecution, SegmentedAggregationWithTwoSegments) {
1626+
BatchesWithSchema data;
1627+
data.batches = {
1628+
ExecBatchFromJSON({int32(), int32(), int32()}, "[[1, 1, 1], [1, 2, 1], [1, 1, 2]]"),
1629+
ExecBatchFromJSON({int32(), int32(), int32()},
1630+
"[[2, 2, 2], [2, 1, 3], [2, 2, 3]]")};
1631+
data.schema = schema({
1632+
field("a", int32()),
1633+
field("b", int32()),
1634+
field("c", int32()),
1635+
});
1636+
1637+
Declaration plan = Declaration::Sequence(
1638+
{{"source",
1639+
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
1640+
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
1641+
{"hash_sum", nullptr, "c", "sum(c)"},
1642+
{"hash_mean", nullptr, "c", "mean(c)"},
1643+
},
1644+
/*keys=*/{"b"}, /*segment_leys=*/{"a"}}}});
1645+
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
1646+
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
1647+
1648+
auto expected = ExecBatchFromJSON(
1649+
{int64(), float64(), int32(), int32()},
1650+
R"([[3, 1.5, 1, 1], [1, 1, 2, 1], [3, 3, 1, 2], [5, 2.5, 2, 2]])");
1651+
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
1652+
{expected});
1653+
}
1654+
1655+
TEST(ExecPlanExecution, SegmentedAggregationWithBatchCrossingSegment) {
1656+
BatchesWithSchema data;
1657+
data.batches = {
1658+
ExecBatchFromJSON({int32(), int32(), int32()}, "[[1, 1, 1], [1, 1, 1], [2, 2, 2]]"),
1659+
ExecBatchFromJSON({int32(), int32(), int32()},
1660+
"[[2, 2, 2], [3, 3, 3], [3, 3, 3]]")};
1661+
data.schema = schema({
1662+
field("a", int32()),
1663+
field("b", int32()),
1664+
field("c", int32()),
1665+
});
1666+
1667+
Declaration plan = Declaration::Sequence(
1668+
{{"source",
1669+
SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}},
1670+
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
1671+
{"hash_sum", nullptr, "c", "sum(c)"},
1672+
{"hash_mean", nullptr, "c", "mean(c)"},
1673+
},
1674+
/*keys=*/{"b"}, /*segment_leys=*/{"a"}}}});
1675+
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
1676+
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
1677+
1678+
auto expected = ExecBatchFromJSON({int64(), float64(), int32(), int32()},
1679+
R"([[2, 1, 1, 1], [4, 2, 2, 2], [6, 3, 3, 3]])");
1680+
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
1681+
{expected});
1682+
}
1683+
15811684
} // namespace compute
15821685
} // namespace arrow

0 commit comments

Comments
 (0)