package kubernetes import ( "context" "errors" "fmt" "math/rand" "net/http" "strings" "time" "github.com/dexidp/dex/pkg/log" "github.com/dexidp/dex/storage" "github.com/dexidp/dex/storage/kubernetes/k8sapi" ) const ( kindAuthCode = "AuthCode" kindAuthRequest = "AuthRequest" kindClient = "OAuth2Client" kindRefreshToken = "RefreshToken" kindKeys = "SigningKey" kindPassword = "Password" kindOfflineSessions = "OfflineSessions" kindConnector = "Connector" kindDeviceRequest = "DeviceRequest" kindDeviceToken = "DeviceToken" ) const ( resourceAuthCode = "authcodes" resourceAuthRequest = "authrequests" resourceClient = "oauth2clients" resourceRefreshToken = "refreshtokens" resourceKeys = "signingkeies" // Kubernetes attempts to pluralize. resourcePassword = "passwords" resourceOfflineSessions = "offlinesessionses" // Again attempts to pluralize. resourceConnector = "connectors" resourceDeviceRequest = "devicerequests" resourceDeviceToken = "devicetokens" ) // Config values for the Kubernetes storage type. type Config struct { InCluster bool `json:"inCluster"` KubeConfigFile string `json:"kubeConfigFile"` } // Open returns a storage using Kubernetes third party resource. func (c *Config) Open(logger log.Logger) (storage.Storage, error) { cli, err := c.open(logger, false) if err != nil { return nil, err } return cli, nil } // open returns a kubernetes client, initializing the third party resources used // by dex. // // waitForResources controls if errors creating the resources cause this method to return // immediately (used during testing), or if the client will asynchronously retry. func (c *Config) open(logger log.Logger, waitForResources bool) (*client, error) { if c.InCluster && (c.KubeConfigFile != "") { return nil, errors.New("cannot specify both 'inCluster' and 'kubeConfigFile'") } if !c.InCluster && (c.KubeConfigFile == "") { return nil, errors.New("must specify either 'inCluster' or 'kubeConfigFile'") } var ( cluster k8sapi.Cluster user k8sapi.AuthInfo namespace string err error ) if c.InCluster { cluster, user, namespace, err = inClusterConfig() } else { cluster, user, namespace, err = loadKubeConfig(c.KubeConfigFile) } if err != nil { return nil, err } cli, err := newClient(cluster, user, namespace, logger) if err != nil { return nil, fmt.Errorf("create client: %v", err) } if err = cli.detectKubernetesVersion(); err != nil { return nil, fmt.Errorf("cannot get kubernetes version: %v", err) } ctx, cancel := context.WithCancel(context.Background()) logger.Info("creating custom Kubernetes resources") if !cli.registerCustomResources() { if waitForResources { cancel() return nil, fmt.Errorf("failed creating custom resources") } // Try to synchronously create the custom resources once. This doesn't mean // they'll immediately be available, but ensures that the client will actually try // once. go func() { for { if cli.registerCustomResources() { return } select { case <-ctx.Done(): return case <-time.After(30 * time.Second): } } }() } if waitForResources { if err := cli.waitForCRDs(ctx); err != nil { cancel() return nil, err } } // If the client is closed, stop trying to create resources. cli.cancel = cancel return cli, nil } // registerCustomResources attempts to create the custom resources dex // requires or identifies that they're already enabled. This function creates // custom resource definitions(CRDs) // It logs all errors, returning true if the resources were created successfully. // // Creating a custom resource does not mean that they'll be immediately available. func (cli *client) registerCustomResources() (ok bool) { ok = true definitions := customResourceDefinitions(cli.crdAPIVersion) length := len(definitions) for i := 0; i < length; i++ { var err error var resourceName string r := definitions[i] var i interface{} cli.logger.Infof("checking if custom resource %s has already been created...", r.ObjectMeta.Name) if err := cli.list(r.Spec.Names.Plural, &i); err == nil { cli.logger.Infof("The custom resource %s already available, skipping create", r.ObjectMeta.Name) continue } else { cli.logger.Infof("failed to list custom resource %s, attempting to create: %v", r.ObjectMeta.Name, err) } err = cli.postResource(cli.crdAPIVersion, "", "customresourcedefinitions", r) resourceName = r.ObjectMeta.Name if err != nil { switch err { case storage.ErrAlreadyExists: cli.logger.Infof("custom resource already created %s", resourceName) case storage.ErrNotFound: cli.logger.Errorf("custom resources not found, please enable the respective API group") ok = false default: cli.logger.Errorf("creating custom resource %s: %v", resourceName, err) ok = false } continue } cli.logger.Errorf("create custom resource %s", resourceName) } return ok } // waitForCRDs waits for all CRDs to be in a ready state, and is used // by the tests to synchronize before running conformance. func (cli *client) waitForCRDs(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() for _, crd := range customResourceDefinitions(cli.crdAPIVersion) { for { err := cli.isCRDReady(crd.Name) if err == nil { break } cli.logger.Errorf("checking CRD: %v", err) select { case <-ctx.Done(): return errors.New("timed out waiting for CRDs to be available") case <-time.After(time.Millisecond * 100): } } } return nil } // isCRDReady determines if a CRD is ready by inspecting its conditions. func (cli *client) isCRDReady(name string) error { var r k8sapi.CustomResourceDefinition err := cli.getResource(cli.crdAPIVersion, "", "customresourcedefinitions", name, &r) if err != nil { return fmt.Errorf("get crd %s: %v", name, err) } conds := make(map[string]string) // For debugging, keep the conditions around. for _, c := range r.Status.Conditions { if c.Type == k8sapi.Established && c.Status == k8sapi.ConditionTrue { return nil } conds[string(c.Type)] = string(c.Status) } return fmt.Errorf("crd %s not ready %#v", name, conds) } func (cli *client) Close() error { if cli.cancel != nil { cli.cancel() } return nil } func (cli *client) CreateAuthRequest(a storage.AuthRequest) error { return cli.post(resourceAuthRequest, cli.fromStorageAuthRequest(a)) } func (cli *client) CreateClient(c storage.Client) error { return cli.post(resourceClient, cli.fromStorageClient(c)) } func (cli *client) CreateAuthCode(c storage.AuthCode) error { return cli.post(resourceAuthCode, cli.fromStorageAuthCode(c)) } func (cli *client) CreatePassword(p storage.Password) error { return cli.post(resourcePassword, cli.fromStoragePassword(p)) } func (cli *client) CreateRefresh(r storage.RefreshToken) error { return cli.post(resourceRefreshToken, cli.fromStorageRefreshToken(r)) } func (cli *client) CreateOfflineSessions(o storage.OfflineSessions) error { return cli.post(resourceOfflineSessions, cli.fromStorageOfflineSessions(o)) } func (cli *client) CreateConnector(c storage.Connector) error { return cli.post(resourceConnector, cli.fromStorageConnector(c)) } func (cli *client) GetAuthRequest(id string) (storage.AuthRequest, error) { var req AuthRequest if err := cli.get(resourceAuthRequest, id, &req); err != nil { return storage.AuthRequest{}, err } return toStorageAuthRequest(req), nil } func (cli *client) GetAuthCode(id string) (storage.AuthCode, error) { var code AuthCode if err := cli.get(resourceAuthCode, id, &code); err != nil { return storage.AuthCode{}, err } return toStorageAuthCode(code), nil } func (cli *client) GetClient(id string) (storage.Client, error) { c, err := cli.getClient(id) if err != nil { return storage.Client{}, err } return toStorageClient(c), nil } func (cli *client) getClient(id string) (Client, error) { var c Client name := cli.idToName(id) if err := cli.get(resourceClient, name, &c); err != nil { return Client{}, err } if c.ID != id { return Client{}, fmt.Errorf("get client: ID %q mapped to client with ID %q", id, c.ID) } return c, nil } func (cli *client) GetPassword(email string) (storage.Password, error) { p, err := cli.getPassword(email) if err != nil { return storage.Password{}, err } return toStoragePassword(p), nil } func (cli *client) getPassword(email string) (Password, error) { // TODO(ericchiang): Figure out whose job it is to lowercase emails. email = strings.ToLower(email) var p Password name := cli.idToName(email) if err := cli.get(resourcePassword, name, &p); err != nil { return Password{}, err } if email != p.Email { return Password{}, fmt.Errorf("get email: email %q mapped to password with email %q", email, p.Email) } return p, nil } func (cli *client) GetKeys() (storage.Keys, error) { var keys Keys if err := cli.get(resourceKeys, keysName, &keys); err != nil { return storage.Keys{}, err } return toStorageKeys(keys), nil } func (cli *client) GetRefresh(id string) (storage.RefreshToken, error) { r, err := cli.getRefreshToken(id) if err != nil { return storage.RefreshToken{}, err } return toStorageRefreshToken(r), nil } func (cli *client) getRefreshToken(id string) (r RefreshToken, err error) { err = cli.get(resourceRefreshToken, id, &r) return } func (cli *client) GetOfflineSessions(userID string, connID string) (storage.OfflineSessions, error) { o, err := cli.getOfflineSessions(userID, connID) if err != nil { return storage.OfflineSessions{}, err } return toStorageOfflineSessions(o), nil } func (cli *client) getOfflineSessions(userID string, connID string) (o OfflineSessions, err error) { name := cli.offlineTokenName(userID, connID) if err = cli.get(resourceOfflineSessions, name, &o); err != nil { return OfflineSessions{}, err } if userID != o.UserID || connID != o.ConnID { return OfflineSessions{}, fmt.Errorf("get offline session: wrong session retrieved") } return o, nil } func (cli *client) GetConnector(id string) (storage.Connector, error) { var c Connector if err := cli.get(resourceConnector, id, &c); err != nil { return storage.Connector{}, err } return toStorageConnector(c), nil } func (cli *client) ListClients() ([]storage.Client, error) { return nil, errors.New("not implemented") } func (cli *client) ListRefreshTokens() ([]storage.RefreshToken, error) { return nil, errors.New("not implemented") } func (cli *client) ListPasswords() (passwords []storage.Password, err error) { var passwordList PasswordList if err = cli.list(resourcePassword, &passwordList); err != nil { return passwords, fmt.Errorf("failed to list passwords: %v", err) } for _, password := range passwordList.Passwords { p := storage.Password{ Email: password.Email, Hash: password.Hash, Username: password.Username, UserID: password.UserID, } passwords = append(passwords, p) } return } func (cli *client) ListConnectors() (connectors []storage.Connector, err error) { var connectorList ConnectorList if err = cli.list(resourceConnector, &connectorList); err != nil { return connectors, fmt.Errorf("failed to list connectors: %v", err) } connectors = make([]storage.Connector, len(connectorList.Connectors)) for i, connector := range connectorList.Connectors { connectors[i] = toStorageConnector(connector) } return } func (cli *client) DeleteAuthRequest(id string) error { return cli.delete(resourceAuthRequest, id) } func (cli *client) DeleteAuthCode(code string) error { return cli.delete(resourceAuthCode, code) } func (cli *client) DeleteClient(id string) error { // Check for hash collision. c, err := cli.getClient(id) if err != nil { return err } return cli.delete(resourceClient, c.ObjectMeta.Name) } func (cli *client) DeleteRefresh(id string) error { return cli.delete(resourceRefreshToken, id) } func (cli *client) DeletePassword(email string) error { // Check for hash collision. p, err := cli.getPassword(email) if err != nil { return err } return cli.delete(resourcePassword, p.ObjectMeta.Name) } func (cli *client) DeleteOfflineSessions(userID string, connID string) error { // Check for hash collision. o, err := cli.getOfflineSessions(userID, connID) if err != nil { return err } return cli.delete(resourceOfflineSessions, o.ObjectMeta.Name) } func (cli *client) DeleteConnector(id string) error { return cli.delete(resourceConnector, id) } func (cli *client) UpdateRefreshToken(id string, updater func(old storage.RefreshToken) (storage.RefreshToken, error)) error { return retryOnConflict(context.TODO(), func() error { r, err := cli.getRefreshToken(id) if err != nil { return err } updated, err := updater(toStorageRefreshToken(r)) if err != nil { return err } updated.ID = id newToken := cli.fromStorageRefreshToken(updated) newToken.ObjectMeta = r.ObjectMeta return cli.put(resourceRefreshToken, r.ObjectMeta.Name, newToken) }) } func (cli *client) UpdateClient(id string, updater func(old storage.Client) (storage.Client, error)) error { c, err := cli.getClient(id) if err != nil { return err } updated, err := updater(toStorageClient(c)) if err != nil { return err } updated.ID = c.ID newClient := cli.fromStorageClient(updated) newClient.ObjectMeta = c.ObjectMeta return cli.put(resourceClient, c.ObjectMeta.Name, newClient) } func (cli *client) UpdatePassword(email string, updater func(old storage.Password) (storage.Password, error)) error { p, err := cli.getPassword(email) if err != nil { return err } updated, err := updater(toStoragePassword(p)) if err != nil { return err } updated.Email = p.Email newPassword := cli.fromStoragePassword(updated) newPassword.ObjectMeta = p.ObjectMeta return cli.put(resourcePassword, p.ObjectMeta.Name, newPassword) } func (cli *client) UpdateOfflineSessions(userID string, connID string, updater func(old storage.OfflineSessions) (storage.OfflineSessions, error)) error { return retryOnConflict(context.TODO(), func() error { o, err := cli.getOfflineSessions(userID, connID) if err != nil { return err } updated, err := updater(toStorageOfflineSessions(o)) if err != nil { return err } newOfflineSessions := cli.fromStorageOfflineSessions(updated) newOfflineSessions.ObjectMeta = o.ObjectMeta return cli.put(resourceOfflineSessions, o.ObjectMeta.Name, newOfflineSessions) }) } func (cli *client) UpdateKeys(updater func(old storage.Keys) (storage.Keys, error)) error { firstUpdate := false var keys Keys if err := cli.get(resourceKeys, keysName, &keys); err != nil { if err != storage.ErrNotFound { return err } firstUpdate = true } var oldKeys storage.Keys if !firstUpdate { oldKeys = toStorageKeys(keys) } updated, err := updater(oldKeys) if err != nil { return err } newKeys := cli.fromStorageKeys(updated) if firstUpdate { err = cli.post(resourceKeys, newKeys) if err != nil && errors.Is(err, storage.ErrAlreadyExists) { // We need to tolerate conflicts here in case of HA mode. cli.logger.Debugf("Keys creation failed: %v. It is possible that keys have already been created by another dex instance.", err) return errors.New("keys already created by another server instance") } return err } newKeys.ObjectMeta = keys.ObjectMeta err = cli.put(resourceKeys, keysName, newKeys) if isKubernetesAPIConflictError(err) { // We need to tolerate conflicts here in case of HA mode. // Dex instances run keys rotation at the same time because they use SigningKey.nextRotation CR field as a trigger. cli.logger.Debugf("Keys rotation failed: %v. It is possible that keys have already been rotated by another dex instance.", err) return errors.New("keys already rotated by another server instance") } return err } func (cli *client) UpdateAuthRequest(id string, updater func(a storage.AuthRequest) (storage.AuthRequest, error)) error { var req AuthRequest err := cli.get(resourceAuthRequest, id, &req) if err != nil { return err } updated, err := updater(toStorageAuthRequest(req)) if err != nil { return err } newReq := cli.fromStorageAuthRequest(updated) newReq.ObjectMeta = req.ObjectMeta return cli.put(resourceAuthRequest, id, newReq) } func (cli *client) UpdateConnector(id string, updater func(a storage.Connector) (storage.Connector, error)) error { return retryOnConflict(context.TODO(), func() error { var c Connector err := cli.get(resourceConnector, id, &c) if err != nil { return err } updated, err := updater(toStorageConnector(c)) if err != nil { return err } newConn := cli.fromStorageConnector(updated) newConn.ObjectMeta = c.ObjectMeta return cli.put(resourceConnector, id, newConn) }) } func (cli *client) GarbageCollect(now time.Time) (result storage.GCResult, err error) { var authRequests AuthRequestList if err := cli.list(resourceAuthRequest, &authRequests); err != nil { return result, fmt.Errorf("failed to list auth requests: %v", err) } var delErr error for _, authRequest := range authRequests.AuthRequests { if now.After(authRequest.Expiry) { if err := cli.delete(resourceAuthRequest, authRequest.ObjectMeta.Name); err != nil { cli.logger.Errorf("failed to delete auth request: %v", err) delErr = fmt.Errorf("failed to delete auth request: %v", err) } result.AuthRequests++ } } if delErr != nil { return result, delErr } var authCodes AuthCodeList if err := cli.list(resourceAuthCode, &authCodes); err != nil { return result, fmt.Errorf("failed to list auth codes: %v", err) } for _, authCode := range authCodes.AuthCodes { if now.After(authCode.Expiry) { if err := cli.delete(resourceAuthCode, authCode.ObjectMeta.Name); err != nil { cli.logger.Errorf("failed to delete auth code %v", err) delErr = fmt.Errorf("failed to delete auth code: %v", err) } result.AuthCodes++ } } var deviceRequests DeviceRequestList if err := cli.list(resourceDeviceRequest, &deviceRequests); err != nil { return result, fmt.Errorf("failed to list device requests: %v", err) } for _, deviceRequest := range deviceRequests.DeviceRequests { if now.After(deviceRequest.Expiry) { if err := cli.delete(resourceDeviceRequest, deviceRequest.ObjectMeta.Name); err != nil { cli.logger.Errorf("failed to delete device request: %v", err) delErr = fmt.Errorf("failed to delete device request: %v", err) } result.DeviceRequests++ } } var deviceTokens DeviceTokenList if err := cli.list(resourceDeviceToken, &deviceTokens); err != nil { return result, fmt.Errorf("failed to list device tokens: %v", err) } for _, deviceToken := range deviceTokens.DeviceTokens { if now.After(deviceToken.Expiry) { if err := cli.delete(resourceDeviceToken, deviceToken.ObjectMeta.Name); err != nil { cli.logger.Errorf("failed to delete device token: %v", err) delErr = fmt.Errorf("failed to delete device token: %v", err) } result.DeviceTokens++ } } if delErr != nil { return result, delErr } return result, delErr } func (cli *client) CreateDeviceRequest(d storage.DeviceRequest) error { return cli.post(resourceDeviceRequest, cli.fromStorageDeviceRequest(d)) } func (cli *client) GetDeviceRequest(userCode string) (storage.DeviceRequest, error) { var req DeviceRequest if err := cli.get(resourceDeviceRequest, strings.ToLower(userCode), &req); err != nil { return storage.DeviceRequest{}, err } return toStorageDeviceRequest(req), nil } func (cli *client) CreateDeviceToken(t storage.DeviceToken) error { return cli.post(resourceDeviceToken, cli.fromStorageDeviceToken(t)) } func (cli *client) GetDeviceToken(deviceCode string) (storage.DeviceToken, error) { var token DeviceToken if err := cli.get(resourceDeviceToken, deviceCode, &token); err != nil { return storage.DeviceToken{}, err } return toStorageDeviceToken(token), nil } func (cli *client) getDeviceToken(deviceCode string) (t DeviceToken, err error) { err = cli.get(resourceDeviceToken, deviceCode, &t) return } func (cli *client) UpdateDeviceToken(deviceCode string, updater func(old storage.DeviceToken) (storage.DeviceToken, error)) error { return retryOnConflict(context.TODO(), func() error { r, err := cli.getDeviceToken(deviceCode) if err != nil { return err } updated, err := updater(toStorageDeviceToken(r)) if err != nil { return err } updated.DeviceCode = deviceCode newToken := cli.fromStorageDeviceToken(updated) newToken.ObjectMeta = r.ObjectMeta return cli.put(resourceDeviceToken, r.ObjectMeta.Name, newToken) }) } func isKubernetesAPIConflictError(err error) bool { if httpErr, ok := err.(httpError); ok { if httpErr.StatusCode() == http.StatusConflict { return true } } return false } func retryOnConflict(ctx context.Context, action func() error) error { policy := []int{10, 20, 100, 300, 600} attempts := 0 getNextStep := func() time.Duration { step := policy[attempts] return time.Duration(step*5+rand.Intn(step)) * time.Microsecond } if err := action(); err == nil || !isKubernetesAPIConflictError(err) { return err } for { select { case <-time.After(getNextStep()): if err := action(); err == nil || !isKubernetesAPIConflictError(err) { return err } attempts++ if attempts >= 4 { return errors.New("maximum timeout reached while retrying a conflicted request") } case <-ctx.Done(): return errors.New("canceled") } } }