Skip to content

Commit a49ab59

Browse files
authored
feat(bigtable): Add support for new functions (#10582)
* feat(bigtable): Add support for additional aggregate functions * remove merge leftover * remove merge leftover * add new types to switch statement * fix hll name * fix hll name * fix hll name * renamed hll
1 parent 6bd2596 commit a49ab59

File tree

2 files changed

+136
-1
lines changed

2 files changed

+136
-1
lines changed

bigtable/type.go

+27
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,27 @@ func (sum SumAggregator) fillProto(proto *btapb.Type_Aggregate) {
150150
proto.Aggregator = &btapb.Type_Aggregate_Sum_{Sum: &btapb.Type_Aggregate_Sum{}}
151151
}
152152

153+
// MinAggregator is an aggregation function that finds the minimum between the input and the accumulator.
154+
type MinAggregator struct{}
155+
156+
func (min MinAggregator) fillProto(proto *btapb.Type_Aggregate) {
157+
proto.Aggregator = &btapb.Type_Aggregate_Min_{Min: &btapb.Type_Aggregate_Min{}}
158+
}
159+
160+
// MaxAggregator is an aggregation function that finds the maximum between the input and the accumulator.
161+
type MaxAggregator struct{}
162+
163+
func (max MaxAggregator) fillProto(proto *btapb.Type_Aggregate) {
164+
proto.Aggregator = &btapb.Type_Aggregate_Max_{Max: &btapb.Type_Aggregate_Max{}}
165+
}
166+
167+
// HllppUniqueCountAggregator is an aggregation function that calculates the unique count of inputs and the accumulator.
168+
type HllppUniqueCountAggregator struct{}
169+
170+
func (hll HllppUniqueCountAggregator) fillProto(proto *btapb.Type_Aggregate) {
171+
proto.Aggregator = &btapb.Type_Aggregate_HllppUniqueCount{HllppUniqueCount: &btapb.Type_Aggregate_HyperLogLogPlusPlusUniqueCount{}}
172+
}
173+
153174
type unknownAggregator struct {
154175
wrapped *btapb.Type_Aggregate
155176
}
@@ -238,6 +259,12 @@ func aggregateProtoToType(agg *btapb.Type_Aggregate) Type {
238259
switch agg.Aggregator.(type) {
239260
case *btapb.Type_Aggregate_Sum_:
240261
aggregator = SumAggregator{}
262+
case *btapb.Type_Aggregate_Min_:
263+
aggregator = MinAggregator{}
264+
case *btapb.Type_Aggregate_Max_:
265+
aggregator = MaxAggregator{}
266+
case *btapb.Type_Aggregate_HllppUniqueCount:
267+
aggregator = HllppUniqueCountAggregator{}
241268
default:
242269
aggregator = unknownAggregator{wrapped: agg}
243270
}

bigtable/type_test.go

+109-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestStringProto(t *testing.T) {
7070
}
7171
}
7272

73-
func TestAggregateProto(t *testing.T) {
73+
func TestSumAggregateProto(t *testing.T) {
7474
want := &btapb.Type{
7575
Kind: &btapb.Type_AggregateType{
7676
AggregateType: &btapb.Type_Aggregate{
@@ -114,6 +114,114 @@ func TestProtoBijection(t *testing.T) {
114114
}
115115
}
116116

117+
func TestMinAggregateProto(t *testing.T) {
118+
want := &btapb.Type{
119+
Kind: &btapb.Type_AggregateType{
120+
AggregateType: &btapb.Type_Aggregate{
121+
InputType: &btapb.Type{
122+
Kind: &btapb.Type_Int64Type{
123+
Int64Type: &btapb.Type_Int64{
124+
Encoding: &btapb.Type_Int64_Encoding{
125+
Encoding: &btapb.Type_Int64_Encoding_BigEndianBytes_{
126+
BigEndianBytes: &btapb.Type_Int64_Encoding_BigEndianBytes{
127+
BytesType: &btapb.Type_Bytes{
128+
Encoding: &btapb.Type_Bytes_Encoding{
129+
Encoding: &btapb.Type_Bytes_Encoding_Raw_{
130+
Raw: &btapb.Type_Bytes_Encoding_Raw{},
131+
},
132+
},
133+
},
134+
},
135+
},
136+
},
137+
},
138+
},
139+
},
140+
Aggregator: &btapb.Type_Aggregate_Min_{
141+
Min: &btapb.Type_Aggregate_Min{},
142+
},
143+
},
144+
},
145+
}
146+
147+
got := AggregateType{Input: Int64Type{}, Aggregator: MinAggregator{}}.proto()
148+
if !proto.Equal(got, want) {
149+
t.Errorf("got type %v, want: %v", got, want)
150+
}
151+
}
152+
153+
func TestMaxAggregateProto(t *testing.T) {
154+
want := &btapb.Type{
155+
Kind: &btapb.Type_AggregateType{
156+
AggregateType: &btapb.Type_Aggregate{
157+
InputType: &btapb.Type{
158+
Kind: &btapb.Type_Int64Type{
159+
Int64Type: &btapb.Type_Int64{
160+
Encoding: &btapb.Type_Int64_Encoding{
161+
Encoding: &btapb.Type_Int64_Encoding_BigEndianBytes_{
162+
BigEndianBytes: &btapb.Type_Int64_Encoding_BigEndianBytes{
163+
BytesType: &btapb.Type_Bytes{
164+
Encoding: &btapb.Type_Bytes_Encoding{
165+
Encoding: &btapb.Type_Bytes_Encoding_Raw_{
166+
Raw: &btapb.Type_Bytes_Encoding_Raw{},
167+
},
168+
},
169+
},
170+
},
171+
},
172+
},
173+
},
174+
},
175+
},
176+
Aggregator: &btapb.Type_Aggregate_Max_{
177+
Max: &btapb.Type_Aggregate_Max{},
178+
},
179+
},
180+
},
181+
}
182+
183+
got := AggregateType{Input: Int64Type{}, Aggregator: MaxAggregator{}}.proto()
184+
if !proto.Equal(got, want) {
185+
t.Errorf("got type %v, want: %v", got, want)
186+
}
187+
}
188+
189+
func TestHllAggregateProto(t *testing.T) {
190+
want := &btapb.Type{
191+
Kind: &btapb.Type_AggregateType{
192+
AggregateType: &btapb.Type_Aggregate{
193+
InputType: &btapb.Type{
194+
Kind: &btapb.Type_Int64Type{
195+
Int64Type: &btapb.Type_Int64{
196+
Encoding: &btapb.Type_Int64_Encoding{
197+
Encoding: &btapb.Type_Int64_Encoding_BigEndianBytes_{
198+
BigEndianBytes: &btapb.Type_Int64_Encoding_BigEndianBytes{
199+
BytesType: &btapb.Type_Bytes{
200+
Encoding: &btapb.Type_Bytes_Encoding{
201+
Encoding: &btapb.Type_Bytes_Encoding_Raw_{
202+
Raw: &btapb.Type_Bytes_Encoding_Raw{},
203+
},
204+
},
205+
},
206+
},
207+
},
208+
},
209+
},
210+
},
211+
},
212+
Aggregator: &btapb.Type_Aggregate_HllppUniqueCount{
213+
HllppUniqueCount: &btapb.Type_Aggregate_HyperLogLogPlusPlusUniqueCount{},
214+
},
215+
},
216+
},
217+
}
218+
219+
got := AggregateType{Input: Int64Type{}, Aggregator: HllppUniqueCountAggregator{}}.proto()
220+
if !proto.Equal(got, want) {
221+
t.Errorf("got type %v, want: %v", got, want)
222+
}
223+
}
224+
117225
func TestNilChecks(t *testing.T) {
118226
// protoToType
119227
if val, ok := protoToType(nil).(unknown[btapb.Type]); !ok {

0 commit comments

Comments
 (0)