feat(server): support graceful shutdown (#461)
* feat(server): support graceful shutdown for http server Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
parent
ee0cc30e3d
commit
bcd0e70252
|
@ -3,11 +3,13 @@
|
||||||
package gorush
|
package gorush
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"github.com/apex/gateway"
|
"github.com/apex/gateway"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RunHTTPServer provide run http or https protocol.
|
// RunHTTPServer provide run http or https protocol.
|
||||||
func RunHTTPServer() error {
|
func RunHTTPServer(ctx context.Context) error {
|
||||||
if !PushConf.Core.Enabled {
|
if !PushConf.Core.Enabled {
|
||||||
LogAccess.Debug("httpd server is disabled.")
|
LogAccess.Debug("httpd server is disabled.")
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -3,16 +3,19 @@
|
||||||
package gorush
|
package gorush
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RunHTTPServer provide run http or https protocol.
|
// RunHTTPServer provide run http or https protocol.
|
||||||
func RunHTTPServer() (err error) {
|
func RunHTTPServer(ctx context.Context) (err error) {
|
||||||
if !PushConf.Core.Enabled {
|
if !PushConf.Core.Enabled {
|
||||||
LogAccess.Debug("httpd server is disabled.")
|
LogAccess.Info("httpd server is disabled.")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,7 +26,7 @@ func RunHTTPServer() (err error) {
|
||||||
|
|
||||||
LogAccess.Info("HTTPD server is running on " + PushConf.Core.Port + " port.")
|
LogAccess.Info("HTTPD server is running on " + PushConf.Core.Port + " port.")
|
||||||
if PushConf.Core.AutoTLS.Enabled {
|
if PushConf.Core.AutoTLS.Enabled {
|
||||||
return startServer(autoTLSServer())
|
return startServer(ctx, autoTLSServer())
|
||||||
} else if PushConf.Core.SSL {
|
} else if PushConf.Core.SSL {
|
||||||
config := &tls.Config{
|
config := &tls.Config{
|
||||||
MinVersion: tls.VersionTLS10,
|
MinVersion: tls.VersionTLS10,
|
||||||
|
@ -62,12 +65,41 @@ func RunHTTPServer() (err error) {
|
||||||
server.TLSConfig = config
|
server.TLSConfig = config
|
||||||
}
|
}
|
||||||
|
|
||||||
return startServer(server)
|
return startServer(ctx, server)
|
||||||
}
|
}
|
||||||
|
|
||||||
func startServer(s *http.Server) error {
|
func listenAndServe(ctx context.Context, s *http.Server) error {
|
||||||
if s.TLSConfig == nil {
|
var g errgroup.Group
|
||||||
|
g.Go(func() error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return s.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
g.Go(func() error {
|
||||||
return s.ListenAndServe()
|
return s.ListenAndServe()
|
||||||
}
|
})
|
||||||
return s.ListenAndServeTLS("", "")
|
return g.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func listenAndServeTLS(ctx context.Context, s *http.Server) error {
|
||||||
|
var g errgroup.Group
|
||||||
|
g.Go(func() error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return s.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
g.Go(func() error {
|
||||||
|
return s.ListenAndServeTLS("", "")
|
||||||
|
})
|
||||||
|
return g.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func startServer(ctx context.Context, s *http.Server) error {
|
||||||
|
if s.TLSConfig == nil {
|
||||||
|
return listenAndServe(ctx, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return listenAndServeTLS(ctx, s)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package gorush
|
package gorush
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
@ -63,13 +64,12 @@ func TestRunNormalServer(t *testing.T) {
|
||||||
gin.SetMode(gin.TestMode)
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
assert.NoError(t, RunHTTPServer())
|
assert.NoError(t, RunHTTPServer(context.Background()))
|
||||||
}()
|
}()
|
||||||
// have to wait for the goroutine to start and run the server
|
// have to wait for the goroutine to start and run the server
|
||||||
// otherwise the main thread will complete
|
// otherwise the main thread will complete
|
||||||
time.Sleep(5 * time.Millisecond)
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
|
||||||
assert.Error(t, RunHTTPServer())
|
|
||||||
testRequest(t, "http://localhost:8088/api/stat/go")
|
testRequest(t, "http://localhost:8088/api/stat/go")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ func TestRunTLSServer(t *testing.T) {
|
||||||
PushConf.Core.KeyPath = "../certificate/localhost.key"
|
PushConf.Core.KeyPath = "../certificate/localhost.key"
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
assert.NoError(t, RunHTTPServer())
|
assert.NoError(t, RunHTTPServer(context.Background()))
|
||||||
}()
|
}()
|
||||||
// have to wait for the goroutine to start and run the server
|
// have to wait for the goroutine to start and run the server
|
||||||
// otherwise the main thread will complete
|
// otherwise the main thread will complete
|
||||||
|
@ -104,7 +104,7 @@ func TestRunTLSBase64Server(t *testing.T) {
|
||||||
PushConf.Core.KeyBase64 = key
|
PushConf.Core.KeyBase64 = key
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
assert.NoError(t, RunHTTPServer())
|
assert.NoError(t, RunHTTPServer(context.Background()))
|
||||||
}()
|
}()
|
||||||
// have to wait for the goroutine to start and run the server
|
// have to wait for the goroutine to start and run the server
|
||||||
// otherwise the main thread will complete
|
// otherwise the main thread will complete
|
||||||
|
@ -117,7 +117,7 @@ func TestRunAutoTLSServer(t *testing.T) {
|
||||||
initTest()
|
initTest()
|
||||||
PushConf.Core.AutoTLS.Enabled = true
|
PushConf.Core.AutoTLS.Enabled = true
|
||||||
go func() {
|
go func() {
|
||||||
assert.NoError(t, RunHTTPServer())
|
assert.NoError(t, RunHTTPServer(context.Background()))
|
||||||
}()
|
}()
|
||||||
// have to wait for the goroutine to start and run the server
|
// have to wait for the goroutine to start and run the server
|
||||||
// otherwise the main thread will complete
|
// otherwise the main thread will complete
|
||||||
|
@ -132,7 +132,7 @@ func TestLoadTLSCertError(t *testing.T) {
|
||||||
PushConf.Core.CertPath = "../config/config.yml"
|
PushConf.Core.CertPath = "../config/config.yml"
|
||||||
PushConf.Core.KeyPath = "../config/config.yml"
|
PushConf.Core.KeyPath = "../config/config.yml"
|
||||||
|
|
||||||
assert.Error(t, RunHTTPServer())
|
assert.Error(t, RunHTTPServer(context.Background()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMissingTLSCertConfg(t *testing.T) {
|
func TestMissingTLSCertConfg(t *testing.T) {
|
||||||
|
@ -145,8 +145,8 @@ func TestMissingTLSCertConfg(t *testing.T) {
|
||||||
PushConf.Core.CertBase64 = ""
|
PushConf.Core.CertBase64 = ""
|
||||||
PushConf.Core.KeyBase64 = ""
|
PushConf.Core.KeyBase64 = ""
|
||||||
|
|
||||||
err := RunHTTPServer()
|
err := RunHTTPServer(context.Background())
|
||||||
assert.Error(t, RunHTTPServer())
|
assert.Error(t, RunHTTPServer(context.Background()))
|
||||||
assert.Equal(t, "missing https cert config", err.Error())
|
assert.Equal(t, "missing https cert config", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -383,7 +383,7 @@ func TestVersionHandler(t *testing.T) {
|
||||||
func TestDisabledHTTPServer(t *testing.T) {
|
func TestDisabledHTTPServer(t *testing.T) {
|
||||||
initTest()
|
initTest()
|
||||||
PushConf.Core.Enabled = false
|
PushConf.Core.Enabled = false
|
||||||
err := RunHTTPServer()
|
err := RunHTTPServer(context.Background())
|
||||||
PushConf.Core.Enabled = true
|
PushConf.Core.Enabled = true
|
||||||
|
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
17
main.go
17
main.go
|
@ -248,12 +248,14 @@ func main() {
|
||||||
gorush.LogError.Fatal(err)
|
gorush.LogError.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
finished := make(chan struct{})
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
wg.Add(int(gorush.PushConf.Core.WorkerNum))
|
wg.Add(int(gorush.PushConf.Core.WorkerNum))
|
||||||
ctx := withContextFunc(context.Background(), func() {
|
ctx := withContextFunc(context.Background(), func() {
|
||||||
gorush.LogAccess.Info("close the notification queue channel")
|
gorush.LogAccess.Info("close the notification queue channel")
|
||||||
close(gorush.QueueNotification)
|
close(gorush.QueueNotification)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
close(finished)
|
||||||
gorush.LogAccess.Info("the notification queue has been clear")
|
gorush.LogAccess.Info("the notification queue has been clear")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -269,8 +271,19 @@ func main() {
|
||||||
|
|
||||||
var g errgroup.Group
|
var g errgroup.Group
|
||||||
|
|
||||||
g.Go(gorush.RunHTTPServer) // Run httpd server
|
g.Go(func() error {
|
||||||
g.Go(rpc.RunGRPCServer) // Run gRPC internal server
|
return gorush.RunHTTPServer(ctx)
|
||||||
|
}) // Run httpd server
|
||||||
|
|
||||||
|
g.Go(rpc.RunGRPCServer) // Run gRPC internal server
|
||||||
|
|
||||||
|
// check job completely
|
||||||
|
g.Go(func() error {
|
||||||
|
select {
|
||||||
|
case <-finished:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
if err = g.Wait(); err != nil {
|
if err = g.Wait(); err != nil {
|
||||||
gorush.LogError.Fatal(err)
|
gorush.LogError.Fatal(err)
|
||||||
|
|
Loading…
Reference in New Issue