Skip to content

Commit 0211c95

Browse files
authored
feat(bigtable): Add MergeToCell support to the bigtable emulator and client (#10366)
* feat(bigtable): Add MergeToCell support to the bigtable emulator and client * feat(bigtable): Add MergeToCell support to the bigtable emulator and client * add integration tests * add integration tests
1 parent a49ab59 commit 0211c95

File tree

5 files changed

+276
-2
lines changed

5 files changed

+276
-2
lines changed

bigtable/bigtable.go

+14
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,20 @@ func (m *Mutation) addToCell(family, column string, ts Timestamp, value *btpb.Va
11671167
}}})
11681168
}
11691169

1170+
// MergeBytesToCell merges a bytes accumulator value to a cell in an aggregate column family.
1171+
func (m *Mutation) MergeBytesToCell(family, column string, ts Timestamp, value []byte) {
1172+
m.mergeToCell(family, column, ts, &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: value}})
1173+
}
1174+
1175+
func (m *Mutation) mergeToCell(family, column string, ts Timestamp, value *btpb.Value) {
1176+
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_MergeToCell_{MergeToCell: &btpb.Mutation_MergeToCell{
1177+
FamilyName: family,
1178+
ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte(column)}},
1179+
Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: int64(ts.TruncateToMilliseconds())}},
1180+
Input: value,
1181+
}}})
1182+
}
1183+
11701184
// entryErr is a container that combines an entry with the error that was returned for it.
11711185
// Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed.
11721186
type entryErr struct {

bigtable/bigtable_test.go

+62-1
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ func TestHeaderPopulatedWithAppProfile(t *testing.T) {
754754
}
755755
}
756756

757-
func TestMutateRowsWithAggregates(t *testing.T) {
757+
func TestMutateRowsWithAggregates_AddToCell(t *testing.T) {
758758
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
759759
if err != nil {
760760
t.Fatalf("NewEmulatedEnv failed: %v", err)
@@ -814,3 +814,64 @@ func TestMutateRowsWithAggregates(t *testing.T) {
814814
t.Error()
815815
}
816816
}
817+
818+
func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) {
819+
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
820+
if err != nil {
821+
t.Fatalf("NewEmulatedEnv failed: %v", err)
822+
}
823+
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
824+
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
825+
)
826+
if err != nil {
827+
t.Fatalf("grpc.Dial failed: %v", err)
828+
}
829+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
830+
defer cancel()
831+
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
832+
if err != nil {
833+
t.Fatalf("NewClient failed: %v", err)
834+
}
835+
defer adminClient.Close()
836+
837+
tableConf := &TableConf{
838+
TableID: testEnv.config.Table,
839+
ColumnFamilies: map[string]Family{
840+
"f": {
841+
ValueType: AggregateType{
842+
Input: Int64Type{},
843+
Aggregator: SumAggregator{},
844+
},
845+
},
846+
},
847+
}
848+
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
849+
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
850+
}
851+
852+
client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
853+
if err != nil {
854+
t.Fatalf("NewClient failed: %v", err)
855+
}
856+
defer client.Close()
857+
table := client.Open(testEnv.config.Table)
858+
859+
m := NewMutation()
860+
m.MergeBytesToCell("f", "q", 0, binary.BigEndian.AppendUint64([]byte{}, 1000))
861+
err = table.Apply(ctx, "row1", m)
862+
if err != nil {
863+
t.Fatalf("Apply failed: %v", err)
864+
}
865+
866+
m = NewMutation()
867+
m.MergeBytesToCell("f", "q", 0, binary.BigEndian.AppendUint64([]byte{}, 2000))
868+
err = table.Apply(ctx, "row1", m)
869+
if err != nil {
870+
t.Fatalf("Apply failed: %v", err)
871+
}
872+
873+
row, err := table.ReadRow(ctx, "row1")
874+
if !bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) {
875+
t.Error()
876+
}
877+
}

bigtable/bttest/inmem.go

+29
Original file line numberDiff line numberDiff line change
@@ -1147,6 +1147,35 @@ func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*co
11471147
f := r.getOrCreateFamily(fam, fs[fam].order)
11481148
f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf)
11491149

1150+
case *btpb.Mutation_MergeToCell_:
1151+
add := mut.MergeToCell
1152+
var cf, ok = fs[add.FamilyName]
1153+
if !ok {
1154+
return fmt.Errorf("unknown family %q", add.FamilyName)
1155+
}
1156+
if cf.valueType == nil || cf.valueType.GetAggregateType() == nil {
1157+
return fmt.Errorf("illegal attempt to use MergeToCell on non-aggregate cell")
1158+
}
1159+
ts := add.Timestamp.GetRawTimestampMicros()
1160+
if ts < 0 {
1161+
return fmt.Errorf("MergeToCell must set timestamp >= 0")
1162+
}
1163+
1164+
fam := add.FamilyName
1165+
col := string(add.GetColumnQualifier().GetRawValue())
1166+
1167+
var value []byte
1168+
switch v := add.Input.Kind.(type) {
1169+
case *btpb.Value_RawValue:
1170+
value = v.RawValue
1171+
default:
1172+
return fmt.Errorf("only []bytes values are supported")
1173+
}
1174+
1175+
newCell := cell{ts: ts, value: value}
1176+
f := r.getOrCreateFamily(fam, fs[fam].order)
1177+
f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf)
1178+
11501179
case *btpb.Mutation_DeleteFromColumn_:
11511180
del := mut.DeleteFromColumn
11521181
if _, ok := fs[del.FamilyName]; !ok {

bigtable/bttest/inmem_test.go

+91-1
Original file line numberDiff line numberDiff line change
@@ -1794,7 +1794,7 @@ func TestFilters(t *testing.T) {
17941794
}
17951795
}
17961796

1797-
func TestMutateRowsAggregate(t *testing.T) {
1797+
func TestMutateRowsAggregate_AddToCell(t *testing.T) {
17981798
ctx := context.Background()
17991799

18001800
s := &server{
@@ -1884,6 +1884,96 @@ func TestMutateRowsAggregate(t *testing.T) {
18841884
}
18851885
}
18861886

1887+
func TestMutateRowsAggregate_MergeToCell(t *testing.T) {
1888+
ctx := context.Background()
1889+
1890+
s := &server{
1891+
tables: make(map[string]*table),
1892+
}
1893+
1894+
tblInfo, err := populateTable(ctx, s)
1895+
if err != nil {
1896+
t.Fatal(err)
1897+
}
1898+
1899+
_, err = s.ModifyColumnFamilies(ctx, &btapb.ModifyColumnFamiliesRequest{
1900+
Name: tblInfo.Name,
1901+
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
1902+
Id: "sum",
1903+
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{
1904+
Create: &btapb.ColumnFamily{
1905+
ValueType: &btapb.Type{
1906+
Kind: &btapb.Type_AggregateType{
1907+
AggregateType: &btapb.Type_Aggregate{
1908+
InputType: &btapb.Type{
1909+
Kind: &btapb.Type_Int64Type{},
1910+
},
1911+
Aggregator: &btapb.Type_Aggregate_Sum_{
1912+
Sum: &btapb.Type_Aggregate_Sum{},
1913+
},
1914+
},
1915+
},
1916+
},
1917+
},
1918+
}},
1919+
}})
1920+
1921+
if err != nil {
1922+
t.Fatal(err)
1923+
}
1924+
1925+
_, err = s.MutateRow(ctx, &btpb.MutateRowRequest{
1926+
TableName: tblInfo.GetName(),
1927+
RowKey: []byte("row1"),
1928+
Mutations: []*btpb.Mutation{{
1929+
Mutation: &btpb.Mutation_MergeToCell_{MergeToCell: &btpb.Mutation_MergeToCell{
1930+
FamilyName: "sum",
1931+
ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte("col1")}},
1932+
Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: 0}},
1933+
Input: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: binary.BigEndian.AppendUint64([]byte{}, 1)}},
1934+
}},
1935+
}},
1936+
})
1937+
1938+
if err != nil {
1939+
t.Fatal(err)
1940+
}
1941+
1942+
_, err = s.MutateRow(ctx, &btpb.MutateRowRequest{
1943+
TableName: tblInfo.GetName(),
1944+
RowKey: []byte("row1"),
1945+
Mutations: []*btpb.Mutation{{
1946+
Mutation: &btpb.Mutation_MergeToCell_{MergeToCell: &btpb.Mutation_MergeToCell{
1947+
FamilyName: "sum",
1948+
ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte("col1")}},
1949+
Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: 0}},
1950+
Input: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: binary.BigEndian.AppendUint64([]byte{}, 2)}},
1951+
}},
1952+
}},
1953+
})
1954+
1955+
if err != nil {
1956+
t.Fatal(err)
1957+
}
1958+
1959+
mock := &MockReadRowsServer{}
1960+
err = s.ReadRows(&btpb.ReadRowsRequest{
1961+
TableName: tblInfo.GetName(),
1962+
Rows: &btpb.RowSet{
1963+
RowKeys: [][]byte{
1964+
[]byte("row1"),
1965+
},
1966+
}}, mock)
1967+
if err != nil {
1968+
t.Fatal(err)
1969+
}
1970+
got := mock.responses[0]
1971+
1972+
if !bytes.Equal(got.Chunks[0].Value, binary.BigEndian.AppendUint64([]byte{}, 3)) {
1973+
t.Error()
1974+
}
1975+
}
1976+
18871977
func Test_Mutation_DeleteFromColumn(t *testing.T) {
18881978
ctx := context.Background()
18891979

bigtable/integration_test.go

+80
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package bigtable
1818

1919
import (
2020
"context"
21+
"encoding/binary"
2122
"flag"
2223
"fmt"
2324
"log"
@@ -280,6 +281,73 @@ func TestIntegration_ReadRowList(t *testing.T) {
280281
}
281282
}
282283

284+
func TestIntegration_Aggregates(t *testing.T) {
285+
ctx := context.Background()
286+
_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
287+
if err != nil {
288+
t.Fatal(err)
289+
}
290+
defer cleanup()
291+
key := "some-key"
292+
family := "sum"
293+
column := "col"
294+
mut := NewMutation()
295+
mut.AddIntToCell(family, column, 1000, 5)
296+
297+
// Add 5 to empty cell.
298+
if err := table.Apply(ctx, key, mut); err != nil {
299+
t.Fatalf("Mutating row %q: %v", key, err)
300+
}
301+
row, err := table.ReadRow(ctx, key)
302+
if err != nil {
303+
t.Fatalf("Reading a row: %v", err)
304+
}
305+
wantRow := Row{
306+
family: []ReadItem{
307+
{Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 5)},
308+
},
309+
}
310+
if !testutil.Equal(row, wantRow) {
311+
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
312+
}
313+
314+
// Add 5 again.
315+
if err := table.Apply(ctx, key, mut); err != nil {
316+
t.Fatalf("Mutating row %q: %v", key, err)
317+
}
318+
row, err = table.ReadRow(ctx, key)
319+
if err != nil {
320+
t.Fatalf("Reading a row: %v", err)
321+
}
322+
wantRow = Row{
323+
family: []ReadItem{
324+
{Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 10)},
325+
},
326+
}
327+
if !testutil.Equal(row, wantRow) {
328+
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
329+
}
330+
331+
// Merge 5, which translates in the backend to adding 5 for sum column families.
332+
mut2 := NewMutation()
333+
mut2.MergeBytesToCell(family, column, 1000, binary.BigEndian.AppendUint64([]byte{}, 5))
334+
if err := table.Apply(ctx, key, mut); err != nil {
335+
t.Fatalf("Mutating row %q: %v", key, err)
336+
}
337+
row, err = table.ReadRow(ctx, key)
338+
if err != nil {
339+
t.Fatalf("Reading a row: %v", err)
340+
}
341+
wantRow = Row{
342+
family: []ReadItem{
343+
{Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 15)},
344+
},
345+
}
346+
if !testutil.Equal(row, wantRow) {
347+
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
348+
}
349+
}
350+
283351
func TestIntegration_ReadRowListReverse(t *testing.T) {
284352
ctx := context.Background()
285353
_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
@@ -4221,6 +4289,18 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *C
42214289
return nil, nil, nil, nil, "", nil, err
42224290
}
42234291

4292+
err = retryOnUnavailable(ctx, func() error {
4293+
return adminClient.CreateColumnFamilyWithConfig(ctx, tableName, "sum", Family{ValueType: AggregateType{
4294+
Input: Int64Type{},
4295+
Aggregator: SumAggregator{},
4296+
}})
4297+
})
4298+
if err != nil {
4299+
cancel()
4300+
t.Logf("Error creating aggregate column family: %v", err)
4301+
return nil, nil, nil, nil, "", nil, err
4302+
}
4303+
42244304
return testEnv, client, adminClient, client.Open(tableName), tableName, func() {
42254305
if err := adminClient.DeleteTable(ctx, tableName); err != nil {
42264306
t.Errorf("DeleteTable got error %v", err)

0 commit comments

Comments
 (0)