Skip to content

Commit 8294e7d

Browse files
authored
feat: Use chunks while preparing batch (#11658)
1 parent 74b64e4 commit 8294e7d

File tree

2 files changed

+8
-8
lines changed

2 files changed

+8
-8
lines changed

plugins/destination/clickhouse/client/write.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, recor
1515
return err
1616
}
1717

18-
if err := values.BatchAddRecords(ctx, batch, table, records); err != nil {
18+
if err := values.BatchAddRecords(ctx, batch, table.ToArrowSchema(), records); err != nil {
1919
_ = batch.Abort()
2020
return err
2121
}

plugins/destination/clickhouse/typeconv/ch/values/batch.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ import (
55

66
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
77
"github.com/apache/arrow/go/v13/arrow"
8-
"github.com/cloudquery/plugin-sdk/v3/schema"
8+
"github.com/apache/arrow/go/v13/arrow/array"
99
"golang.org/x/sync/errgroup"
1010
)
1111

12-
func BatchAddRecords(ctx context.Context, batch driver.Batch, table *schema.Table, records []arrow.Record) error {
12+
func BatchAddRecords(ctx context.Context, batch driver.Batch, sc *arrow.Schema, records []arrow.Record) error {
13+
table := array.NewTableFromRecords(sc, records)
1314
eg, _ := errgroup.WithContext(ctx)
14-
for n := range table.Columns {
15-
n := n
15+
for n := 0; n < int(table.NumCols()); n++ {
16+
column, chunks := batch.Column(n), table.Column(n).Data().Chunks()
1617
eg.Go(func() error {
17-
column := batch.Column(n)
18-
for i := range records {
19-
data, err := FromArray(records[i].Column(n))
18+
for _, chunk := range chunks {
19+
data, err := FromArray(chunk)
2020
if err != nil {
2121
return err
2222
}

0 commit comments

Comments
 (0)