-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
Describe the bug
Aggregations over Utf8View/StringViewArray consume significantly more memory as tracked by the memory pool, compared to Utf8/StringArray. The actual memory usage is within 10% of each other, but the reported memory usage can be more than 2x higher.
To Reproduce
Minimal reproducer:
use std::sync::Arc;
use arrow::array::{ArrayRef, RecordBatch, StringArray, StringViewArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
use datafusion::execution::memory_pool::MemoryPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::expressions::{Column, Literal};
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::test::TestMemoryExec;
use datafusion::physical_plan::{ExecutionPlan, Partitioning, collect};
use datafusion::prelude::{SessionConfig, SessionContext};
// --- Knobs ---
const USE_UTF8VIEW: bool = true;
const NUM_ROWS: usize = 2_000_000;
const DATA_PARTITIONS: usize = 100;
const HASH_PARTITIONS: usize = 100;
const POOL_SIZE: usize = 1024 * 1024 * 1024; // 1GB
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let (schema, batch) = make_data();
println!(
"input batch: {} rows, {} bytes",
batch.num_rows(),
batch.get_array_memory_size()
);
let plan = build_plan(schema, batch)?;
println!("plan:\n{}", displayable(plan.as_ref()).indent(true));
let (ctx, pool) = make_session_ctx()?;
println!("pool reserved before collect: {} bytes", pool.reserved());
let results = collect(plan, ctx.task_ctx()).await?;
println!("pool reserved after collect: {} bytes", pool.reserved());
let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
let total_bytes: usize = results.iter().map(|b| b.get_array_memory_size()).sum();
println!(
"{} batches, {total_rows} rows, {total_bytes} bytes in output",
results.len()
);
Ok(())
}
fn make_data() -> (SchemaRef, RecordBatch) {
let values: Vec<String> = (0..NUM_ROWS).map(|i| format!("value_{i}")).collect();
let string_refs: Vec<&str> = values.iter().map(|s| s.as_str()).collect();
let (dt, col): (DataType, ArrayRef) = if USE_UTF8VIEW {
(
DataType::Utf8View,
Arc::new(StringViewArray::from(string_refs)),
)
} else {
(DataType::Utf8, Arc::new(StringArray::from(string_refs)))
};
let schema = Arc::new(Schema::new(vec![Field::new("a", dt, false)]));
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![col]).unwrap();
(schema, batch)
}
fn build_plan(
schema: SchemaRef,
batch: RecordBatch,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let rows_per_partition = NUM_ROWS / DATA_PARTITIONS;
let partitions: Vec<Vec<RecordBatch>> = (0..DATA_PARTITIONS)
.map(|i| vec![batch.slice(i * rows_per_partition, rows_per_partition)])
.collect();
let plan = TestMemoryExec::try_new_exec(&partitions, Arc::clone(&schema), None)?;
// GROUP BY "a", COUNT(*)
let group_by =
PhysicalGroupBy::new_single(vec![(Arc::new(Column::new("a", 0)), "a".to_string())]);
let count_expr = Arc::new(
AggregateExprBuilder::new(count_udaf(), vec![Arc::new(Literal::new(1i8.into()))])
.schema(Arc::clone(&schema))
.alias("count")
.build()?,
);
let plan = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
group_by.clone(),
vec![count_expr.clone()],
vec![None],
plan,
schema,
)?);
let partial_schema = plan.schema();
let plan = Arc::new(RepartitionExec::try_new(
plan,
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], HASH_PARTITIONS),
)?);
let plan = Arc::new(AggregateExec::try_new(
AggregateMode::FinalPartitioned,
group_by,
vec![count_expr],
vec![None],
plan,
partial_schema,
)?);
let plan: Arc<dyn ExecutionPlan> = Arc::new(CoalescePartitionsExec::new(plan));
Ok(plan)
}
fn make_session_ctx() -> Result<(SessionContext, Arc<dyn MemoryPool>), Box<dyn std::error::Error>> {
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(POOL_SIZE, 1.0)
.with_disk_manager_builder(
DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled),
)
.build_arc()?;
let pool: Arc<dyn MemoryPool> = Arc::clone(&runtime.memory_pool);
let config = SessionConfig::new();
let ctx = SessionContext::new_with_config_rt(config, runtime);
Ok((ctx, pool))
}Here's the Cargo.toml for reference:
[package]
name = "datafusion-scratch"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.0", features = ["rt-multi-thread"] }
datafusion = { path = "../datafusion/datafusion/core" }
arrow = "57.3.0"
rand = "0.9.2"
env_logger = "0.11.9"(the path in datafusion is the latest main 9660c9874315354ff22245699785f5f77841be80, but any v52 should have the same behavior).
This fails with:
input batch: 2000000 rows, 46663800 bytes
plan:
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([a@0], 100), input_partitions=100
AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
DataSourceExec: partitions=100, partition_sizes=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
pool reserved before collect: 0 bytes
Error: Shared(ResourcesExhausted("Memory Exhausted while SpillPool (DiskManager is disabled)"))
On macos, I can get the actual memory usage using /usr/bin/time -l, which gives me:
277151744 maximum resident set size
(around ~264MB for ~44MB of input data, but the memory tracker thinks we have more than 1GB of memory allocations and fails the program).
The program runs after increasing the POOL_SIZE: usize = 2048 * 1024 * 1024 and gives a 316456960 maximum resident set size (~301MB). This is lower than the Utf8/StringView case (below), but datafusion's tracker thinks it's much higher!
Expected behavior
At the top of the script, when set USE_UTF8VIEW: bool = false; to use Utf8/StringView instead. The script runs just fine with this:
input batch: 2000000 rows, 41554616 bytes
plan:
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([a@0], 100), input_partitions=100
AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
DataSourceExec: partitions=100, partition_sizes=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
pool reserved before collect: 0 bytes
pool reserved after collect: 0 bytes
300 batches, 2000000 rows, 72276490 bytes in output
/usr/bin/time -l gives me 375013376 maximum resident set size (~357MB)
Additional context
Not directly related, but I noticed that there's no peak_mem_used metric in RepartitionExec like there is one in AggregateExec.