Skip to content

Commit b04dc50

Browse files
authored
feat: gRPC sync endpoint metrics (#1861)
<!-- Please use this template for your pull request. --> <!-- Please use the sections that you need and delete other sections --> ## This PR Adds OpenTelemetry metrics for the gRPC flag sync service to enable monitoring of sync connections and operations. ### Metrics **Custom Metrics:** | Metric | Type | Description | | ------ | ---- | ----------- | | `feature_flag.flagd.sync.active_streams` | Gauge | Currently active streaming connections | | `feature_flag.flagd.sync.stream.duration` | Histogram | Duration of streaming connections (seconds) | **Standard gRPC Metrics:** - Leverages `otelgrpc.NewServerHandler()` for comprehensive gRPC server metrics (request counts, latencies, status codes, etc.) ### Changes - Add custom sync metric definitions in `core/pkg/telemetry/metrics.go` - Instrument `SyncFlags` streaming RPC with stream lifecycle metrics - Enable standard gRPC metrics via OpenTelemetry gRPC instrumentation - Add unit tests for metric collection, histogram buckets, and NoopMetricsRecorder --------- Signed-off-by: Alexandre Chakroun <[email protected]>
1 parent 947af79 commit b04dc50

File tree

7 files changed

+202
-55
lines changed

7 files changed

+202
-55
lines changed

core/pkg/telemetry/metrics.go

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"time"
66

77
"github.com/prometheus/client_golang/prometheus"
8+
"go.opentelemetry.io/otel"
89
"go.opentelemetry.io/otel/attribute"
910
"go.opentelemetry.io/otel/metric"
1011
"go.opentelemetry.io/otel/sdk/instrumentation"
@@ -14,16 +15,20 @@ import (
1415
)
1516

1617
const (
17-
ProviderName = "flagd"
18+
ProviderName = "flagd"
19+
featureFlagPrefix = "feature_flag."
1820

1921
FeatureFlagReasonKey = attribute.Key("feature_flag.reason")
2022
ExceptionTypeKey = attribute.Key("ExceptionTypeKeyName")
2123

2224
httpRequestDurationMetric = "http.server.request.duration"
2325
httpResponseSizeMetric = "http.server.response.body.size"
2426
httpActiveRequestsMetric = "http.server.active_requests"
25-
impressionMetric = "feature_flag." + ProviderName + ".impression"
26-
reasonMetric = "feature_flag." + ProviderName + ".result.reason"
27+
impressionMetric = featureFlagPrefix + ProviderName + ".impression"
28+
reasonMetric = featureFlagPrefix + ProviderName + ".result.reason"
29+
30+
syncActiveStreamsMetric = featureFlagPrefix + ProviderName + ".sync.active_streams"
31+
syncStreamDurationMetric = featureFlagPrefix + ProviderName + ".sync.stream.duration"
2732
)
2833

2934
type IMetricsRecorder interface {
@@ -34,6 +39,10 @@ type IMetricsRecorder interface {
3439
InFlightRequestEnd(ctx context.Context, attrs []attribute.KeyValue)
3540
RecordEvaluation(ctx context.Context, err error, reason, variant, key string)
3641
Impressions(ctx context.Context, reason, variant, key string)
42+
// gRPC Sync metrics
43+
SyncStreamStart(ctx context.Context, attrs []attribute.KeyValue)
44+
SyncStreamEnd(ctx context.Context, attrs []attribute.KeyValue)
45+
SyncStreamDuration(ctx context.Context, duration time.Duration, attrs []attribute.KeyValue)
3746
}
3847

3948
type NoopMetricsRecorder struct{}
@@ -60,12 +69,27 @@ func (NoopMetricsRecorder) RecordEvaluation(_ context.Context, _ error, _, _, _
6069
func (NoopMetricsRecorder) Impressions(_ context.Context, _, _, _ string) {
6170
}
6271

72+
func (NoopMetricsRecorder) SyncStreamStart(_ context.Context, _ []attribute.KeyValue) {
73+
// No-op implementation: intentionally does nothing
74+
}
75+
76+
func (NoopMetricsRecorder) SyncStreamEnd(_ context.Context, _ []attribute.KeyValue) {
77+
// No-op implementation: intentionally does nothing
78+
}
79+
80+
func (NoopMetricsRecorder) SyncStreamDuration(_ context.Context, _ time.Duration, _ []attribute.KeyValue) {
81+
// No-op implementation: intentionally does nothing
82+
}
83+
6384
type MetricsRecorder struct {
6485
httpRequestDurHistogram metric.Float64Histogram
6586
httpResponseSizeHistogram metric.Float64Histogram
6687
httpRequestsInflight metric.Int64UpDownCounter
6788
impressions metric.Int64Counter
6889
reasons metric.Int64Counter
90+
// gRPC Sync metrics
91+
syncActiveStreams metric.Int64UpDownCounter
92+
syncStreamDuration metric.Float64Histogram
6993
}
7094

7195
func (r MetricsRecorder) HTTPAttributes(svcName, url, method, code, scheme string) []attribute.KeyValue {
@@ -122,6 +146,18 @@ func (r MetricsRecorder) Reasons(ctx context.Context, key string, reason string,
122146
r.reasons.Add(ctx, 1, metric.WithAttributes(attrs...))
123147
}
124148

149+
func (r MetricsRecorder) SyncStreamStart(ctx context.Context, attrs []attribute.KeyValue) {
150+
r.syncActiveStreams.Add(ctx, 1, metric.WithAttributes(attrs...))
151+
}
152+
153+
func (r MetricsRecorder) SyncStreamEnd(ctx context.Context, attrs []attribute.KeyValue) {
154+
r.syncActiveStreams.Add(ctx, -1, metric.WithAttributes(attrs...))
155+
}
156+
157+
func (r MetricsRecorder) SyncStreamDuration(ctx context.Context, duration time.Duration, attrs []attribute.KeyValue) {
158+
r.syncStreamDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...))
159+
}
160+
125161
func getDurationView(svcName, viewName string, bucket []float64) msdk.View {
126162
return msdk.NewView(
127163
msdk.Instrument{
@@ -156,10 +192,15 @@ func NewOTelRecorder(exporter msdk.Reader, resource *resource.Resource, serviceN
156192
msdk.WithView(getDurationView(serviceName, httpRequestDurationMetric, prometheus.DefBuckets)),
157193
// for response size we want 8 exponential bucket starting from 100 Bytes
158194
msdk.WithView(getDurationView(serviceName, httpResponseSizeMetric, prometheus.ExponentialBuckets(100, 10, 8))),
195+
// for gRPC sync stream duration: 30s, 1min, 2min, 5min, 8min, 10min, 20min, 30min, 1h, 3h
196+
msdk.WithView(getDurationView(serviceName, syncStreamDurationMetric, []float64{30, 60, 120, 300, 480, 600, 1200, 1800, 3600, 10800})),
159197
// set entity producing telemetry
160198
msdk.WithResource(resource),
161199
)
162200

201+
// Set as global MeterProvider so otelgrpc and other instrumentation can use it
202+
otel.SetMeterProvider(provider)
203+
163204
meter := provider.Meter(serviceName)
164205

165206
// we can ignore errors from OpenTelemetry since they could occur if we select the wrong aggregator
@@ -188,11 +229,26 @@ func NewOTelRecorder(exporter msdk.Reader, resource *resource.Resource, serviceN
188229
metric.WithDescription("Measures the number of evaluations for a given reason."),
189230
metric.WithUnit("{reason}"),
190231
)
232+
233+
// gRPC Sync metrics
234+
syncActiveStreams, _ := meter.Int64UpDownCounter(
235+
syncActiveStreamsMetric,
236+
metric.WithDescription("Measures the number of currently active gRPC sync streaming connections."),
237+
metric.WithUnit("{stream}"),
238+
)
239+
syncStreamDuration, _ := meter.Float64Histogram(
240+
syncStreamDurationMetric,
241+
metric.WithDescription("Measures the duration of gRPC sync streaming connections."),
242+
metric.WithUnit("s"),
243+
)
244+
191245
return &MetricsRecorder{
192246
httpRequestDurHistogram: hduration,
193247
httpResponseSizeHistogram: hsize,
194248
httpRequestsInflight: reqCounter,
195249
impressions: impressions,
196250
reasons: reasons,
251+
syncActiveStreams: syncActiveStreams,
252+
syncStreamDuration: syncStreamDuration,
197253
}
198254
}

core/pkg/telemetry/metrics_test.go

Lines changed: 86 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package telemetry
22

33
import (
4-
"context"
54
"fmt"
65
"testing"
76
"time"
@@ -103,40 +102,40 @@ func TestMetrics(t *testing.T) {
103102
semconv.ServiceNameKey.String(svcName),
104103
}
105104
const n = 5
106-
type MetricF func(exp metric.Reader)
105+
type MetricF func(t *testing.T, exp metric.Reader)
107106
tests := []struct {
108107
name string
109108
metricFunc MetricF
110109
metricsLen int
111110
}{
112111
{
113112
name: "HTTPRequestDuration",
114-
metricFunc: func(exp metric.Reader) {
113+
metricFunc: func(t *testing.T, exp metric.Reader) {
115114
rs := resource.NewWithAttributes("testSchema")
116115
rec := NewOTelRecorder(exp, rs, svcName)
117116
for i := 0; i < n; i++ {
118-
rec.HTTPRequestDuration(context.TODO(), 10, attrs)
117+
rec.HTTPRequestDuration(t.Context(), 10, attrs)
119118
}
120119
},
121120
metricsLen: 1,
122121
},
123122
{
124123
name: "HTTPResponseSize",
125-
metricFunc: func(exp metric.Reader) {
124+
metricFunc: func(t *testing.T, exp metric.Reader) {
126125
rs := resource.NewWithAttributes("testSchema")
127126
rec := NewOTelRecorder(exp, rs, svcName)
128127
for i := 0; i < n; i++ {
129-
rec.HTTPResponseSize(context.TODO(), 100, attrs)
128+
rec.HTTPResponseSize(t.Context(), 100, attrs)
130129
}
131130
},
132131
metricsLen: 1,
133132
},
134133
{
135134
name: "InFlightRequestStart",
136-
metricFunc: func(exp metric.Reader) {
135+
metricFunc: func(t *testing.T, exp metric.Reader) {
137136
rs := resource.NewWithAttributes("testSchema")
138137
rec := NewOTelRecorder(exp, rs, svcName)
139-
ctx := context.TODO()
138+
ctx := t.Context()
140139
for i := 0; i < n; i++ {
141140
rec.InFlightRequestStart(ctx, attrs)
142141
rec.InFlightRequestEnd(ctx, attrs)
@@ -146,54 +145,78 @@ func TestMetrics(t *testing.T) {
146145
},
147146
{
148147
name: "Impressions",
149-
metricFunc: func(exp metric.Reader) {
148+
metricFunc: func(t *testing.T, exp metric.Reader) {
150149
rs := resource.NewWithAttributes("testSchema")
151150
rec := NewOTelRecorder(exp, rs, svcName)
152151
for i := 0; i < n; i++ {
153-
rec.Impressions(context.TODO(), "reason", "variant", "key")
152+
rec.Impressions(t.Context(), "reason", "variant", "key")
154153
}
155154
},
156155
metricsLen: 1,
157156
},
158157
{
159158
name: "Reasons",
160-
metricFunc: func(exp metric.Reader) {
159+
metricFunc: func(t *testing.T, exp metric.Reader) {
161160
rs := resource.NewWithAttributes("testSchema")
162161
rec := NewOTelRecorder(exp, rs, svcName)
163162
for i := 0; i < n; i++ {
164-
rec.Reasons(context.TODO(), "keyA", "reason", nil)
163+
rec.Reasons(t.Context(), "keyA", "reason", nil)
165164
}
166165
for i := 0; i < n; i++ {
167-
rec.Reasons(context.TODO(), "keyB", "error", fmt.Errorf("err not found"))
166+
rec.Reasons(t.Context(), "keyB", "error", fmt.Errorf("err not found"))
168167
}
169168
},
170169
metricsLen: 1,
171170
},
172171
{
173172
name: "RecordEvaluations",
174-
metricFunc: func(exp metric.Reader) {
173+
metricFunc: func(t *testing.T, exp metric.Reader) {
175174
rs := resource.NewWithAttributes("testSchema")
176175
rec := NewOTelRecorder(exp, rs, svcName)
177176
for i := 0; i < n; i++ {
178-
rec.RecordEvaluation(context.TODO(), nil, "reason", "variant", "key")
177+
rec.RecordEvaluation(t.Context(), nil, "reason", "variant", "key")
179178
}
180179
for i := 0; i < n; i++ {
181-
rec.RecordEvaluation(context.TODO(), fmt.Errorf("general"), "error", "variant", "key")
180+
rec.RecordEvaluation(t.Context(), fmt.Errorf("general"), "error", "variant", "key")
182181
}
183182
for i := 0; i < n; i++ {
184-
rec.RecordEvaluation(context.TODO(), fmt.Errorf("not found"), "error", "variant", "key")
183+
rec.RecordEvaluation(t.Context(), fmt.Errorf("not found"), "error", "variant", "key")
185184
}
186185
},
187186
metricsLen: 2,
188187
},
188+
{
189+
name: "SyncActiveStreams",
190+
metricFunc: func(t *testing.T, exp metric.Reader) {
191+
rs := resource.NewWithAttributes("testSchema")
192+
rec := NewOTelRecorder(exp, rs, svcName)
193+
ctx := t.Context()
194+
for i := 0; i < n; i++ {
195+
rec.SyncStreamStart(ctx, attrs)
196+
rec.SyncStreamEnd(ctx, attrs)
197+
}
198+
},
199+
metricsLen: 1,
200+
},
201+
{
202+
name: "SyncStreamDuration",
203+
metricFunc: func(t *testing.T, exp metric.Reader) {
204+
rs := resource.NewWithAttributes("testSchema")
205+
rec := NewOTelRecorder(exp, rs, svcName)
206+
for i := 0; i < n; i++ {
207+
rec.SyncStreamDuration(t.Context(), 100*time.Millisecond, attrs)
208+
}
209+
},
210+
metricsLen: 1,
211+
},
189212
}
190213

191214
for _, tt := range tests {
192215
t.Run(tt.name, func(t *testing.T) {
193216
exp := metric.NewManualReader()
194-
tt.metricFunc(exp)
217+
tt.metricFunc(t, exp)
195218
var data metricdata.ResourceMetrics
196-
err := exp.Collect(context.TODO(), &data)
219+
err := exp.Collect(t.Context(), &data)
197220
if err != nil {
198221
t.Errorf("Got %v", err)
199222
}
@@ -211,39 +234,54 @@ func TestMetrics(t *testing.T) {
211234
}
212235

213236
// some really simple tests just to make sure all methods are actually implemented and nothing panics
214-
func TestNoopMetricsRecorder_HTTPAttributes(t *testing.T) {
237+
func TestNoopMetricsRecorderHTTPAttributes(t *testing.T) {
215238
no := NoopMetricsRecorder{}
216239
got := no.HTTPAttributes("", "", "", "", "")
217240
require.Empty(t, got)
218241
}
219242

220-
func TestNoopMetricsRecorder_HTTPRequestDuration(_ *testing.T) {
243+
func TestNoopMetricsRecorderHTTPRequestDuration(t *testing.T) {
244+
no := NoopMetricsRecorder{}
245+
no.HTTPRequestDuration(t.Context(), 0, nil)
246+
}
247+
248+
func TestNoopMetricsRecorderInFlightRequestStart(t *testing.T) {
221249
no := NoopMetricsRecorder{}
222-
no.HTTPRequestDuration(context.TODO(), 0, nil)
250+
no.InFlightRequestStart(t.Context(), nil)
223251
}
224252

225-
func TestNoopMetricsRecorder_InFlightRequestStart(_ *testing.T) {
253+
func TestNoopMetricsRecorderInFlightRequestEnd(t *testing.T) {
226254
no := NoopMetricsRecorder{}
227-
no.InFlightRequestStart(context.TODO(), nil)
255+
no.InFlightRequestEnd(t.Context(), nil)
228256
}
229257

230-
func TestNoopMetricsRecorder_InFlightRequestEnd(_ *testing.T) {
258+
func TestNoopMetricsRecorderRecordEvaluation(t *testing.T) {
231259
no := NoopMetricsRecorder{}
232-
no.InFlightRequestEnd(context.TODO(), nil)
260+
no.RecordEvaluation(t.Context(), nil, "", "", "")
233261
}
234262

235-
func TestNoopMetricsRecorder_RecordEvaluation(_ *testing.T) {
263+
func TestNoopMetricsRecorderImpressions(t *testing.T) {
236264
no := NoopMetricsRecorder{}
237-
no.RecordEvaluation(context.TODO(), nil, "", "", "")
265+
no.Impressions(t.Context(), "", "", "")
238266
}
239267

240-
func TestNoopMetricsRecorder_Impressions(_ *testing.T) {
268+
func TestNoopMetricsRecorderSyncStreamStart(t *testing.T) {
241269
no := NoopMetricsRecorder{}
242-
no.Impressions(context.TODO(), "", "", "")
270+
no.SyncStreamStart(t.Context(), nil)
271+
}
272+
273+
func TestNoopMetricsRecorderSyncStreamEnd(t *testing.T) {
274+
no := NoopMetricsRecorder{}
275+
no.SyncStreamEnd(t.Context(), nil)
276+
}
277+
278+
func TestNoopMetricsRecorderSyncStreamDuration(t *testing.T) {
279+
no := NoopMetricsRecorder{}
280+
no.SyncStreamDuration(t.Context(), 0, nil)
243281
}
244282

245283
// testHistogramBuckets is a helper function that tests histogram bucket configuration
246-
func testHistogramBuckets(t *testing.T, metricName string, expectedBounds []float64, recordMetric func(rec *MetricsRecorder, attrs []attribute.KeyValue), assertMsg string) {
284+
func testHistogramBuckets(t *testing.T, metricName string, expectedBounds []float64, recordMetric func(t *testing.T, rec *MetricsRecorder, attrs []attribute.KeyValue), assertMsg string) {
247285
t.Helper()
248286
const testSvcName = "testService"
249287
exp := metric.NewManualReader()
@@ -253,10 +291,10 @@ func testHistogramBuckets(t *testing.T, metricName string, expectedBounds []floa
253291
attrs := []attribute.KeyValue{
254292
semconv.ServiceNameKey.String(testSvcName),
255293
}
256-
recordMetric(rec, attrs)
294+
recordMetric(t, rec, attrs)
257295

258296
var data metricdata.ResourceMetrics
259-
err := exp.Collect(context.TODO(), &data)
297+
err := exp.Collect(t.Context(), &data)
260298
require.NoError(t, err)
261299

262300
require.Len(t, data.ScopeMetrics, 1)
@@ -282,8 +320,8 @@ func TestHTTPRequestDurationBuckets(t *testing.T) {
282320
testHistogramBuckets(t,
283321
httpRequestDurationMetric,
284322
prometheus.DefBuckets,
285-
func(rec *MetricsRecorder, attrs []attribute.KeyValue) {
286-
rec.HTTPRequestDuration(context.TODO(), 100*time.Millisecond, attrs)
323+
func(t *testing.T, rec *MetricsRecorder, attrs []attribute.KeyValue) {
324+
rec.HTTPRequestDuration(t.Context(), 100*time.Millisecond, attrs)
287325
},
288326
"Expected histogram buckets to match prometheus.DefBuckets",
289327
)
@@ -293,9 +331,20 @@ func TestHTTPResponseSizeBuckets(t *testing.T) {
293331
testHistogramBuckets(t,
294332
httpResponseSizeMetric,
295333
prometheus.ExponentialBuckets(100, 10, 8),
296-
func(rec *MetricsRecorder, attrs []attribute.KeyValue) {
297-
rec.HTTPResponseSize(context.TODO(), 500, attrs)
334+
func(t *testing.T, rec *MetricsRecorder, attrs []attribute.KeyValue) {
335+
rec.HTTPResponseSize(t.Context(), 500, attrs)
298336
},
299337
"Expected histogram buckets to match exponential buckets (100, 10, 8)",
300338
)
301339
}
340+
341+
func TestGRPCSyncStreamDurationBuckets(t *testing.T) {
342+
testHistogramBuckets(t,
343+
syncStreamDurationMetric,
344+
[]float64{30, 60, 120, 300, 480, 600, 1200, 1800, 3600, 10800},
345+
func(t *testing.T, rec *MetricsRecorder, attrs []attribute.KeyValue) {
346+
rec.SyncStreamDuration(t.Context(), 100*time.Millisecond, attrs)
347+
},
348+
"Expected histogram buckets for long-lived sync streams (30s, 1min, 2min, 5min, 8min, 10min, 20min, 30min, 1h, 3h)",
349+
)
350+
}

0 commit comments

Comments
 (0)