Skip to content

Commit 7eef56a

Browse files
committed
fix: Split int, float types
1 parent 19d8907 commit 7eef56a

File tree

2 files changed

+95
-47
lines changed

2 files changed

+95
-47
lines changed

plugins/destination/mysql/client/read.go

Lines changed: 58 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,35 @@ func (*Client) createResultsArray(table *arrow.Schema) []any {
2222
case *arrow.BooleanType:
2323
var r sql.NullBool
2424
results = append(results, &r)
25-
case *arrow.Int8Type, *arrow.Uint8Type, *arrow.Int16Type, *arrow.Uint16Type, *arrow.Int32Type, *arrow.Uint32Type, *arrow.Int64Type, *arrow.Uint64Type:
26-
var r sql.NullInt64
25+
case *arrow.Int8Type:
26+
var r *int8
27+
results = append(results, &r)
28+
case *arrow.Uint8Type:
29+
var r *uint8
30+
results = append(results, &r)
31+
case *arrow.Int16Type:
32+
var r *int16
33+
results = append(results, &r)
34+
case *arrow.Uint16Type:
35+
var r *uint16
36+
results = append(results, &r)
37+
case *arrow.Int32Type:
38+
var r *int32
39+
results = append(results, &r)
40+
case *arrow.Uint32Type:
41+
var r *uint32
42+
results = append(results, &r)
43+
case *arrow.Int64Type:
44+
var r *int64
45+
results = append(results, &r)
46+
case *arrow.Uint64Type:
47+
var r *uint64
48+
results = append(results, &r)
49+
case *arrow.Float32Type:
50+
var r *float32
2751
results = append(results, &r)
28-
case *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type:
29-
var r sql.NullFloat64
52+
case *arrow.Float64Type:
53+
var r *float64
3054
results = append(results, &r)
3155
case *arrow.StringType, *arrow.LargeStringType:
3256
var r sql.NullString
@@ -60,74 +84,74 @@ func reverseTransform(table *arrow.Schema, values []any) (arrow.Record, error) {
6084
recordBuilder.Field(i).(*array.BooleanBuilder).AppendNull()
6185
}
6286
case *arrow.Int8Type:
63-
v := val.(*sql.NullInt64)
64-
if !v.Valid {
87+
v := val.(**int8)
88+
if *v == nil {
6589
recordBuilder.Field(i).AppendNull()
6690
} else {
67-
recordBuilder.Field(i).(*array.Int8Builder).Append(int8(v.Int64))
91+
recordBuilder.Field(i).(*array.Int8Builder).Append(**v)
6892
}
6993
case *arrow.Int16Type:
70-
v := val.(*sql.NullInt64)
71-
if !v.Valid {
94+
v := val.(**int16)
95+
if *v == nil {
7296
recordBuilder.Field(i).AppendNull()
7397
} else {
74-
recordBuilder.Field(i).(*array.Int16Builder).Append(int16(v.Int64))
98+
recordBuilder.Field(i).(*array.Int16Builder).Append(**v)
7599
}
76100
case *arrow.Int32Type:
77-
v := val.(*sql.NullInt64)
78-
if !v.Valid {
101+
v := val.(**int32)
102+
if *v == nil {
79103
recordBuilder.Field(i).AppendNull()
80104
} else {
81-
recordBuilder.Field(i).(*array.Int32Builder).Append(int32(v.Int64))
105+
recordBuilder.Field(i).(*array.Int32Builder).Append(**v)
82106
}
83107
case *arrow.Int64Type:
84-
v := val.(*sql.NullInt64)
85-
if !v.Valid {
108+
v := val.(**int64)
109+
if *v == nil {
86110
recordBuilder.Field(i).AppendNull()
87111
} else {
88-
recordBuilder.Field(i).(*array.Int64Builder).Append(v.Int64)
112+
recordBuilder.Field(i).(*array.Int64Builder).Append(**v)
89113
}
90114
case *arrow.Uint8Type:
91-
v := val.(*sql.NullInt64)
92-
if !v.Valid {
115+
v := val.(**uint8)
116+
if *v == nil {
93117
recordBuilder.Field(i).AppendNull()
94118
} else {
95-
recordBuilder.Field(i).(*array.Uint8Builder).Append(uint8(v.Int64))
119+
recordBuilder.Field(i).(*array.Uint8Builder).Append(**v)
96120
}
97121
case *arrow.Uint16Type:
98-
v := val.(*sql.NullInt64)
99-
if !v.Valid {
122+
v := val.(**uint16)
123+
if *v == nil {
100124
recordBuilder.Field(i).AppendNull()
101125
} else {
102-
recordBuilder.Field(i).(*array.Uint16Builder).Append(uint16(v.Int64))
126+
recordBuilder.Field(i).(*array.Uint16Builder).Append(**v)
103127
}
104128
case *arrow.Uint32Type:
105-
v := val.(*sql.NullInt64)
106-
if !v.Valid {
129+
v := val.(**uint32)
130+
if *v == nil {
107131
recordBuilder.Field(i).AppendNull()
108132
} else {
109-
recordBuilder.Field(i).(*array.Uint32Builder).Append(uint32(v.Int64))
133+
recordBuilder.Field(i).(*array.Uint32Builder).Append(**v)
110134
}
111135
case *arrow.Uint64Type:
112-
v := val.(*sql.NullInt64)
113-
if !v.Valid {
136+
v := val.(**uint64)
137+
if *v == nil {
114138
recordBuilder.Field(i).AppendNull()
115139
} else {
116-
recordBuilder.Field(i).(*array.Uint64Builder).Append(uint64(v.Int64))
140+
recordBuilder.Field(i).(*array.Uint64Builder).Append(**v)
117141
}
118142
case *arrow.Float32Type:
119-
v := val.(*sql.NullFloat64)
120-
if !v.Valid {
143+
v := val.(**float32)
144+
if *v == nil {
121145
recordBuilder.Field(i).AppendNull()
122146
} else {
123-
recordBuilder.Field(i).(*array.Float32Builder).Append(float32(val.(*sql.NullFloat64).Float64))
147+
recordBuilder.Field(i).(*array.Float32Builder).Append(**v)
124148
}
125149
case *arrow.Float64Type:
126-
v := val.(*sql.NullFloat64)
127-
if !v.Valid {
150+
v := val.(**float64)
151+
if *v == nil {
128152
recordBuilder.Field(i).AppendNull()
129153
} else {
130-
recordBuilder.Field(i).(*array.Float64Builder).Append(val.(*sql.NullFloat64).Float64)
154+
recordBuilder.Field(i).(*array.Float64Builder).Append(**v)
131155
}
132156
case *arrow.StringType:
133157
v := val.(*sql.NullString)

plugins/destination/mysql/client/types.go

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,25 @@ func mySQLTypeToArrowType(tableName string, columnName string, sqlType string) (
1616
return arrow.FixedWidthTypes.Timestamp_us, nil
1717
}
1818
sqlTypeToSchemaType := map[string]arrow.DataType{
19-
"bool": arrow.FixedWidthTypes.Boolean,
20-
"tinyint(1)": arrow.FixedWidthTypes.Boolean,
21-
"bigint": arrow.PrimitiveTypes.Int64,
22-
"bigint(20)": arrow.PrimitiveTypes.Int64,
23-
"float": arrow.PrimitiveTypes.Float64,
24-
"binary(16)": types.ExtensionTypes.UUID,
25-
"blob": arrow.BinaryTypes.LargeBinary,
26-
"nvarchar(255)": types.ExtensionTypes.Inet,
27-
"varchar(255)": types.ExtensionTypes.Inet,
28-
"text": arrow.BinaryTypes.LargeString,
29-
"json": types.ExtensionTypes.JSON,
19+
"bool": arrow.FixedWidthTypes.Boolean,
20+
"tinyint(1)": arrow.FixedWidthTypes.Boolean,
21+
"tinyint": arrow.PrimitiveTypes.Int8,
22+
"smallint": arrow.PrimitiveTypes.Int16,
23+
"int": arrow.PrimitiveTypes.Int32,
24+
"bigint": arrow.PrimitiveTypes.Int64,
25+
"bigint(20)": arrow.PrimitiveTypes.Int64,
26+
"tinyint unsigned": arrow.PrimitiveTypes.Uint8,
27+
"smallint unsigned": arrow.PrimitiveTypes.Uint16,
28+
"int unsigned": arrow.PrimitiveTypes.Uint32,
29+
"bigint unsigned": arrow.PrimitiveTypes.Uint64,
30+
"float": arrow.PrimitiveTypes.Float32,
31+
"double": arrow.PrimitiveTypes.Float64,
32+
"binary(16)": types.ExtensionTypes.UUID,
33+
"blob": arrow.BinaryTypes.LargeBinary,
34+
"nvarchar(255)": types.ExtensionTypes.Inet,
35+
"varchar(255)": types.ExtensionTypes.Inet,
36+
"text": arrow.BinaryTypes.LargeString,
37+
"json": types.ExtensionTypes.JSON,
3038
}
3139

3240
if v, ok := sqlTypeToSchemaType[sqlType]; ok {
@@ -43,10 +51,26 @@ func arrowTypeToMySqlStr(t arrow.DataType) string {
4351
switch t.(type) {
4452
case *arrow.BooleanType:
4553
return "bool"
46-
case *arrow.Int8Type, *arrow.Uint8Type, *arrow.Int16Type, *arrow.Uint16Type, *arrow.Int32Type, *arrow.Uint32Type, *arrow.Int64Type, *arrow.Uint64Type:
54+
case *arrow.Int8Type:
55+
return "tinyint"
56+
case *arrow.Int16Type:
57+
return "smallint"
58+
case *arrow.Int32Type:
59+
return "int"
60+
case *arrow.Int64Type:
4761
return "bigint"
48-
case *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type:
62+
case *arrow.Uint8Type:
63+
return "tinyint unsigned"
64+
case *arrow.Uint16Type:
65+
return "smallint unsigned"
66+
case *arrow.Uint32Type:
67+
return "int unsigned"
68+
case *arrow.Uint64Type:
69+
return "bigint unsigned"
70+
case *arrow.Float32Type:
4971
return "float"
72+
case *arrow.Float64Type:
73+
return "double"
5074
case *arrow.StringType, *arrow.LargeStringType:
5175
return "text"
5276
case *arrow.BinaryType, *arrow.LargeBinaryType:

0 commit comments

Comments
 (0)