Skip to content

Commit d03e09e

Browse files
authored
feat: Bulk all messages in a batch into a single bulk write (#13588)
Extracted from #13553
1 parent a3779e8 commit d03e09e

File tree

2 files changed

+34
-52
lines changed

2 files changed

+34
-52
lines changed

plugins/destination/elasticsearch/client/deletestale.go renamed to plugins/destination/elasticsearch/client/delete_stale.go

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,39 +33,27 @@ func (c *Client) DeleteStale(ctx context.Context, msgs message.WriteDeleteStales
3333
for _, msg := range msgs {
3434
msg := msg
3535
g.Go(func() error {
36-
syncTime := msg.SyncTime
37-
source := msg.SourceName
38-
syncTimeStr := syncTime.Format(time.RFC3339)
39-
dateRange := types.NewDateRangeQuery()
40-
dateRange.Lt = &syncTimeStr
41-
q := types.Query{
42-
Bool: &types.BoolQuery{
43-
Must: []types.Query{
44-
{
45-
MatchPhrase: map[string]types.MatchPhraseQuery{
46-
schema.CqSourceNameColumn.Name: {
47-
Query: source,
48-
},
49-
},
50-
},
51-
{
52-
Range: map[string]types.RangeQuery{
53-
schema.CqSyncTimeColumn.Name: dateRange,
36+
syncTimeStr := msg.SyncTime.Format(time.RFC3339)
37+
return c.deleteStaleIndex(gctx,
38+
msg.TableName,
39+
&deletebyquery.Request{
40+
Query: &types.Query{
41+
Bool: &types.BoolQuery{
42+
Filter: []types.Query{
43+
{MatchPhrase: map[string]types.MatchPhraseQuery{schema.CqSourceNameColumn.Name: {Query: msg.SourceName}}},
44+
{Range: map[string]types.RangeQuery{schema.CqSyncTimeColumn.Name: &types.DateRangeQuery{Lt: &syncTimeStr}}},
5445
},
5546
},
5647
},
5748
},
58-
}
59-
req := deletebyquery.NewRequest()
60-
req.Query = &q
61-
return c.deleteStaleIndex(gctx, msg.TableName, req)
49+
)
6250
})
6351
}
6452
return g.Wait()
6553
}
6654

6755
func (c *Client) deleteStaleIndex(ctx context.Context, index string, req *deletebyquery.Request) error {
68-
resp, err := c.typedClient.DeleteByQuery(index).Request(req).Do(ctx)
56+
resp, err := c.typedClient.DeleteByQuery(index).Request(req).WaitForCompletion(true).Do(ctx)
6957
if err != nil {
7058
return fmt.Errorf("failed to delete stale entries: %w", err)
7159
}

plugins/destination/elasticsearch/client/write.go

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,38 +32,37 @@ func (c *Client) Write(ctx context.Context, msgs <-chan message.WriteMessage) er
3232
return nil
3333
}
3434

35-
func (c *Client) WriteTableBatch(ctx context.Context, name string, msgs message.WriteInserts) error {
35+
func (c *Client) WriteTableBatch(ctx context.Context, _ string, msgs message.WriteInserts) error {
36+
if len(msgs) == 0 {
37+
return nil
38+
}
39+
40+
// all messages correspond to the same table
41+
table := msgs[0].GetTable()
42+
data := new(bytes.Buffer)
3643
for _, msg := range msgs {
37-
table := msg.GetTable()
38-
record := msg.Record
39-
err := c.writeRecord(ctx, table, record)
40-
if err != nil {
44+
if err := c.appendToWriteBuffer(table, msg.Record, data); err != nil {
4145
return err
4246
}
4347
}
44-
return nil
48+
49+
return c.writeData(ctx, table, data)
4550
}
4651

47-
func (c *Client) writeRecord(ctx context.Context, table *schema.Table, record arrow.Record) error {
48-
var buf bytes.Buffer
49-
pks := pkIndexes(table) // do some work up front to avoid doing it for every resource
50-
// get the sync time from the first resource in the batch (here we assume that all resources in the batch
51-
// have the same sync time. At the moment this assumption holds.)
52-
syncTime := time.Now()
52+
func (c *Client) appendToWriteBuffer(table *schema.Table, record arrow.Record, buf *bytes.Buffer) error {
53+
pks := table.PrimaryKeysIndexes() // do some work up front to avoid doing it for every resource
5354
for r := 0; r < int(record.NumRows()); r++ {
5455
doc := map[string]any{}
5556
for i, col := range record.Columns() {
56-
doc[table.Columns[i].Name] = c.getValueForElasticsearch(col, r)
57+
doc[record.ColumnName(i)] = c.getValueForElasticsearch(col, r)
5758
}
5859
data, err := json.Marshal(doc)
5960
if err != nil {
6061
return fmt.Errorf("failed to marshal JSON: %w", err)
6162
}
6263

6364
var meta []byte
64-
hasPrimaryKeys := len(table.PrimaryKeys()) > 0
65-
66-
if hasPrimaryKeys {
65+
if len(pks) > 0 {
6766
docID := fmt.Sprint(resourceID(record, r, pks))
6867
meta = []byte(fmt.Sprintf(`{"index":{"_id":"%s"}}%s`, docID, "\n"))
6968
} else {
@@ -74,10 +73,18 @@ func (c *Client) writeRecord(ctx context.Context, table *schema.Table, record ar
7473
buf.Write(meta)
7574
buf.Write(data)
7675
}
76+
return nil
77+
}
78+
79+
func (c *Client) writeData(ctx context.Context, table *schema.Table, buf *bytes.Buffer) error {
80+
// get the sync time from the first resource in the batch (here we assume that all resources in the batch
81+
// have the same sync time. At the moment this assumption holds.)
82+
syncTime := time.Now()
7783
index := c.getIndexName(table, syncTime)
7884
resp, err := c.client.Bulk(bytes.NewReader(buf.Bytes()),
7985
c.client.Bulk.WithContext(ctx),
8086
c.client.Bulk.WithIndex(index),
87+
c.client.Bulk.WithRefresh("wait_for"), // returns only once the data is written
8188
)
8289
if err != nil {
8390
return fmt.Errorf("failed to create bulk request: %w", err)
@@ -161,19 +168,6 @@ func (c *Client) getValueForElasticsearch(col arrow.Array, i int) any {
161168
return col.GetOneForMarshal(i)
162169
}
163170

164-
func pkIndexes(table *schema.Table) []int {
165-
pks := table.PrimaryKeys()
166-
if len(pks) == 0 {
167-
// if no PK is defined, use all columns for the ID which is based on the indices returned by this function
168-
pks = table.Columns.Names()
169-
}
170-
inds := make([]int, 0, len(pks))
171-
for _, col := range pks {
172-
inds = append(inds, table.Columns.Index(col))
173-
}
174-
return inds
175-
}
176-
177171
// elasticsearch IDs are limited to 512 bytes, so we hash the resource PK to make sure it's within the limit
178172
func resourceID(record arrow.Record, i int, pkIndexes []int) uint64 {
179173
parts := make([]string, 0, len(pkIndexes))

0 commit comments

Comments
 (0)