Skip to content

Over-counting of memory in aggregation + repartition over Utf8View/StringViewArray #20491

@Samyak2

Description

@Samyak2

Describe the bug

Aggregations over Utf8View/StringViewArray consume significantly more memory as tracked by the memory pool, compared to Utf8/StringArray. The actual memory usage is within 10% of each other, but the reported memory usage can be more than 2x higher.

To Reproduce

Minimal reproducer:

use std::sync::Arc;

use arrow::array::{ArrayRef, RecordBatch, StringArray, StringViewArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
use datafusion::execution::memory_pool::MemoryPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::expressions::{Column, Literal};
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::test::TestMemoryExec;
use datafusion::physical_plan::{ExecutionPlan, Partitioning, collect};
use datafusion::prelude::{SessionConfig, SessionContext};

// --- Knobs ---
const USE_UTF8VIEW: bool = true;
const NUM_ROWS: usize = 2_000_000;
const DATA_PARTITIONS: usize = 100;
const HASH_PARTITIONS: usize = 100;
const POOL_SIZE: usize = 1024 * 1024 * 1024; // 1GB

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    env_logger::init();

    let (schema, batch) = make_data();
    println!(
        "input batch: {} rows, {} bytes",
        batch.num_rows(),
        batch.get_array_memory_size()
    );

    let plan = build_plan(schema, batch)?;
    println!("plan:\n{}", displayable(plan.as_ref()).indent(true));

    let (ctx, pool) = make_session_ctx()?;

    println!("pool reserved before collect: {} bytes", pool.reserved());
    let results = collect(plan, ctx.task_ctx()).await?;
    println!("pool reserved after collect:  {} bytes", pool.reserved());

    let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
    let total_bytes: usize = results.iter().map(|b| b.get_array_memory_size()).sum();
    println!(
        "{} batches, {total_rows} rows, {total_bytes} bytes in output",
        results.len()
    );

    Ok(())
}

fn make_data() -> (SchemaRef, RecordBatch) {
    let values: Vec<String> = (0..NUM_ROWS).map(|i| format!("value_{i}")).collect();
    let string_refs: Vec<&str> = values.iter().map(|s| s.as_str()).collect();

    let (dt, col): (DataType, ArrayRef) = if USE_UTF8VIEW {
        (
            DataType::Utf8View,
            Arc::new(StringViewArray::from(string_refs)),
        )
    } else {
        (DataType::Utf8, Arc::new(StringArray::from(string_refs)))
    };

    let schema = Arc::new(Schema::new(vec![Field::new("a", dt, false)]));
    let batch = RecordBatch::try_new(Arc::clone(&schema), vec![col]).unwrap();
    (schema, batch)
}

fn build_plan(
    schema: SchemaRef,
    batch: RecordBatch,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
    let rows_per_partition = NUM_ROWS / DATA_PARTITIONS;
    let partitions: Vec<Vec<RecordBatch>> = (0..DATA_PARTITIONS)
        .map(|i| vec![batch.slice(i * rows_per_partition, rows_per_partition)])
        .collect();
    let plan = TestMemoryExec::try_new_exec(&partitions, Arc::clone(&schema), None)?;

    // GROUP BY "a", COUNT(*)
    let group_by =
        PhysicalGroupBy::new_single(vec![(Arc::new(Column::new("a", 0)), "a".to_string())]);
    let count_expr = Arc::new(
        AggregateExprBuilder::new(count_udaf(), vec![Arc::new(Literal::new(1i8.into()))])
            .schema(Arc::clone(&schema))
            .alias("count")
            .build()?,
    );

    let plan = Arc::new(AggregateExec::try_new(
        AggregateMode::Partial,
        group_by.clone(),
        vec![count_expr.clone()],
        vec![None],
        plan,
        schema,
    )?);
    let partial_schema = plan.schema();

    let plan = Arc::new(RepartitionExec::try_new(
        plan,
        Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], HASH_PARTITIONS),
    )?);

    let plan = Arc::new(AggregateExec::try_new(
        AggregateMode::FinalPartitioned,
        group_by,
        vec![count_expr],
        vec![None],
        plan,
        partial_schema,
    )?);

    let plan: Arc<dyn ExecutionPlan> = Arc::new(CoalescePartitionsExec::new(plan));
    Ok(plan)
}

fn make_session_ctx() -> Result<(SessionContext, Arc<dyn MemoryPool>), Box<dyn std::error::Error>> {
    let runtime = RuntimeEnvBuilder::new()
        .with_memory_limit(POOL_SIZE, 1.0)
        .with_disk_manager_builder(
            DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled),
        )
        .build_arc()?;
    let pool: Arc<dyn MemoryPool> = Arc::clone(&runtime.memory_pool);

    let config = SessionConfig::new();
    let ctx = SessionContext::new_with_config_rt(config, runtime);
    Ok((ctx, pool))
}

Here's the Cargo.toml for reference:

[package]
name = "datafusion-scratch"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.0", features = ["rt-multi-thread"] }
datafusion = { path = "../datafusion/datafusion/core" }
arrow = "57.3.0"
rand = "0.9.2"
env_logger = "0.11.9"

(the path in datafusion is the latest main 9660c9874315354ff22245699785f5f77841be80, but any v52 should have the same behavior).

This fails with:

input batch: 2000000 rows, 46663800 bytes
plan:
CoalescePartitionsExec
  RepartitionExec: partitioning=Hash([a@0], 100), input_partitions=100
    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
      DataSourceExec: partitions=100, partition_sizes=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

pool reserved before collect: 0 bytes
Error: Shared(ResourcesExhausted("Memory Exhausted while SpillPool (DiskManager is disabled)"))

On macos, I can get the actual memory usage using /usr/bin/time -l, which gives me:

277151744  maximum resident set size

(around ~264MB for ~44MB of input data, but the memory tracker thinks we have more than 1GB of memory allocations and fails the program).

The program runs after increasing the POOL_SIZE: usize = 2048 * 1024 * 1024 and gives a 316456960 maximum resident set size (~301MB). This is lower than the Utf8/StringView case (below), but datafusion's tracker thinks it's much higher!

Expected behavior

At the top of the script, when set USE_UTF8VIEW: bool = false; to use Utf8/StringView instead. The script runs just fine with this:

input batch: 2000000 rows, 41554616 bytes
plan:
CoalescePartitionsExec
  RepartitionExec: partitioning=Hash([a@0], 100), input_partitions=100
    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
      DataSourceExec: partitions=100, partition_sizes=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

pool reserved before collect: 0 bytes
pool reserved after collect:  0 bytes
300 batches, 2000000 rows, 72276490 bytes in output

/usr/bin/time -l gives me 375013376 maximum resident set size (~357MB)

Additional context

Not directly related, but I noticed that there's no peak_mem_used metric in RepartitionExec like there is one in AggregateExec.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions