feat: Retry Kubernetes update requests
Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>
This commit is contained in:
parent
6cdbb59406
commit
be378dd9a7
2 changed files with 140 additions and 52 deletions
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -439,19 +440,21 @@ func (cli *client) DeleteConnector(id string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *client) UpdateRefreshToken(id string, updater func(old storage.RefreshToken) (storage.RefreshToken, error)) error {
|
func (cli *client) UpdateRefreshToken(id string, updater func(old storage.RefreshToken) (storage.RefreshToken, error)) error {
|
||||||
r, err := cli.getRefreshToken(id)
|
return retryOnConflict(context.TODO(), func() error {
|
||||||
if err != nil {
|
r, err := cli.getRefreshToken(id)
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
updated, err := updater(toStorageRefreshToken(r))
|
}
|
||||||
if err != nil {
|
updated, err := updater(toStorageRefreshToken(r))
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
updated.ID = id
|
}
|
||||||
|
updated.ID = id
|
||||||
|
|
||||||
newToken := cli.fromStorageRefreshToken(updated)
|
newToken := cli.fromStorageRefreshToken(updated)
|
||||||
newToken.ObjectMeta = r.ObjectMeta
|
newToken.ObjectMeta = r.ObjectMeta
|
||||||
return cli.put(resourceRefreshToken, r.ObjectMeta.Name, newToken)
|
return cli.put(resourceRefreshToken, r.ObjectMeta.Name, newToken)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *client) UpdateClient(id string, updater func(old storage.Client) (storage.Client, error)) error {
|
func (cli *client) UpdateClient(id string, updater func(old storage.Client) (storage.Client, error)) error {
|
||||||
|
@ -489,19 +492,21 @@ func (cli *client) UpdatePassword(email string, updater func(old storage.Passwor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *client) UpdateOfflineSessions(userID string, connID string, updater func(old storage.OfflineSessions) (storage.OfflineSessions, error)) error {
|
func (cli *client) UpdateOfflineSessions(userID string, connID string, updater func(old storage.OfflineSessions) (storage.OfflineSessions, error)) error {
|
||||||
o, err := cli.getOfflineSessions(userID, connID)
|
return retryOnConflict(context.TODO(), func() error {
|
||||||
if err != nil {
|
o, err := cli.getOfflineSessions(userID, connID)
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
updated, err := updater(toStorageOfflineSessions(o))
|
updated, err := updater(toStorageOfflineSessions(o))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
newOfflineSessions := cli.fromStorageOfflineSessions(updated)
|
newOfflineSessions := cli.fromStorageOfflineSessions(updated)
|
||||||
newOfflineSessions.ObjectMeta = o.ObjectMeta
|
newOfflineSessions.ObjectMeta = o.ObjectMeta
|
||||||
return cli.put(resourceOfflineSessions, o.ObjectMeta.Name, newOfflineSessions)
|
return cli.put(resourceOfflineSessions, o.ObjectMeta.Name, newOfflineSessions)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *client) UpdateKeys(updater func(old storage.Keys) (storage.Keys, error)) error {
|
func (cli *client) UpdateKeys(updater func(old storage.Keys) (storage.Keys, error)) error {
|
||||||
|
@ -539,13 +544,11 @@ func (cli *client) UpdateKeys(updater func(old storage.Keys) (storage.Keys, erro
|
||||||
newKeys.ObjectMeta = keys.ObjectMeta
|
newKeys.ObjectMeta = keys.ObjectMeta
|
||||||
|
|
||||||
err = cli.put(resourceKeys, keysName, newKeys)
|
err = cli.put(resourceKeys, keysName, newKeys)
|
||||||
if httpErr, ok := err.(httpError); ok {
|
if isKubernetesAPIConflictError(err) {
|
||||||
// We need to tolerate conflicts here in case of HA mode.
|
// We need to tolerate conflicts here in case of HA mode.
|
||||||
// Dex instances run keys rotation at the same time because they use SigningKey.nextRotation CR field as a trigger.
|
// Dex instances run keys rotation at the same time because they use SigningKey.nextRotation CR field as a trigger.
|
||||||
if httpErr.StatusCode() == http.StatusConflict {
|
cli.logger.Debugf("Keys rotation failed: %v. It is possible that keys have already been rotated by another dex instance.", err)
|
||||||
cli.logger.Debugf("Keys rotation failed: %v. It is possible that keys have already been rotated by another dex instance.", err)
|
return errors.New("keys already rotated by another server instance")
|
||||||
return errors.New("keys already rotated by another server instance")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
@ -569,20 +572,22 @@ func (cli *client) UpdateAuthRequest(id string, updater func(a storage.AuthReque
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *client) UpdateConnector(id string, updater func(a storage.Connector) (storage.Connector, error)) error {
|
func (cli *client) UpdateConnector(id string, updater func(a storage.Connector) (storage.Connector, error)) error {
|
||||||
var c Connector
|
return retryOnConflict(context.TODO(), func() error {
|
||||||
err := cli.get(resourceConnector, id, &c)
|
var c Connector
|
||||||
if err != nil {
|
err := cli.get(resourceConnector, id, &c)
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
updated, err := updater(toStorageConnector(c))
|
updated, err := updater(toStorageConnector(c))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
newConn := cli.fromStorageConnector(updated)
|
newConn := cli.fromStorageConnector(updated)
|
||||||
newConn.ObjectMeta = c.ObjectMeta
|
newConn.ObjectMeta = c.ObjectMeta
|
||||||
return cli.put(resourceConnector, id, newConn)
|
return cli.put(resourceConnector, id, newConn)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *client) GarbageCollect(now time.Time) (result storage.GCResult, err error) {
|
func (cli *client) GarbageCollect(now time.Time) (result storage.GCResult, err error) {
|
||||||
|
@ -686,17 +691,58 @@ func (cli *client) getDeviceToken(deviceCode string) (t DeviceToken, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *client) UpdateDeviceToken(deviceCode string, updater func(old storage.DeviceToken) (storage.DeviceToken, error)) error {
|
func (cli *client) UpdateDeviceToken(deviceCode string, updater func(old storage.DeviceToken) (storage.DeviceToken, error)) error {
|
||||||
r, err := cli.getDeviceToken(deviceCode)
|
return retryOnConflict(context.TODO(), func() error {
|
||||||
if err != nil {
|
r, err := cli.getDeviceToken(deviceCode)
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
updated, err := updater(toStorageDeviceToken(r))
|
}
|
||||||
if err != nil {
|
updated, err := updater(toStorageDeviceToken(r))
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
updated.DeviceCode = deviceCode
|
}
|
||||||
|
updated.DeviceCode = deviceCode
|
||||||
|
|
||||||
newToken := cli.fromStorageDeviceToken(updated)
|
newToken := cli.fromStorageDeviceToken(updated)
|
||||||
newToken.ObjectMeta = r.ObjectMeta
|
newToken.ObjectMeta = r.ObjectMeta
|
||||||
return cli.put(resourceDeviceToken, r.ObjectMeta.Name, newToken)
|
return cli.put(resourceDeviceToken, r.ObjectMeta.Name, newToken)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func isKubernetesAPIConflictError(err error) bool {
|
||||||
|
if httpErr, ok := err.(httpError); ok {
|
||||||
|
if httpErr.StatusCode() == http.StatusConflict {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func retryOnConflict(ctx context.Context, action func() error) error {
|
||||||
|
policy := []int{10, 20, 100, 300, 600}
|
||||||
|
|
||||||
|
attempts := 0
|
||||||
|
getNextStep := func() time.Duration {
|
||||||
|
step := policy[attempts]
|
||||||
|
return time.Duration(step*5+rand.Intn(step)) * time.Microsecond
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := action(); err == nil || !isKubernetesAPIConflictError(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(getNextStep()):
|
||||||
|
if err := action(); err == nil || !isKubernetesAPIConflictError(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
attempts++
|
||||||
|
if attempts >= 4 {
|
||||||
|
return errors.New("maximum timeout reached while retrying a conflicted request")
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return errors.New("canceled")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package kubernetes
|
package kubernetes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -12,6 +13,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"sigs.k8s.io/testing_frameworks/integration"
|
"sigs.k8s.io/testing_frameworks/integration"
|
||||||
|
|
||||||
|
@ -272,3 +274,43 @@ func newStatusCodesResponseTestClient(getResponseCode, actionResponseCode int) *
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRetryOnConflict(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
action func() error
|
||||||
|
exactErr string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"Timeout reached",
|
||||||
|
func() error { err := httpErr{status: 409}; return error(&err) },
|
||||||
|
"maximum timeout reached while retrying a conflicted request",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"HTTP Error",
|
||||||
|
func() error { err := httpErr{status: 500}; return error(&err) },
|
||||||
|
" Internal Server Error: response from server \"\"",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Error",
|
||||||
|
func() error { return errors.New("test") },
|
||||||
|
"test",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"OK",
|
||||||
|
func() error { return nil },
|
||||||
|
"",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range tests {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
err := retryOnConflict(context.TODO(), testCase.action)
|
||||||
|
if testCase.exactErr != "" {
|
||||||
|
require.EqualError(t, err, testCase.exactErr)
|
||||||
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Reference in a new issue