Skip to content

Commit 7c6a3e2

Browse files
authored
feat(types)!: Proper support for nested types (#11196)
Closes #11187 Code pieces are copied from [cloudquery/filetypes](https://github.com/cloudquery/filetypes) (persqueue) BEGIN_COMMIT_OVERRIDE feat(types)!: Proper support for nested types (#11196) BREAKING-CHANGE: Support writing Apache Arrow nested types: * Structs as DuckDB structs * Maps as DuckDB maps END_COMMIT_OVERRIDE
1 parent dd6b275 commit 7c6a3e2

File tree

7 files changed

+170
-112
lines changed

7 files changed

+170
-112
lines changed

plugins/destination/duckdb/client/read.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,20 +111,38 @@ func reverseTransformArray(dt arrow.DataType, arr arrow.Array) arrow.Array {
111111
switch dt := dt.(type) {
112112
case *types.UUIDType:
113113
return array.NewExtensionArrayWithStorage(dt, arr.(*array.FixedSizeBinary))
114-
case *types.InetType, *types.MACType, *types.JSONType, *arrow.StructType:
114+
case *types.InetType, *types.MACType, *types.JSONType:
115115
return reverseTransformFromString(dt, arr.(*array.String))
116116
case *arrow.Uint16Type:
117117
return reverseTransformUint16(arr.(*array.Uint32))
118118
case *arrow.Uint8Type:
119119
return reverseTransformUint8(arr.(*array.Uint32))
120120
case *arrow.TimestampType:
121121
return transformTimestamp(dt, arr.(*array.Timestamp))
122-
case *arrow.MapType:
123-
child := reverseTransformArray(dt.ValueType(), arr.(*array.List).ListValues()).Data()
124-
return array.NewMapData(array.NewData(dt, arr.Len(), arr.Data().Buffers(), []arrow.ArrayData{child}, arr.NullN(), arr.Data().Offset()))
125-
case listLike:
126-
child := reverseTransformArray(dt.Elem(), arr.(array.ListLike).ListValues()).Data()
127-
return array.NewListData(array.NewData(dt, arr.Len(), arr.Data().Buffers(), []arrow.ArrayData{child}, arr.NullN(), arr.Data().Offset()))
122+
case *arrow.StructType:
123+
arr := arr.(*array.Struct)
124+
children := make([]arrow.ArrayData, arr.NumField())
125+
for i := range children {
126+
// struct fields can be odd when read from parquet, but the data is intact
127+
child := array.MakeFromData(arr.Data().Children()[i])
128+
children[i] = reverseTransformArray(dt.Field(i).Type, child).Data()
129+
}
130+
131+
return array.NewStructData(array.NewData(
132+
dt, arr.Len(),
133+
arr.Data().Buffers(),
134+
children,
135+
arr.NullN(), arr.Data().Offset(),
136+
))
137+
138+
case arrow.ListLikeType: // also handles maps
139+
return array.MakeFromData(array.NewData(
140+
dt, arr.Len(),
141+
arr.Data().Buffers(),
142+
[]arrow.ArrayData{reverseTransformArray(dt.Elem(), arr.(array.ListLike).ListValues()).Data()},
143+
arr.NullN(), arr.Data().Offset(),
144+
))
145+
128146
default:
129147
return reverseTransformFromString(dt, arr.(*array.String))
130148
}

plugins/destination/duckdb/client/transform.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func transformArray(arr arrow.Array) arrow.Array {
2121
}
2222

2323
switch arr := arr.(type) {
24-
case *types.UUIDArray, *types.InetArray, *types.MACArray, *types.JSONArray, *array.Struct:
24+
case *types.UUIDArray, *types.InetArray, *types.MACArray, *types.JSONArray:
2525
return transformToStringArray(arr)
2626
case *array.Uint8:
2727
return transformUint8ToUint32Array(arr)
@@ -30,10 +30,31 @@ func transformArray(arr arrow.Array) arrow.Array {
3030
case *array.Timestamp:
3131
// mismatching unit or tz
3232
return transformTimestamp(duckDBToArrow(arrowToDuckDB(arr.DataType())).(*arrow.TimestampType), arr)
33-
case array.ListLike:
34-
child := transformArray(arr.ListValues()).Data()
35-
newType := arrow.ListOf(child.DataType())
36-
return array.NewListData(array.NewData(newType, arr.Len(), arr.Data().Buffers(), []arrow.ArrayData{child}, arr.NullN(), arr.Data().Offset()))
33+
34+
case *array.Struct:
35+
dt := arr.DataType().(*arrow.StructType)
36+
children := make([]arrow.ArrayData, arr.NumField())
37+
names := make([]string, arr.NumField())
38+
for i := range children {
39+
children[i] = transformArray(arr.Field(i)).Data()
40+
names[i] = dt.Field(i).Name
41+
}
42+
43+
return array.NewStructData(array.NewData(
44+
transformTypeForWriting(dt), arr.Len(),
45+
arr.Data().Buffers(),
46+
children,
47+
arr.NullN(), arr.Data().Offset(),
48+
))
49+
50+
case array.ListLike: // this includes maps, too
51+
return array.MakeFromData(array.NewData(
52+
transformTypeForWriting(arr.DataType()), arr.Len(),
53+
arr.Data().Buffers(),
54+
[]arrow.ArrayData{transformArray(arr.ListValues()).Data()},
55+
arr.NullN(), arr.Data().Offset(),
56+
))
57+
3758
default:
3859
return transformToStringArray(arr)
3960
}

plugins/destination/duckdb/client/types.go

Lines changed: 101 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,44 +5,53 @@ import (
55

66
"github.com/apache/arrow/go/v13/arrow"
77
"github.com/cloudquery/plugin-sdk/v3/types"
8+
"golang.org/x/exp/slices"
89
)
910

10-
type listLike interface {
11-
arrow.DataType
12-
Elem() arrow.DataType
11+
func transformSchemaForWriting(sc *arrow.Schema) *arrow.Schema {
12+
md := arrow.MetadataFrom(sc.Metadata().ToMap())
13+
return arrow.NewSchema(transformFieldsForWriting(sc.Fields()), &md)
1314
}
1415

15-
func transformSchemaForWriting(sc *arrow.Schema) *arrow.Schema {
16-
fields := sc.Fields()
16+
func transformFieldsForWriting(fields []arrow.Field) []arrow.Field {
1717
for i := range fields {
1818
fields[i].Type = transformTypeForWriting(fields[i].Type)
1919
}
20-
md := sc.Metadata()
21-
return arrow.NewSchema(fields, &md)
20+
return fields
2221
}
2322

2423
func transformTypeForWriting(dt arrow.DataType) arrow.DataType {
2524
switch dt := dt.(type) {
26-
case listLike:
27-
return arrow.ListOf(transformTypeForWriting(dt.Elem()))
25+
case *arrow.StructType:
26+
return arrow.StructOf(transformFieldsForWriting(dt.Fields())...)
2827
case *arrow.MapType:
29-
return arrow.ListOf(transformTypeForWriting(dt.ValueType()))
30-
}
31-
32-
switch dt := duckDBToArrow(arrowToDuckDB(dt)).(type) {
28+
return arrow.MapOf(transformTypeForWriting(dt.KeyType()), transformTypeForWriting(dt.ItemType()))
29+
case arrow.ListLikeType:
30+
return arrow.ListOf(transformTypeForWriting(dt.Elem()))
3331
case *types.UUIDType, *types.JSONType:
3432
return arrow.BinaryTypes.String
3533
default:
36-
return dt
34+
return duckDBToArrow(arrowToDuckDB(dt))
3735
}
3836
}
3937

4038
func arrowToDuckDB(dt arrow.DataType) string {
4139
switch dt := dt.(type) {
42-
case listLike:
43-
return arrowToDuckDB(dt.Elem()) + "[]"
40+
case *arrow.StructType:
41+
builder := new(strings.Builder)
42+
builder.WriteString("struct(")
43+
for i, field := range dt.Fields() {
44+
if i > 0 {
45+
builder.WriteString(", ")
46+
}
47+
builder.WriteString(sanitizeID(field.Name) + " " + arrowToDuckDB(field.Type))
48+
}
49+
builder.WriteString(")")
50+
return builder.String()
4451
case *arrow.MapType:
45-
return arrowToDuckDB(arrow.ListOf(dt.ValueType()))
52+
return "map(" + arrowToDuckDB(dt.KeyType()) + ", " + arrowToDuckDB(dt.ItemType()) + ")"
53+
case arrow.ListLikeType:
54+
return arrowToDuckDB(dt.Elem()) + "[]"
4655
case *arrow.BooleanType:
4756
return "boolean"
4857
case *arrow.Int8Type:
@@ -79,20 +88,21 @@ func arrowToDuckDB(dt arrow.DataType) string {
7988
return "date"
8089
case *arrow.DayTimeIntervalType:
8190
return "interval"
82-
case *arrow.StructType:
83-
return "json"
8491
default:
8592
return "varchar"
8693
}
8794
}
8895

8996
func duckDBToArrow(t string) arrow.DataType {
90-
if strings.HasSuffix(t, "[]") {
97+
switch {
98+
case strings.HasSuffix(t, "[]"):
9199
return arrow.ListOf(duckDBToArrow(strings.TrimSuffix(t, "[]")))
100+
case strings.HasPrefix(t, "struct"):
101+
return duckDBStructToArrow(t)
102+
case strings.HasPrefix(t, "map"):
103+
return duckDBMapToArrow(t)
92104
}
93-
if strings.HasPrefix(t, "struct") {
94-
return types.ExtensionTypes.JSON
95-
}
105+
96106
switch t {
97107
case "tinyint", "int1":
98108
return arrow.PrimitiveTypes.Int8
@@ -133,6 +143,74 @@ func duckDBToArrow(t string) arrow.DataType {
133143
}
134144
}
135145

146+
func duckDBStructToArrow(spec string) *arrow.StructType {
147+
params := strings.TrimPrefix(spec, "struct")
148+
params = strings.TrimSpace(params)
149+
params = strings.TrimSuffix(strings.TrimPrefix(params, "("), ")")
150+
151+
fieldsSpec := splitParams(params)
152+
if len(fieldsSpec) == 0 {
153+
panic("unsupported struct spec: " + spec)
154+
}
155+
156+
fields := make([]arrow.Field, len(fieldsSpec))
157+
for i, fieldSpec := range fieldsSpec {
158+
parts := strings.SplitN(fieldSpec, " ", 2)
159+
if len(parts) != 2 {
160+
panic("unsupported field spec: " + fieldSpec)
161+
}
162+
163+
fields[i] = arrow.Field{
164+
Name: strings.Trim(parts[0], `"`),
165+
Type: duckDBToArrow(strings.TrimSpace(parts[1])),
166+
Nullable: true, // all duckdb columns are nullable
167+
}
168+
}
169+
170+
return arrow.StructOf(fields...)
171+
}
172+
173+
func duckDBMapToArrow(spec string) *arrow.MapType {
174+
params := strings.TrimPrefix(spec, "map")
175+
params = strings.TrimSpace(params)
176+
params = strings.TrimSuffix(strings.TrimPrefix(params, "("), ")")
177+
178+
kv := splitParams(params)
179+
if len(kv) != 2 {
180+
panic("unsupported map spec: " + spec)
181+
}
182+
183+
// these should only be types
184+
return arrow.MapOf(duckDBToArrow(kv[0]), duckDBToArrow(kv[1]))
185+
}
186+
187+
func splitParams(params string) []string {
188+
params = strings.TrimSpace(params)
189+
190+
var brackets int
191+
var parts []string
192+
elem := make([]rune, 0, len(params))
193+
194+
for _, r := range params {
195+
switch r {
196+
case '(':
197+
brackets++
198+
case ')':
199+
brackets--
200+
case ',':
201+
if brackets == 0 {
202+
parts = append(parts, strings.TrimSpace(string(elem)))
203+
elem = elem[:0] // cleanup
204+
continue
205+
}
206+
}
207+
elem = append(elem, r)
208+
}
209+
parts = append(parts, strings.TrimSpace(string(elem)))
210+
211+
return slices.Clip(parts)
212+
}
213+
136214
func sanitizeID(id string) string {
137215
return `"` + id + `"`
138216
}

plugins/destination/duckdb/client/write.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cloudquery/plugin-pb-go/specs"
1313
"github.com/cloudquery/plugin-sdk/v3/schema"
1414
"github.com/google/uuid"
15+
"golang.org/x/exp/slices"
1516
)
1617

1718
func nonPkIndices(sc *schema.Table) []int {
@@ -29,12 +30,20 @@ func nonPkIndices(sc *schema.Table) []int {
2930
// but this is unavoidable until support is added to duckdb itself.
3031
// See https://github.com/duckdb/duckdb/blob/c5d9afb97bbf0be12216f3b89ae3131afbbc3156/src/storage/table/list_column_data.cpp#L243-L251
3132
func containsList(sc *schema.Table) bool {
32-
for _, f := range sc.Columns {
33-
if arrow.IsListLike(f.Type.ID()) {
34-
return true
35-
}
33+
return slices.ContainsFunc(sc.Columns, func(c schema.Column) bool { return dtContainsList(c.Type) })
34+
}
35+
36+
func dtContainsList(dt arrow.DataType) bool {
37+
switch dt := dt.(type) {
38+
case *arrow.StructType:
39+
return slices.ContainsFunc(dt.Fields(), func(f arrow.Field) bool { return dtContainsList(f.Type) })
40+
case *arrow.MapType:
41+
return dtContainsList(dt.KeyType()) || dtContainsList(dt.ItemType())
42+
case arrow.ListLikeType:
43+
return true
44+
default:
45+
return false
3646
}
37-
return false
3847
}
3948

4049
func (c *Client) upsert(ctx context.Context, tmpTableName string, tableName string, table *schema.Table) error {

plugins/destination/duckdb/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ require (
99
github.com/google/uuid v1.3.0
1010
github.com/marcboeker/go-duckdb v1.3.0
1111
github.com/rs/zerolog v1.29.0
12+
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
1213
)
1314

1415
// TODO: remove once all updates are merged
15-
replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230602001318-a7aad4c5365c
16+
replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230602155531-6d34568d4501
1617

1718
require (
1819
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
@@ -46,7 +47,6 @@ require (
4647
github.com/stretchr/testify v1.8.2 // indirect
4748
github.com/thoas/go-funk v0.9.3 // indirect
4849
github.com/zeebo/xxh3 v1.0.2 // indirect
49-
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
5050
golang.org/x/mod v0.9.0 // indirect
5151
golang.org/x/net v0.9.0 // indirect
5252
golang.org/x/sync v0.1.0 // indirect

plugins/destination/duckdb/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
4545
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
4646
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
4747
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
48-
github.com/cloudquery/arrow/go/v13 v13.0.0-20230602001318-a7aad4c5365c h1:z/inZxI0DTuu4DELOZQN5CjlSx0jwdVcn4XILXWDB+o=
49-
github.com/cloudquery/arrow/go/v13 v13.0.0-20230602001318-a7aad4c5365c/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po=
48+
github.com/cloudquery/arrow/go/v13 v13.0.0-20230602155531-6d34568d4501 h1:vZJyF3/vy6nhP7guV0I5aB1eK3CGXZb7vg7UQPPnqqo=
49+
github.com/cloudquery/arrow/go/v13 v13.0.0-20230602155531-6d34568d4501/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po=
5050
github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37/izMi+FQ=
5151
github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc=
5252
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=

plugins/destination/duckdb/json/writer.go

Lines changed: 0 additions & 68 deletions
This file was deleted.

0 commit comments

Comments
 (0)