# Tableflow Setup

## Introduction

Tableflow automates the tedious process of transforming a topic in an Apache Kafka-compatible data streaming system into an Apache Iceberg table. Instead of writing custom code and manually configuring a data pipeline for each table you want to build, Tableflow allows you to declaratively specify which topics to build tables from and what schema and data format to expect. When schemas inevitably need to change, you can update the schema in Tableflow's editor and WarpStream will handle the schema migration automatically.

Compaction and table maintenance is included out-of-the-box with no tuning required. Tableflow continuously compacts the table in the background with intelligent heuristics to ensure readers get the best performance.

Tableflow is available as Bring-Your-Own-Cloud (BYOC) where the compute and storage live inside your cloud account inside your VPC. The raw data for your table is only ever stored inside your object storage bucket and never leaves your VPC during the table ingestion and maintenance process. Tableflow maintains a metadata store inside WarpStream Cloud as the Iceberg metadata layer that is periodically synced into your object storage bucket.

## Getting Started

To get started with Tableflow, you first need to create a Tableflow cluster from the WarpStream Console, or using one of [infrastructure-as-code](https://docs.warpstream.com/warpstream/agent-setup/infrastucture-as-code) deployment options. The WarpStream Agents that join this cluster will only perform Tableflow operations and do not expose the Apache Kafka protocol. Please refer to our [other documentation](https://docs.warpstream.com/warpstream/agent-setup/deploy) for how to install and configure the WarpStream Agents in your environment as the process does not differ for Tableflow.

As part of deploying the Agents, you'll also need to setup and configure an object storage bucket and/or provide the Agents with access to one of your existing buckets. See our [object storage configuration documentation](https://docs.warpstream.com/warpstream/agent-setup/different-object-stores) for more details on that.

Once the Agents are running, you can open the Configuration table and start defining your source clusters, topics, tables, and schemas. Currently the only method for defining schemas is `inline` mode, which doesn't require using an external Schema Registry. Schema Registry support will be available in a future release.

## Managed Tables

Tableflow tables are fully managed by WarpStream, or what we call "managed tables". You cannot use another system for performing writes, compactions, or other table maintenance operations. This is in contrast to a connector-based approach where you would be forced to combine multiple distinct systems or operations together to implement all of these functions.

## Configuration

Tableflow is configured with and is fully controllable from a single YAML file which can be edited through the WarpStream console or the [Pipelines API](https://docs.warpstream.com/warpstream/reference/api-reference/pipelines).

### Overview

```yaml
source_clusters:
  - name: tableflow_cluster_1
    bootstrap_brokers:
      - hostname: localhost
        port: 9092
  - name: tableflow_cluster_2
    bootstrap_brokers:
      - hostname: broker1.kafkaserver.com
        port: 9092
      - hostname: broker2.kafkaserver.com
        port: 9092
    credentials:
      sasl_username_env: SASL_USERNAME_ENV_VAR
      sasl_password_env: SASL_PASSWORD_ENV_VAR
      use_tls: true
      sasl_mechanism: plain
tables:
    - source_cluster_name: tableflow_cluster_1
      source_topic: example_json_logs_topic
      source_format: json
      schema_mode: inline
      input_schema: |
          {
            "type": "object",
            "properties": {
              "environment": { "type": "string" },
              "service": { "type": "string" },
              "status": { "type": "string" },
              "message": { "type": "string" }
            },
            "required": ["environment", "service", "status", "message"]
          }
    - source_cluster_name: tableflow_cluster_2          
      source_topic: example_avro_events_topic
      source_format: avro
      schema_mode: inline
      input_schema: |
          {
            "type": "record",
            "name": "ExampleAvroEvent",
            "fields": [
              { "name": "event_id", "type": "string" },
              { "name": "user_id", "type": "long" },
              { "name": "session_id", "type": "string" }
            ]
          }
destination_bucket_url: s3://my-bucket-name
```

The YAML specifies

* The source clusters Tableflow should connect to.
* For each cluster, the topic that Tableflow should create Iceberg tables from.
* For each topic, the schema to deserialize the Kafka records with.
* The destination bucket to store the table.

### Configure Source Clusters

Source clusters are the Apache Kafka-compatible systems like WarpStream that store the topics you'd like to convert to tables. You define clusters by giving them a name, a list of brokers, and credentials if they are needed. You define source clusters at the root of the configuration YAML.

```yaml
source_clusters:
  - name: tableflow_cluster
    bootstrap_brokers:
      - hostname: broker.kafkaserver.com
        port: 9092
```

You can define multiple source clusters so a single Tableflow cluster can centralize data from multiple clusters into one unified place.

### Configure Connection and Credentials

If credentials are needed to connect to the Kafka cluster, the connection information can be provided under the `credentials` block for each cluster.

{% hint style="info" %}
Note that if the source Kafka cluster is a WarpStream cluster, credentials still need to be provided if authentication is required. This is different from the Managed Data Pipelines setup where credentials are injected automatically.
{% endhint %}

#### **TLS**

```yaml
source_clusters:
  - name: tableflow_cluster 
    ...
    credentials:
      use_tls: true
      tls_insecure_skip_verify: false
```

* `use_tls` specifies whether the Agents should use TLS when connecting to your source clusters.
* `tls_insecure_skip_verify` specifies whether a client verifies the server's certificate chain and host name.

#### **SASL**

```yaml
source_clusters:
  - name: tableflow_cluster 
    ...
    credentials:
      sasl_username_env: SASL_USERNAME_ENV_VAR
      sasl_password_env: SASL_PASSWORD_ENV_VAR
      sasl_mechanism: plain
      use_tls: true
```

* Both the `sasl_username_env` and the `sasl_password_env` fields refer to environment variable names. The Agents will append a `TABLEFLOW_` prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as `TABLEFLOW_SASL_USERNAME_ENV_VAR` and `TABLEFLOW_SASL_PASSWORD_ENV_VAR` respectively.
* The default value of `sasl_mechanism` is `plain`. Supported mechanisms include: `plain`, `scram-256`, and `scram-512`.

#### **mTLS PEM encoded certs**

```yaml
source_clusters:
  - name: tableflow_cluster 
    ...
    credentials:
      mtls_client_cert_env: MTLS_CERT_PATH_ENV_VAR
      mtls_client_key_env: MTLS_KEY_PATH_ENV_VAR
      mtls_server_ca_cert_env: MTLS_SERVER_CA_CERT_PATH_ENV_VAR
      use_tls: true
```

* The `mtls_client_cert_env`, `mtls_client_key_env`, and `mtls_server_ca_cert_env` fields refer to environment variable names. The Agents will append a `TABLEFLOW_` prefix to the values of the fields before using the environment variables, so the environment variables in the Agent should be configured as `MTLS_CERT_PATH_ENV_VAR`, `MTLS_KEY_PATH_ENV_VAR`, and `MTLS_SERVER_CA_CERT_PATH_ENV_VAR` respectively.
* `mtls_client_cert_env` specifies the environment variable that contains the path to the X.509 certificate file in PEM format.
* `mtls_client_key_env` specifies the environment variable that contains the path to the X.509 private key file in PEM format.
* `mtls_server_ca_cert_env` is optional and specifies the environment variable that contains the path to the X.509 certificate file in PEM format for the client certificate authority. "If not specified, the host's root certificate pool will be used for client certificate verification.

### Configure the Destination Bucket URL

To specify where your table data should be stored, use the `destination_bucket_url` field at the root of the configuration YAML. This configures the default destination bucket URL for all tables.

```yaml
destination_bucket_url: s3://bucket-name?region=us-east-1
```

Alternatively, you can specify per-table bucket URL overrides within each tables configuration:

```yaml
tables:
    - source_cluster_name: tableflow_cluster_1
      source_topic: example_json_logs_topic
      destination_bucket_url: s3://bucket-name-for-this-table?region=us-east-1
```

{% hint style="warning" %}
The destination bucket URL for a table can only be changed if a table is completely empty. This means that once your table has started ingesting data successfully, the destination bucket **cannot** be changed.
{% endhint %}

Check our [object storage configuration documentation](https://docs.warpstream.com/warpstream/agent-setup/different-object-stores) for more details on how to configure this URL for various different cloud providers, as well as for a complete list of permissions that the Agents will require.

Note that Tables will be created under the `<bucket-name>/warpstream/_tableflow` path. Optionally, [a prefix can be specified in the bucket URL](https://docs.warpstream.com/warpstream/agent-setup/different-object-stores#using-a-bucket-prefix), which will result in Tables being created under the `<bucket-name>/prefix/warpstream/_tableflow` path.

### Configure Topics and Tables

The next step is defining your tables and topics. Each table you define has exactly one source topic, and the table will be named the same as `<cluster-name>+<topic+name`>.

{% hint style="info" %}
Tableflow currently supports append-only tables. If you ingest data from a compacted topic in the source cluster, rows will not be deduplicated and any tombstones may not comply with the schema. Support for compacted topics is coming soon.
{% endhint %}

```yaml
tables:
  - source_cluster_name: tableflow_cluster_1
    source_topic: example_json_logs_topic
    source_format: json
    schema_mode: inline
    input_schema: |
      {
        "type": "object",
        "required": ["entry_id", "date", "lines"],
        "properties": {
          "entry_id": { "type": "string" },
          "date": { "type": "string", "format": "date" },
          "description": { "type": "string" },
          "reference": { "type": "string" },
          "entry_type": {
            "type": "string",
            "enum": ["standard", "adjusting", "closing", "reversing"]
          },
          "lines": {
            "type": "array",
            "items": {
              "type": "object",
              "required": ["account_code", "account_name", "debit", "credit"],
              "properties": {
                "account_code": { "type": "string" },
                "account_name": { "type": "string" },
                "debit": { "type": "number" },
                "credit": { "type": "number" },
                "department": { "type": "string" },
                "cost_center": { "type": "string" }
              }
            }
          }
        }
      }

  - source_cluster_name: tableflow_cluster_2
    source_topic: example_avro_events_topic
    source_format: avro
    schema_mode: inline
    input_schema: |
      {
        "type": "record",
        "name": "ExampleAvroEventsTopic",
        "fields": [
          { "name": "event_id", "type": "string" },
          { "name": "user_id", "type": "long" },
          { "name": "session_id", "type": "string" },
          {
            "name": "profile",
            "type": {
              "type": "record",
              "name": "Profile",
              "fields": [
                { "name": "country", "type": "string" },
                { "name": "language", "type": "string" }
              ]
            }
          },
          {
            "name": "device",
            "type": {
              "type": "record",
              "name": "Device",
              "fields": [
                { "name": "type", "type": "string" },
                {
                  "name": "os",
                  "type": {
                    "type": "record",
                    "name": "DeviceOS",
                    "fields": [
                      { "name": "name", "type": "string" },
                      { "name": "version", "type": "string" },
                      { "name": "vendor", "type": ["null", "string"] }
                    ]
                  }
                },
                {
                  "name": "browser",
                  "type": {
                    "type": "record",
                    "name": "DeviceBrowser",
                    "fields": [
                      { "name": "name", "type": "string" },
                      { "name": "version", "type": "string" }
                    ]
                  }
                },
                {
                  "name": "screen",
                  "type": {
                    "type": "record",
                    "name": "DeviceScreen",
                    "fields": [
                      { "name": "width", "type": "int" },
                      { "name": "height", "type": "int" },
                      { "name": "pixel_ratio", "type": "double" }
                    ]
                  }
                },
                { "name": "model", "type": ["null", "string"] }
              ]
            }
          },
          {
            "name": "cookies",
            "type": {
              "type": "array",
              "items": "string"
            }
          },
          {
            "name": "event_attributes",
            "type": {
              "type": "map",
              "values": "string"
            }
          }
        ]
      }
```

### Schema Definitions

When using inline schemas, provide the schema in the same format as your input records.

`schema_mode` controls where schema definitions come from:

* `inline`: schema is declared directly in the Tableflow config

The guidance in this section is about how to declare schemas when using `schema_mode: inline`.

* If your records are JSON, provide a JSON Schema.
* If your records are Avro, provide an Avro schema.
* If your records are Protobuf, provide a `.proto` schema.

#### Recommended Pattern: `input_schema`-first (agents v771+)

Use `input_schema` as the default way to declare schemas.\
If you are **not** using transforms, define only `input_schema`.

```yaml
tables:
  - source_cluster_name: tableflow_cluster_1
    source_topic: example_events_topic
    source_format: json
    schema_mode: inline
    input_schema: |
      {
        "type": "object",
        "properties": {
          "event_id": { "type": "string" },
          "user_id": { "type": "integer" },
          "created_at": { "type": "string", "format": "date-time" }
        },
        "required": ["event_id"]
      }
```

This is now the preferred pattern because it keeps schema declaration aligned with producer payload format and avoids manually maintaining Iceberg field IDs for simple pipelines.

#### Example: Json Input Schema

Supported types are:

`boolean`, `int`, `long`, `float`, `double`, `decimal`, `date`, `time`, `timestamp`, `timestamptz`, `string`, `uuid`, `fixed`, and `binary`.

Note that map keys can only be `string`

```yaml
tables:
  - source_cluster_name: tableflow_cluster_1
    source_topic: example_events_topic
    source_format: json
    schema_mode: inline
    input_schema: |
      {
        "type": "object",
        "properties": {
          "event_id": { "type": "string" },
          "user_id": { "type": "integer" },
          "created_at": { "type": "string", "format": "date-time" }
        },
        "required": ["event_id"]
      }
```

#### Example: Avro Input Schema

Supported types are:

`boolean`, `int`, `long`, `float`, `double`, `decimal`, `date`, `time`, `timestamp`, `timestamptz`, `string`, `uuid`, `fixed`, and `binary`.

Note that map keys can only be `string.`

```yaml
tables:
  - source_cluster_name: tableflow_cluster_1
    source_topic: example_avro_events_topic
    source_format: avro
    schema_mode: inline
    input_schema: |
      {
        "type": "record",
        "name": "Event",
        "fields": [
          { "name": "event_id", "type": "string" },
          { "name": "user_id", "type": "long" },
          { "name": "amount", "type": ["null", "double"] }
        ]
      }
```

#### Example: Protobuf Input Schema

When `source_format` is `protobuf`, set `wire_format` based on the payload encoding:

* `raw`: protobuf binary payload with no prefix
* `confluent`: Confluent wire format (magic byte + schema ID prefix)

The list of supported types are:

`boolean`, `int32`, `sint32`, `uint32`, `fixed32`, `sfixed32`, `int64`, `sint64`, `uint64`, `fixed64`, `sfixed64`, `float`, `double`, `string`, and `bytes` .

```yaml
tables:
  - source_cluster_name: tableflow_cluster_1
    source_topic: example_protobuf_events_topic
    source_format: protobuf
    wire_format: raw
    schema_mode: inline
    input_schema: |
      syntax = "proto3";

      message Event {
        string event_id = 1;
        int64 user_id = 2;
        Status status = 3;
      }
      
      enum Status {
      	UNKNOWN = 0;
      	ACTIVE = 1;
      	INACTIVE = 2;
      }
```

{% hint style="warning" %}
Enum values are stored by name in Iceberg but identified by number on the wire. This has implications for schema evolution:

* adding new enum values is safe
* renaming enum values is **forbidden** in WarpStream's TableFlow because renaming would cause inconsistent data (old records would have old names, new records would have new names)
* removing old enum values is safe. But note that if we decode a record whose number is not in the current schema, it will be stored in Iceberg as the number in string form (e.g. `"99"`)

**Note:** WarpStream's TableFlow also validates that sibling enum fields have identical sets if they share any value name. This prevents accidental inconsistencies between different fields using the same enum type.
{% endhint %}

The Protobuf types are converted as follows to the Iceberg types:

| Protobuf Type               | Iceberg Type    | Notes                                                                    |
| --------------------------- | --------------- | ------------------------------------------------------------------------ |
| `boolean`                   | `boolean`       |                                                                          |
| `int32`                     | `integer`       |                                                                          |
| `sint32`                    | `integer`       |                                                                          |
| `sfixed32`                  | `integer`       |                                                                          |
| `uint32`                    | `decimal(10,0)` | Stored as decimal(10,0) to prevent overflow (max `uint32` > max `int32`) |
| `fixed32`                   | `decimal(10,0)` | Stored as decimal(10,0) to prevent overflow (max `uint32` > max `int32`) |
| `int64`                     | `long`          |                                                                          |
| `sint64`                    | `long`          |                                                                          |
| `sfixed64`                  | `long`          |                                                                          |
| `uint64`                    | `decimal(20,0)` | Stored as decimal(20,0) to prevent overflow (max `uint64` > max `int64`) |
| `fixed64`                   | `decimal(20,0)` | Stored as decimal(20,0) to prevent overflow (max `uint64` > max `int64`) |
| `float`                     | `float`         |                                                                          |
| `double`                    | `double`        |                                                                          |
| `enum`                      | `string`        | Stored as the enum value name                                            |
| `string`                    | `string`        |                                                                          |
| `bytes`                     | `binary`        |                                                                          |
| `message`                   | `struct`        |                                                                          |
| `map`                       | `map`           |                                                                          |
| `repeated`                  | `list`          |                                                                          |
| `oneof`                     | `struct`        | Converted to a struct where each option is an optional field             |
| `google.protobuf.Timestamp` | `timestamptz`   | Nanosecond precision is truncated to microseconds (Iceberg limitation)   |

**Note:** Iceberg does not have unsigned integer types. To prevent overflow when storing large unsigned values:

* `uint32` and `fixed32` are stored as `decimal(10,0)` (the max `uint32` number `4,294,967,295` has 10 digits)
* `uint64` and `fixed64` are stored as `decimal(20,0)` (the max `uint64` number `18,446,744,073,709,551,615` has 20 digits)

#### JSON Schema Features Currently Not Supported

When using JSON `input_schema`, Tableflow currently rejects JSON Schema documents that use the following keywords:

* `prefixItems`
* `contains`
* `patternProperties`
* `dependentRequired`
* `dependentSchemas`
* `if`
* `then`
* `else`
* `$defs`
* `$ref`
* `allOf`
* `oneOf`
* `anyOf`

If any of these are present, schema conversion fails with an unsupported-feature validation error.

### When to Declare `schema` (Output Schema)

Define `schema` only when you use `transforms` and the post-transform output shape differs from the input shape.

* `input_schema` describes how to decode source records (pre-transform).
* `schema` describes the Iceberg table shape (post-transform).
* When declared as raw schemas (string blocks), both `input_schema` and `schema` must use the same format as `source_format`.

```yaml
tables:
  - source_cluster_name: tableflow_cluster_1
    source_topic: example_cdc_topic
    source_format: avro
    schema_mode: inline
    input_schema: |
      {
        "type": "record",
        "name": "Envelope",
        "fields": [
          {
            "name": "payload",
            "type": {
              "type": "record",
              "name": "Payload",
              "fields": [
                {
                  "name": "after",
                  "type": {
                    "type": "record",
                    "name": "After",
                    "fields": [
                      { "name": "field1", "type": "string" },
                      { "name": "field2", "type": "string" }
                    ]
                  }
                }
              ]
            }
          }
        ]
      }
    transforms:
      - transform_type: bento
        transform: |
          root.field1 = this.payload.after.field1
          root.field2 = this.payload.after.field2
    schema: |
      {
        "type": "record",
        "name": "FlattenedEvent",
        "fields": [
          { "name": "field1", "type": "string" },
          { "name": "field2", "type": "string" }
        ]
      }
```

<details>

<summary>Deprecated schema definitions</summary>

This section contains documentation for the old way of declaring schemas definitions in tableflow. This was cumbersome because it required you to manually specify field-ids and used a schema definition that didn't necessarily map 1 to 1 with the type used to store your data. We still support it for backward compatibility but you shouldn't use it.

As shown in the above example, schemas specified with the `inline` mode contain a list of fields. Each field is named and has a unique integer id that will be mapped to the field ID for your Iceberg table as well as a type that will be used as the Iceberg date type for the corresponding column.

**Primitive Types**

The syntax for defining a primitive field looks like the following:

```yaml
- { name: <field-name>, id: <field-id>, type: <field-type>, optional: { true | false } }
```

where `field-type` is one of the supported fields for your input record type (refer to the Protobuf / Avro / Json sections above).

**Nested Types**

**For Avro and JSON only**

A `struct` is specified as a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. Fields can be of any type.

```yaml
- name: <struct-field-name>
  id: <struct-field-id>
  type: struct
  fields:
    - { name: <struct-field-1>, id: <struct-field-id-1>, type: <struct-field-type-1>, optional: { true | false} }
    - { name: <struct-field-2>, id: <struct-field-id-2>, type: <struct-field-type-2>, optional: { true | false} }
    - { name: <struct-field-3>, id: <struct-field-id-3>, type: <struct-field-type-3>, optional: { true | false} }
```

**For Avro, JSON and Protobuf**

A `map` is specified as a key/value pair. Both the key field and value field have an integer id that is unique in the table schema. While map values can be either optional or required, map keys are required. For both Avro and JSON schemas, map keys can only be of type `string`, but values can be of any type. For Protobuf, map keys can be of any map key type allowed by the Protobuf specs, namely: `string`, `int32`, `int64`, `uint32`, `uint64`, `sint32`, `sint64`, `sfixed32`, `sfixed64`, `fixed32`, `fixed64`, `boolean`.

```yaml
- name: <map-field-name>
  id: <map-field-id>
  type: map
  fields:
    - { name: <key-name>, id: <key-field-id>, type: <key-field-type> }
    - { name: <value-name>, id: <value-field-id, type: <value-field-id>, optional: { true | false} }
```

Note that for Protobuf, the `proto_field_number` must be added to the map field but should not be set for the key nor value. So a full example for a Protobuf map is:

```yaml
- name: <map-field-name>
  id: <map-field-id>
  proto_field_number: <map-proto-field-number>
  type: map
  fields:
    - { name: <key-name>, id: <key-field-id>, type: <key-field-type> }
    - { name: <value-name>, id: <value-field-id, type: <value-field-id>, optional: { true | false} }
```

A `list` (in JSON and Avro) or `repeated` (in Protobuf) is specified with a single element field. The element field is named `element` and has an integer id that is unique in the table schema. Elements can be either optional\* or required and can be of any type.

```yaml
- name: <list-field-name>
  id: <list-field-id>
  type: list
  fields:
    - { name: element, id: <element-field-id>, type: <element-field-type> }
```

Note that for Protobuf, the `proto_field_number` must be added to the repeated field but should not be set for the element. So a full example for a Protobuf repeated is:

```yaml
- name: <repeated-field-name>
  id: <repeated-field-id>
  proto_field_number: <repeated-proto-field-number>
  type: repeated
  fields:
    - { name: element, id: <element-field-id>, type: <element-field-type> }
```

\*Support for optional list elements will be coming soon.

**For Avro only**

AVRO's binary encoding does not include field names nor type information and instead values are concatenated strictly based on the schema. Consequently, the decoder interprets the stream of bytes strictly according to the sequence defined in the schema.

**You must list fields in your table schema in the same order as in the producer’s AVRO schema.** Reordering fields (e.g. by id or by logical group) can cause decode failures such as `avro: ReadBool: invalid bool` or `unexpected EOF`. If you see these errors, compare your schema field order to the producer’s (e.g. the `.avsc` file or the schema in your schema registry) and align the order.

**For Protobuf only**

A `message` is defined using `type: message`. A `proto_field_number` must be provided for every nested field.

```yaml
- name: <message-field-name>
  id: <message-field-id>
  proto_field_number: <message-proto-field-number>
  type: message
  fields:
    - { name: <message-field-1>, id: <message-field-id-1>, proto_field_number: <message-field-id-1-proto-field-number>, type: <message-field-type-1>, optional: { true | false} }
    - { name: <message-field-2>, id: <message-field-id-2>, proto_field_number: <message-field-id-2-proto-field-number>, type: <message-field-type-2>, optional: { true | false} }
    - { name: <message-field-3>, id: <message-field-id-3>, proto_field_number: <message-field-id-3-proto-field-number>, type: <message-field-type-3>, optional: { true | false} }
```

An `enum` is stored as a string in the Iceberg table using the enum value name. You must define all enum values with their corresponding numbers.

```yaml
- name: <enum-name>
  id: <enum-id>
  proto_field_number: <enum-proto-field-number>
  type: enum
  enum_values:
    - { name: <enum-value-0>, number: <0> }
    - { name: <enum-value-1>, number: <1> }
    - { name: <enum-value-2>, number: <2> }
```

{% hint style="warning" %}
Enum values are stored by name in Iceberg but identified by number on the wire. This has implications for schema evolution:

* adding new enum values is safe
* renaming enum values is **forbidden** in WarpStream's TableFlow because renaming would cause inconsistent data (old records would have old names, new records would have new names)
* removing old enum values is safe. But note that if we decode a record whose number is not in the current schema, it will be stored in Iceberg as the number in string form (e.g. `"99"`)

**Note:** WarpStream's TableFlow also validates that sibling enum fields have identical sets if they share any value name. This prevents accidental inconsistencies between different fields using the same enum type.
{% endhint %}

A `oneof` field is defined with `type: oneof`. Each option must be explicitly set to optional and with a `proto_field_number`, but the oneof field itself must be defined as required and without any `proto_field_number`.

```yaml
- name: <oneof-field-name>
  id: <oneof-field-id>
  type: oneof
  fields:
    - { name: <oneof-field-1>, id: <oneof-field-id-1>, proto_field_number: <oneof-field-id-1-proto-field-number>, type: <oneof-field-type-1>, optional: true }
    - { name: <oneof-field-2>, id: <oneof-field-id-2>, proto_field_number: <oneof-field-id-2-proto-field-number>, type: <oneof-field-type-2>, optional: true }
    - { name: <oneof-field-3>, id: <oneof-field-id-3>, proto_field_number: <oneof-field-id-3-proto-field-number>, type: <oneof-field-type-3>, optional: true }
```

</details>

### Partitioning

Tableflow supports unpartitioned, timestamp partitioned tables using the record timestamp, and custom partitioning. Tables are unpartitioned by default and migrating between partitioning schemes is currently not supported.

The partitioning scheme is specified on a per-table basis. For convenience the `partitioning_scheme` option can be used to define unpartitioned tables and timestamp partitioned tables using the record timestamp. Supported values for `partitioning_scheme` include `unpartitioned`, `hour`, `day`, `month`, and `year`.

```yaml
tables:
  - source_topic: example_json_logs_topic
    ...
    partitioning_scheme: hour
    ...
```

If a custom partitioning scheme is needed, then the `custom_partitioning` option can be used as follows:

```yaml
tables:
    - source_topic: example_json_logs_topic
      ...
      custom_partitioning:
        - source_field_path: <field_path_1>
          name: <field_name_1>
          transform: { name: <transform_name_1> }
        - source_field_path: <field_path_2>
          name: <field_name_2>
          transform: { name: <transform_name_2> }
      ...
```

| Field             | Description                                                                                                                                                                                          |
| ----------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `source_field_id` | **The Input Column**. The ID of the field in your schema (or a [default field](#table-schema)) that you want to partition on.                                                                        |
| `name`            | **The Partition Alias**. This is the name given to the partition itself. It does not have to match the source field name. It must start with a letter and contain only alphanumerics or underscores. |
| `transform`       | An object defining the transform to be applied to the source column to produce a partition value.                                                                                                    |

The `transform` object requires a `name` and, depending on the type, additional parameters:

* Time-based: `year`, `month`, `day`, `hour`.
* `bucket`: Requires an `n` field (e.g., `{ name: "bucket", n: 16 }`) to specify the number of buckets.
* `truncate`: Requires a `w` field (e.g., `{ name: "truncate", w: 10 }`) to specify the width of the truncation.
* `identity`: Uses the source value as-is.

{% hint style="info" %}
Note that to use custom partitioning the Agent version needs to be at least v748.
{% endhint %}

For example, to create hourly partitions on the Kafka timestamp and bucket the Kafka partitions into fours bins the configuration would look like this:

```yaml
tables:
    - source_topic: example_json_logs_topic
      ...
      custom_partitioning:
        - source_field_path: warpstream.timestamp
          name: timestamp_hour
          transform: { name: hour }
        - source_field_path: warpstream.bucket
          name: partition_bucket
          transform: { name: bucket, n: 4 }
      ...
```

### Transforms (Agent v730+)

Tableflow supports applying stateless transformations to ingested records. This can be helpful to massage the data into the desired shape before inserting it into the table without having to reprocess the data into a completely different topic first.

For example, imagine the topic contains a [CDC change stream from debezium](https://debezium.io/documentation/reference/stable/integrations/serdes.html) that looks like the following:

```json
{
    "schema": {...},
    "payload": {
    	"op": "u",
    	"source": {
    		...
    	},
    	"ts_ms" : "...",
    	"ts_us" : "...",
    	"ts_ns" : "...",
    	"before" : {
    		"field1" : "oldvalue1",
    		"field2" : "oldvalue2"
    	},
    	"after" : {
    		"field1" : "newvalue1",
    		"field2" : "newvalue2"
    	}
	}
}
```

Without a transform, the table schema would have to be defined as follows:

```yml
source_format: json
input_schema: |
  {
    "type": "object",
    "properties": {
      "payload": {
        "type": "object",
        "properties": {
          "after": {
            "type": "object",
            "properties": {
              "field1": { "type": "string" },
              "field2": { "type": "string" }
            }
          }
        }
      }
    }
  }
```

This is unfortunate because users querying the data would always have to write their queries in the form: `SELECT payload.after.field1` instead of simply `SELECT field1`.

Transforms solve this problem by rewriting the structure of the record before applying the table schema. For example, we can rewrite the Tableflow configuration above as follows:

```yaml
source_format: json
transforms:
  - transform_type: bento
    transform: |
      root.field1 = this.payload.after.field1
      root.field2 = this.payload.after.field2
input_schema: |
  {
    "type": "object",
    "properties": {
      "field1": { "type": "string" },
      "field2": { "type": "string" }
    }
  }
```

Note that while the example above works for a JSON workload, it would not work for an Avro or Protobuf workload. The reason for this is that JSON is self-describing, while Avro and Protobuf records can only be deserialized if the schema is known ahead of time.

However, when transformations are used, records may only match the schema defined in the `schema` field **after** the transformation is applied. As a result, Avro and Protobuf workloads may require a separate `input_schema` which defines how to deserialize the input records pre-transformation.

For example, if the previous use-case were Avro encoded our Tableflow configuration would look like this:

```yaml
source_format: avro
input_schema: |
  {
    "type": "record",
    "name": "InputRecord",
    "fields": [
      {
        "name": "payload",
        "type": {
          "type": "record",
          "name": "Payload",
          "fields": [
            {
              "name": "after",
              "type": {
                "type": "record",
                "name": "After",
                "fields": [
                  { "name": "field1", "type": "string" },
                  { "name": "field2", "type": "string" }
                ]
              }
            }
          ]
        }
      }
    ]
  }
transforms:
  - transform_type: bento
    transform: |
      root.field1 = this.payload.after.field1
      root.field2 = this.payload.after.field2
schema: |
  {
    "type": "record",
    "name": "OutputRecord",
    "fields": [
      { "name": "field1", "type": "string" },
      { "name": "field2", "type": "string" }
    ]
  }
```

Note that field IDs are not required for `input_schema`. Also, the `optional` property is not enforced by `input_schema`, all fields are treated as optional, so whether or not a field is optional or required is still ultimately determined by the value of `schema` even when `input_schema` and `transforms` are being used.

In summary, pre-transformation processed records must match `input_schema` and post-transformation must match `schema` . If `input_schema` is not defined, then it defaults to the same value as `schema`. `input_schema` is never required when processing JSON payloads, but may be required when processing Avro and Protobuf payloads.

Separately, keep in mind that transforms can be chained:

```yaml
transforms:
  - transform_type: bento
    transform: |
      root.field1 = this.payload.after.field1
  - transform_type: bento
    transform: |
      root.field2 = this.payload.after.field2
```

Tableflow transformations are executed by running arbitrary Bento Bloblang programs, but are limited to "pure" Bloblang functions that have no external side-effects.

Bloblang is a rich turing-complete programming language with many [features](https://warpstreamlabs.github.io/bento/docs/guides/bloblang/about), [functions](https://warpstreamlabs.github.io/bento/docs/guides/bloblang/functions), [methods](https://warpstreamlabs.github.io/bento/docs/guides/bloblang/methods), [conditionals](https://warpstreamlabs.github.io/bento/docs/guides/bloblang/walkthrough#conditionals), and even [error-handling](https://warpstreamlabs.github.io/bento/docs/guides/bloblang/walkthrough#error-handling). You can read more about Bloblang and its capabilities in the [Bento Bloblang documentation](https://warpstreamlabs.github.io/bento/docs/guides/bloblang/about), but but the basics are quite straightforward and can be grasped with a few examples.

The key thing to understand about Bloblang transformations is that they're mapping functions that mutate the input record into the desired shape. Within the context of a Tableflow Bloblang mapping, the `this` keyword refers to the input record and the `root` keyword refers to the output record. See the examples below to learn how to perform the most common transformations.

{% hint style="success" %}
The Bento website has a [powerful and interactive Bloblang playground](https://warpstreamlabs.github.io/bento/docs/guides/bloblang/playground) that can be used to experiment with Bloblang mapping programs.
{% endhint %}

#### Rename a field

```python
root.new_field = this.old_field
```

#### Delete a field

```python
root.unwanted_field = deleted()
```

#### Add a field

```python
root.uppercase_name = this.name.uppercase()
```

#### Drop / filter out an entire record

```python
if this.name == "foo" {
    root = deleted()
} else {
    root.name = this.name.uppercase()
}
```

#### Type Conversions

```python
this.user_age = this.user_age.string()
```

### Data Retention and TTL

By default, data is retained in the table indefinitely. Optionally, a retention period can specified using the `retention_ttl` field. Retention must be expressed in units of hour (`h`).

<pre class="language-yaml"><code class="lang-yaml"><strong>tables:
</strong>    - source_topic: example_json_logs_topic
      retention_ttl: 720h ## 30d
      ...
</code></pre>

### Starting ingestion at the latest offsets

By default, data is ingested from the start of the topic. You can optionally override this setting to instead start at the latest offsets (skipping any data previously stored in your input topic) via the `start_ingestion_at` field:

<pre class="language-yaml"><code class="lang-yaml"><strong>tables:
</strong>    - source_topic: example_json_logs_topic
      start_ingestion_at: latest
      ...
</code></pre>

The supported values are:

* `latest`
* `earliest`

### Dead Letter Queue (DLQ) Mode

{% hint style="info" %}
**Requires Agent v737 or higher.**
{% endhint %}

By default, Tableflow stops ingestion when it sees records that are incompatible with the provided schema to avoid head of line blocking. This behavior can be overridden using the `dlq_mode` field. Supported values include:

* `stop`, which blocks ingestion upon encountering an invalid record (*this is the default*)
* `skip`, which skips invalid records during ingestion

<pre class="language-yaml"><code class="lang-yaml"><strong>tables:
</strong>    - source_topic: example_json_logs_topic
      dlq_mode: stop
      ...
</code></pre>

### Compression codecs

{% hint style="info" %}
**Requires Agent v748 or higher.**
{% endhint %}

Tableflow supports a few compression codecs for the stored data files. The default one is `snappy` .

This codec can be overridden using the `compression` field. Supported values include:

* `snappy`
* `gzip`
* `lz4`
* `zstd`
* `brotli`
* `none` To disable compression

<pre class="language-yaml"><code class="lang-yaml"><strong>tables:
</strong>    - source_topic: example_json_logs_topic
      compression: zstd
      ...
</code></pre>

### Skipping raw record values

{% hint style="info" %}
**Requires Agent v749 or higher.**
{% endhint %}

If you don't need to access the raw record values (the ones coming from the kafka topics) you can set the `skip_raw_record_values` to true in your config. This will result in smaller data files.

<pre class="language-yaml"><code class="lang-yaml"><strong>tables:
</strong>    - source_topic: example_json_logs_topic
      skip_raw_record_values: true
      ...
</code></pre>

### Handling topic re-creation

To define the table ingestion behavior when the source topic is recreated, use the `topic_recreation_policy` setting.\
\
Currently, the only supported policy is `recreate_table`. This ensures data integrity by creating a new table (with a different identifier) whenever the system detects that the source topic has been re-created.

<pre class="language-yaml"><code class="lang-yaml"><strong>tables:
</strong>    - source_topic: example_json_logs_topic
      topic_recreation_policy: recreate_table
      ...
</code></pre>

### Pausing a Table

To temporarily stop ingestion for a specific table without removing it from your configuration, set `paused` to `true`:

```yaml
tables:
    - source_topic: example_json_logs_topic
      paused: true
      ...
```

To resume, set `paused: false` (or remove the field) and deploy the updated configuration. Ingestion will pick up from where it left off.

## Table Schema

Tableflow creates an Iceberg table with a struct schema, containing all the fields from the configured schema as well as the following default fields:

| Name                 | Field ID | Type      |
| -------------------- | -------- | --------- |
| warpstream           | 10000000 | struct    |
| warpstream.partition | 10000001 | int       |
| warpstream.offset    | 10000002 | long      |
| warpstream.key       | 10000003 | binary    |
| warpstream.value     | 10000004 | binary    |
| warpstream.timestamp | 10000005 | timestamp |

The `warpstream.value` field can be ommited with the `skip_raw_record_values` [option](#skipping-raw-record-values-agent-v749).

## Schema Migrations

Tableflow follows the [Apache Iceberg schema evolution rules](https://iceberg.apache.org/spec/#schema-evolution). Schema migrations are supported for adding columns, changing fields from required to optional, and widening integer and floating point types. To perform any of these operations, update the schema in the Configuration editor and deploy it. In the next few syncs of the table metadata into your bucket, the schema change will be reflected.

{% hint style="warning" %}
Ensure that you execute a schema change before attempting to send data with the new schema. If you fail to do this, you will potentially lose the data written with the newer schema.
{% endhint %}

### Breaking Changes

Any schema change that is not one of the compatible operations listed above is considered a breaking change and requires [Table Recreation](#table-recreation). Specifically, the following changes are breaking:

* **Make an optional field required** — Existing data may contain nulls for that field, so the constraint cannot be applied retroactively.
* **Change a type in a non-widening way** — For example, `long` → `int`, `double` → `float`, or `string` → `int`. This includes any data type change that is not a supported numeric widening.
* **Drop a column** — Removing a column from the schema is not supported as an in-place migration.
* **Reorder columns** — Changing the order of columns is not supported as an in-place migration.
* **Rename a column** — Renaming a column is not currently supported as an in-place migration.

To apply breaking changes, use the `recreation_key` mechanism described in [Table Recreation](#table-recreation) below.

## Table Recreation

Certain operations require an existing table to be completely deleted and recreated. While this is most commonly necessary to apply breaking schema changes (such as converting an optional field to required, performing non-widening type conversions, or dropping columns), a full rebuild may also be required for other operational reasons. The `recreation_key` parameter is designed to automate this workflow.

To utilize this feature, assign an initial string value to the `recreation_key` in your table configuration. Any change to this value acts as a direct trigger for table recreation. When you need to rebuild a table, simply update the `recreation_key` to a new string and deploy the new configuration. Tableflow will detect the change, automatically drop the existing table, and provision a new one with the updated config.

{% hint style="danger" %}
Note that adding a recreation key to a table that does not already specify this option will trigger a recreation.
{% endhint %}

The following example demonstrates how to use the `recreation_key` to apply backward-incompatible schema changes:

```yaml
tables:
    - source_cluster_name: tableflow_cluster_1
      source_topic: example_json_logs_topic
      source_format: json
      recreation_key: "v1"
      schema_mode: inline
      schema:
        fields:
          - { name: environment, type: string, id: 1 }
          - { name: service, type: string, id: 2, optional: true }
```

To make the `service` field required and add a required `severity` field, bump the value (e.g. `"v1"` to `"v2"`) alongside your schema change:

```yaml
      recreation_key: "v2"
      schema:
        fields:
          - { name: environment, type: string, id: 1 }
          - { name: service, type: string, id: 2 }
          - { name: severity, type: int, id: 3 }
```

After recreation, Tableflow will re-ingest data from the earliest available offset in the source topic (subject to the topic's retention policy) into the new table.

{% hint style="warning" %}
Changing `recreation_key` will hard-delete the existing table's metadata. Data already written to the object store is not removed. The new table will only contain data that is still available in the source topic based on its retention settings.
{% endhint %}

## Table Deletion

Tableflow does not delete any tables from the object storage bucket when they are removed from the Configuration in order to prevent accidental data deletion. To delete a table, first delete it from the Configuration and then use your cloud provider's UI or CLI to delete the directory containing your table from within the `warpstream/_tableflow` directory. To programmatically delete and recreate a table (e.g. for incompatible schema changes), see [Table Recreation](#table-recreation).

## Object Storage Path Layout

Tableflow writes all table data and metadata into your object storage bucket under a predictable directory structure. Understanding this layout is useful for debugging, removing an entire table directory, or integrating with query engines that need direct file paths.

Given a bucket URL of `s3://my-bucket` (or `s3://my-bucket?prefix=my-prefix`), the structure is:

```
s3://my-bucket/[my-prefix/]warpstream/_tableflow/
└── <table_name>-<table_uuid>/
    ├── data/
    │   ├── 00000000000000000001.parquet
    │   ├── 00000000000000000002.parquet
    │   └── ...
    └── metadata/
        ├── v1.metadata.json
        ├── v2.metadata.json
        ├── ...
        ├── version-hint.text
        ├── snap-<id>.avro
        └── mani-<id>.avro
```

| Path                         | Contents                                                                                                                                                                                                                   |
| ---------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `warpstream/_tableflow/`     | Root directory for all Tableflow tables.                                                                                                                                                                                   |
| `<table_name>-<table_uuid>/` | One directory per table.                                                                                                                                                                                                   |
| `data/`                      | Parquet data files.                                                                                                                                                                                                        |
| `metadata/`                  | Iceberg metadata: JSON metadata files (`v1.metadata.json`, `v2.metadata.json`, ...), manifest files (`mani-*.avro`), manifest lists (`snap-*.avro`), and a `version-hint.text` that points to the latest metadata version. |

{% hint style="info" %}
When a table is deleted from the configuration or via the API, Tableflow only removes the control plane metadata. The data and metadata files in object storage are **not** deleted. You must clean them up manually using your cloud provider's UI or CLI.
{% endhint %}

## Terraform / Infrastructure as Code

Tableflow Agents are deployed using the standard [WarpStream Agent chart](https://github.com/warpstreamlabs/charts/tree/main/charts/warpstream-agent), and there is full support for Tableflow clusters in the [WarpStream Terraform provider](https://registry.terraform.io/providers/warpstreamlabs/warpstream/latest/docs/resources/tableflow_cluster).

[Click here](https://github.com/warpstreamlabs/terraform-provider-warpstream/blob/main/examples/tableflow/main.tf) for a complete example of creating a Tableflow cluster and configuring it to ingest a single topic into an Iceberg table.

## Observability

Ingestion lag for tables is emitted as a metric from the WarpStream Tableflow Agents. This metric is also available visually within the WarpStream Console. This metric is called `warpstream_tableflow_partition_time_lag_seconds`. Offset lag will be available at a later date.

This metric is tagged by `table.id`, `table.name`, `topic`, and `partition`. `partition`-level tagging can be disabled with the `WARPSTREAM_DISABLE_CONSUMER_GROUP_METRICS_TAGS=partition` environment variable.

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-61c43a069986c768ed1d8da7420fbb2b6619dbea%2FScreenshot%202025-07-29%20at%208.54.20%E2%80%AFPM.png?alt=media" alt=""><figcaption></figcaption></figure>

## Agent Version Requirements

Different Tableflow features require different minimum Agent versions. The table below summarizes the requirements:

| Feature                                               | Minimum Agent Version |
| ----------------------------------------------------- | --------------------- |
| Tableflow (core ingestion, metadata sync, compaction) | v710                  |
| AWS Glue integration                                  | v710                  |
| Transforms (Bento/Bloblang)                           | v730                  |
| Dead Letter Queue (`dlq_mode: stop` / `skip`)         | v737                  |
| BigQuery integration                                  | v737                  |
| Custom partitioning                                   | v748                  |
| Compression codecs                                    | v748                  |
| `skip_raw_record_values`                              | v749                  |
| `google.protobuf.Timestamp` as partitioning field     | v764                  |
| BigLake Metastore integration                         | v769                  |
| Hive Metadata Store integration                       | v769                  |
| `input_schema` without manual Iceberg field IDs       | v771                  |

{% hint style="info" %}
Always check the [Change Log](https://docs.warpstream.com/warpstream/overview/change-log) for the latest feature additions and bug fixes. When in doubt, use the latest stable Agent version.
{% endhint %}

## Tableflow UI

The Tableflow UI available in the WarpStream console allows editing the Configuration.

<figure><img src="https://77315434-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FjB7FxO8ty4EXO4HsQP4E%2Fuploads%2Fgit-blob-e762508abae500664cbee3218509e739041886c8%2FScreenshot%202025-08-01%20at%209.55.18%E2%80%AFAM.png?alt=media" alt=""><figcaption></figcaption></figure>
