From 820b46058375116564e77bdfc411fe58adfee651 Mon Sep 17 00:00:00 2001 From: Eric Chiang Date: Mon, 1 Aug 2016 22:53:12 -0700 Subject: [PATCH] storage/kubernetes: garbage collect expired objects --- storage/kubernetes/client.go | 13 ++- storage/kubernetes/garbage_collection.go | 45 ++++++++-- storage/kubernetes/garbage_collection_test.go | 88 +++++++++++++++++++ storage/kubernetes/storage.go | 24 +++++ storage/kubernetes/storage_test.go | 5 +- storage/storage.go | 2 +- 6 files changed, 164 insertions(+), 13 deletions(-) create mode 100644 storage/kubernetes/garbage_collection_test.go diff --git a/storage/kubernetes/client.go b/storage/kubernetes/client.go index 825f21f3..1ce67e27 100644 --- a/storage/kubernetes/client.go +++ b/storage/kubernetes/client.go @@ -19,6 +19,7 @@ import ( "time" "github.com/gtank/cryptopasta" + "golang.org/x/net/context" yaml "gopkg.in/yaml.v2" "github.com/coreos/poke/storage" @@ -33,6 +34,9 @@ type client struct { now func() time.Time + // If not nil, the cancel function for stopping garbage colletion. + cancel context.CancelFunc + // BUG: currently each third party API group can only have one resource in it, // so for each resource this storage uses, it need a unique API group. // @@ -251,7 +255,14 @@ func newClient(cluster k8sapi.Cluster, user k8sapi.AuthInfo, namespace string) ( } // TODO(ericchiang): make API Group and version configurable. - return &client{&http.Client{Transport: t}, cluster.Server, namespace, "oidc.coreos.com/v1", time.Now, true}, nil + return &client{ + client: &http.Client{Transport: t}, + baseURL: cluster.Server, + namespace: namespace, + apiVersion: "oidc.coreos.com/v1", + now: time.Now, + prependResourceNameToAPIGroup: true, + }, nil } type transport struct { diff --git a/storage/kubernetes/garbage_collection.go b/storage/kubernetes/garbage_collection.go index cc3ec519..b58b0c89 100644 --- a/storage/kubernetes/garbage_collection.go +++ b/storage/kubernetes/garbage_collection.go @@ -3,27 +3,56 @@ package kubernetes import ( "fmt" "log" + "time" + + "golang.org/x/net/context" ) -// TODO(ericchiang): Complete this. +// gc begins the gc process for Kubernetes. +func (cli *client) gc(ctx context.Context, every time.Duration) { + handleErr := func(err error) { log.Println(err.Error()) } -type multiErr []error + for { + select { + case <-ctx.Done(): + return + case <-time.After(every): + } -func (m multiErr) Error() string { - return fmt.Sprintf("errors encountered: %s", m) + // TODO(ericchiang): On failures, run garbage collection more often. + log.Println("kubernetes: running garbage collection") + cli.gcAuthRequests(handleErr) + cli.gcAuthCodes(handleErr) + log.Printf("kubernetes: garbage collection finished, next run at %s", cli.now().Add(every)) + } } -func (cli *client) gcAuthRequests() error { +func (cli *client) gcAuthRequests(handleErr func(error)) { var authRequests AuthRequestList if err := cli.list(resourceAuthRequest, &authRequests); err != nil { - return err + handleErr(fmt.Errorf("failed to list auth requests: %v", err)) + return } for _, authRequest := range authRequests.AuthRequests { if cli.now().After(authRequest.Expiry) { if err := cli.delete(resourceAuthRequest, authRequest.ObjectMeta.Name); err != nil { - log.Printf("failed to detele auth request: %v", err) + handleErr(fmt.Errorf("failed to detele auth request: %v", err)) + } + } + } +} + +func (cli *client) gcAuthCodes(handleErr func(error)) { + var authCodes AuthCodeList + if err := cli.list(resourceAuthCode, &authCodes); err != nil { + handleErr(fmt.Errorf("failed to list auth codes: %v", err)) + return + } + for _, authCode := range authCodes.AuthCodes { + if cli.now().After(authCode.Expiry) { + if err := cli.delete(resourceAuthCode, authCode.ObjectMeta.Name); err != nil { + handleErr(fmt.Errorf("failed to delete auth code: %v", err)) } } } - return nil } diff --git a/storage/kubernetes/garbage_collection_test.go b/storage/kubernetes/garbage_collection_test.go new file mode 100644 index 00000000..34dfd209 --- /dev/null +++ b/storage/kubernetes/garbage_collection_test.go @@ -0,0 +1,88 @@ +package kubernetes + +import ( + "testing" + "time" + + "github.com/coreos/poke/storage" +) + +func muster(t *testing.T) func(err error) { + return func(err error) { + if err != nil { + t.Fatal(err) + } + } +} + +func TestGCAuthRequests(t *testing.T) { + cli := loadClient(t) + must := muster(t) + + now := time.Now() + cli.now = func() time.Time { return now } + + expiredID := storage.NewNonce() + goodID := storage.NewNonce() + + must(cli.CreateAuthRequest(storage.AuthRequest{ + ID: expiredID, + Expiry: now.Add(-time.Second), + })) + + must(cli.CreateAuthRequest(storage.AuthRequest{ + ID: goodID, + Expiry: now.Add(time.Second), + })) + + handleErr := func(err error) { t.Error(err.Error()) } + cli.gcAuthRequests(handleErr) + + if _, err := cli.GetAuthRequest(goodID); err != nil { + t.Errorf("failed to get good auth ID: %v", err) + } + _, err := cli.GetAuthRequest(expiredID) + switch { + case err == nil: + t.Errorf("gc did not remove expired auth request") + case err == storage.ErrNotFound: + default: + t.Errorf("expected storage.ErrNotFound, got %v", err) + } +} + +func TestGCAuthCodes(t *testing.T) { + cli := loadClient(t) + must := muster(t) + + now := time.Now() + cli.now = func() time.Time { return now } + + expiredID := storage.NewNonce() + goodID := storage.NewNonce() + + must(cli.CreateAuthCode(storage.AuthCode{ + ID: expiredID, + Expiry: now.Add(-time.Second), + })) + + must(cli.CreateAuthCode(storage.AuthCode{ + ID: goodID, + Expiry: now.Add(time.Second), + })) + + handleErr := func(err error) { t.Error(err.Error()) } + cli.gcAuthCodes(handleErr) + + if _, err := cli.GetAuthCode(goodID); err != nil { + t.Errorf("failed to get good auth ID: %v", err) + } + _, err := cli.GetAuthCode(expiredID) + switch { + case err == nil: + t.Errorf("gc did not remove expired auth request") + case err == storage.ErrNotFound: + default: + t.Errorf("expected storage.ErrNotFound, got %v", err) + } +} diff --git a/storage/kubernetes/storage.go b/storage/kubernetes/storage.go index ab123084..63b292cc 100644 --- a/storage/kubernetes/storage.go +++ b/storage/kubernetes/storage.go @@ -5,8 +5,10 @@ import ( "fmt" "os" "path/filepath" + "time" homedir "github.com/mitchellh/go-homedir" + "golang.org/x/net/context" "github.com/coreos/poke/storage" "github.com/coreos/poke/storage/kubernetes/k8sapi" @@ -32,10 +34,29 @@ const ( type Config struct { InCluster bool `yaml:"inCluster"` KubeConfigPath string `yaml:"kubeConfigPath"` + GCFrequency int64 `yaml:"gcFrequency"` // seconds } // Open returns a storage using Kubernetes third party resource. func (c *Config) Open() (storage.Storage, error) { + cli, err := c.open() + if err != nil { + return nil, err + } + + // start up garbage collection + gcFrequency := c.GCFrequency + if gcFrequency == 0 { + gcFrequency = 600 + } + ctx, cancel := context.WithCancel(context.Background()) + cli.cancel = cancel + go cli.gc(ctx, time.Duration(gcFrequency)*time.Second) + return cli, nil +} + +// open returns a client with no garbage collection. +func (c *Config) open() (*client, error) { if c.InCluster && (c.KubeConfigPath != "") { return nil, errors.New("cannot specify both 'inCluster' and 'kubeConfigPath'") } @@ -70,6 +91,9 @@ func (c *Config) Open() (storage.Storage, error) { } func (cli *client) Close() error { + if cli.cancel != nil { + cli.cancel() + } return nil } diff --git a/storage/kubernetes/storage_test.go b/storage/kubernetes/storage_test.go index 8b807d07..fe19a30f 100644 --- a/storage/kubernetes/storage_test.go +++ b/storage/kubernetes/storage_test.go @@ -4,7 +4,6 @@ import ( "os" "testing" - "github.com/coreos/poke/storage" "github.com/coreos/poke/storage/storagetest" ) @@ -12,12 +11,12 @@ func TestLoadClient(t *testing.T) { loadClient(t) } -func loadClient(t *testing.T) storage.Storage { +func loadClient(t *testing.T) *client { if os.Getenv("KUBECONFIG") == "" { t.Skip() } var config Config - s, err := config.Open() + s, err := config.open() if err != nil { t.Fatal(err) } diff --git a/storage/storage.go b/storage/storage.go index 376cdfec..ae09513a 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -73,7 +73,7 @@ func Open(driverName string, config map[string]string) (Storage, error) { // require compare-and-swap atomic actions. // // Implementations are expected to perform their own garbage collection of -// expired objects (expect keys which are handled by rotation). +// expired objects (expect keys, which are handled by the server). type Storage interface { Close() error