@@ -18,7 +18,6 @@ package spanner
1818
1919import (
2020 "context"
21- "errors"
2221 "fmt"
2322 "io"
2423 "log"
@@ -718,69 +717,14 @@ func metricsInterceptor() grpc.UnaryClientInterceptor {
718717
719718 statusCode , _ := status .FromError (err )
720719 mt .currOp .currAttempt .setStatus (statusCode .Code ().String ())
721-
722- isDirectPathUsed := false
723- if peerInfo .Addr != nil {
724- remoteIP := peerInfo .Addr .String ()
725- if strings .HasPrefix (remoteIP , directPathIPV4Prefix ) || strings .HasPrefix (remoteIP , directPathIPV6Prefix ) {
726- isDirectPathUsed = true
727- }
728- }
729-
730- mt .currOp .currAttempt .setDirectPathUsed (isDirectPathUsed )
720+ mt .currOp .currAttempt .setDirectPathUsed (peer .NewContext (ctx , peerInfo ))
731721 metrics := parseServerTimingHeader (md )
732722 mt .currOp .currAttempt .setServerTimingMetrics (metrics )
733723 recordAttemptCompletion (mt )
734724 return err
735725 }
736726}
737727
738- // wrappedStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
739- // SendMsg method call.
740- type wrappedStream struct {
741- method string
742- target string
743- grpc.ClientStream
744- }
745-
746- func (w * wrappedStream ) RecvMsg (m any ) error {
747- err := w .ClientStream .RecvMsg (m )
748- if errors .Is (err , io .EOF ) {
749- return err
750- }
751- ctx := w .ClientStream .Context ()
752- mt , ok := ctx .Value (metricsTracerKey ).(* builtinMetricsTracer )
753- if ! ok {
754- return err
755- }
756- mt .method = w .method
757- if strings .HasPrefix (w .target , "google-c2p" ) {
758- mt .currOp .setDirectPathEnabled (true )
759- }
760- isDirectPathUsed := false
761- peerInfo , ok := peer .FromContext (ctx )
762- if ok {
763- if peerInfo .Addr != nil {
764- remoteIP := peerInfo .Addr .String ()
765- if strings .HasPrefix (remoteIP , directPathIPV4Prefix ) || strings .HasPrefix (remoteIP , directPathIPV6Prefix ) {
766- isDirectPathUsed = true
767- }
768- }
769- }
770- if mt .currOp .currAttempt != nil {
771- mt .currOp .currAttempt .setDirectPathUsed (isDirectPathUsed )
772- }
773- return err
774- }
775-
776- func (w * wrappedStream ) SendMsg (m any ) error {
777- return w .ClientStream .SendMsg (m )
778- }
779-
780- func newWrappedStream (s grpc.ClientStream , method , target string ) grpc.ClientStream {
781- return & wrappedStream {ClientStream : s , method : method , target : target }
782- }
783-
784728// metricsInterceptor is a gRPC stream client interceptor that records metrics for stream RPCs.
785729func metricsStreamInterceptor () grpc.StreamClientInterceptor {
786730 return func (
@@ -795,7 +739,14 @@ func metricsStreamInterceptor() grpc.StreamClientInterceptor {
795739 if err != nil {
796740 return nil , err
797741 }
798- return newWrappedStream (s , method , cc .Target ()), nil
742+ mt , ok := ctx .Value (metricsTracerKey ).(* builtinMetricsTracer )
743+ if ok && mt != nil {
744+ mt .method = method
745+ if strings .HasPrefix (cc .Target (), "google-c2p" ) {
746+ mt .currOp .setDirectPathEnabled (true )
747+ }
748+ }
749+ return s , nil
799750 }
800751}
801752
0 commit comments