@@ -32,38 +32,37 @@ func (c *Client) Write(ctx context.Context, msgs <-chan message.WriteMessage) er
3232 return nil
3333}
3434
35- func (c * Client ) WriteTableBatch (ctx context.Context , name string , msgs message.WriteInserts ) error {
35+ func (c * Client ) WriteTableBatch (ctx context.Context , _ string , msgs message.WriteInserts ) error {
36+ if len (msgs ) == 0 {
37+ return nil
38+ }
39+
40+ // all messages correspond to the same table
41+ table := msgs [0 ].GetTable ()
42+ data := new (bytes.Buffer )
3643 for _ , msg := range msgs {
37- table := msg .GetTable ()
38- record := msg .Record
39- err := c .writeRecord (ctx , table , record )
40- if err != nil {
44+ if err := c .appendToWriteBuffer (table , msg .Record , data ); err != nil {
4145 return err
4246 }
4347 }
44- return nil
48+
49+ return c .writeData (ctx , table , data )
4550}
4651
47- func (c * Client ) writeRecord (ctx context.Context , table * schema.Table , record arrow.Record ) error {
48- var buf bytes.Buffer
49- pks := pkIndexes (table ) // do some work up front to avoid doing it for every resource
50- // get the sync time from the first resource in the batch (here we assume that all resources in the batch
51- // have the same sync time. At the moment this assumption holds.)
52- syncTime := time .Now ()
52+ func (c * Client ) appendToWriteBuffer (table * schema.Table , record arrow.Record , buf * bytes.Buffer ) error {
53+ pks := table .PrimaryKeysIndexes () // do some work up front to avoid doing it for every resource
5354 for r := 0 ; r < int (record .NumRows ()); r ++ {
5455 doc := map [string ]any {}
5556 for i , col := range record .Columns () {
56- doc [table . Columns [ i ]. Name ] = c .getValueForElasticsearch (col , r )
57+ doc [record . ColumnName ( i ) ] = c .getValueForElasticsearch (col , r )
5758 }
5859 data , err := json .Marshal (doc )
5960 if err != nil {
6061 return fmt .Errorf ("failed to marshal JSON: %w" , err )
6162 }
6263
6364 var meta []byte
64- hasPrimaryKeys := len (table .PrimaryKeys ()) > 0
65-
66- if hasPrimaryKeys {
65+ if len (pks ) > 0 {
6766 docID := fmt .Sprint (resourceID (record , r , pks ))
6867 meta = []byte (fmt .Sprintf (`{"index":{"_id":"%s"}}%s` , docID , "\n " ))
6968 } else {
@@ -74,10 +73,18 @@ func (c *Client) writeRecord(ctx context.Context, table *schema.Table, record ar
7473 buf .Write (meta )
7574 buf .Write (data )
7675 }
76+ return nil
77+ }
78+
79+ func (c * Client ) writeData (ctx context.Context , table * schema.Table , buf * bytes.Buffer ) error {
80+ // get the sync time from the first resource in the batch (here we assume that all resources in the batch
81+ // have the same sync time. At the moment this assumption holds.)
82+ syncTime := time .Now ()
7783 index := c .getIndexName (table , syncTime )
7884 resp , err := c .client .Bulk (bytes .NewReader (buf .Bytes ()),
7985 c .client .Bulk .WithContext (ctx ),
8086 c .client .Bulk .WithIndex (index ),
87+ c .client .Bulk .WithRefresh ("wait_for" ), // returns only once the data is written
8188 )
8289 if err != nil {
8390 return fmt .Errorf ("failed to create bulk request: %w" , err )
@@ -161,19 +168,6 @@ func (c *Client) getValueForElasticsearch(col arrow.Array, i int) any {
161168 return col .GetOneForMarshal (i )
162169}
163170
164- func pkIndexes (table * schema.Table ) []int {
165- pks := table .PrimaryKeys ()
166- if len (pks ) == 0 {
167- // if no PK is defined, use all columns for the ID which is based on the indices returned by this function
168- pks = table .Columns .Names ()
169- }
170- inds := make ([]int , 0 , len (pks ))
171- for _ , col := range pks {
172- inds = append (inds , table .Columns .Index (col ))
173- }
174- return inds
175- }
176-
177171// elasticsearch IDs are limited to 512 bytes, so we hash the resource PK to make sure it's within the limit
178172func resourceID (record arrow.Record , i int , pkIndexes []int ) uint64 {
179173 parts := make ([]string , 0 , len (pkIndexes ))
0 commit comments