Skip to content

Commit 0ef3e12

Browse files
authored
feat!: Migrate Elasticsearch to Plugin SDK v3 for native Arrow support (#10967)
Closes #10716 BEGIN_COMMIT_OVERRIDE feat: Update to use [Apache Arrow](https://arrow.apache.org/) type system BREAKING-CHANGE: This release introduces an internal change to our type system to use [Apache Arrow](https://arrow.apache.org/). This should have only one visible breaking change: JSON columns are now stored as string, due to limitations of Elasticsearch when storing JSON lists with mixed types. If you encounter an issue during the upgrade, please submit a [bug report](https://github.com/cloudquery/cloudquery/issues/new/choose). END_COMMIT_OVERRIDE
1 parent 612d3ea commit 0ef3e12

File tree

13 files changed

+451
-212
lines changed

13 files changed

+451
-212
lines changed

plugins/destination/elasticsearch/client/client.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@ import (
77
"time"
88

99
"github.com/cenkalti/backoff/v4"
10-
"github.com/cloudquery/plugin-sdk/plugins/destination"
11-
"github.com/cloudquery/plugin-sdk/specs"
10+
"github.com/cloudquery/plugin-pb-go/specs"
11+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
1212
"github.com/elastic/go-elasticsearch/v8"
1313
"github.com/rs/zerolog"
1414
)
1515

1616
type Client struct {
1717
destination.UnimplementedUnmanagedWriter
18-
destination.DefaultReverseTransformer
1918
logger zerolog.Logger
2019
spec specs.Destination
2120
metrics destination.Metrics

plugins/destination/elasticsearch/client/client_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"os"
55
"testing"
66

7-
"github.com/cloudquery/plugin-sdk/plugins/destination"
8-
"github.com/cloudquery/plugin-sdk/specs"
7+
"github.com/cloudquery/plugin-pb-go/specs"
8+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
99
)
1010

1111
var migrateStrategy = destination.MigrateStrategy{
@@ -36,5 +36,6 @@ func TestPlugin(t *testing.T) {
3636

3737
MigrateStrategyOverwrite: migrateStrategy,
3838
MigrateStrategyAppend: migrateStrategy,
39-
})
39+
},
40+
)
4041
}

plugins/destination/elasticsearch/client/deletestale.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"io"
88
"time"
99

10-
"github.com/cloudquery/plugin-sdk/schema"
10+
"github.com/cloudquery/plugin-sdk/v3/schema"
1111
"github.com/elastic/go-elasticsearch/v8/typedapi/core/deletebyquery"
1212
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
1313
"golang.org/x/sync/errgroup"

plugins/destination/elasticsearch/client/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package client
22

33
import (
4-
"github.com/cloudquery/plugin-sdk/plugins/destination"
4+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
55
)
66

77
func (c *Client) Metrics() destination.Metrics {

plugins/destination/elasticsearch/client/migrate.go

Lines changed: 131 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"fmt"
77
"strings"
88

9-
"github.com/cloudquery/plugin-sdk/schema"
9+
"github.com/apache/arrow/go/v13/arrow"
10+
"github.com/cloudquery/plugin-sdk/v3/schema"
11+
cqtypes "github.com/cloudquery/plugin-sdk/v3/types"
1012
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
1113
)
1214

@@ -31,45 +33,7 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
3133
func (c *Client) getIndexTemplate(table *schema.Table) (string, error) {
3234
properties := map[string]types.Property{}
3335
for _, col := range table.Columns {
34-
switch col.Type {
35-
case schema.TypeBool:
36-
properties[col.Name] = types.NewBooleanProperty()
37-
case schema.TypeInt:
38-
properties[col.Name] = types.NewIntegerNumberProperty()
39-
case schema.TypeFloat:
40-
properties[col.Name] = types.NewFloatNumberProperty()
41-
case schema.TypeUUID:
42-
properties[col.Name] = types.NewTextProperty()
43-
case schema.TypeString:
44-
properties[col.Name] = types.NewTextProperty()
45-
case schema.TypeByteArray:
46-
properties[col.Name] = types.NewBinaryProperty()
47-
case schema.TypeStringArray:
48-
properties[col.Name] = types.NewTextProperty()
49-
case schema.TypeTimestamp:
50-
d := types.NewDateProperty()
51-
f := "strict_date_optional_time||epoch_millis"
52-
d.Format = &f
53-
properties[col.Name] = d
54-
case schema.TypeJSON:
55-
properties[col.Name] = types.NewObjectProperty()
56-
case schema.TypeUUIDArray:
57-
properties[col.Name] = types.NewTextProperty()
58-
case schema.TypeCIDR:
59-
properties[col.Name] = types.NewIpRangeProperty()
60-
case schema.TypeCIDRArray:
61-
properties[col.Name] = types.NewIpRangeProperty()
62-
case schema.TypeMacAddr:
63-
properties[col.Name] = types.NewTextProperty()
64-
case schema.TypeMacAddrArray:
65-
properties[col.Name] = types.NewTextProperty()
66-
case schema.TypeInet:
67-
properties[col.Name] = types.NewIpRangeProperty()
68-
case schema.TypeInetArray:
69-
properties[col.Name] = types.NewIpRangeProperty()
70-
case schema.TypeIntArray:
71-
properties[col.Name] = types.NewIntegerNumberProperty()
72-
}
36+
properties[col.Name] = arrowTypeToElasticsearchProperty(col.Type)
7337
}
7438
tmp := types.IndexTemplate{
7539
AllowAutoCreate: nil,
@@ -88,3 +52,130 @@ func (c *Client) getIndexTemplate(table *schema.Table) (string, error) {
8852
b, err := json.Marshal(tmp)
8953
return string(b), err
9054
}
55+
56+
func arrowTypeToElasticsearchProperty(dataType arrow.DataType) types.Property {
57+
if dataType == nil {
58+
return types.NewTextProperty()
59+
}
60+
switch {
61+
// handle known extensions
62+
case typeOneOf(dataType,
63+
cqtypes.ExtensionTypes.UUID,
64+
cqtypes.ExtensionTypes.MAC,
65+
cqtypes.ExtensionTypes.Inet):
66+
return types.NewTextProperty()
67+
case typeOneOf(dataType,
68+
cqtypes.ExtensionTypes.JSON):
69+
return types.NewTextProperty()
70+
71+
// handle nested types
72+
case dataType.ID() == arrow.LIST:
73+
return arrowTypeToElasticsearchProperty(dataType.(*arrow.ListType).Elem())
74+
case dataType.ID() == arrow.LARGE_LIST:
75+
return arrowTypeToElasticsearchProperty(dataType.(*arrow.LargeListType).Elem())
76+
case dataType.ID() == arrow.FIXED_SIZE_LIST:
77+
return arrowTypeToElasticsearchProperty(dataType.(*arrow.FixedSizeListType).Elem())
78+
case dataType.ID() == arrow.STRUCT:
79+
p := types.NewObjectProperty()
80+
for _, field := range dataType.(*arrow.StructType).Fields() {
81+
p.Properties[field.Name] = arrowTypeToElasticsearchProperty(field.Type)
82+
}
83+
return p
84+
case dataType.ID() == arrow.MAP:
85+
p := types.NewObjectProperty()
86+
p.Properties["key"] = arrowTypeToElasticsearchProperty(dataType.(*arrow.MapType).KeyType())
87+
p.Properties["value"] = arrowTypeToElasticsearchProperty(dataType.(*arrow.MapType).ItemType())
88+
return p
89+
// handle primitive types
90+
case typeOneOf(dataType,
91+
arrow.FixedWidthTypes.Boolean):
92+
return types.NewBooleanProperty()
93+
case typeOneOf(dataType,
94+
arrow.PrimitiveTypes.Int8):
95+
return types.NewByteNumberProperty()
96+
case typeOneOf(dataType,
97+
arrow.PrimitiveTypes.Int16):
98+
return types.NewShortNumberProperty()
99+
case typeOneOf(dataType,
100+
arrow.PrimitiveTypes.Int32):
101+
return types.NewIntegerNumberProperty()
102+
case typeOneOf(dataType,
103+
arrow.PrimitiveTypes.Int64):
104+
return types.NewLongNumberProperty()
105+
case typeOneOf(dataType,
106+
arrow.PrimitiveTypes.Uint8,
107+
arrow.PrimitiveTypes.Uint16,
108+
arrow.PrimitiveTypes.Uint32,
109+
arrow.PrimitiveTypes.Uint64):
110+
return types.NewUnsignedLongNumberProperty()
111+
case typeOneOf(dataType,
112+
arrow.FixedWidthTypes.Float16):
113+
return types.NewHalfFloatNumberProperty()
114+
case typeOneOf(dataType,
115+
arrow.PrimitiveTypes.Float32):
116+
return types.NewFloatNumberProperty()
117+
case typeOneOf(dataType,
118+
arrow.PrimitiveTypes.Float64):
119+
return types.NewDoubleNumberProperty()
120+
case typeOneOf(dataType,
121+
arrow.BinaryTypes.String,
122+
arrow.BinaryTypes.LargeString):
123+
return types.NewTextProperty()
124+
case typeOneOf(dataType,
125+
arrow.BinaryTypes.Binary,
126+
arrow.BinaryTypes.LargeBinary):
127+
return types.NewBinaryProperty()
128+
case typeOneOf(dataType,
129+
arrow.FixedWidthTypes.Date32,
130+
arrow.FixedWidthTypes.Date64):
131+
d := types.NewDateProperty()
132+
f := "yyyy-MM-dd"
133+
d.Format = &f
134+
return d
135+
case typeOneOf(dataType,
136+
arrow.FixedWidthTypes.Time32s):
137+
d := types.NewDateProperty()
138+
f := "HH:mm:ss"
139+
d.Format = &f
140+
return d
141+
case typeOneOf(dataType,
142+
arrow.FixedWidthTypes.Time32ms):
143+
d := types.NewDateProperty()
144+
f := "HH:mm:ss.SSS"
145+
d.Format = &f
146+
return d
147+
case typeOneOf(dataType,
148+
arrow.FixedWidthTypes.Time64us,
149+
arrow.FixedWidthTypes.Time64ns):
150+
return types.NewTextProperty()
151+
case typeOneOf(dataType,
152+
arrow.FixedWidthTypes.Timestamp_s,
153+
arrow.FixedWidthTypes.Timestamp_ms,
154+
arrow.FixedWidthTypes.Timestamp_us):
155+
d := types.NewDateProperty()
156+
f := "strict_date_optional_time"
157+
d.Format = &f
158+
return d
159+
case typeOneOf(dataType,
160+
arrow.FixedWidthTypes.Timestamp_ns):
161+
d := types.NewDateNanosProperty()
162+
f := "strict_date_optional_time_nanos"
163+
d.Format = &f
164+
return d
165+
case typeOneOf(dataType,
166+
arrow.FixedWidthTypes.DayTimeInterval,
167+
arrow.FixedWidthTypes.MonthInterval,
168+
arrow.FixedWidthTypes.MonthDayNanoInterval):
169+
return types.NewObjectProperty()
170+
}
171+
return types.NewTextProperty()
172+
}
173+
174+
func typeOneOf(left arrow.DataType, dt ...arrow.DataType) bool {
175+
for _, t := range dt {
176+
if arrow.TypeEqual(left, t) {
177+
return true
178+
}
179+
}
180+
return false
181+
}

plugins/destination/elasticsearch/client/read.go

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package client
22

33
import (
4+
"bytes"
45
"context"
5-
"encoding/base64"
6-
"encoding/json"
76
"fmt"
87
"io"
98

10-
"github.com/cloudquery/plugin-sdk/schema"
9+
"github.com/goccy/go-json"
10+
11+
"github.com/apache/arrow/go/v13/arrow"
12+
"github.com/apache/arrow/go/v13/arrow/array"
13+
"github.com/apache/arrow/go/v13/arrow/memory"
14+
"github.com/cloudquery/plugin-sdk/v3/schema"
1115
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
1216
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
1317
)
1418

15-
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []any) error {
19+
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error {
1620
index := c.getIndexNamePattern(table.Name)
1721

1822
// refresh index before read, to ensure all written data is available
@@ -49,21 +53,63 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName strin
4953
return fmt.Errorf("failed to decode response body: %w", err)
5054
}
5155

56+
sm := table.ToArrowSchema()
5257
for _, hit := range result.Hits.Hits {
53-
values := make([]any, len(table.Columns))
54-
for i, col := range table.Columns {
55-
switch col.Type {
56-
case schema.TypeByteArray:
57-
ba, err := base64.StdEncoding.DecodeString(hit.Source[col.Name].(string))
58-
if err != nil {
59-
return fmt.Errorf("failed to decode base64 string: %w", err)
60-
}
61-
values[i] = ba
62-
default:
63-
values[i] = hit.Source[col.Name]
58+
rb := array.NewRecordBuilder(memory.DefaultAllocator, sm)
59+
for i, field := range rb.Fields() {
60+
err := appendValue(field, hit.Source[sm.Field(i).Name])
61+
if err != nil {
62+
return fmt.Errorf("failed to read from table %s: %w", table.Name, err)
6463
}
6564
}
66-
res <- values
65+
res <- rb.NewRecord()
6766
}
6867
return nil
6968
}
69+
70+
func appendValue(builder array.Builder, value any) error {
71+
if value == nil {
72+
builder.AppendNull()
73+
return nil
74+
}
75+
switch bldr := builder.(type) {
76+
case array.ListLikeBuilder:
77+
lst := value.([]any)
78+
if lst == nil {
79+
bldr.AppendNull()
80+
return nil
81+
}
82+
bldr.Append(true)
83+
valBuilder := bldr.ValueBuilder()
84+
for _, v := range lst {
85+
if err := appendValue(valBuilder, v); err != nil {
86+
return err
87+
}
88+
}
89+
return nil
90+
case *array.StructBuilder:
91+
m := value.(map[string]any)
92+
bldr.Append(true)
93+
bldrType := bldr.Type().(*arrow.StructType)
94+
for k, v := range m {
95+
idx, _ := bldrType.FieldIdx(k)
96+
fieldBldr := bldr.FieldBuilder(idx)
97+
if err := appendValue(fieldBldr, v); err != nil {
98+
return err
99+
}
100+
}
101+
return nil
102+
case *array.MonthIntervalBuilder, *array.DayTimeIntervalBuilder, *array.MonthDayNanoIntervalBuilder:
103+
b, err := json.Marshal(value)
104+
if err != nil {
105+
return err
106+
}
107+
dec := json.NewDecoder(bytes.NewReader(b))
108+
return bldr.UnmarshalOne(dec)
109+
case *array.Int8Builder, *array.Int16Builder, *array.Int32Builder, *array.Int64Builder:
110+
return bldr.AppendValueFromString(fmt.Sprintf("%d", int64(value.(float64))))
111+
case *array.Uint8Builder, *array.Uint16Builder, *array.Uint32Builder, *array.Uint64Builder:
112+
return bldr.AppendValueFromString(fmt.Sprintf("%d", uint64(value.(float64))))
113+
}
114+
return builder.AppendValueFromString(fmt.Sprintf("%v", value))
115+
}

0 commit comments

Comments
 (0)