Skip to content

Commit f50a922

Browse files
committed
UT for concurrent inject/extract metadata
Signed-off-by: Jin Dong <[email protected]>
1 parent ea5083f commit f50a922

File tree

1 file changed

+68
-1
lines changed

1 file changed

+68
-1
lines changed

interceptor_test.go

+68-1
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,14 @@ import (
3737
"net"
3838
"runtime"
3939
"strings"
40+
"sync"
4041
"testing"
4142

4243
"github.com/containerd/otelttrpc/internal"
4344
"github.com/containerd/ttrpc"
4445
"github.com/stretchr/testify/assert"
46+
"go.opentelemetry.io/otel"
47+
"go.opentelemetry.io/otel/propagation"
4548
sdktrace "go.opentelemetry.io/otel/sdk/trace"
4649
"go.opentelemetry.io/otel/sdk/trace/tracetest"
4750
"go.opentelemetry.io/otel/trace"
@@ -88,9 +91,71 @@ func (s *testingServer) Test(ctx context.Context, req *internal.TestPayload) (*i
8891
return tp, nil
8992
}
9093

94+
func TestClientCallServerConcurrent(t *testing.T) {
95+
var (
96+
ctx = ttrpc.WithMetadata(context.Background(), ttrpc.MD{"test-key": []string{"test-val"}})
97+
exp, tp = newTracerProvider()
98+
server = mustServer(t)(newServerWithTTRPCInterceptor(tp))
99+
testImpl = &testingServer{}
100+
addr, listener = newTestListener(t)
101+
payload = &internal.TestPayload{
102+
Foo: "bar",
103+
}
104+
)
105+
106+
concurrency := 30
107+
testClients := make([]*testingClient, 0, concurrency)
108+
for i := 0; i < concurrency; i++ {
109+
client, cleanup := newTestClient(t, addr, tp)
110+
testClients = append(testClients, newTestingClient(client))
111+
defer cleanup()
112+
}
113+
defer listener.Close()
114+
defer func() { _ = tp.Shutdown(ctx) }()
115+
116+
registerTestingService(server, testImpl)
117+
118+
go func() {
119+
_ = server.Serve(ctx, listener)
120+
}()
121+
defer func() {
122+
_ = server.Shutdown(ctx)
123+
}()
124+
125+
var wg sync.WaitGroup
126+
var errs []error
127+
var mu sync.Mutex
128+
129+
for _, testClient := range testClients {
130+
// capture range variable
131+
// TODO: we can remove this once we upgrade golang to >= 1.22
132+
testClient := testClient
133+
wg.Add(1)
134+
go func() {
135+
defer wg.Done()
136+
if _, err := testClient.Test(ctx, payload); err != nil {
137+
mu.Lock()
138+
defer mu.Unlock()
139+
errs = append(errs, err)
140+
}
141+
}()
142+
}
143+
144+
wg.Wait()
145+
if len(errs) > 0 {
146+
t.Fatalf("unexpected errors: %v", errs)
147+
}
148+
149+
// get exported spans
150+
snapshots := exp.GetSpans().Snapshots()
151+
// we should capture `concurrency * 2` spans, one each from client and server side
152+
// TODO: validate individual spans and their attributes
153+
assert.Equal(t, concurrency*2, len(snapshots), "Number of spans mismatched")
154+
}
155+
91156
func TestClientCallServer(t *testing.T) {
92157
var (
93-
ctx = context.Background()
158+
ctx = ttrpc.WithMetadata(context.Background(), ttrpc.MD{"test-key": []string{"test-val"}})
94159
exp, tp = newTracerProvider()
95160
server = mustServer(t)(newServerWithTTRPCInterceptor(tp))
96161
testImpl = &testingServer{}
@@ -153,6 +218,8 @@ func newTracerProvider() (*tracetest.InMemoryExporter, *sdktrace.TracerProvider)
153218
tp := sdktrace.NewTracerProvider(
154219
sdktrace.WithSyncer(exp),
155220
)
221+
222+
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
156223
return exp, tp
157224
}
158225

0 commit comments

Comments
 (0)