@@ -37,11 +37,14 @@ import (
37
37
"net"
38
38
"runtime"
39
39
"strings"
40
+ "sync"
40
41
"testing"
41
42
42
43
"github.com/containerd/otelttrpc/internal"
43
44
"github.com/containerd/ttrpc"
44
45
"github.com/stretchr/testify/assert"
46
+ "go.opentelemetry.io/otel"
47
+ "go.opentelemetry.io/otel/propagation"
45
48
sdktrace "go.opentelemetry.io/otel/sdk/trace"
46
49
"go.opentelemetry.io/otel/sdk/trace/tracetest"
47
50
"go.opentelemetry.io/otel/trace"
@@ -88,9 +91,71 @@ func (s *testingServer) Test(ctx context.Context, req *internal.TestPayload) (*i
88
91
return tp , nil
89
92
}
90
93
94
+ func TestClientCallServerConcurrent (t * testing.T ) {
95
+ var (
96
+ ctx = ttrpc .WithMetadata (context .Background (), ttrpc.MD {"test-key" : []string {"test-val" }})
97
+ exp , tp = newTracerProvider ()
98
+ server = mustServer (t )(newServerWithTTRPCInterceptor (tp ))
99
+ testImpl = & testingServer {}
100
+ addr , listener = newTestListener (t )
101
+ payload = & internal.TestPayload {
102
+ Foo : "bar" ,
103
+ }
104
+ )
105
+
106
+ concurrency := 30
107
+ testClients := make ([]* testingClient , 0 , concurrency )
108
+ for i := 0 ; i < concurrency ; i ++ {
109
+ client , cleanup := newTestClient (t , addr , tp )
110
+ testClients = append (testClients , newTestingClient (client ))
111
+ defer cleanup ()
112
+ }
113
+ defer listener .Close ()
114
+ defer func () { _ = tp .Shutdown (ctx ) }()
115
+
116
+ registerTestingService (server , testImpl )
117
+
118
+ go func () {
119
+ _ = server .Serve (ctx , listener )
120
+ }()
121
+ defer func () {
122
+ _ = server .Shutdown (ctx )
123
+ }()
124
+
125
+ var wg sync.WaitGroup
126
+ var errs []error
127
+ var mu sync.Mutex
128
+
129
+ for _ , testClient := range testClients {
130
+ // capture range variable
131
+ // TODO: we can remove this once we upgrade golang to >= 1.22
132
+ testClient := testClient
133
+ wg .Add (1 )
134
+ go func () {
135
+ defer wg .Done ()
136
+ if _ , err := testClient .Test (ctx , payload ); err != nil {
137
+ mu .Lock ()
138
+ defer mu .Unlock ()
139
+ errs = append (errs , err )
140
+ }
141
+ }()
142
+ }
143
+
144
+ wg .Wait ()
145
+ if len (errs ) > 0 {
146
+ t .Fatalf ("unexpected errors: %v" , errs )
147
+ }
148
+
149
+ // get exported spans
150
+ snapshots := exp .GetSpans ().Snapshots ()
151
+ // we should capture `concurrency * 2` spans, one each from client and server side
152
+ // TODO: validate individual spans and their attributes
153
+ assert .Equal (t , concurrency * 2 , len (snapshots ), "Number of spans mismatched" )
154
+ }
155
+
91
156
func TestClientCallServer (t * testing.T ) {
92
157
var (
93
- ctx = context .Background ()
158
+ ctx = ttrpc . WithMetadata ( context .Background (), ttrpc. MD { "test-key" : [] string { "test-val" }} )
94
159
exp , tp = newTracerProvider ()
95
160
server = mustServer (t )(newServerWithTTRPCInterceptor (tp ))
96
161
testImpl = & testingServer {}
@@ -153,6 +218,8 @@ func newTracerProvider() (*tracetest.InMemoryExporter, *sdktrace.TracerProvider)
153
218
tp := sdktrace .NewTracerProvider (
154
219
sdktrace .WithSyncer (exp ),
155
220
)
221
+
222
+ otel .SetTextMapPropagator (propagation .NewCompositeTextMapPropagator (propagation.TraceContext {}, propagation.Baggage {}))
156
223
return exp , tp
157
224
}
158
225
0 commit comments