Skip to content

[Java API] When using the java iceberg api to write data into a partition table, an error occurs For a timestamp partition field, the type cannot be parsed correctly. #6510

@youngxinler

Description

@youngxinler

Apache Iceberg version

1.1.0 (latest release)

Query engine

Other

Please describe the bug 🐞

When using the java iceberg api to write data into a partition table, an error occurs For a timestamp partition field, the type cannot be parsed correctly.

        Configuration configuration = new Configuration();
        // this is a local file catalog
        HadoopCatalog hadoopCatalog = new HadoopCatalog(configuration, icebergWareHousePath);
        TableIdentifier name = TableIdentifier.of("logging", "logs");
        Schema schema = new Schema(
                Types.NestedField.required(1, "level", Types.StringType.get()),
                Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),
                Types.NestedField.required(3, "message", Types.StringType.get()),
                Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))
        );
        PartitionSpec spec = PartitionSpec.builderFor(schema)
                .hour("event_time")
                .identity("level")
                .build();
        Table table = hadoopCatalog.createTable(name, schema, spec);

        GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema());

        int partitionId = 1, taskId = 1;
        OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(FileFormat.PARQUET).build();
        final PartitionKey partitionKey = new PartitionKey(table.spec(), table.spec().schema());

        // partitionedFanoutWriter will auto partitioned record and create the partitioned writer
        PartitionedFanoutWriter<Record> partitionedFanoutWriter = new PartitionedFanoutWriter<Record>(table.spec(), FileFormat.PARQUET, appenderFactory, outputFileFactory, table.io(), TARGET_FILE_SIZE_IN_BYTES) {
            @Override
            protected PartitionKey partition(Record record) {
                partitionKey.partition(record);
                return partitionKey;
            }
        };

        Random random = new Random();
        List<String> levels = Arrays.asList("info", "debug", "error", "warn");
        GenericRecord genericRecord = GenericRecord.create(table.schema());

        // assume write 1000 records
        for (int i = 0; i < 1000; i++) {
            GenericRecord record = genericRecord.copy();
            record.setField("level",  levels.get(random.nextInt(levels.size())));
//            record.setField("event_time", System.currentTimeMillis());
            record.setField("event_time", OffsetDateTime.now());
            record.setField("message", "Iceberg is a great table format");
            record.setField("call_stack", Arrays.asList("NullPointerException"));
            partitionedFanoutWriter.write(record);
        }


        AppendFiles appendFiles = table.newAppend();

        // submit datafiles to the table
        Arrays.stream(partitionedFanoutWriter.dataFiles()).forEach(appendFiles::appendFile);

        // submit snapshot
        Snapshot newSnapshot = appendFiles.apply();
        appendFiles.commit();

When I use Long to set event_time and write, it will report an error

java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.OffsetDateTime

	at org.apache.iceberg.data.parquet.BaseParquetWriter$TimestamptzWriter.write(BaseParquetWriter.java:281)
	at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
	at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:138)
	at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
	at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:362)
	at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:345)
	at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
	at org.apache.iceberg.io.PartitionedFanoutWriter.write(PartitionedFanoutWriter.java:63)

When I use java.time.OffsetDateTime to set event_time and write, it will also report an error

java.lang.IllegalStateException: Not an instance of java.lang.Long: 2023-01-02T11:20:20.746+08:00

	at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
	at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)

This problem will not occur if the table is non partitioned. I looked at the internal code of PartitionKey. It seems that the transformation logic of this internal partition field is related?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions