Skip to content

Commit ad5ff79

Browse files
committed
add workflow metrics collectors init function
1 parent 4ba2b0c commit ad5ff79

File tree

2 files changed

+90
-30
lines changed

2 files changed

+90
-30
lines changed

eventmesh-workflow-go/internal/constants/constants.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,18 @@ const (
6969
EventPropsWorkflowTaskInstanceID = "workflowtaskinstanceid"
7070
)
7171

72-
// Metrics Related constants
72+
// Metrics Collector types
7373
const (
7474
MetricsEventTask = "event_task"
7575
MetricsOperationTask = "operation_task"
7676
MetricsSwitchTask = "switch_task"
7777
MetricsScheduler = "scheduler"
7878
MetricsEngine = "engine"
7979
MetricsTaskQueue = "task_queue"
80+
)
8081

82+
// Metrics labels
83+
const (
8184
MetricsTotal = "total"
8285
MetricsSwitchReject = "reject"
8386
MetricsSwitchPass = "pass"

eventmesh-workflow-go/internal/metrics/metrics.go

Lines changed: 86 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,41 +19,84 @@ import (
1919
"fmt"
2020
"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
2121
conf "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
22+
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
2223
"github.com/prometheus/client_golang/prometheus"
2324
"github.com/prometheus/client_golang/prometheus/promhttp"
2425
"net/http"
2526
"sync"
2627
)
2728

29+
var prometheusMetrics *Metrics
30+
2831
func init() {
2932
prometheusMetrics = getPrometheusMetricsByConfig()
33+
loadAllCollectors()
3034
}
3135

32-
var prometheusMetrics *Metrics
36+
// loadAllCollectors all collectors used in workflow should register in this function first
37+
func loadAllCollectors() {
38+
prometheusMetrics.registerNewCollector(constants.MetricsEventTask, histogram)
39+
prometheusMetrics.registerNewCollector(constants.MetricsEventTask, gauge)
40+
41+
prometheusMetrics.registerNewCollector(constants.MetricsOperationTask, histogram)
42+
prometheusMetrics.registerNewCollector(constants.MetricsOperationTask, gauge)
43+
44+
prometheusMetrics.registerNewCollector(constants.MetricsSwitchTask, histogram)
45+
prometheusMetrics.registerNewCollector(constants.MetricsSwitchTask, gauge)
3346

34-
func Inc(name string, label string) {
35-
collector := prometheusMetrics.loadCollector(name, gauge).(*prometheus.GaugeVec)
36-
collector.With(prometheus.Labels{"label": label}).Inc()
47+
prometheusMetrics.registerNewCollector(constants.MetricsScheduler, histogram)
48+
prometheusMetrics.registerNewCollector(constants.MetricsScheduler, gauge)
49+
50+
prometheusMetrics.registerNewCollector(constants.MetricsEngine, histogram)
51+
prometheusMetrics.registerNewCollector(constants.MetricsEngine, gauge)
52+
53+
prometheusMetrics.registerNewCollector(constants.MetricsTaskQueue, histogram)
54+
prometheusMetrics.registerNewCollector(constants.MetricsTaskQueue, gauge)
3755
}
3856

39-
func Add(name string, label string, val float64) {
40-
collector := prometheusMetrics.loadCollector(name, gauge).(*prometheus.GaugeVec)
41-
collector.With(prometheus.Labels{"label": label}).Add(val)
57+
func Inc(name string, label string) error {
58+
collector, err := prometheusMetrics.loadCollector(name, gauge)
59+
if err != nil {
60+
return err
61+
}
62+
collector.(*prometheus.GaugeVec).With(prometheus.Labels{"label": label}).Inc()
63+
return nil
4264
}
4365

44-
func Sub(name string, label string, val float64) {
45-
collector := prometheusMetrics.loadCollector(name, gauge).(*prometheus.GaugeVec)
46-
collector.With(prometheus.Labels{"label": label}).Sub(val)
66+
func Add(name string, label string, val float64) error {
67+
collector, err := prometheusMetrics.loadCollector(name, gauge)
68+
if err != nil {
69+
return err
70+
}
71+
collector.(*prometheus.GaugeVec).With(prometheus.Labels{"label": label}).Add(val)
72+
return nil
4773
}
4874

49-
func Dec(name string, label string) {
50-
collector := prometheusMetrics.loadCollector(name, gauge).(*prometheus.GaugeVec)
51-
collector.With(prometheus.Labels{"label": label}).Dec()
75+
func Sub(name string, label string, val float64) error {
76+
collector, err := prometheusMetrics.loadCollector(name, gauge)
77+
if err != nil {
78+
return err
79+
}
80+
collector.(*prometheus.GaugeVec).With(prometheus.Labels{"label": label}).Sub(val)
81+
return nil
5282
}
5383

54-
func RecordLatency(name string, label string, latency float64) {
55-
collector := prometheusMetrics.loadCollector(name, histogram).(*prometheus.HistogramVec)
56-
collector.With(prometheus.Labels{"label": label}).Observe(latency)
84+
func Dec(name string, label string) error {
85+
collector, err := prometheusMetrics.loadCollector(name, gauge)
86+
if err != nil {
87+
return err
88+
}
89+
collector.(*prometheus.GaugeVec).With(prometheus.Labels{"label": label}).Dec()
90+
return nil
91+
}
92+
93+
func RecordLatency(name string, label string, latency float64) error {
94+
collector, err := prometheusMetrics.loadCollector(name, histogram)
95+
if err != nil {
96+
return err
97+
}
98+
collector.(*prometheus.HistogramVec).With(prometheus.Labels{"label": label}).Observe(latency)
99+
return nil
57100
}
58101

59102
func getPrometheusMetricsByConfig() *Metrics {
@@ -121,34 +164,48 @@ func (p *Metrics) exposeEndpoint() {
121164
}
122165

123166
// loadCollector load collector by name and collectorType
124-
func (p *Metrics) loadCollector(name string, collectorType int) prometheus.Collector {
125-
if collector := p.getCollectorByNameAndType(name, collectorType); collector != nil {
126-
return collector
167+
func (p *Metrics) loadCollector(name string, collectorType int) (prometheus.Collector, error) {
168+
collector, err := p.getCollectorByNameAndType(name, collectorType)
169+
if err != nil {
170+
return nil, err
127171
}
128-
return p.registerNewCollector(name, collectorType)
172+
173+
if collector != nil {
174+
return collector, nil
175+
}
176+
return nil, fmt.Errorf("fail to load collector, name: %s", name)
129177
}
130178

131-
func (p *Metrics) getCollectorByNameAndType(name string, collectorType int) prometheus.Collector {
179+
func (p *Metrics) getCollectorByNameAndType(name string, collectorType int) (prometheus.Collector, error) {
132180
switch collectorType {
133181
case histogram:
134-
return p.histograms[name]
182+
return p.histograms[name], nil
135183
case gauge:
136-
return p.histograms[name]
184+
return p.histograms[name], nil
137185
default:
138-
panic("prometheus metrics get collector error, illegal collector type")
186+
return nil, fmt.Errorf("prometheus metrics get collector error, illegal collector type: %d", collectorType)
139187
}
140188
}
141189

142190
// registerNewCollector create and register new collector of collectorType
143-
func (p *Metrics) registerNewCollector(name string, collectorType int) prometheus.Collector {
191+
func (p *Metrics) registerNewCollector(name string, collectorType int) (prometheus.Collector, error) {
144192
p.lock.Lock()
145193
defer p.lock.Unlock()
146194

147-
if collector := p.getCollectorByNameAndType(name, collectorType); collector != nil {
148-
return collector
195+
var (
196+
collector prometheus.Collector
197+
err error
198+
)
199+
200+
collector, err = p.getCollectorByNameAndType(name, collectorType)
201+
if err != nil {
202+
return nil, err
203+
}
204+
205+
if collector != nil {
206+
return collector, nil
149207
}
150208

151-
var collector prometheus.Collector
152209
switch collectorType {
153210
case histogram:
154211
collector = prometheus.NewHistogramVec(
@@ -173,5 +230,5 @@ func (p *Metrics) registerNewCollector(name string, collectorType int) prometheu
173230
panic("prometheus metrics plugin register collector error, illegal collector type")
174231
}
175232
prometheus.MustRegister(collector)
176-
return collector
233+
return collector, nil
177234
}

0 commit comments

Comments
 (0)