Skip to content

Commit 211a125

Browse files
authored
[#9177] add metrics server to go function (#9318)
Fixes #9177 ### Motivation go function added metrics collector by #6105, but havnt pass `metricsPort` to go function, also not init & start prometheus http server. As the result, function worker will keep trying to access to the metrics port to collect data, which will cause massive log errors in log history. ### Modifications - expose `metricsPort` to go function - add prometheus http server to go function ### Verifying this change - [x] Make sure that the change passes the CI checks.
1 parent 88f8fa4 commit 211a125

File tree

14 files changed

+97
-5
lines changed

14 files changed

+97
-5
lines changed

pulsar-function-go/conf/conf.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ type Conf struct {
7373
DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
7474
ExpectedHealthCheckInterval int32 `json:"expectedHealthCheckInterval" yaml:"expectedHealthCheckInterval"`
7575
UserConfig string `json:"userConfig" yaml:"userConfig"`
76+
//metrics config
77+
MetricsPort int `json:"metricsPort" yaml:"metricsPort"`
7678
}
7779

7880
var (

pulsar-function-go/conf/conf.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,5 @@ disk: 0
5757
maxMessageRetries: 0
5858
deadLetterTopic: ""
5959
expectedHealthCheckInterval: 3
60+
# metrics config
61+
metricsPort: 50001

pulsar-function-go/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ require (
1111
github.com/sirupsen/logrus v1.4.2
1212
github.com/stretchr/testify v1.4.0
1313
google.golang.org/grpc v1.27.0
14-
google.golang.org/protobuf v1.25.0 // indirect
14+
google.golang.org/protobuf v1.25.0
1515
gopkg.in/yaml.v2 v2.3.0
1616
)
1717

pulsar-function-go/go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ github.com/apache/pulsar-client-go v0.3.1-0.20201201083639-154bff0bb825 h1:Rfvcn
1616
github.com/apache/pulsar-client-go v0.3.1-0.20201201083639-154bff0bb825/go.mod h1:pTmScVVHRhbB8wh0J+m5ZzHI0Lyfe0TwfPEbYEh+JUw=
1717
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb h1:E1P0FudxDdj2RhbveZC9i3PwukLCA/4XQSkBS/dw6/I=
1818
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY=
19+
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd h1:P5kM7jcXJ7TaftX0/EMKiSJgvQc/ct+Fw0KMvcH3WuY=
1920
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY=
2021
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
2122
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
@@ -33,6 +34,7 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
3334
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
3435
github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
3536
github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
37+
github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 h1:QWqadCIHYA5zja4b6h9uGQn93u1vL+G/aewImumdg/M=
3638
github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:inRp+etsHuvVqMPNTXaFlpf/Tj7wqviBtdJoPVrPEFQ=
3739
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3840
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -76,6 +78,7 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0
7678
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
7779
github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
7880
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
81+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
7982
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
8083
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
8184
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@@ -113,6 +116,7 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
113116
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
114117
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
115118
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
119+
github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg=
116120
github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
117121
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
118122
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
@@ -253,11 +257,13 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
253257
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
254258
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
255259
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
260+
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
256261
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
257262
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
258263
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
259264
google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
260265
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
266+
google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
261267
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
262268
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
263269
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

pulsar-function-go/pf/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,11 @@ func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
167167
// GetCurrentRecord gets the current message from the function context
168168
func (c *FunctionContext) GetCurrentRecord() pulsar.Message {
169169
return c.record
170+
}
170171

172+
//GetMetricsPort returns the port the pulsar function metrics listen on
173+
func (c *FunctionContext) GetMetricsPort() int {
174+
return c.instanceConf.metricsPort
171175
}
172176

173177
// An unexported type to be used as the key for types in this package. This

pulsar-function-go/pf/instance.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ func (gi *goInstance) startFunction(function function) error {
144144
servicer := InstanceControlServicer{goInstance: gi}
145145
servicer.serve(gi)
146146

147+
metricsServicer := NewMetricsServicer(gi)
148+
metricsServicer.serve()
149+
defer metricsServicer.close()
147150
CLOSE:
148151
for {
149152
idleTimer.Reset(idleDuration)

pulsar-function-go/pf/instanceConf.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type instanceConf struct {
4040
pulsarServiceURL string
4141
killAfterIdle time.Duration
4242
expectedHealthCheckInterval int32
43+
metricsPort int
4344
}
4445

4546
func newInstanceConf() *instanceConf {
@@ -59,6 +60,7 @@ func newInstanceConf() *instanceConf {
5960
pulsarServiceURL: cfg.PulsarServiceURL,
6061
killAfterIdle: cfg.KillAfterIdleMs,
6162
expectedHealthCheckInterval: cfg.ExpectedHealthCheckInterval,
63+
metricsPort: cfg.MetricsPort,
6264
funcDetails: pb.FunctionDetails{
6365
Tenant: cfg.Tenant,
6466
Namespace: cfg.NameSpace,

pulsar-function-go/pf/instanceConf_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ func Test_newInstanceConf(t *testing.T) {
3939
pulsarServiceURL: "pulsar://localhost:6650",
4040
killAfterIdle: 50000,
4141
expectedHealthCheckInterval: 3,
42+
metricsPort: 50001,
4243
funcDetails: pb.FunctionDetails{Tenant: "",
4344
Namespace: "",
4445
Name: "go-function",

pulsar-function-go/pf/stats.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,14 @@
2020
package pf
2121

2222
import (
23+
"fmt"
24+
"net/http"
2325
"time"
2426

25-
"github.com/prometheus/client_golang/prometheus"
27+
"github.com/prometheus/client_golang/prometheus/promhttp"
2628

29+
log "github.com/apache/pulsar/pulsar-function-go/logutil"
30+
"github.com/prometheus/client_golang/prometheus"
2731
prometheus_client "github.com/prometheus/client_model/go"
2832
)
2933

@@ -120,6 +124,11 @@ var (
120124
Help: "Exception from system code."}, exceptionMetricsLabelNames)
121125
)
122126

127+
type MetricsServicer struct {
128+
goInstance *goInstance
129+
server *http.Server
130+
}
131+
123132
var reg *prometheus.Registry
124133

125134
func init() {
@@ -304,3 +313,39 @@ func (stat *StatWithLabelValues) reset() {
304313
stat.statTotalSysExceptions1min.Set(0.0)
305314
stat.statTotalReceived1min.Set(0.0)
306315
}
316+
317+
func NewMetricsServicer(goInstance *goInstance) *MetricsServicer {
318+
serveMux := http.NewServeMux()
319+
serveMux.Handle("/metrics", promhttp.HandlerFor(
320+
reg,
321+
promhttp.HandlerOpts{
322+
EnableOpenMetrics: true,
323+
},
324+
))
325+
server := &http.Server{
326+
Addr: fmt.Sprintf(":%d", goInstance.context.GetMetricsPort()),
327+
Handler: serveMux,
328+
}
329+
return &MetricsServicer{
330+
goInstance,
331+
server,
332+
}
333+
}
334+
335+
func (s *MetricsServicer) serve() {
336+
go func() {
337+
// create a listener on metrics port
338+
log.Infof("Starting metrics server on port %d", s.goInstance.context.GetMetricsPort())
339+
err := s.server.ListenAndServe()
340+
if err != nil {
341+
log.Fatalf("failed to start metrics server: %v", err)
342+
}
343+
}()
344+
}
345+
346+
func (s *MetricsServicer) close() {
347+
err := s.server.Close()
348+
if err != nil {
349+
log.Fatalf("failed to close metrics server: %v", err)
350+
}
351+
}

pulsar-function-go/pf/stats_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
package pf
2121

2222
import (
23+
"fmt"
24+
"io/ioutil"
2325
"math"
26+
"net/http"
2427
"testing"
2528

2629
"github.com/golang/protobuf/proto"
@@ -184,3 +187,18 @@ func TestExampleSummaryVec_Pulsar(t *testing.T) {
184187
assert.Equal(t, 61925, int(*sum))
185188
assert.Equal(t, 2000, int(*count))
186189
}
190+
191+
func TestMetricsServer(t *testing.T) {
192+
gi := newGoInstance()
193+
metricsServicer := NewMetricsServicer(gi)
194+
metricsServicer.serve()
195+
gi.stats.incrTotalReceived()
196+
197+
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", gi.context.GetMetricsPort()))
198+
assert.Equal(t, nil, err)
199+
assert.Equal(t, 200, resp.StatusCode)
200+
body, err := ioutil.ReadAll(resp.Body)
201+
assert.Equal(t, nil, err)
202+
assert.NotEmpty(t, body)
203+
resp.Body.Close()
204+
}

0 commit comments

Comments
 (0)