From eae3219e4d4b8781f1ffc0188512f88030f65a13 Mon Sep 17 00:00:00 2001 From: "m.nabokikh" Date: Mon, 13 Sep 2021 14:25:17 +0400 Subject: [PATCH] feat: Add MySQL ent-based storage driver Signed-off-by: m.nabokikh --- .github/workflows/ci.yaml | 14 +++ cmd/dex/config.go | 3 +- storage/ent/mysql.go | 162 +++++++++++++++++++++++++++++++ storage/ent/mysql_test.go | 183 +++++++++++++++++++++++++++++++++++ storage/ent/postgres_test.go | 7 -- storage/ent/sqlite.go | 8 +- storage/ent/utils.go | 10 ++ 7 files changed, 374 insertions(+), 13 deletions(-) create mode 100644 storage/ent/mysql.go create mode 100644 storage/ent/mysql_test.go create mode 100644 storage/ent/utils.go diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fe3a0359..55deda32 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -35,6 +35,15 @@ jobs: - 3306 options: --health-cmd "mysql -proot -e \"show databases;\"" --health-interval 10s --health-timeout 5s --health-retries 5 + mysql-ent: + image: mysql:5.7 + env: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: dex + ports: + - 3306 + options: --health-cmd "mysql -proot -e \"show databases;\"" --health-interval 10s --health-timeout 5s --health-retries 5 + etcd: image: gcr.io/etcd-development/etcd:v3.5.0 ports: @@ -77,6 +86,11 @@ jobs: DEX_MYSQL_PASSWORD: root DEX_MYSQL_HOST: 127.0.0.1 DEX_MYSQL_PORT: ${{ job.services.mysql.ports[3306] }} + DEX_MYSQL_ENT_DATABASE: dex + DEX_MYSQL_ENT_USER: root + DEX_MYSQL_ENT_PASSWORD: root + DEX_MYSQL_ENT_HOST: 127.0.0.1 + DEX_MYSQL_ENT_PORT: ${{ job.services.mysql-ent.ports[3306] }} DEX_POSTGRES_DATABASE: postgres DEX_POSTGRES_USER: postgres DEX_POSTGRES_PASSWORD: postgres diff --git a/cmd/dex/config.go b/cmd/dex/config.go index 309fc52c..37167bb0 100644 --- a/cmd/dex/config.go +++ b/cmd/dex/config.go @@ -183,6 +183,7 @@ var ( _ StorageConfig = (*sql.MySQL)(nil) _ StorageConfig = (*ent.SQLite3)(nil) _ StorageConfig = (*ent.Postgres)(nil) + _ StorageConfig = (*ent.MySQL)(nil) ) func getORMBasedSQLStorage(normal, entBased StorageConfig) func() StorageConfig { @@ -200,9 +201,9 @@ var storages = map[string]func() StorageConfig{ "etcd": func() StorageConfig { return new(etcd.Etcd) }, "kubernetes": func() StorageConfig { return new(kubernetes.Config) }, "memory": func() StorageConfig { return new(memory.Config) }, - "mysql": func() StorageConfig { return new(sql.MySQL) }, "sqlite3": getORMBasedSQLStorage(&sql.SQLite3{}, &ent.SQLite3{}), "postgres": getORMBasedSQLStorage(&sql.Postgres{}, &ent.Postgres{}), + "mysql": getORMBasedSQLStorage(&sql.MySQL{}, &ent.MySQL{}), } // isExpandEnvEnabled returns if os.ExpandEnv should be used for each storage and connector config. diff --git a/storage/ent/mysql.go b/storage/ent/mysql.go new file mode 100644 index 00000000..7caa91ff --- /dev/null +++ b/storage/ent/mysql.go @@ -0,0 +1,162 @@ +package ent + +import ( + "context" + "crypto/sha256" + "crypto/tls" + "crypto/x509" + "database/sql" + "fmt" + "io/ioutil" + "net" + "strconv" + "time" + + entSQL "entgo.io/ent/dialect/sql" + "github.com/go-sql-driver/mysql" + + // Register postgres driver. + _ "github.com/lib/pq" + + "github.com/dexidp/dex/pkg/log" + "github.com/dexidp/dex/storage" + "github.com/dexidp/dex/storage/ent/client" + "github.com/dexidp/dex/storage/ent/db" +) + +// nolint +const ( + // MySQL SSL modes + mysqlSSLTrue = "true" + mysqlSSLFalse = "false" + mysqlSSLSkipVerify = "skip-verify" + mysqlSSLCustom = "custom" +) + +// MySQL options for creating an SQL db. +type MySQL struct { + NetworkDB + + SSL SSL `json:"ssl"` + + params map[string]string +} + +// Open always returns a new in sqlite3 storage. +func (m *MySQL) Open(logger log.Logger) (storage.Storage, error) { + logger.Debug("experimental ent-based storage driver is enabled") + drv, err := m.driver() + if err != nil { + return nil, err + } + + databaseClient := client.NewDatabase( + client.WithClient(db.NewClient(db.Driver(drv))), + client.WithHasher(sha256.New), + // Set tx isolation leve for each transaction as dex does for postgres + client.WithTxIsolationLevel(sql.LevelSerializable), + ) + + if err := databaseClient.Schema().Create(context.TODO()); err != nil { + return nil, err + } + + return databaseClient, nil +} + +func (m *MySQL) driver() (*entSQL.Driver, error) { + var tlsConfig string + + switch { + case m.SSL.CAFile != "" || m.SSL.CertFile != "" || m.SSL.KeyFile != "": + if err := m.makeTLSConfig(); err != nil { + return nil, fmt.Errorf("failed to make TLS config: %v", err) + } + tlsConfig = mysqlSSLCustom + case m.SSL.Mode == "": + tlsConfig = mysqlSSLTrue + default: + tlsConfig = m.SSL.Mode + } + + drv, err := entSQL.Open("mysql", m.dsn(tlsConfig)) + if err != nil { + return nil, err + } + + if m.MaxIdleConns == 0 { + /* Override default behaviour to fix https://github.com/dexidp/dex/issues/1608 */ + drv.DB().SetMaxIdleConns(0) + } else { + drv.DB().SetMaxIdleConns(m.MaxIdleConns) + } + + return drv, nil +} + +func (m *MySQL) dsn(tlsConfig string) string { + cfg := mysql.Config{ + User: m.User, + Passwd: m.Password, + DBName: m.Database, + AllowNativePasswords: true, + + Timeout: time.Second * time.Duration(m.ConnectionTimeout), + + TLSConfig: tlsConfig, + + ParseTime: true, + Params: make(map[string]string), + } + + if m.Host != "" { + if m.Host[0] != '/' { + cfg.Net = "tcp" + cfg.Addr = m.Host + + if m.Port != 0 { + cfg.Addr = net.JoinHostPort(m.Host, strconv.Itoa(int(m.Port))) + } + } else { + cfg.Net = "unix" + cfg.Addr = m.Host + } + } + + for k, v := range m.params { + cfg.Params[k] = v + } + + return cfg.FormatDSN() +} + +func (m *MySQL) makeTLSConfig() error { + cfg := &tls.Config{} + + if m.SSL.CAFile != "" { + rootCertPool := x509.NewCertPool() + + pem, err := ioutil.ReadFile(m.SSL.CAFile) + if err != nil { + return err + } + + if ok := rootCertPool.AppendCertsFromPEM(pem); !ok { + return fmt.Errorf("failed to append PEM") + } + cfg.RootCAs = rootCertPool + } + + if m.SSL.CertFile != "" && m.SSL.KeyFile != "" { + clientCert := make([]tls.Certificate, 0, 1) + certs, err := tls.LoadX509KeyPair(m.SSL.CertFile, m.SSL.KeyFile) + if err != nil { + return err + } + clientCert = append(clientCert, certs) + cfg.Certificates = clientCert + } + + mysql.RegisterTLSConfig(mysqlSSLCustom, cfg) + return nil +} diff --git a/storage/ent/mysql_test.go b/storage/ent/mysql_test.go new file mode 100644 index 00000000..fdb2fda1 --- /dev/null +++ b/storage/ent/mysql_test.go @@ -0,0 +1,183 @@ +package ent + +import ( + "os" + "strconv" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + + "github.com/dexidp/dex/storage" + "github.com/dexidp/dex/storage/conformance" +) + +const ( + MySQLEntHostEnv = "DEX_MYSQL_ENT_HOST" + MySQLEntPortEnv = "DEX_MYSQL_ENT_PORT" + MySQLEntDatabaseEnv = "DEX_MYSQL_ENT_DATABASE" + MySQLEntUserEnv = "DEX_MYSQL_ENT_USER" + MySQLEntPasswordEnv = "DEX_MYSQL_ENT_PASSWORD" +) + +func mysqlTestConfig(host string, port uint64) *MySQL { + return &MySQL{ + NetworkDB: NetworkDB{ + Database: getenv(MySQLEntDatabaseEnv, "mysql"), + User: getenv(MySQLEntUserEnv, "mysql"), + Password: getenv(MySQLEntPasswordEnv, "mysql"), + Host: host, + Port: uint16(port), + }, + SSL: SSL{ + Mode: mysqlSSLSkipVerify, + }, + } +} + +func newMySQLStorage(host string, port uint64) storage.Storage { + logger := &logrus.Logger{ + Out: os.Stderr, + Formatter: &logrus.TextFormatter{DisableColors: true}, + Level: logrus.DebugLevel, + } + + cfg := mysqlTestConfig(host, port) + s, err := cfg.Open(logger) + if err != nil { + panic(err) + } + return s +} + +func TestMySQL(t *testing.T) { + host := os.Getenv(MySQLEntHostEnv) + if host == "" { + t.Skipf("test environment variable %s not set, skipping", MySQLEntHostEnv) + } + + port := uint64(3306) + if rawPort := os.Getenv(MySQLEntPortEnv); rawPort != "" { + var err error + + port, err = strconv.ParseUint(rawPort, 10, 32) + require.NoError(t, err, "invalid mysql port %q: %s", rawPort, err) + } + + newStorage := func() storage.Storage { + return newMySQLStorage(host, port) + } + conformance.RunTests(t, newStorage) + conformance.RunTransactionTests(t, newStorage) +} + +func TestMySQLDSN(t *testing.T) { + tests := []struct { + name string + cfg *MySQL + desiredDSN string + }{ + { + name: "Host port", + cfg: &MySQL{ + NetworkDB: NetworkDB{ + Host: "localhost", + Port: uint16(3306), + }, + }, + desiredDSN: "tcp(localhost:3306)/?checkConnLiveness=false&parseTime=true&tls=false&maxAllowedPacket=0", + }, + { + name: "Host with port", + cfg: &MySQL{ + NetworkDB: NetworkDB{ + Host: "localhost:3306", + }, + }, + desiredDSN: "tcp(localhost:3306)/?checkConnLiveness=false&parseTime=true&tls=false&maxAllowedPacket=0", + }, + { + name: "Host ipv6 with port", + cfg: &MySQL{ + NetworkDB: NetworkDB{ + Host: "[a:b:c:d]:3306", + }, + }, + desiredDSN: "tcp([a:b:c:d]:3306)/?checkConnLiveness=false&parseTime=true&tls=false&maxAllowedPacket=0", + }, + { + name: "Credentials and timeout", + cfg: &MySQL{ + NetworkDB: NetworkDB{ + Database: "test", + User: "test", + Password: "test", + ConnectionTimeout: 5, + }, + }, + desiredDSN: "test:test@/test?checkConnLiveness=false&parseTime=true&timeout=5s&tls=false&maxAllowedPacket=0", + }, + { + name: "SSL", + cfg: &MySQL{ + SSL: SSL{ + CAFile: "/ca.crt", + KeyFile: "/cert.crt", + CertFile: "/cert.key", + }, + }, + desiredDSN: "/?checkConnLiveness=false&parseTime=true&tls=false&maxAllowedPacket=0", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.desiredDSN, tt.cfg.dsn(mysqlSSLFalse)) + }) + } +} + +func TestMySQLDriver(t *testing.T) { + host := os.Getenv(MySQLEntHostEnv) + if host == "" { + t.Skipf("test environment variable %s not set, skipping", MySQLEntHostEnv) + } + + port := uint64(3306) + if rawPort := os.Getenv(MySQLEntPortEnv); rawPort != "" { + var err error + + port, err = strconv.ParseUint(rawPort, 10, 32) + require.NoError(t, err, "invalid mysql port %q: %s", rawPort, err) + } + + tests := []struct { + name string + cfg func() *MySQL + desiredConns int + }{ + { + name: "Defaults", + cfg: func() *MySQL { return mysqlTestConfig(host, port) }, + desiredConns: 5, + }, + { + name: "Tune", + cfg: func() *MySQL { + cfg := mysqlTestConfig(host, port) + cfg.MaxOpenConns = 101 + return cfg + }, + desiredConns: 101, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + drv, err := tt.cfg().driver() + require.NoError(t, err) + + require.Equal(t, tt.desiredConns, drv.DB().Stats().MaxOpenConnections) + }) + } +} diff --git a/storage/ent/postgres_test.go b/storage/ent/postgres_test.go index 8021e3a1..c8e3a54d 100644 --- a/storage/ent/postgres_test.go +++ b/storage/ent/postgres_test.go @@ -20,13 +20,6 @@ const ( PostgresEntPasswordEnv = "DEX_POSTGRES_ENT_PASSWORD" ) -func getenv(key, defaultVal string) string { - if val := os.Getenv(key); val != "" { - return val - } - return defaultVal -} - func postgresTestConfig(host string, port uint64) *Postgres { return &Postgres{ NetworkDB: NetworkDB{ diff --git a/storage/ent/sqlite.go b/storage/ent/sqlite.go index e6c43cd9..22866b6f 100644 --- a/storage/ent/sqlite.go +++ b/storage/ent/sqlite.go @@ -33,12 +33,10 @@ func (s *SQLite3) Open(logger log.Logger) (storage.Storage, error) { return nil, err } + // always allow only one connection to sqlite3, any other thread/go-routine + // attempting concurrent access will have to wait pool := drv.DB() - if s.File == ":memory:" { - // sqlite3 uses file locks to coordinate concurrent access. In memory - // doesn't support this, so limit the number of connections to 1. - pool.SetMaxOpenConns(1) - } + pool.SetMaxOpenConns(1) databaseClient := client.NewDatabase( client.WithClient(db.NewClient(db.Driver(drv))), diff --git a/storage/ent/utils.go b/storage/ent/utils.go new file mode 100644 index 00000000..6f51e065 --- /dev/null +++ b/storage/ent/utils.go @@ -0,0 +1,10 @@ +package ent + +import "os" + +func getenv(key, defaultVal string) string { + if val := os.Getenv(key); val != "" { + return val + } + return defaultVal +}