chore(queue): upgrade queue package and add metrics (#673)
* chore(queue): upgrade queue package and add metrics add new metrics BusyWorkers *prometheus.Desc SuccessTasks *prometheus.Desc FailureTasks *prometheus.Desc SubmittedTasks *prometheus.Desc fix https://github.com/appleboy/gorush/issues/672 Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com> * chore: update go version Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
@@ -3,6 +3,7 @@ package metric
|
||||
import (
|
||||
"github.com/appleboy/gorush/status"
|
||||
|
||||
"github.com/golang-queue/queue"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
@@ -18,14 +19,15 @@ type Metrics struct {
|
||||
AndroidError *prometheus.Desc
|
||||
HuaweiSuccess *prometheus.Desc
|
||||
HuaweiError *prometheus.Desc
|
||||
QueueUsage *prometheus.Desc
|
||||
GetQueueUsage func() int
|
||||
BusyWorkers *prometheus.Desc
|
||||
SuccessTasks *prometheus.Desc
|
||||
FailureTasks *prometheus.Desc
|
||||
SubmittedTasks *prometheus.Desc
|
||||
q *queue.Queue
|
||||
}
|
||||
|
||||
var getGetQueueUsage = func() int { return 0 }
|
||||
|
||||
// NewMetrics returns a new Metrics with all prometheus.Desc initialized
|
||||
func NewMetrics(c ...func() int) Metrics {
|
||||
func NewMetrics(q *queue.Queue) Metrics {
|
||||
m := Metrics{
|
||||
TotalPushCount: prometheus.NewDesc(
|
||||
namespace+"total_push_count",
|
||||
@@ -62,16 +64,27 @@ func NewMetrics(c ...func() int) Metrics {
|
||||
"Number of huawei fail count",
|
||||
nil, nil,
|
||||
),
|
||||
QueueUsage: prometheus.NewDesc(
|
||||
namespace+"queue_usage",
|
||||
"Length of internal queue",
|
||||
BusyWorkers: prometheus.NewDesc(
|
||||
namespace+"busy_workers",
|
||||
"Length of busy workers",
|
||||
nil, nil,
|
||||
),
|
||||
GetQueueUsage: getGetQueueUsage,
|
||||
}
|
||||
|
||||
if len(c) > 0 {
|
||||
m.GetQueueUsage = c[0]
|
||||
FailureTasks: prometheus.NewDesc(
|
||||
namespace+"failure_tasks",
|
||||
"Length of Failure Tasks",
|
||||
nil, nil,
|
||||
),
|
||||
SuccessTasks: prometheus.NewDesc(
|
||||
namespace+"success_tasks",
|
||||
"Length of Success Tasks",
|
||||
nil, nil,
|
||||
),
|
||||
SubmittedTasks: prometheus.NewDesc(
|
||||
namespace+"submitted_tasks",
|
||||
"Length of Submitted Tasks",
|
||||
nil, nil,
|
||||
),
|
||||
q: q,
|
||||
}
|
||||
|
||||
return m
|
||||
@@ -86,7 +99,10 @@ func (c Metrics) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- c.AndroidError
|
||||
ch <- c.HuaweiSuccess
|
||||
ch <- c.HuaweiError
|
||||
ch <- c.QueueUsage
|
||||
ch <- c.BusyWorkers
|
||||
ch <- c.SuccessTasks
|
||||
ch <- c.FailureTasks
|
||||
ch <- c.SubmittedTasks
|
||||
}
|
||||
|
||||
// Collect returns the metrics with values
|
||||
@@ -127,8 +143,23 @@ func (c Metrics) Collect(ch chan<- prometheus.Metric) {
|
||||
float64(status.StatStorage.GetHuaweiError()),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.QueueUsage,
|
||||
c.BusyWorkers,
|
||||
prometheus.GaugeValue,
|
||||
float64(c.GetQueueUsage()),
|
||||
float64(c.q.BusyWorkers()),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.SuccessTasks,
|
||||
prometheus.CounterValue,
|
||||
float64(c.q.SuccessTasks()),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.FailureTasks,
|
||||
prometheus.CounterValue,
|
||||
float64(c.q.FailureTasks()),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.SubmittedTasks,
|
||||
prometheus.CounterValue,
|
||||
float64(c.q.SubmittedTasks()),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,15 +1,23 @@
|
||||
package metric
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang-queue/queue"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNewMetrics(t *testing.T) {
|
||||
m := NewMetrics()
|
||||
assert.Equal(t, 0, m.GetQueueUsage())
|
||||
var noTask = func(ctx context.Context) error { return nil }
|
||||
|
||||
m = NewMetrics(func() int { return 1 })
|
||||
assert.Equal(t, 1, m.GetQueueUsage())
|
||||
func TestNewMetrics(t *testing.T) {
|
||||
q := queue.NewPool(10)
|
||||
assert.NoError(t, q.QueueTask(noTask))
|
||||
assert.NoError(t, q.QueueTask(noTask))
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
defer q.Release()
|
||||
m := NewMetrics(q)
|
||||
assert.Equal(t, 2, m.q.SubmittedTasks())
|
||||
assert.Equal(t, 2, m.q.SuccessTasks())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user