Skip to content

[BUG][connector-file-hadoop] When reading parquet file field value as BINARY type, write downstream data exception #9140

@JeremyXin

Description

@JeremyXin

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

When I synchronized the upstream Hive table (actually it was synchronizing the HDFS file) in the Doris scenario, I found that the data written to Doris was abnormal.

The SQL statement of creating the Hive table is as below:

CREATE TABLE `xxx`.`xxx`(
  `org_openid` string COMMENT 'xxx', 
  `is_admin` string COMMENT 'xxx', 
  `function_type` string COMMENT 'xxx', 
  `mail_active_day_count` string COMMENT 'xxx', 
  `mail_active_week_count` string COMMENT 'xxx', 
  `mail_active_last_week_count` string COMMENT 'xxx', 
  `mail_next_week_retention` string COMMENT 'xxx', 
  `mail_active_month_count` string COMMENT 'xxx', 
  `create_time` string COMMENT 'Creating Time', 
  `update_time` string COMMENT 'Updating Time', 
  `mail_active_web_day_count` bigint COMMENT 'xxx', 
  `mail_active_notweb_day_count` bigint COMMENT 'xxx', 
  `mail_active_web_week_count` bigint COMMENT 'xxx', 
  `mail_active_notweb_week_count` bigint COMMENT 'xxx')
PARTITIONED BY ( 
  `dt` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://xxx'

The format of HDFS file:

ce4b677d9f44dc4b-421324f20000000a_2037610321_data.0.parq

After my local test analysis, I found that when using 'ParquetReadStragegy' to obtain metadata schema according to files, the above org_openid, is_amdin and other string fields were identified as BINARY, and the original logical type (OriginType) was null. As a result, those fields are considered as Byte Array (PrimitiveByteArrayType).

During the reading file process, the above fields were read into a byte array (HeapByteBuffer type). Even after applying the resolveObject method, they are still presented as a byte array. As a result, the data written into the Doris table is abnormal.

I'm not sure why the original logical type (OriginType) of the fields in HDFS files created from the above Hive table is null, and currently, I cannot alter the structure of the upstream Hive table.

For this situation, the possible solution might be that the parquet file supports user-defined schema so that these fields can be defined as SeaTunnelRowType of String. Then under the case STRING branch of the resolveObject method, if the field value type is ByteBuffer, additional processing is done to convert it to String. In fact, I have already implemented the above changes locally and written the correct data.

Actually, I noticed that getSeaTunnelRowTypeInfoWithUserConfigRowType method in OrcReadStrategy class is implemented to allow user-defined schema, but it isn't yet implemented in ParquetReadStrategy class. Maybe it is also worth implementing?

If this is a issues, I can submit the pr and fix it using the above design logic.

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  job.mode = "BATCH"
  parallelism = 10
}
source {
  HdfsFile {
    path = "hdfs://cluster1/xxx"
    file_format_type = "parquet"
    fs.defaultFS = "hdfs://cluster1"
    hdfs_site_path = "xxx/hdfs-site.xml"
    krb5_path = "xxx/krb5.conf"
    kerberos_principal = "xxx"
    kerberos_keytab_path = "xxx/kerberos.keytab"
  }
}

sink {
  Doris {
    fenodes = "xxx:8030"
    username = "xxx"
    password = "xxx"
    database = "database"
    table = "table"
    doris.config {
      format = "json"
      read_json_by_line = "true"
      }
  }
}

Running Command

sh bin/seatunnel.sh --config config/v2.batch.hdfs.template -m local

Error Exception

parquet data is incorrectly written to the downstream doris table. For details, see the screenshot

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

The schema parsed from the parquet file
Image

Image

The record value when reading the parquet file
Image

Upstream hive table data sampling:
Image

Write downstream doris table data sampling. The written data is abnormal.
Image

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions