-
Notifications
You must be signed in to change notification settings - Fork 3k
Closed
Description
Apache Iceberg version
1.1.0 (latest release)
Query engine
Other
Please describe the bug 🐞
When using the java iceberg api to write data into a partition table, an error occurs For a timestamp partition field, the type cannot be parsed correctly.
Configuration configuration = new Configuration();
// this is a local file catalog
HadoopCatalog hadoopCatalog = new HadoopCatalog(configuration, icebergWareHousePath);
TableIdentifier name = TableIdentifier.of("logging", "logs");
Schema schema = new Schema(
Types.NestedField.required(1, "level", Types.StringType.get()),
Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),
Types.NestedField.required(3, "message", Types.StringType.get()),
Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))
);
PartitionSpec spec = PartitionSpec.builderFor(schema)
.hour("event_time")
.identity("level")
.build();
Table table = hadoopCatalog.createTable(name, schema, spec);
GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema());
int partitionId = 1, taskId = 1;
OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(FileFormat.PARQUET).build();
final PartitionKey partitionKey = new PartitionKey(table.spec(), table.spec().schema());
// partitionedFanoutWriter will auto partitioned record and create the partitioned writer
PartitionedFanoutWriter<Record> partitionedFanoutWriter = new PartitionedFanoutWriter<Record>(table.spec(), FileFormat.PARQUET, appenderFactory, outputFileFactory, table.io(), TARGET_FILE_SIZE_IN_BYTES) {
@Override
protected PartitionKey partition(Record record) {
partitionKey.partition(record);
return partitionKey;
}
};
Random random = new Random();
List<String> levels = Arrays.asList("info", "debug", "error", "warn");
GenericRecord genericRecord = GenericRecord.create(table.schema());
// assume write 1000 records
for (int i = 0; i < 1000; i++) {
GenericRecord record = genericRecord.copy();
record.setField("level", levels.get(random.nextInt(levels.size())));
// record.setField("event_time", System.currentTimeMillis());
record.setField("event_time", OffsetDateTime.now());
record.setField("message", "Iceberg is a great table format");
record.setField("call_stack", Arrays.asList("NullPointerException"));
partitionedFanoutWriter.write(record);
}
AppendFiles appendFiles = table.newAppend();
// submit datafiles to the table
Arrays.stream(partitionedFanoutWriter.dataFiles()).forEach(appendFiles::appendFile);
// submit snapshot
Snapshot newSnapshot = appendFiles.apply();
appendFiles.commit();
When I use Long to set event_time and write, it will report an error
java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.OffsetDateTime
at org.apache.iceberg.data.parquet.BaseParquetWriter$TimestamptzWriter.write(BaseParquetWriter.java:281)
at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:138)
at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:362)
at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:345)
at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
at org.apache.iceberg.io.PartitionedFanoutWriter.write(PartitionedFanoutWriter.java:63)
When I use java.time.OffsetDateTime to set event_time and write, it will also report an error
java.lang.IllegalStateException: Not an instance of java.lang.Long: 2023-01-02T11:20:20.746+08:00
at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
This problem will not occur if the table is non partitioned. I looked at the internal code of PartitionKey. It seems that the transformation logic of this internal partition field is related?
Metadata
Metadata
Assignees
Labels
No labels