chore: support custom logger in queue package (#607)

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi Wu 2021-07-24 00:27:11 +08:00 committed by GitHub
parent 2d01f8b854
commit 66923789ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 7 deletions

43
queue/logger.go Normal file
View File

@ -0,0 +1,43 @@
package queue
import (
"fmt"
"github.com/rs/zerolog/log"
)
// Logger interface is used throughout gorush
type Logger interface {
Infof(format string, args ...interface{})
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Info(args ...interface{})
Error(args ...interface{})
Fatal(args ...interface{})
}
type defaultLogger struct{}
func (l defaultLogger) Infof(format string, args ...interface{}) {
log.Info().Msgf(format, args...)
}
func (l defaultLogger) Errorf(format string, args ...interface{}) {
log.Error().Msgf(format, args...)
}
func (l defaultLogger) Fatalf(format string, args ...interface{}) {
log.Fatal().Msgf(format, args...)
}
func (l defaultLogger) Info(args ...interface{}) {
log.Info().Msg(fmt.Sprint(args...))
}
func (l defaultLogger) Error(args ...interface{}) {
log.Error().Msg(fmt.Sprint(args...))
}
func (l defaultLogger) Fatal(args ...interface{}) {
log.Fatal().Msg(fmt.Sprint(args...))
}

View File

@ -2,13 +2,12 @@ package queue
import ( import (
"runtime" "runtime"
"github.com/appleboy/gorush/logx"
) )
type ( type (
// A Queue is a message queue. // A Queue is a message queue.
Queue struct { Queue struct {
logger Logger
workerCount int workerCount int
routineGroup *routineGroup routineGroup *routineGroup
quit chan struct{} quit chan struct{}
@ -23,6 +22,7 @@ func NewQueue(w Worker, workerNum int) *Queue {
routineGroup: newRoutineGroup(), routineGroup: newRoutineGroup(),
quit: make(chan struct{}), quit: make(chan struct{}),
worker: w, worker: w,
logger: new(defaultLogger),
} }
if workerNum > 0 { if workerNum > 0 {
@ -65,24 +65,24 @@ func (q *Queue) Queue(job QueuedMessage) error {
func (q *Queue) work(num int) { func (q *Queue) work(num int) {
if err := q.worker.BeforeRun(); err != nil { if err := q.worker.BeforeRun(); err != nil {
logx.LogError.Fatal(err) q.logger.Fatal(err)
} }
q.routineGroup.Run(func() { q.routineGroup.Run(func() {
// to handle panic cases from inside the worker // to handle panic cases from inside the worker
// in such case, we start a new goroutine // in such case, we start a new goroutine
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logx.LogError.Error(err) q.logger.Error(err)
go q.work(num) go q.work(num)
} }
}() }()
logx.LogAccess.Info("started the worker num ", num) q.logger.Info("started the worker num ", num)
q.worker.Run(q.quit) q.worker.Run(q.quit)
logx.LogAccess.Info("closed the worker num ", num) q.logger.Info("closed the worker num ", num)
}) })
if err := q.worker.AfterRun(); err != nil { if err := q.worker.AfterRun(); err != nil {
logx.LogError.Fatal(err) q.logger.Fatal(err)
} }
} }