Skip to content

Commit b0ad9d4

Browse files
authored
fix!: Split max index length between PKs (#13569)
#### Summary Fixes #13486. The max length of an index is either `3072` bytes or `767` bytes. Since we use `utf8mb4` which can use up to 4 bytes per char, we max out at either `768` or `191` chars (the number in bytes divided by `4`). The current code allocates `191` chars per PK which is OK if a table has 4 PKs since `191*4= 764`, but fails if a table has more than that. See list of tables [here](#13486 (comment)) The solution is to get the max number of bytes based on the row format, and evenly distribute the max length between PKs. This is still not the best solution as we assign the same number of bytes to a UUID and a `text` column (for example CQId is `UUID`), but I don't think it's very common to have one UUID PK and another `text` PK and enough other PKs so the prefix is too short. <!--
1 parent f22bfa6 commit b0ad9d4

File tree

3 files changed

+32
-19
lines changed

3 files changed

+32
-19
lines changed

plugins/destination/mysql/client/client.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type Client struct {
3232
writer *batchwriter.BatchWriter
3333
serverType ServerType
3434
serverVersion string
35+
36+
maxIndexLength int
3537
}
3638

3739
func New(ctx context.Context, logger zerolog.Logger, spec []byte, _ plugin.NewClientOptions) (plugin.Client, error) {
@@ -77,6 +79,8 @@ func New(ctx context.Context, logger zerolog.Logger, spec []byte, _ plugin.NewCl
7779
return nil, err
7880
}
7981

82+
c.setMaxIndexLength(ctx)
83+
8084
return c, nil
8185
}
8286

@@ -118,6 +122,26 @@ func (c *Client) getVersion(ctx context.Context) error {
118122
return nil
119123
}
120124

125+
func (c *Client) setMaxIndexLength(ctx context.Context) {
126+
const maxIndexLengthInBytes = 3072
127+
row := c.db.QueryRowContext(ctx, "show variables like 'innodb_default_row_format'")
128+
var varName sql.NullString
129+
var rowFormat sql.NullString
130+
err := row.Scan(&varName, &rowFormat)
131+
if err != nil {
132+
c.logger.Warn().Err(err).Msgf("failed to detect max index length, using default value of %d bytes", maxIndexLengthInBytes)
133+
}
134+
135+
// In MySQL >= 5.7 the max PK length is 3072 bytes for dynamic or compressed row format, and 767 bytes for redundant or compact row format.
136+
// We need to divide the max length in bytes by 4, since we use utf8mb4 charset, which can take up to 4 bytes per character.
137+
switch rowFormat.String {
138+
case "redundant", "compact":
139+
c.maxIndexLength = 767 / 4
140+
default:
141+
c.maxIndexLength = maxIndexLengthInBytes / 4
142+
}
143+
}
144+
121145
func (c *Client) Close(ctx context.Context) error {
122146
if err := c.writer.Close(ctx); err != nil {
123147
_ = c.db.Close()

plugins/destination/mysql/client/schema.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,13 @@ func identifier(name string) string {
1313
return fmt.Sprintf("`%s`", name)
1414
}
1515

16-
const maxPrefixLength = 191
17-
1816
const columnQuery = `SELECT
1917
cols.COLUMN_NAME,
2018
COLUMN_TYPE,
2119
IS_NULLABLE,
22-
constraint_type,
23-
sub_part
20+
constraint_type
2421
FROM
2522
INFORMATION_SCHEMA.COLUMNS AS cols
26-
LEFT JOIN information_schema.STATISTICS as stats on
27-
cols.table_schema = stats.table_schema and
28-
cols.TABLE_NAME = stats.table_name and
29-
cols.COLUMN_NAME = stats.column_name and index_name = 'PRIMARY'
3023
LEFT JOIN
3124
(SELECT
3225
tc.constraint_schema,
@@ -62,8 +55,7 @@ func (c *Client) getTableColumns(ctx context.Context, tableName string) ([]schem
6255
var typ string
6356
var nullable string
6457
var constraintType *string
65-
var subpart *int
66-
if err := rows.Scan(&name, &typ, &nullable, &constraintType, &subpart); err != nil {
58+
if err := rows.Scan(&name, &typ, &nullable, &constraintType); err != nil {
6759
return nil, err
6860
}
6961

@@ -72,10 +64,6 @@ func (c *Client) getTableColumns(ctx context.Context, tableName string) ([]schem
7264
if constraintType != nil {
7365
primaryKey = strings.Contains(*constraintType, "PRIMARY KEY")
7466
}
75-
// subpart only non nil for pks on blob/text columns
76-
if subpart != nil && *subpart != maxPrefixLength {
77-
primaryKey = false
78-
}
7967
columns = append(columns, schema.Column{
8068
Name: name,
8169
Type: schemaType,
@@ -172,15 +160,15 @@ func (c *Client) createTable(ctx context.Context, table *schema.Table) error {
172160
if len(primaryKeysIndices) > 0 {
173161
builder.WriteString(",\n ")
174162
builder.WriteString(" PRIMARY KEY (")
163+
lengthPerPk := c.maxIndexLength / len(primaryKeysIndices)
175164
for i, pk := range primaryKeysIndices {
176165
column := table.Columns[pk]
177166
builder.WriteString(identifier(column.Name))
178167
sqlType := arrowTypeToMySqlStr(column.Type)
179168
if sqlType == "blob" || sqlType == "text" {
180169
// `blob/text` SQL types require specifying prefix length to use for the primary key
181170
// https://dev.mysql.com/doc/refman/8.0/en/innodb-limits.html
182-
// The index key prefix length limit is 767 bytes for InnoDB tables that use the REDUNDANT or COMPACT row format. For example, you might hit this limit with a column prefix index of more than 191 characters on a TEXT or VARCHAR column, assuming a utf8mb4 character set and the maximum of 4 bytes for each character.
183-
builder.WriteString("(" + strconv.Itoa(maxPrefixLength) + ")")
171+
builder.WriteString("(" + strconv.Itoa(lengthPerPk) + ")")
184172
}
185173
if i < len(primaryKeysIndices)-1 {
186174
builder.WriteString(", ")

plugins/destination/mysql/client/write.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,17 @@ func (c *Client) writeResources(ctx context.Context, query string, table *schema
6060
return err
6161
}
6262

63-
// log a warning that a blob or text field that is a PK has more than 191 characters
6463
for _, record := range transformedRecords {
6564
for _, truncatablePKIndex := range pks {
66-
if len(record[truncatablePKIndex].(string)) > maxPrefixLength {
65+
// log a warning that a blob or text field that is a PK has more than the limit
66+
lengthPerPk := c.maxIndexLength / len(pks)
67+
if len(record[truncatablePKIndex].(string)) > lengthPerPk {
6768
indexes := table.PrimaryKeysIndexes()
6869
pkValues := make(map[string]any, len(indexes))
6970
for i, pkIndex := range indexes {
7071
pkValues[table.Columns[pkIndex].Name] = record[i]
7172
}
72-
c.logger.Debug().Any("pk_values", pkValues).Msgf("record contains a primary key that is longer than MySQL can handle. only the first %d will be included in the index", maxPrefixLength)
73+
c.logger.Debug().Any("pk_values", pkValues).Msgf("record contains a primary key that is longer than MySQL can handle. only the first %d will be included in the index", lengthPerPk)
7374
tablesWithTruncation[table.Name] = true
7475
break
7576
}

0 commit comments

Comments
 (0)