chore(queue): upgrade queue to v0.0.7 version (#629)

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi Wu 2021-08-22 22:04:50 +08:00 committed by GitHub
parent e929974d10
commit 18016db289
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 39 deletions

6
go.mod
View File

@ -13,9 +13,9 @@ require (
github.com/gin-contrib/logger v0.2.0 github.com/gin-contrib/logger v0.2.0
github.com/gin-gonic/gin v1.7.4 github.com/gin-gonic/gin v1.7.4
github.com/go-redis/redis/v7 v7.4.0 github.com/go-redis/redis/v7 v7.4.0
github.com/golang-queue/nats v0.0.1 github.com/golang-queue/nats v0.0.2
github.com/golang-queue/nsq v0.0.1 github.com/golang-queue/nsq v0.0.2
github.com/golang-queue/queue v0.0.6 github.com/golang-queue/queue v0.0.7
github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect
github.com/golang/protobuf v1.5.2 github.com/golang/protobuf v1.5.2
github.com/google/flatbuffers v2.0.0+incompatible // indirect github.com/google/flatbuffers v2.0.0+incompatible // indirect

20
go.sum
View File

@ -155,12 +155,12 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-queue/nats v0.0.1 h1:8P5YOHP6BdqJfIa47VfQ5+1sInIsWxnema/UcdpM9NE= github.com/golang-queue/nats v0.0.2 h1:81Ege/02a9d2a1SDN2/t1S1XIr/JpY+s8xj/pLZIHXM=
github.com/golang-queue/nats v0.0.1/go.mod h1:B+5LDTDILJiH9eplW7ETKKMlMA4vRDAvOkQzFjmVQ/o= github.com/golang-queue/nats v0.0.2/go.mod h1:dzXOwbx20CJ5oX4UiBIDyfedCiR5sBWWKJVXmiRlDYc=
github.com/golang-queue/nsq v0.0.1 h1:7IgLshRYC5AsX3QnMyWDKfrR/WSTanAPqQX3XxV0O7I= github.com/golang-queue/nsq v0.0.2 h1:kP4fMLl1K6TNlJGq3tJ4t07e703mDGiMYLLltT8F4/Q=
github.com/golang-queue/nsq v0.0.1/go.mod h1:hvxpT5CcllnL2iesg/m7Ih1fBjO/METlbXmwS8/tHpk= github.com/golang-queue/nsq v0.0.2/go.mod h1:1Q/8y4BclWLj03sn0dApJIObatC3qMX5gLjlbo0TwXs=
github.com/golang-queue/queue v0.0.6 h1:TLd0lSM7uNgXXj7SXSMfyaZUwRgbOu9tq6UfZXXELnA= github.com/golang-queue/queue v0.0.7 h1:WENCPyPBcIWYgBFSAZ8USGtwmxeCeMkhjwuyM0MAi84=
github.com/golang-queue/queue v0.0.6/go.mod h1:IeIGBO1largDrFEaxDgIckoAFIUTn0eolTQris8bm08= github.com/golang-queue/queue v0.0.7/go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v0.0.0-20210429001901-424d2337a529 h1:2voWjNECnrZRbfwXxHB1/j8wa6xdKn85B5NzgVL/pTU= github.com/golang/glog v0.0.0-20210429001901-424d2337a529 h1:2voWjNECnrZRbfwXxHB1/j8wa6xdKn85B5NzgVL/pTU=
github.com/golang/glog v0.0.0-20210429001901-424d2337a529/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v0.0.0-20210429001901-424d2337a529/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
@ -326,11 +326,11 @@ github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5Vgl
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI= github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k=
github.com/nats-io/nats-server/v2 v2.3.2 h1:SGJLWrjBHsl0DsdY8PeTR3YKEfiUEYVVq2STw9d8MSY= github.com/nats-io/nats-server/v2 v2.3.4 h1:WcNa6HDFX8gjZPHb8CJ9wxRHEjJSlhWUb/MKb6/mlUY=
github.com/nats-io/nats-server/v2 v2.3.2/go.mod h1:dUf7Cm5z5LbciFVwWx54owyCKm8x4/hL6p7rrljhLFY= github.com/nats-io/nats-server/v2 v2.3.4/go.mod h1:3mtbaN5GkCo/Z5T3nNj0I0/W1fPkKzLiDC6jjWJKp98=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 h1:9GqilBhZaR3xYis0JgMlJjNw933WIobdjKhilXm+Vls= github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 h1:9GqilBhZaR3xYis0JgMlJjNw933WIobdjKhilXm+Vls=
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=

23
main.go
View File

@ -25,7 +25,6 @@ import (
"github.com/golang-queue/nats" "github.com/golang-queue/nats"
"github.com/golang-queue/nsq" "github.com/golang-queue/nsq"
"github.com/golang-queue/queue" "github.com/golang-queue/queue"
"github.com/golang-queue/queue/simple"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -325,10 +324,10 @@ func main() {
var w queue.Worker var w queue.Worker
switch core.Queue(cfg.Queue.Engine) { switch core.Queue(cfg.Queue.Engine) {
case core.LocalQueue: case core.LocalQueue:
w = simple.NewWorker( w = queue.NewConsumer(
simple.WithQueueNum(int(cfg.Core.QueueNum)), queue.WithQueueSize(int(cfg.Core.QueueNum)),
simple.WithRunFunc(notify.Run(cfg)), queue.WithFn(notify.Run(cfg)),
simple.WithLogger(logx.QueueLogger()), queue.WithLogger(logx.QueueLogger()),
) )
case core.NSQ: case core.NSQ:
w = nsq.NewWorker( w = nsq.NewWorker(
@ -351,23 +350,17 @@ func main() {
logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine) logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine)
} }
q, err := queue.NewQueue( q := queue.NewPool(
int(cfg.Core.WorkerNum),
queue.WithWorker(w), queue.WithWorker(w),
queue.WithLogger(logx.QueueLogger()), queue.WithLogger(logx.QueueLogger()),
queue.WithWorkerCount(int(cfg.Core.WorkerNum)),
) )
if err != nil {
logx.LogError.Fatal(err)
}
q.Start()
finished := make(chan struct{}) finished := make(chan struct{})
ctx := withContextFunc(context.Background(), func() { ctx := withContextFunc(context.Background(), func() {
logx.LogAccess.Info("close the queue system, current queue usage: ", q.Usage()) logx.LogAccess.Info("close the queue system, current queue usage: ", q.Usage())
// stop queue system // stop queue system and wait job completed
q.Shutdown() q.Release()
// wait job completed
q.Wait()
close(finished) close(finished)
// close the connection with storage // close the connection with storage
logx.LogAccess.Info("close the storage connection: ", cfg.Stat.Engine) logx.LogAccess.Info("close the storage connection: ", cfg.Stat.Engine)

View File

@ -21,14 +21,12 @@ import (
"github.com/buger/jsonparser" "github.com/buger/jsonparser"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/golang-queue/queue" "github.com/golang-queue/queue"
"github.com/golang-queue/queue/simple"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
var ( var (
goVersion = runtime.Version() goVersion = runtime.Version()
q *queue.Queue q *queue.Queue
w queue.Worker
) )
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
@ -44,23 +42,18 @@ func TestMain(m *testing.M) {
log.Fatal(err) log.Fatal(err)
} }
w = simple.NewWorker( q = queue.NewPool(
simple.WithRunFunc(func(ctx context.Context, msg queue.QueuedMessage) error { int(cfg.Core.WorkerNum),
queue.WithFn(func(ctx context.Context, msg queue.QueuedMessage) error {
_, err := notify.SendNotification(msg, cfg) _, err := notify.SendNotification(msg, cfg)
return err return err
}), }),
)
q, _ = queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(4),
queue.WithLogger(logx.QueueLogger()), queue.WithLogger(logx.QueueLogger()),
) )
q.Start()
code := m.Run() code := m.Run()
defer func() { defer func() {
q.Shutdown() q.Release()
q.Wait()
os.Exit(code) os.Exit(code)
}() }()
} }