Merge pull request #2019 from dexidp/refactor-run-groups

Refactor run groups
This commit is contained in:
Márk Sági-Kazár 2021-02-25 14:36:01 +01:00 committed by GitHub
commit f7d1405cfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 79 additions and 40 deletions

View File

@ -73,27 +73,6 @@ func commandServe() *cobra.Command {
return cmd
}
func listenAndShutdownGracefully(logger log.Logger, gr *run.Group, srv *http.Server, name string) error {
l, err := net.Listen("tcp", srv.Addr)
if err != nil {
return fmt.Errorf("listening (%s) on %s: %v", name, srv.Addr, err)
}
gr.Add(func() error {
logger.Infof("listening (%s) on %s", name, srv.Addr)
return srv.Serve(l)
}, func(err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
logger.Debugf("starting graceful shutdown (%s)", name)
if err := srv.Shutdown(ctx); err != nil {
logger.Errorf("graceful shutdown (%s): %v", name, err)
}
})
return nil
}
func runServe(options serveOptions) error {
configFile := options.config
configData, err := ioutil.ReadFile(configFile)
@ -354,28 +333,78 @@ func runServe(options serveOptions) error {
InitiallyPassing: true,
})
var gr run.Group
var group run.Group
// Set up telemetry server
if c.Telemetry.HTTP != "" {
telemetrySrv := &http.Server{Addr: c.Telemetry.HTTP, Handler: telemetryRouter}
const name = "telemetry"
defer telemetrySrv.Close()
if err := listenAndShutdownGracefully(logger, &gr, telemetrySrv, "http/telemetry"); err != nil {
return err
logger.Infof("listening (%s) on %s", name, c.Telemetry.HTTP)
l, err := net.Listen("tcp", c.Telemetry.HTTP)
if err != nil {
return fmt.Errorf("listening (%s) on %s: %v", name, c.Telemetry.HTTP, err)
}
server := &http.Server{
Handler: telemetryRouter,
}
defer server.Close()
group.Add(func() error {
return server.Serve(l)
}, func(err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
logger.Debugf("starting graceful shutdown (%s)", name)
if err := server.Shutdown(ctx); err != nil {
logger.Errorf("graceful shutdown (%s): %v", name, err)
}
})
}
// Set up http server
if c.Web.HTTP != "" {
httpSrv := &http.Server{Addr: c.Web.HTTP, Handler: serv}
const name = "http"
defer httpSrv.Close()
if err := listenAndShutdownGracefully(logger, &gr, httpSrv, "http"); err != nil {
return err
logger.Infof("listening (%s) on %s", name, c.Web.HTTP)
l, err := net.Listen("tcp", c.Web.HTTP)
if err != nil {
return fmt.Errorf("listening (%s) on %s: %v", name, c.Web.HTTP, err)
}
server := &http.Server{
Handler: serv,
}
defer server.Close()
group.Add(func() error {
return server.Serve(l)
}, func(err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
logger.Debugf("starting graceful shutdown (%s)", name)
if err := server.Shutdown(ctx); err != nil {
logger.Errorf("graceful shutdown (%s): %v", name, err)
}
})
}
// Set up https server
if c.Web.HTTPS != "" {
httpsSrv := &http.Server{
Addr: c.Web.HTTPS,
const name = "https"
logger.Infof("listening (%s) on %s", name, c.Web.HTTPS)
l, err := net.Listen("tcp", c.Web.HTTPS)
if err != nil {
return fmt.Errorf("listening (%s) on %s: %v", name, c.Web.HTTPS, err)
}
server := &http.Server{
Handler: serv,
TLSConfig: &tls.Config{
CipherSuites: allowedTLSCiphers,
@ -383,14 +412,25 @@ func runServe(options serveOptions) error {
MinVersion: tls.VersionTLS12,
},
}
defer server.Close()
defer httpsSrv.Close()
if err := listenAndShutdownGracefully(logger, &gr, httpsSrv, "https"); err != nil {
return err
}
group.Add(func() error {
return server.ServeTLS(l, c.Web.TLSCert, c.Web.TLSKey)
}, func(err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
logger.Debugf("starting graceful shutdown (%s)", name)
if err := server.Shutdown(ctx); err != nil {
logger.Errorf("graceful shutdown (%s): %v", name, err)
}
})
}
// Set up grpc server
if c.GRPC.Addr != "" {
logger.Infof("listening (grpc) on %s", c.GRPC.Addr)
grpcListener, err := net.Listen("tcp", c.GRPC.Addr)
if err != nil {
return fmt.Errorf("listening (grcp) on %s: %w", c.GRPC.Addr, err)
@ -405,8 +445,7 @@ func runServe(options serveOptions) error {
reflection.Register(grpcSrv)
}
gr.Add(func() error {
logger.Infof("listening (grpc) on %s", c.GRPC.Addr)
group.Add(func() error {
return grpcSrv.Serve(grpcListener)
}, func(err error) {
logger.Debugf("starting graceful shutdown (grpc)")
@ -414,8 +453,8 @@ func runServe(options serveOptions) error {
})
}
gr.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM))
if err := gr.Run(); err != nil {
group.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM))
if err := group.Run(); err != nil {
if _, ok := err.(run.SignalError); !ok {
return fmt.Errorf("run groups: %w", err)
}