chore(queue): upgrade appleboy/queue to 0.5.0 (#616)

This commit is contained in:
Bo-Yi Wu
2021-08-01 17:12:16 +08:00
committed by GitHub
parent 9bb618d506
commit 132e1791cd
10 changed files with 128 additions and 111 deletions

View File

@@ -228,10 +228,11 @@ func routerEngine(cfg config.ConfYaml, q *queue.Queue) *gin.Engine {
}
// markFailedNotification adds failure logs for all tokens in push notification
func markFailedNotification(cfg config.ConfYaml, notification *notify.PushNotification, reason string) {
func markFailedNotification(cfg config.ConfYaml, notification *notify.PushNotification, reason string) []logx.LogPushEntry {
logx.LogError.Error(reason)
logs := make([]logx.LogPushEntry, 0)
for _, token := range notification.Tokens {
notification.AddLog(logx.GetLogPushEntry(&logx.InputLog{
logs = append(logs, logx.GetLogPushEntry(&logx.InputLog{
ID: notification.ID,
Status: core.FailedPush,
Token: token,
@@ -242,7 +243,8 @@ func markFailedNotification(cfg config.ConfYaml, notification *notify.PushNotifi
Format: cfg.Log.Format,
}))
}
notification.WaitDone()
return logs
}
// HandleNotification add notification to queue list.
@@ -275,16 +277,36 @@ func handleNotification(ctx context.Context, cfg config.ConfYaml, req notify.Req
newNotification = append(newNotification, notification)
}
log := make([]logx.LogPushEntry, 0, count)
logs := make([]logx.LogPushEntry, 0, count)
for _, notification := range newNotification {
if cfg.Core.Sync {
notification.Wg = &wg
notification.Log = &log
notification.AddWaitCount()
wg.Add(1)
}
if err := q.Queue(notification); err != nil {
markFailedNotification(cfg, notification, "max capacity reached")
if core.IsLocalQueue(core.Queue(cfg.Queue.Engine)) && cfg.Core.Sync {
func(msg *notify.PushNotification) {
q.QueueTask(func(ctx context.Context) error {
defer wg.Done()
resp, err := notify.SendNotification(msg)
if err != nil {
return err
}
// add log
for _, v := range resp.Logs {
logs = append(logs, v)
}
return nil
})
}(notification)
} else if err := q.Queue(notification); err != nil {
resp := markFailedNotification(cfg, notification, "max capacity reached")
// add log
for _, v := range resp {
logs = append(logs, v)
}
wg.Done()
}
count += len(notification.Tokens)
@@ -300,5 +322,5 @@ func handleNotification(ctx context.Context, cfg config.ConfYaml, req notify.Req
status.StatStorage.AddTotalCount(int64(count))
return count, log
return count, logs
}

View File

@@ -37,7 +37,7 @@ func TestMain(m *testing.M) {
}
w = simple.NewWorker(
simple.WithRunFunc(func(msg queue.QueuedMessage) error {
simple.WithRunFunc(func(ctx context.Context, msg queue.QueuedMessage) error {
notify.SendNotification(msg)
return nil
}),