Skip to content

Glue catalog concurrent write issue #1868

@jembishop

Description

@jembishop

Apache Iceberg Rust version

0.7.0

Describe the bug

It seems if I use fast append for a iceberg table using the glue catalog, the optimistic concurrency doesn't seem to work, and some of my processes registered files get clobbered by other concurrent processing.

My setup is a iceberg table with the glue catalog in AWS. I create parquet files (not using rust-iceberg), then use rust iceberg to register the uploaded files with iceberg, retrying to ensure optimistic concurrency errors are retried.

async fn register_files_to_iceberg_batch(
    bucket_name: &str, region: &str, event_name: &str, is_test: bool, requests: Vec<IcebergRegistrationRequest>,
    credentials: &aws_credential_types::Credentials,
) -> anyhow::Result<()> {
    const RETRY_DURATION: std::time::Duration = std::time::Duration::from_secs(10);
    const MAX_ATTEMPTS: u32 = 100;

    for attempt in 0..MAX_ATTEMPTS {
        let catalog = create_glue_catalog(bucket_name, credentials)
            .await
            .with_context(|| format!("Failed to create Glue catalog on attempt {}", attempt))?;

        match register_files_to_iceberg_batch_once(bucket_name, region, event_name, is_test, &requests, &catalog).await
        {
            Ok(()) => return Ok(()),
            Err(err) => {
                let is_commit_conflict = err
                    .downcast_ref::<iceberg::Error>()
                    .map(|e| e.kind() == ErrorKind::CatalogCommitConflicts)
                    .unwrap_or(false);

                if attempt < MAX_ATTEMPTS - 1 {
                    if is_commit_conflict {
                        tracing::info!(
                            attempt,
                            error = &*err,
                            retry_duration = ?RETRY_DURATION,
                            "Iceberg commit conflict, retrying batch with fresh catalog"
                        );
                    } else {
                        tracing::error!(
                            attempt,
                            error = &*err,
                            retry_duration = ?RETRY_DURATION,
                            "Error registering files to Iceberg, retrying batch with fresh catalog"
                        );
                    }
                    tokio::time::sleep(RETRY_DURATION).await;
                } else {
                    bail!("Max retry attempts reached for Iceberg batch registration: {}", err);
                }
            }
        }
    }
    Ok(())
}
async fn register_files_to_iceberg_batch_once(
    bucket_name: &str, region: &str, event_name: &str, is_test: bool, requests: &[IcebergRegistrationRequest],
    catalog: &iceberg_catalog_glue::GlueCatalog,
) -> anyhow::Result<()> {
    let start = std::time::Instant::now();
    let region_name = region_to_name(region);
    let table_name = format!("{}_{}_iceberg", region_name, event_name);
    let database_name = if is_test { "test_market_data" } else { "market_data" };

    tracing::info!(
        table_name,
        file_count = requests.len(),
        "Registering files to Iceberg table"
    );

    let table_ident = TableIdent::new(NamespaceIdent::new(database_name.to_string()), table_name.clone());
    let table = catalog.load_table(&table_ident).await?;

    let partition_spec = table.metadata().default_partition_spec();

    // Build data files for all requests
    let mut data_files = Vec::new();
    for request in requests {
        let full_path = format!(
            "s3://{}/{}",
            bucket_name,
            request.s3_key.to_string_with_id(&request.file_id)
        );

        let mut partition_values = Vec::new();
        for field in partition_spec.fields() {
            let source_field = table
                .metadata()
                .current_schema()
                .field_by_id(field.source_id)
                .context("Failed to get source field")?;
            let field_name = source_field.name.as_str();

            let literal = match field_name {
                "InstrumentKind" => Some(Literal::string(request.s3_key.partition_info.instrument_kind.clone())),
                "Exchange" => Some(Literal::string(request.s3_key.partition_info.exchange.clone())),
                "Date" => {
                    let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
                    let days_since_epoch = request
                        .s3_key
                        .partition_info
                        .date
                        .signed_duration_since(epoch)
                        .num_days() as i32;
                    Some(Literal::date(days_since_epoch))
                }
                _ => None,
            };
            partition_values.push(literal);
        }

        let partition = Struct::from_iter(partition_values);

        let data_file = DataFileBuilder::default()
            .content(DataContentType::Data)
            .file_path(full_path.clone())
            .file_format(DataFileFormat::Parquet)
            .partition(partition)
            .record_count(request.metadata.record_count)
            .file_size_in_bytes(request.metadata.file_size)
            .partition_spec_id(table.metadata().default_partition_spec_id())
            .column_sizes(request.metadata.stats.column_sizes.clone())
            .value_counts(request.metadata.stats.value_counts.clone())
            .null_value_counts(request.metadata.stats.null_value_counts.clone())
            .lower_bounds(request.metadata.stats.lower_bounds.clone())
            .upper_bounds(request.metadata.stats.upper_bounds.clone())
            .build()?;

        data_files.push(data_file);
    }

    // Single commit with all data files
    let tx = Transaction::new(&table);
    let append_action = tx.fast_append().with_check_duplicate(false).add_data_files(data_files);
    let tx = append_action.apply(tx)?;
    tx.commit(catalog).await?;

    let committed_files: Vec<String> = requests
        .iter()
        .map(|r| r.s3_key.to_string_with_id(&r.file_id))
        .collect();

    tracing::info!(
        table = table_name,
        file_count = requests.len(),
        elapsed_ms = start.elapsed().as_millis(),
        files = ?committed_files,
        "Successfully registered files to Iceberg in single commit"
    );

    Ok(())
}

When running this, I don't actually observe any logs related to iceberg commit conflicts, and I get a bunch of orphaned unregistered files with concurrent writers, which I thought was due to optimistic concurrency conflicts not being raised.

I have a branch here which sets the glue table version id, which seems to solve the problem for me.

Is this a bug or am I misusing the library?

Thanks

To Reproduce

Set up an iceberg table in s3 with aws glue, and use multiple writers attempting to register parquet files using the fast append action at the same time, with optimistic retrying for concurrency.

Expected behavior

There will be large amounts of orphans due to concurrent modification errors not occuring.

Willingness to contribute

I have a branch that fixes it, not sure if that is best practise approach though

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions