453 lines
18 KiB
Go
453 lines
18 KiB
Go
/*
|
|
*
|
|
* Copyright 2021 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
// Package server contains internal server-side functionality used by the public
|
|
// facing xds package.
|
|
package server
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"google.golang.org/grpc/backoff"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/grpclog"
|
|
internalbackoff "google.golang.org/grpc/internal/backoff"
|
|
"google.golang.org/grpc/internal/envconfig"
|
|
internalgrpclog "google.golang.org/grpc/internal/grpclog"
|
|
"google.golang.org/grpc/internal/grpcsync"
|
|
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
|
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
|
|
)
|
|
|
|
var (
|
|
logger = grpclog.Component("xds")
|
|
|
|
// Backoff strategy for temporary errors received from Accept(). If this
|
|
// needs to be configurable, we can inject it through ListenerWrapperParams.
|
|
bs = internalbackoff.Exponential{Config: backoff.Config{
|
|
BaseDelay: 5 * time.Millisecond,
|
|
Multiplier: 2.0,
|
|
MaxDelay: 1 * time.Second,
|
|
}}
|
|
backoffFunc = bs.Backoff
|
|
)
|
|
|
|
// ServingModeCallback is the callback that users can register to get notified
|
|
// about the server's serving mode changes. The callback is invoked with the
|
|
// address of the listener and its new mode. The err parameter is set to a
|
|
// non-nil error if the server has transitioned into not-serving mode.
|
|
type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err error)
|
|
|
|
// DrainCallback is the callback that an xDS-enabled server registers to get
|
|
// notified about updates to the Listener configuration. The server is expected
|
|
// to gracefully shutdown existing connections, thereby forcing clients to
|
|
// reconnect and have the new configuration applied to the newly created
|
|
// connections.
|
|
type DrainCallback func(addr net.Addr)
|
|
|
|
func prefixLogger(p *listenerWrapper) *internalgrpclog.PrefixLogger {
|
|
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", p))
|
|
}
|
|
|
|
// XDSClient wraps the methods on the XDSClient which are required by
|
|
// the listenerWrapper.
|
|
type XDSClient interface {
|
|
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
|
|
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
|
|
BootstrapConfig() *bootstrap.Config
|
|
}
|
|
|
|
// ListenerWrapperParams wraps parameters required to create a listenerWrapper.
|
|
type ListenerWrapperParams struct {
|
|
// Listener is the net.Listener passed by the user that is to be wrapped.
|
|
Listener net.Listener
|
|
// ListenerResourceName is the xDS Listener resource to request.
|
|
ListenerResourceName string
|
|
// XDSCredsInUse specifies whether or not the user expressed interest to
|
|
// receive security configuration from the control plane.
|
|
XDSCredsInUse bool
|
|
// XDSClient provides the functionality from the XDSClient required here.
|
|
XDSClient XDSClient
|
|
// ModeCallback is the callback to invoke when the serving mode changes.
|
|
ModeCallback ServingModeCallback
|
|
// DrainCallback is the callback to invoke when the Listener gets a LDS
|
|
// update.
|
|
DrainCallback DrainCallback
|
|
}
|
|
|
|
// NewListenerWrapper creates a new listenerWrapper with params. It returns a
|
|
// net.Listener and a channel which is written to, indicating that the former is
|
|
// ready to be passed to grpc.Serve().
|
|
//
|
|
// Only TCP listeners are supported.
|
|
func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan struct{}) {
|
|
lw := &listenerWrapper{
|
|
Listener: params.Listener,
|
|
name: params.ListenerResourceName,
|
|
xdsCredsInUse: params.XDSCredsInUse,
|
|
xdsC: params.XDSClient,
|
|
modeCallback: params.ModeCallback,
|
|
drainCallback: params.DrainCallback,
|
|
isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(),
|
|
|
|
mode: connectivity.ServingModeStarting,
|
|
closed: grpcsync.NewEvent(),
|
|
goodUpdate: grpcsync.NewEvent(),
|
|
ldsUpdateCh: make(chan ldsUpdateWithError, 1),
|
|
rdsUpdateCh: make(chan rdsHandlerUpdate, 1),
|
|
}
|
|
lw.logger = prefixLogger(lw)
|
|
|
|
// Serve() verifies that Addr() returns a valid TCPAddr. So, it is safe to
|
|
// ignore the error from SplitHostPort().
|
|
lisAddr := lw.Listener.Addr().String()
|
|
lw.addr, lw.port, _ = net.SplitHostPort(lisAddr)
|
|
|
|
lw.rdsHandler = newRDSHandler(lw.xdsC, lw.rdsUpdateCh)
|
|
|
|
cancelWatch := lw.xdsC.WatchListener(lw.name, lw.handleListenerUpdate)
|
|
lw.logger.Infof("Watch started on resource name %v", lw.name)
|
|
lw.cancelWatch = func() {
|
|
cancelWatch()
|
|
lw.logger.Infof("Watch cancelled on resource name %v", lw.name)
|
|
}
|
|
go lw.run()
|
|
return lw, lw.goodUpdate.Done()
|
|
}
|
|
|
|
type ldsUpdateWithError struct {
|
|
update xdsresource.ListenerUpdate
|
|
err error
|
|
}
|
|
|
|
// listenerWrapper wraps the net.Listener associated with the listening address
|
|
// passed to Serve(). It also contains all other state associated with this
|
|
// particular invocation of Serve().
|
|
type listenerWrapper struct {
|
|
net.Listener
|
|
logger *internalgrpclog.PrefixLogger
|
|
|
|
name string
|
|
xdsCredsInUse bool
|
|
xdsC XDSClient
|
|
cancelWatch func()
|
|
modeCallback ServingModeCallback
|
|
drainCallback DrainCallback
|
|
|
|
// Set to true if the listener is bound to the IP_ANY address (which is
|
|
// "0.0.0.0" for IPv4 and "::" for IPv6).
|
|
isUnspecifiedAddr bool
|
|
// Listening address and port. Used to validate the socket address in the
|
|
// Listener resource received from the control plane.
|
|
addr, port string
|
|
|
|
// This is used to notify that a good update has been received and that
|
|
// Serve() can be invoked on the underlying gRPC server. Using an event
|
|
// instead of a vanilla channel simplifies the update handler as it need not
|
|
// keep track of whether the received update is the first one or not.
|
|
goodUpdate *grpcsync.Event
|
|
// A small race exists in the XDSClient code between the receipt of an xDS
|
|
// response and the user cancelling the associated watch. In this window,
|
|
// the registered callback may be invoked after the watch is canceled, and
|
|
// the user is expected to work around this. This event signifies that the
|
|
// listener is closed (and hence the watch is cancelled), and we drop any
|
|
// updates received in the callback if this event has fired.
|
|
closed *grpcsync.Event
|
|
|
|
// mu guards access to the current serving mode and the filter chains. The
|
|
// reason for using an rw lock here is that these fields are read in
|
|
// Accept() for all incoming connections, but writes happen rarely (when we
|
|
// get a Listener resource update).
|
|
mu sync.RWMutex
|
|
// Current serving mode.
|
|
mode connectivity.ServingMode
|
|
// Filter chains received as part of the last good update.
|
|
filterChains *xdsresource.FilterChainManager
|
|
|
|
// rdsHandler is used for any dynamic RDS resources specified in a LDS
|
|
// update.
|
|
rdsHandler *rdsHandler
|
|
// rdsUpdates are the RDS resources received from the management
|
|
// server, keyed on the RouteName of the RDS resource.
|
|
rdsUpdates unsafe.Pointer // map[string]xdsclient.RouteConfigUpdate
|
|
// ldsUpdateCh is a channel for XDSClient LDS updates.
|
|
ldsUpdateCh chan ldsUpdateWithError
|
|
// rdsUpdateCh is a channel for XDSClient RDS updates.
|
|
rdsUpdateCh chan rdsHandlerUpdate
|
|
}
|
|
|
|
// Accept blocks on an Accept() on the underlying listener, and wraps the
|
|
// returned net.connWrapper with the configured certificate providers.
|
|
func (l *listenerWrapper) Accept() (net.Conn, error) {
|
|
var retries int
|
|
for {
|
|
conn, err := l.Listener.Accept()
|
|
if err != nil {
|
|
// Temporary() method is implemented by certain error types returned
|
|
// from the net package, and it is useful for us to not shutdown the
|
|
// server in these conditions. The listen queue being full is one
|
|
// such case.
|
|
if ne, ok := err.(interface{ Temporary() bool }); !ok || !ne.Temporary() {
|
|
return nil, err
|
|
}
|
|
retries++
|
|
timer := time.NewTimer(backoffFunc(retries))
|
|
select {
|
|
case <-timer.C:
|
|
case <-l.closed.Done():
|
|
timer.Stop()
|
|
// Continuing here will cause us to call Accept() again
|
|
// which will return a non-temporary error.
|
|
continue
|
|
}
|
|
continue
|
|
}
|
|
// Reset retries after a successful Accept().
|
|
retries = 0
|
|
|
|
// Since the net.Conn represents an incoming connection, the source and
|
|
// destination address can be retrieved from the local address and
|
|
// remote address of the net.Conn respectively.
|
|
destAddr, ok1 := conn.LocalAddr().(*net.TCPAddr)
|
|
srcAddr, ok2 := conn.RemoteAddr().(*net.TCPAddr)
|
|
if !ok1 || !ok2 {
|
|
// If the incoming connection is not a TCP connection, which is
|
|
// really unexpected since we check whether the provided listener is
|
|
// a TCP listener in Serve(), we return an error which would cause
|
|
// us to stop serving.
|
|
return nil, fmt.Errorf("received connection with non-TCP address (local: %T, remote %T)", conn.LocalAddr(), conn.RemoteAddr())
|
|
}
|
|
|
|
l.mu.RLock()
|
|
if l.mode == connectivity.ServingModeNotServing {
|
|
// Close connections as soon as we accept them when we are in
|
|
// "not-serving" mode. Since we accept a net.Listener from the user
|
|
// in Serve(), we cannot close the listener when we move to
|
|
// "not-serving". Closing the connection immediately upon accepting
|
|
// is one of the other ways to implement the "not-serving" mode as
|
|
// outlined in gRFC A36.
|
|
l.mu.RUnlock()
|
|
conn.Close()
|
|
continue
|
|
}
|
|
fc, err := l.filterChains.Lookup(xdsresource.FilterChainLookupParams{
|
|
IsUnspecifiedListener: l.isUnspecifiedAddr,
|
|
DestAddr: destAddr.IP,
|
|
SourceAddr: srcAddr.IP,
|
|
SourcePort: srcAddr.Port,
|
|
})
|
|
l.mu.RUnlock()
|
|
if err != nil {
|
|
// When a matching filter chain is not found, we close the
|
|
// connection right away, but do not return an error back to
|
|
// `grpc.Serve()` from where this Accept() was invoked. Returning an
|
|
// error to `grpc.Serve()` causes the server to shutdown. If we want
|
|
// to avoid the server from shutting down, we would need to return
|
|
// an error type which implements the `Temporary() bool` method,
|
|
// which is invoked by `grpc.Serve()` to see if the returned error
|
|
// represents a temporary condition. In the case of a temporary
|
|
// error, `grpc.Serve()` method sleeps for a small duration and
|
|
// therefore ends up blocking all connection attempts during that
|
|
// time frame, which is also not ideal for an error like this.
|
|
l.logger.Warningf("connection from %s to %s failed to find any matching filter chain", conn.RemoteAddr().String(), conn.LocalAddr().String())
|
|
conn.Close()
|
|
continue
|
|
}
|
|
if !envconfig.XDSRBAC {
|
|
return &connWrapper{Conn: conn, filterChain: fc, parent: l}, nil
|
|
}
|
|
var rc xdsresource.RouteConfigUpdate
|
|
if fc.InlineRouteConfig != nil {
|
|
rc = *fc.InlineRouteConfig
|
|
} else {
|
|
rcPtr := atomic.LoadPointer(&l.rdsUpdates)
|
|
rcuPtr := (*map[string]xdsresource.RouteConfigUpdate)(rcPtr)
|
|
// This shouldn't happen, but this error protects against a panic.
|
|
if rcuPtr == nil {
|
|
return nil, errors.New("route configuration pointer is nil")
|
|
}
|
|
rcu := *rcuPtr
|
|
rc = rcu[fc.RouteConfigName]
|
|
}
|
|
// The filter chain will construct a usuable route table on each
|
|
// connection accept. This is done because preinstantiating every route
|
|
// table before it is needed for a connection would potentially lead to
|
|
// a lot of cpu time and memory allocated for route tables that will
|
|
// never be used. There was also a thought to cache this configuration,
|
|
// and reuse it for the next accepted connection. However, this would
|
|
// lead to a lot of code complexity (RDS Updates for a given route name
|
|
// can come it at any time), and connections aren't accepted too often,
|
|
// so this reinstantation of the Route Configuration is an acceptable
|
|
// tradeoff for simplicity.
|
|
vhswi, err := fc.ConstructUsableRouteConfiguration(rc)
|
|
if err != nil {
|
|
l.logger.Warningf("route configuration construction: %v", err)
|
|
conn.Close()
|
|
continue
|
|
}
|
|
return &connWrapper{Conn: conn, filterChain: fc, parent: l, virtualHosts: vhswi}, nil
|
|
}
|
|
}
|
|
|
|
// Close closes the underlying listener. It also cancels the xDS watch
|
|
// registered in Serve() and closes any certificate provider instances created
|
|
// based on security configuration received in the LDS response.
|
|
func (l *listenerWrapper) Close() error {
|
|
l.closed.Fire()
|
|
l.Listener.Close()
|
|
if l.cancelWatch != nil {
|
|
l.cancelWatch()
|
|
}
|
|
l.rdsHandler.close()
|
|
return nil
|
|
}
|
|
|
|
// run is a long running goroutine which handles all xds updates. LDS and RDS
|
|
// push updates onto a channel which is read and acted upon from this goroutine.
|
|
func (l *listenerWrapper) run() {
|
|
for {
|
|
select {
|
|
case <-l.closed.Done():
|
|
return
|
|
case u := <-l.ldsUpdateCh:
|
|
l.handleLDSUpdate(u)
|
|
case u := <-l.rdsUpdateCh:
|
|
l.handleRDSUpdate(u)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleLDSUpdate is the callback which handles LDS Updates. It writes the
|
|
// received update to the update channel, which is picked up by the run
|
|
// goroutine.
|
|
func (l *listenerWrapper) handleListenerUpdate(update xdsresource.ListenerUpdate, err error) {
|
|
if l.closed.HasFired() {
|
|
l.logger.Warningf("Resource %q received update: %v with error: %v, after listener was closed", l.name, update, err)
|
|
return
|
|
}
|
|
// Remove any existing entry in ldsUpdateCh and replace with the new one, as the only update
|
|
// listener cares about is most recent update.
|
|
select {
|
|
case <-l.ldsUpdateCh:
|
|
default:
|
|
}
|
|
l.ldsUpdateCh <- ldsUpdateWithError{update: update, err: err}
|
|
}
|
|
|
|
// handleRDSUpdate handles a full rds update from rds handler. On a successful
|
|
// update, the server will switch to ServingModeServing as the full
|
|
// configuration (both LDS and RDS) has been received.
|
|
func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
|
|
if l.closed.HasFired() {
|
|
l.logger.Warningf("RDS received update: %v with error: %v, after listener was closed", update.updates, update.err)
|
|
return
|
|
}
|
|
if update.err != nil {
|
|
l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err)
|
|
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
|
|
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
|
|
}
|
|
// For errors which are anything other than "resource-not-found", we
|
|
// continue to use the old configuration.
|
|
return
|
|
}
|
|
atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates))
|
|
|
|
l.switchMode(l.filterChains, connectivity.ServingModeServing, nil)
|
|
l.goodUpdate.Fire()
|
|
}
|
|
|
|
func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
|
|
if update.err != nil {
|
|
l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err)
|
|
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
|
|
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
|
|
}
|
|
// For errors which are anything other than "resource-not-found", we
|
|
// continue to use the old configuration.
|
|
return
|
|
}
|
|
l.logger.Infof("Received update for resource %q: %+v", l.name, update.update)
|
|
|
|
// Make sure that the socket address on the received Listener resource
|
|
// matches the address of the net.Listener passed to us by the user. This
|
|
// check is done here instead of at the XDSClient layer because of the
|
|
// following couple of reasons:
|
|
// - XDSClient cannot know the listening address of every listener in the
|
|
// system, and hence cannot perform this check.
|
|
// - this is a very context-dependent check and only the server has the
|
|
// appropriate context to perform this check.
|
|
//
|
|
// What this means is that the XDSClient has ACKed a resource which can push
|
|
// the server into a "not serving" mode. This is not ideal, but this is
|
|
// what we have decided to do. See gRPC A36 for more details.
|
|
ilc := update.update.InboundListenerCfg
|
|
if ilc.Address != l.addr || ilc.Port != l.port {
|
|
l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
|
|
return
|
|
}
|
|
|
|
// "Updates to a Listener cause all older connections on that Listener to be
|
|
// gracefully shut down with a grace period of 10 minutes for long-lived
|
|
// RPC's, such that clients will reconnect and have the updated
|
|
// configuration apply." - A36 Note that this is not the same as moving the
|
|
// Server's state to ServingModeNotServing. That prevents new connections
|
|
// from being accepted, whereas here we simply want the clients to reconnect
|
|
// to get the updated configuration.
|
|
if envconfig.XDSRBAC {
|
|
if l.drainCallback != nil {
|
|
l.drainCallback(l.Listener.Addr())
|
|
}
|
|
}
|
|
l.rdsHandler.updateRouteNamesToWatch(ilc.FilterChains.RouteConfigNames)
|
|
// If there are no dynamic RDS Configurations still needed to be received
|
|
// from the management server, this listener has all the configuration
|
|
// needed, and is ready to serve.
|
|
if len(ilc.FilterChains.RouteConfigNames) == 0 {
|
|
l.switchMode(ilc.FilterChains, connectivity.ServingModeServing, nil)
|
|
l.goodUpdate.Fire()
|
|
}
|
|
}
|
|
|
|
// switchMode updates the value of serving mode and filter chains stored in the
|
|
// listenerWrapper. And if the serving mode has changed, it invokes the
|
|
// registered mode change callback.
|
|
func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMode connectivity.ServingMode, err error) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
l.filterChains = fcs
|
|
if l.mode == newMode && l.mode == connectivity.ServingModeServing {
|
|
// Redundant updates are suppressed only when we are SERVING and the new
|
|
// mode is also SERVING. In the other case (where we are NOT_SERVING and the
|
|
// new mode is also NOT_SERVING), the update is not suppressed as:
|
|
// 1. the error may have change
|
|
// 2. it provides a timestamp of the last backoff attempt
|
|
return
|
|
}
|
|
l.mode = newMode
|
|
if l.modeCallback != nil {
|
|
l.modeCallback(l.Listener.Addr(), newMode, err)
|
|
}
|
|
}
|