// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package domain

import (
	"sync"
	"sync/atomic"
	"time"

	"github.com/juju/errors"
	"github.com/ngaut/log"
	"github.com/pingcap/tidb/ddl"
	"github.com/pingcap/tidb/infoschema"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/meta"
	"github.com/pingcap/tidb/model"
	"github.com/pingcap/tidb/sessionctx/variable"
	"github.com/pingcap/tidb/store/localstore"
	"github.com/pingcap/tidb/terror"
)

var ddlLastReloadSchemaTS = "ddl_last_reload_schema_ts"

// Domain represents a storage space. Different domains can use the same database name.
// Multiple domains can be used in parallel without synchronization.
type Domain struct {
	store      kv.Storage
	infoHandle *infoschema.Handle
	ddl        ddl.DDL
	leaseCh    chan time.Duration
	// nano seconds
	lastLeaseTS int64
	m           sync.Mutex
}

func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {
	m := meta.NewMeta(txn)
	schemaMetaVersion, err := m.GetSchemaVersion()
	if err != nil {
		return errors.Trace(err)
	}

	info := do.infoHandle.Get()
	if info != nil && schemaMetaVersion <= info.SchemaMetaVersion() {
		// info may be changed by other txn, so here its version may be bigger than schema version,
		// so we don't need to reload.
		log.Debugf("[ddl] schema version is still %d, no need reload", schemaMetaVersion)
		return nil
	}

	schemas, err := m.ListDatabases()
	if err != nil {
		return errors.Trace(err)
	}

	for _, di := range schemas {
		if di.State != model.StatePublic {
			// schema is not public, can't be used outside.
			continue
		}

		tables, err1 := m.ListTables(di.ID)
		if err1 != nil {
			return errors.Trace(err1)
		}

		di.Tables = make([]*model.TableInfo, 0, len(tables))
		for _, tbl := range tables {
			if tbl.State != model.StatePublic {
				// schema is not public, can't be used outsiee.
				continue
			}
			di.Tables = append(di.Tables, tbl)
		}
	}

	log.Infof("[ddl] loadInfoSchema %d", schemaMetaVersion)
	err = do.infoHandle.Set(schemas, schemaMetaVersion)
	return errors.Trace(err)
}

// InfoSchema gets information schema from domain.
func (do *Domain) InfoSchema() infoschema.InfoSchema {
	// try reload if possible.
	do.tryReload()
	return do.infoHandle.Get()
}

// DDL gets DDL from domain.
func (do *Domain) DDL() ddl.DDL {
	return do.ddl
}

// Store gets KV store from domain.
func (do *Domain) Store() kv.Storage {
	return do.store
}

// SetLease will reset the lease time for online DDL change.
func (do *Domain) SetLease(lease time.Duration) {
	do.leaseCh <- lease

	// let ddl to reset lease too.
	do.ddl.SetLease(lease)
}

// Stats returns the domain statistic.
func (do *Domain) Stats() (map[string]interface{}, error) {
	m := make(map[string]interface{})
	m[ddlLastReloadSchemaTS] = atomic.LoadInt64(&do.lastLeaseTS) / 1e9

	return m, nil
}

// GetScope gets the status variables scope.
func (do *Domain) GetScope(status string) variable.ScopeFlag {
	// Now domain status variables scope are all default scope.
	return variable.DefaultScopeFlag
}

func (do *Domain) tryReload() {
	// if we don't have update the schema for a long time > lease, we must force reloading it.
	// Although we try to reload schema every lease time in a goroutine, sometimes it may not
	// run accurately, e.g, the machine has a very high load, running the ticker is delayed.
	last := atomic.LoadInt64(&do.lastLeaseTS)
	lease := do.ddl.GetLease()

	// if lease is 0, we use the local store, so no need to reload.
	if lease > 0 && time.Now().UnixNano()-last > lease.Nanoseconds() {
		do.mustReload()
	}
}

const minReloadTimeout = 20 * time.Second

func (do *Domain) reload() error {
	// lock here for only once at same time.
	do.m.Lock()
	defer do.m.Unlock()

	timeout := do.ddl.GetLease() / 2
	if timeout < minReloadTimeout {
		timeout = minReloadTimeout
	}

	done := make(chan error, 1)
	go func() {
		var err error

		for {
			err = kv.RunInNewTxn(do.store, false, do.loadInfoSchema)
			// if err is db closed, we will return it directly, otherwise, we will
			// check reloading again.
			if terror.ErrorEqual(err, localstore.ErrDBClosed) {
				break
			}

			if err != nil {
				log.Errorf("[ddl] load schema err %v, retry again", errors.ErrorStack(err))
				// TODO: use a backoff algorithm.
				time.Sleep(500 * time.Millisecond)
				continue
			}

			atomic.StoreInt64(&do.lastLeaseTS, time.Now().UnixNano())
			break
		}

		done <- err
	}()

	select {
	case err := <-done:
		return errors.Trace(err)
	case <-time.After(timeout):
		return errors.New("reload schema timeout")
	}
}

func (do *Domain) mustReload() {
	// if reload error, we will terminate whole program to guarantee data safe.
	err := do.reload()
	if err != nil {
		log.Fatalf("[ddl] reload schema err %v", errors.ErrorStack(err))
	}
}

// check schema every 300 seconds default.
const defaultLoadTime = 300 * time.Second

func (do *Domain) loadSchemaInLoop(lease time.Duration) {
	if lease <= 0 {
		lease = defaultLoadTime
	}

	ticker := time.NewTicker(lease)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			err := do.reload()
			// we may close store in test, but the domain load schema loop is still checking,
			// so we can't panic for ErrDBClosed and just return here.
			if terror.ErrorEqual(err, localstore.ErrDBClosed) {
				return
			} else if err != nil {
				log.Fatalf("[ddl] reload schema err %v", errors.ErrorStack(err))
			}
		case newLease := <-do.leaseCh:
			if newLease <= 0 {
				newLease = defaultLoadTime
			}

			if lease == newLease {
				// nothing to do
				continue
			}

			lease = newLease
			// reset ticker too.
			ticker.Stop()
			ticker = time.NewTicker(lease)
		}
	}
}

type ddlCallback struct {
	ddl.BaseCallback
	do *Domain
}

func (c *ddlCallback) OnChanged(err error) error {
	if err != nil {
		return err
	}
	log.Warnf("[ddl] on DDL change")

	c.do.mustReload()
	return nil
}

// NewDomain creates a new domain.
func NewDomain(store kv.Storage, lease time.Duration) (d *Domain, err error) {
	d = &Domain{
		store:   store,
		leaseCh: make(chan time.Duration, 1),
	}

	d.infoHandle = infoschema.NewHandle(d.store)
	d.ddl = ddl.NewDDL(d.store, d.infoHandle, &ddlCallback{do: d}, lease)
	d.mustReload()

	variable.RegisterStatistics(d)

	go d.loadSchemaInLoop(lease)

	return d, nil
}