Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions plugins/source/mysql/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"time"

"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cloudquery/plugin-sdk/v2/plugins/source"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v3/plugins/source"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/go-sql-driver/mysql"
"github.com/rs/zerolog"
)
Expand Down
13 changes: 6 additions & 7 deletions plugins/source/mysql/client/list_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package client
import (
"context"

"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func Identifier(name string) string {
Expand Down Expand Up @@ -31,13 +31,12 @@ func (c *Client) getTableColumns(ctx context.Context, table *schema.Table) (sche
return nil, err
}

schemaType, err := SchemaType(table.Name, name, dataType, columnType)
if err != nil {
return nil, err
}
schemaType := SchemaType(dataType, columnType)
column := schema.Column{
Name: name, Type: schemaType,
CreationOptions: schema.ColumnCreationOptions{NotNull: nullable == "NO", PrimaryKey: key == "PRI"},
Name: name,
Type: schemaType,
NotNull: nullable == "NO",
PrimaryKey: key == "PRI",
}
tc = append(tc, column)
}
Expand Down
55 changes: 39 additions & 16 deletions plugins/source/mysql/client/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"strings"
"time"

"github.com/cloudquery/plugin-sdk/v2/plugins/source"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/apache/arrow/go/v13/arrow"
"github.com/cloudquery/plugin-sdk/v3/plugins/source"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v3/types"
)

func (c *Client) Sync(ctx context.Context, metrics *source.Metrics, res chan<- *schema.Resource) error {
Expand All @@ -25,25 +27,50 @@ func (c *Client) Sync(ctx context.Context, metrics *source.Metrics, res chan<- *
func (*Client) createResultsArray(table *schema.Table) []any {
results := make([]any, 0, len(table.Columns))
for _, col := range table.Columns {
switch col.Type {
case schema.TypeUUID, schema.TypeByteArray:
// We only support types that we create based on the schema, see SchemaType function
switch col.Type.(type) {
case *types.UUIDType, *arrow.BinaryType:
var r *[]byte
results = append(results, &r)
case schema.TypeBool:
case *arrow.BooleanType:
var r *bool
results = append(results, &r)
case schema.TypeInt:
var r *int
case *arrow.Int8Type:
var r *int8
results = append(results, &r)
case schema.TypeFloat:
case *arrow.Int16Type:
var r *int16
results = append(results, &r)
case *arrow.Int32Type:
var r *int32
results = append(results, &r)
case *arrow.Int64Type:
var r *int64
results = append(results, &r)
case *arrow.Uint8Type:
var r *uint8
results = append(results, &r)
case *arrow.Uint16Type:
var r *uint16
results = append(results, &r)
case *arrow.Uint32Type:
var r *uint32
results = append(results, &r)
case *arrow.Uint64Type:
var r *uint64
results = append(results, &r)
case *arrow.Decimal128Type:
var r *string
results = append(results, &r)
case *arrow.Float32Type:
var r *float32
results = append(results, &r)
case *arrow.Float64Type:
var r *float64
results = append(results, &r)
case schema.TypeTimestamp:
case *arrow.TimestampType:
var r *time.Time
results = append(results, &r)
case schema.TypeJSON:
var r string
results = append(results, &r)
default:
var r *string
results = append(results, &r)
Expand Down Expand Up @@ -97,10 +124,6 @@ func (c *Client) resourceFromValues(tableName string, values []any) (*schema.Res
table := c.Tables.Get(tableName)
resource := schema.NewResourceData(table, nil, values)
for i, col := range table.Columns {
// If the value points to an empty byte array array we set it to nil
if v, ok := values[i].(**[]byte); ok && *v == nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a workaround and no longer needed since the scalar UUID supports **[]byte due to cloudquery/plugin-sdk#922

values[i] = nil
}
if err := resource.Set(col.Name, values[i]); err != nil {
return nil, err
}
Expand Down
207 changes: 76 additions & 131 deletions plugins/source/mysql/client/transformer.go
Original file line number Diff line number Diff line change
@@ -1,137 +1,82 @@
package client

import (
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cloudquery/plugin-sdk/v3/types"
)

// this is used for tests
type Transformer struct{}

func (*Transformer) TransformBool(v *schema.Bool) any {
if v.Status != schema.Present {
return nil
}
return v.Bool
}

func (*Transformer) TransformBytea(v *schema.Bytea) any {
if v.Status != schema.Present {
return nil
}
return v.Bytes
}

func (*Transformer) TransformFloat8(v *schema.Float8) any {
if v.Status != schema.Present {
return nil
}
return v.Float
}

func (*Transformer) TransformInt8(v *schema.Int8) any {
if v.Status != schema.Present {
return nil
}
return v.Int
}

func (*Transformer) TransformTimestamptz(v *schema.Timestamptz) any {
if v.Status != schema.Present {
return nil
}
return v.Time
}

func (*Transformer) TransformJSON(v *schema.JSON) any {
if v.Status != schema.Present {
return nil
}

return string(v.Bytes)
}

func (*Transformer) TransformUUID(v *schema.UUID) any {
if v.Status != schema.Present {
return nil
}
// We need a slice instead of a fixed sized array
bytes := make([]byte, 16)
copy(bytes, v.Bytes[:])
return bytes
}

func (*Transformer) TransformUUIDArray(v *schema.UUIDArray) any {
if v.Status != schema.Present {
return nil
}

return v.String()
}

func (*Transformer) TransformInt8Array(v *schema.Int8Array) any {
if v.Status != schema.Present {
return nil
}

return v.String()
}

func (*Transformer) TransformCIDR(v *schema.CIDR) any {
if v.Status != schema.Present {
return nil
}
return v.String()
}

func (*Transformer) TransformInet(v *schema.Inet) any {
if v.Status != schema.Present {
return nil
}
return v.String()
}

func (*Transformer) TransformMacaddr(v *schema.Macaddr) any {
if v.Status != schema.Present {
return nil
}
return v.String()
}

func (*Transformer) TransformText(v *schema.Text) any {
if v.Status != schema.Present {
return nil
}
return v.Str
}

func (*Transformer) TransformCIDRArray(v *schema.CIDRArray) any {
if v.Status != schema.Present {
return nil
}

return v.String()
}

func (*Transformer) TransformInetArray(v *schema.InetArray) any {
if v.Status != schema.Present {
return nil
}

return v.String()
}

func (*Transformer) TransformMacaddrArray(v *schema.MacaddrArray) any {
if v.Status != schema.Present {
return nil
}

return v.String()
}

func (*Transformer) TransformTextArray(v *schema.TextArray) any {
if v.Status != schema.Present {
return nil
}

return v.String()
func GetValue(arr arrow.Array, i int) (any, error) {
if arr.IsNull(i) {
return nil, nil
}
switch a := arr.(type) {
case *array.Boolean:
return a.Value(i), nil
case *array.Int8:
return a.Value(i), nil
case *array.Int16:
return a.Value(i), nil
case *array.Int32:
return a.Value(i), nil
case *array.Int64:
return a.Value(i), nil
case *array.Uint8:
return a.Value(i), nil
case *array.Uint16:
return a.Value(i), nil
case *array.Uint32:
return a.Value(i), nil
case *array.Uint64:
return a.Value(i), nil
case *array.Float16:
return a.Value(i), nil
case *array.Float32:
return a.Value(i), nil
case *array.Float64:
return a.Value(i), nil
case *array.String:
return a.Value(i), nil
case *array.LargeString:
return a.Value(i), nil
case *array.Binary:
return a.Value(i), nil
case *array.LargeBinary:
return a.Value(i), nil
case *array.FixedSizeBinary:
return a.Value(i), nil
case *array.Timestamp:
toTime, err := a.DataType().(*arrow.TimestampType).GetToTimeFunc()
if err != nil {
return nil, err
}
return toTime(a.Value(i)), nil
case *types.UUIDArray:
bUUID, err := a.Value(i).MarshalBinary()
if err != nil {
return nil, err
}
return bUUID, nil
default:
return a.ValueStr(i), nil
}
}

// used in the tests to insert the arrow.Record
func TransformRecord(record arrow.Record) ([][]any, error) {
numRows := record.NumRows()
res := make([][]any, numRows)
var err error
for i := int64(0); i < numRows; i++ {
numCols := record.NumCols()
row := make([]any, numCols)
for j := 0; int64(j) < numCols; j++ {
row[j], err = GetValue(record.Column(j), int(i))
if err != nil {
return nil, err
}
}
res[i] = row
}
return res, nil
}
Loading