@@ -124,7 +124,20 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
124124 stopCh : make (chan struct {}),
125125 }
126126
127- bsp .configureSelfObservability ()
127+ if x .SelfObservability .Enabled () {
128+ bsp .selfObservabilityEnabled = true
129+ bsp .componentNameAttr = componentName ()
130+
131+ var err error
132+ bsp .spansProcessedCounter , bsp .callbackRegistration , err = newBSPObs (
133+ bsp .componentNameAttr ,
134+ func () int64 { return int64 (len (bsp .queue )) },
135+ int64 (bsp .o .MaxQueueSize ),
136+ )
137+ if err != nil {
138+ otel .Handle (err )
139+ }
140+ }
128141
129142 bsp .stopWait .Add (1 )
130143 go func () {
@@ -144,45 +157,49 @@ func nextProcessorID() int64 {
144157 return processorIDCounter .Add (1 ) - 1
145158}
146159
147- // configureSelfObservability configures metrics for the batch span processor.
148- func (bsp * batchSpanProcessor ) configureSelfObservability () {
149- if ! x .SelfObservability .Enabled () {
150- return
151- }
152- bsp .selfObservabilityEnabled = true
153- bsp .componentNameAttr = semconv .OTelComponentName (
154- fmt .Sprintf ("%s/%d" , otelconv .ComponentTypeBatchingSpanProcessor , nextProcessorID ()))
160+ func componentName () attribute.KeyValue {
161+ id := nextProcessorID ()
162+ name := fmt .Sprintf ("%s/%d" , otelconv .ComponentTypeBatchingSpanProcessor , id )
163+ return semconv .OTelComponentName (name )
164+ }
165+
166+ // newBSPObs creates and returns a new set of metrics instruments and a
167+ // registration for a BatchSpanProcessor. It is the caller's responsibility
168+ // to unregister the registration when it is no longer needed.
169+ func newBSPObs (
170+ cmpnt attribute.KeyValue ,
171+ qLen func () int64 ,
172+ qMax int64 ,
173+ ) (otelconv.SDKProcessorSpanProcessed , metric.Registration , error ) {
155174 meter := otel .GetMeterProvider ().Meter (
156175 selfObsScopeName ,
157176 metric .WithInstrumentationVersion (sdk .Version ()),
158177 metric .WithSchemaURL (semconv .SchemaURL ),
159178 )
160179
161- queueCapacityUpDownCounter , err := otelconv .NewSDKProcessorSpanQueueCapacity (meter )
162- if err != nil {
163- otel .Handle (err )
164- }
165- queueSizeUpDownCounter , err := otelconv .NewSDKProcessorSpanQueueSize (meter )
166- if err != nil {
167- otel .Handle (err )
168- }
169- bsp .spansProcessedCounter , err = otelconv .NewSDKProcessorSpanProcessed (meter )
170- if err != nil {
171- otel .Handle (err )
172- }
180+ qCap , err := otelconv .NewSDKProcessorSpanQueueCapacity (meter )
181+
182+ qSize , e := otelconv .NewSDKProcessorSpanQueueSize (meter )
183+ err = errors .Join (err , e )
184+
185+ spansProcessed , e := otelconv .NewSDKProcessorSpanProcessed (meter )
186+ err = errors .Join (err , e )
173187
174- callbackAttributesOpt := metric .WithAttributes (bsp .componentNameAttr ,
175- semconv .OTelComponentTypeBatchingSpanProcessor )
176- bsp .callbackRegistration , err = meter .RegisterCallback (
188+ cmpntT := semconv .OTelComponentTypeBatchingSpanProcessor
189+ attrs := metric .WithAttributes (cmpnt , cmpntT )
190+
191+ reg , e := meter .RegisterCallback (
177192 func (_ context.Context , o metric.Observer ) error {
178- o .ObserveInt64 (queueSizeUpDownCounter .Inst (), int64 ( len ( bsp . queue )), callbackAttributesOpt )
179- o .ObserveInt64 (queueCapacityUpDownCounter .Inst (), int64 ( bsp . o . MaxQueueSize ), callbackAttributesOpt )
193+ o .ObserveInt64 (qSize .Inst (), qLen (), attrs )
194+ o .ObserveInt64 (qCap .Inst (), qMax , attrs )
180195 return nil
181196 },
182- queueSizeUpDownCounter .Inst (), queueCapacityUpDownCounter .Inst ())
183- if err != nil {
184- otel .Handle (err )
185- }
197+ qSize .Inst (),
198+ qCap .Inst (),
199+ )
200+ err = errors .Join (err , e )
201+
202+ return spansProcessed , reg , err
186203}
187204
188205// OnStart method does nothing.
0 commit comments