forked from mystiq/dex
Merge pull request #654 from ericchiang/dev-sql-optimistic-concurrency
storage/sql: use isolation level "serializable" for transactions
This commit is contained in:
commit
2a9051c864
4 changed files with 170 additions and 19 deletions
|
@ -4,13 +4,6 @@ Dex requires persisting state to perform various tasks such as track refresh tok
|
|||
|
||||
Storage breaches are serious as they can affect applications that rely on dex. Dex saves sensitive data in its backing storage, including signing keys and bcrypt'd passwords. As such, transport security and database ACLs should both be used, no matter which storage option is chosen.
|
||||
|
||||
|
||||
## Caveat: running replicated instances
|
||||
|
||||
Tests still need to be written to validate that multiple instances of dex behave correctly when using the same storage. While there aren't any technical limitations, edge cases have been observed and progress on these kind of bugs can be found on the [dex issue tracker][issues-transaction-tests].
|
||||
|
||||
The dex team suggests running one dex instance most of the time and two instance during upgrades.
|
||||
|
||||
## Kubernetes third party resources
|
||||
|
||||
__NOTE:__ Dex requires Kubernetes version 1.4+.
|
||||
|
|
|
@ -4,6 +4,9 @@ package conformance
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
|
||||
"github.com/coreos/dex/storage"
|
||||
)
|
||||
|
@ -17,7 +20,10 @@ import (
|
|||
// conformance.
|
||||
func RunTransactionTests(t *testing.T, newStorage func() storage.Storage) {
|
||||
runTests(t, newStorage, []subTest{
|
||||
{"AuthRequestConcurrentUpdate", testAuthRequestConcurrentUpdate},
|
||||
{"ClientConcurrentUpdate", testClientConcurrentUpdate},
|
||||
{"PasswordConcurrentUpdate", testPasswordConcurrentUpdate},
|
||||
{"KeysConcurrentUpdate", testKeysConcurrentUpdate},
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -45,10 +51,124 @@ func testClientConcurrentUpdate(t *testing.T, s storage.Storage) {
|
|||
return old, nil
|
||||
})
|
||||
|
||||
t.Logf("update1: %v", err1)
|
||||
t.Logf("update2: %v", err2)
|
||||
|
||||
if err1 == nil && err2 == nil {
|
||||
t.Errorf("update client: concurrent updates both returned no error")
|
||||
if (err1 == nil) == (err2 == nil) {
|
||||
t.Errorf("update client:\nupdate1: %v\nupdate2: %v\n", err1, err2)
|
||||
}
|
||||
}
|
||||
|
||||
func testAuthRequestConcurrentUpdate(t *testing.T, s storage.Storage) {
|
||||
a := storage.AuthRequest{
|
||||
ID: storage.NewID(),
|
||||
ClientID: "foobar",
|
||||
ResponseTypes: []string{"code"},
|
||||
Scopes: []string{"openid", "email"},
|
||||
RedirectURI: "https://localhost:80/callback",
|
||||
Nonce: "foo",
|
||||
State: "bar",
|
||||
ForceApprovalPrompt: true,
|
||||
LoggedIn: true,
|
||||
Expiry: neverExpire,
|
||||
ConnectorID: "ldap",
|
||||
ConnectorData: []byte(`{"some":"data"}`),
|
||||
Claims: storage.Claims{
|
||||
UserID: "1",
|
||||
Username: "jane",
|
||||
Email: "jane.doe@example.com",
|
||||
EmailVerified: true,
|
||||
Groups: []string{"a", "b"},
|
||||
},
|
||||
}
|
||||
|
||||
if err := s.CreateAuthRequest(a); err != nil {
|
||||
t.Fatalf("failed creating auth request: %v", err)
|
||||
}
|
||||
|
||||
var err1, err2 error
|
||||
|
||||
err1 = s.UpdateAuthRequest(a.ID, func(old storage.AuthRequest) (storage.AuthRequest, error) {
|
||||
old.State = "state 1"
|
||||
err2 = s.UpdateAuthRequest(a.ID, func(old storage.AuthRequest) (storage.AuthRequest, error) {
|
||||
old.State = "state 2"
|
||||
return old, nil
|
||||
})
|
||||
return old, nil
|
||||
})
|
||||
|
||||
if (err1 == nil) == (err2 == nil) {
|
||||
t.Errorf("update auth request:\nupdate1: %v\nupdate2: %v\n", err1, err2)
|
||||
}
|
||||
}
|
||||
|
||||
func testPasswordConcurrentUpdate(t *testing.T, s storage.Storage) {
|
||||
// Use bcrypt.MinCost to keep the tests short.
|
||||
passwordHash, err := bcrypt.GenerateFromPassword([]byte("secret"), bcrypt.MinCost)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
password := storage.Password{
|
||||
Email: "jane@example.com",
|
||||
Hash: passwordHash,
|
||||
Username: "jane",
|
||||
UserID: "foobar",
|
||||
}
|
||||
if err := s.CreatePassword(password); err != nil {
|
||||
t.Fatalf("create password token: %v", err)
|
||||
}
|
||||
|
||||
var err1, err2 error
|
||||
|
||||
err1 = s.UpdatePassword(password.Email, func(old storage.Password) (storage.Password, error) {
|
||||
old.Username = "user 1"
|
||||
err2 = s.UpdatePassword(password.Email, func(old storage.Password) (storage.Password, error) {
|
||||
old.Username = "user 2"
|
||||
return old, nil
|
||||
})
|
||||
return old, nil
|
||||
})
|
||||
|
||||
if (err1 == nil) == (err2 == nil) {
|
||||
t.Errorf("update password: concurrent updates both returned no error")
|
||||
}
|
||||
}
|
||||
|
||||
func testKeysConcurrentUpdate(t *testing.T, s storage.Storage) {
|
||||
// Test twice. Once for a create, once for an update.
|
||||
for i := 0; i < 2; i++ {
|
||||
n := time.Now().UTC().Round(time.Second)
|
||||
keys1 := storage.Keys{
|
||||
SigningKey: jsonWebKeys[0].Private,
|
||||
SigningKeyPub: jsonWebKeys[0].Public,
|
||||
NextRotation: n,
|
||||
}
|
||||
|
||||
keys2 := storage.Keys{
|
||||
SigningKey: jsonWebKeys[2].Private,
|
||||
SigningKeyPub: jsonWebKeys[2].Public,
|
||||
NextRotation: n.Add(time.Hour),
|
||||
VerificationKeys: []storage.VerificationKey{
|
||||
{
|
||||
PublicKey: jsonWebKeys[0].Public,
|
||||
Expiry: n.Add(time.Hour),
|
||||
},
|
||||
{
|
||||
PublicKey: jsonWebKeys[1].Public,
|
||||
Expiry: n.Add(time.Hour * 2),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var err1, err2 error
|
||||
|
||||
err1 = s.UpdateKeys(func(old storage.Keys) (storage.Keys, error) {
|
||||
err2 = s.UpdateKeys(func(old storage.Keys) (storage.Keys, error) {
|
||||
return keys1, nil
|
||||
})
|
||||
return keys2, nil
|
||||
})
|
||||
|
||||
if (err1 == nil) == (err2 == nil) {
|
||||
t.Errorf("update keys: concurrent updates both returned no error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ func cleanDB(c *conn) error {
|
|||
delete from auth_code;
|
||||
delete from refresh_token;
|
||||
delete from keys;
|
||||
delete from password;
|
||||
`)
|
||||
return err
|
||||
}
|
||||
|
@ -48,6 +49,7 @@ func TestSQLite3(t *testing.T) {
|
|||
s := &SQLite3{":memory:"}
|
||||
conn, err := s.open()
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stdout, err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
return conn
|
||||
|
@ -58,15 +60,25 @@ func TestSQLite3(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func getenv(key, defaultVal string) string {
|
||||
if val := os.Getenv(key); val != "" {
|
||||
return val
|
||||
}
|
||||
return defaultVal
|
||||
}
|
||||
|
||||
const testPostgresEnv = "DEX_POSTGRES_HOST"
|
||||
|
||||
func TestPostgres(t *testing.T) {
|
||||
if os.Getenv("DEX_POSTGRES_HOST") == "" {
|
||||
t.Skip("postgres envs not set, skipping tests")
|
||||
host := os.Getenv(testPostgresEnv)
|
||||
if host == "" {
|
||||
t.Skipf("test environment variable %q not set, skipping", testPostgresEnv)
|
||||
}
|
||||
p := Postgres{
|
||||
Database: os.Getenv("DEX_POSTGRES_DATABASE"),
|
||||
User: os.Getenv("DEX_POSTGRES_USER"),
|
||||
Password: os.Getenv("DEX_POSTGRES_PASSWORD"),
|
||||
Host: os.Getenv("DEX_POSTGRES_HOST"),
|
||||
Database: getenv("DEX_POSTGRES_DATABASE", "postgres"),
|
||||
User: getenv("DEX_POSTGRES_USER", "postgres"),
|
||||
Password: getenv("DEX_POSTGRES_PASSWORD", "postgres"),
|
||||
Host: host,
|
||||
SSL: PostgresSSL{
|
||||
Mode: sslDisable, // Postgres container doesn't support SSL.
|
||||
},
|
||||
|
@ -92,4 +104,7 @@ func TestPostgres(t *testing.T) {
|
|||
withTimeout(time.Minute*1, func() {
|
||||
conformance.RunTests(t, newStorage)
|
||||
})
|
||||
withTimeout(time.Minute*1, func() {
|
||||
conformance.RunTransactionTests(t, newStorage)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -45,7 +45,30 @@ func matchLiteral(s string) *regexp.Regexp {
|
|||
var (
|
||||
// The "github.com/lib/pq" driver is the default flavor. All others are
|
||||
// translations of this.
|
||||
flavorPostgres = flavor{}
|
||||
flavorPostgres = flavor{
|
||||
// The default behavior for Postgres transactions is consistent reads, not consistent writes.
|
||||
// For each transaction opened, ensure it has the correct isolation level.
|
||||
//
|
||||
// See: https://www.postgresql.org/docs/9.3/static/sql-set-transaction.html
|
||||
//
|
||||
// NOTE(ericchiang): For some reason using `SET SESSION CHARACTERISTICS AS TRANSACTION` at a
|
||||
// session level didn't work for some edge cases. Might be something worth exploring.
|
||||
executeTx: func(db *sql.DB, fn func(sqlTx *sql.Tx) error) error {
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
if _, err := tx.Exec(`SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;`); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := fn(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
},
|
||||
}
|
||||
|
||||
flavorSQLite3 = flavor{
|
||||
queryReplacers: []replacer{
|
||||
|
|
Loading…
Reference in a new issue