diff --git a/go.mod b/go.mod index 08df3fb..dd7bb84 100644 --- a/go.mod +++ b/go.mod @@ -13,9 +13,9 @@ require ( github.com/gin-contrib/logger v0.2.0 github.com/gin-gonic/gin v1.7.4 github.com/go-redis/redis/v7 v7.4.0 - github.com/golang-queue/nats v0.0.1 - github.com/golang-queue/nsq v0.0.1 - github.com/golang-queue/queue v0.0.6 + github.com/golang-queue/nats v0.0.2 + github.com/golang-queue/nsq v0.0.2 + github.com/golang-queue/queue v0.0.7 github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect github.com/golang/protobuf v1.5.2 github.com/google/flatbuffers v2.0.0+incompatible // indirect diff --git a/go.sum b/go.sum index dde7f64..34e60a3 100644 --- a/go.sum +++ b/go.sum @@ -155,12 +155,12 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-queue/nats v0.0.1 h1:8P5YOHP6BdqJfIa47VfQ5+1sInIsWxnema/UcdpM9NE= -github.com/golang-queue/nats v0.0.1/go.mod h1:B+5LDTDILJiH9eplW7ETKKMlMA4vRDAvOkQzFjmVQ/o= -github.com/golang-queue/nsq v0.0.1 h1:7IgLshRYC5AsX3QnMyWDKfrR/WSTanAPqQX3XxV0O7I= -github.com/golang-queue/nsq v0.0.1/go.mod h1:hvxpT5CcllnL2iesg/m7Ih1fBjO/METlbXmwS8/tHpk= -github.com/golang-queue/queue v0.0.6 h1:TLd0lSM7uNgXXj7SXSMfyaZUwRgbOu9tq6UfZXXELnA= -github.com/golang-queue/queue v0.0.6/go.mod h1:IeIGBO1largDrFEaxDgIckoAFIUTn0eolTQris8bm08= +github.com/golang-queue/nats v0.0.2 h1:81Ege/02a9d2a1SDN2/t1S1XIr/JpY+s8xj/pLZIHXM= +github.com/golang-queue/nats v0.0.2/go.mod h1:dzXOwbx20CJ5oX4UiBIDyfedCiR5sBWWKJVXmiRlDYc= +github.com/golang-queue/nsq v0.0.2 h1:kP4fMLl1K6TNlJGq3tJ4t07e703mDGiMYLLltT8F4/Q= +github.com/golang-queue/nsq v0.0.2/go.mod h1:1Q/8y4BclWLj03sn0dApJIObatC3qMX5gLjlbo0TwXs= +github.com/golang-queue/queue v0.0.7 h1:WENCPyPBcIWYgBFSAZ8USGtwmxeCeMkhjwuyM0MAi84= +github.com/golang-queue/queue v0.0.7/go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v0.0.0-20210429001901-424d2337a529 h1:2voWjNECnrZRbfwXxHB1/j8wa6xdKn85B5NzgVL/pTU= github.com/golang/glog v0.0.0-20210429001901-424d2337a529/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -326,11 +326,11 @@ github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5Vgl github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI= -github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= +github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= +github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= -github.com/nats-io/nats-server/v2 v2.3.2 h1:SGJLWrjBHsl0DsdY8PeTR3YKEfiUEYVVq2STw9d8MSY= -github.com/nats-io/nats-server/v2 v2.3.2/go.mod h1:dUf7Cm5z5LbciFVwWx54owyCKm8x4/hL6p7rrljhLFY= +github.com/nats-io/nats-server/v2 v2.3.4 h1:WcNa6HDFX8gjZPHb8CJ9wxRHEjJSlhWUb/MKb6/mlUY= +github.com/nats-io/nats-server/v2 v2.3.4/go.mod h1:3mtbaN5GkCo/Z5T3nNj0I0/W1fPkKzLiDC6jjWJKp98= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 h1:9GqilBhZaR3xYis0JgMlJjNw933WIobdjKhilXm+Vls= github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= diff --git a/main.go b/main.go index eaeafc0..b4dab1c 100644 --- a/main.go +++ b/main.go @@ -25,7 +25,6 @@ import ( "github.com/golang-queue/nats" "github.com/golang-queue/nsq" "github.com/golang-queue/queue" - "github.com/golang-queue/queue/simple" "golang.org/x/sync/errgroup" ) @@ -325,10 +324,10 @@ func main() { var w queue.Worker switch core.Queue(cfg.Queue.Engine) { case core.LocalQueue: - w = simple.NewWorker( - simple.WithQueueNum(int(cfg.Core.QueueNum)), - simple.WithRunFunc(notify.Run(cfg)), - simple.WithLogger(logx.QueueLogger()), + w = queue.NewConsumer( + queue.WithQueueSize(int(cfg.Core.QueueNum)), + queue.WithFn(notify.Run(cfg)), + queue.WithLogger(logx.QueueLogger()), ) case core.NSQ: w = nsq.NewWorker( @@ -351,23 +350,17 @@ func main() { logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine) } - q, err := queue.NewQueue( + q := queue.NewPool( + int(cfg.Core.WorkerNum), queue.WithWorker(w), queue.WithLogger(logx.QueueLogger()), - queue.WithWorkerCount(int(cfg.Core.WorkerNum)), ) - if err != nil { - logx.LogError.Fatal(err) - } - q.Start() finished := make(chan struct{}) ctx := withContextFunc(context.Background(), func() { logx.LogAccess.Info("close the queue system, current queue usage: ", q.Usage()) - // stop queue system - q.Shutdown() - // wait job completed - q.Wait() + // stop queue system and wait job completed + q.Release() close(finished) // close the connection with storage logx.LogAccess.Info("close the storage connection: ", cfg.Stat.Engine) diff --git a/router/server_test.go b/router/server_test.go index 178a711..0d988e8 100644 --- a/router/server_test.go +++ b/router/server_test.go @@ -21,14 +21,12 @@ import ( "github.com/buger/jsonparser" "github.com/gin-gonic/gin" "github.com/golang-queue/queue" - "github.com/golang-queue/queue/simple" "github.com/stretchr/testify/assert" ) var ( goVersion = runtime.Version() q *queue.Queue - w queue.Worker ) func TestMain(m *testing.M) { @@ -44,23 +42,18 @@ func TestMain(m *testing.M) { log.Fatal(err) } - w = simple.NewWorker( - simple.WithRunFunc(func(ctx context.Context, msg queue.QueuedMessage) error { + q = queue.NewPool( + int(cfg.Core.WorkerNum), + queue.WithFn(func(ctx context.Context, msg queue.QueuedMessage) error { _, err := notify.SendNotification(msg, cfg) return err }), - ) - q, _ = queue.NewQueue( - queue.WithWorker(w), - queue.WithWorkerCount(4), queue.WithLogger(logx.QueueLogger()), ) - q.Start() code := m.Run() defer func() { - q.Shutdown() - q.Wait() + q.Release() os.Exit(code) }() }