package queueing

import (
	"errors"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
)

type errTooManyRequests struct{ error }
type errQueueingTimedout struct{ error }

var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}

type queueMetrics struct {
	queueingLimit        prometheus.Gauge
	queueingQueueLimit   prometheus.Gauge
	queueingQueueTimeout prometheus.Gauge
	queueingBusy         prometheus.Gauge
	queueingWaiting      prometheus.Gauge
	queueingWaitingTime  prometheus.Histogram
	queueingErrors       *prometheus.CounterVec
}

// newQueueMetrics prepares Prometheus metrics for queueing mechanism
// name specifies name of the queue, used to label metrics with ConstLabel `queue_name`
//      Don't call newQueueMetrics twice with the same name argument!
// timeout specifies the timeout of storing a request in queue - queueMetrics
//         uses it to calculate histogram buckets for gitlab_workhorse_queueing_waiting_time
//         metric
func newQueueMetrics(name string, timeout time.Duration) *queueMetrics {
	waitingTimeBuckets := []float64{
		timeout.Seconds() * 0.01,
		timeout.Seconds() * 0.05,
		timeout.Seconds() * 0.10,
		timeout.Seconds() * 0.25,
		timeout.Seconds() * 0.50,
		timeout.Seconds() * 0.75,
		timeout.Seconds() * 0.90,
		timeout.Seconds() * 0.95,
		timeout.Seconds() * 0.99,
		timeout.Seconds(),
	}

	metrics := &queueMetrics{
		queueingLimit: promauto.NewGauge(prometheus.GaugeOpts{
			Name: "gitlab_workhorse_queueing_limit",
			Help: "Current limit set for the queueing mechanism",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
		}),

		queueingQueueLimit: promauto.NewGauge(prometheus.GaugeOpts{
			Name: "gitlab_workhorse_queueing_queue_limit",
			Help: "Current queueLimit set for the queueing mechanism",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
		}),

		queueingQueueTimeout: promauto.NewGauge(prometheus.GaugeOpts{
			Name: "gitlab_workhorse_queueing_queue_timeout",
			Help: "Current queueTimeout set for the queueing mechanism",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
		}),

		queueingBusy: promauto.NewGauge(prometheus.GaugeOpts{
			Name: "gitlab_workhorse_queueing_busy",
			Help: "How many queued requests are now processed",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
		}),

		queueingWaiting: promauto.NewGauge(prometheus.GaugeOpts{
			Name: "gitlab_workhorse_queueing_waiting",
			Help: "How many requests are now queued",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
		}),

		queueingWaitingTime: promauto.NewHistogram(prometheus.HistogramOpts{
			Name: "gitlab_workhorse_queueing_waiting_time",
			Help: "How many time a request spent in queue",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
			Buckets: waitingTimeBuckets,
		}),

		queueingErrors: promauto.NewCounterVec(
			prometheus.CounterOpts{
				Name: "gitlab_workhorse_queueing_errors",
				Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type",
				ConstLabels: prometheus.Labels{
					"queue_name": name,
				},
			},
			[]string{"type"},
		),
	}

	return metrics
}

type Queue struct {
	*queueMetrics

	name      string
	busyCh    chan struct{}
	waitingCh chan time.Time
	timeout   time.Duration
}

// newQueue creates a new queue
// name specifies name used to label queue metrics.
//      Don't call newQueue twice with the same name argument!
// limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued
// timeout specifies the time limit of storing the request in the queue
// if the number of requests is above the limit
func newQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue {
	queue := &Queue{
		name:      name,
		busyCh:    make(chan struct{}, limit),
		waitingCh: make(chan time.Time, limit+queueLimit),
		timeout:   timeout,
	}

	queue.queueMetrics = newQueueMetrics(name, timeout)
	queue.queueingLimit.Set(float64(limit))
	queue.queueingQueueLimit.Set(float64(queueLimit))
	queue.queueingQueueTimeout.Set(timeout.Seconds())

	return queue
}

// Acquire takes one slot from the Queue
// and returns when a request should be processed
// it allows up to (limit) of requests running at a time
// it allows to queue up to (queue-limit) requests
func (s *Queue) Acquire() (err error) {
	// push item to a queue to claim your own slot (non-blocking)
	select {
	case s.waitingCh <- time.Now():
		s.queueingWaiting.Inc()
		break
	default:
		s.queueingErrors.WithLabelValues("too_many_requests").Inc()
		return ErrTooManyRequests
	}

	defer func() {
		if err != nil {
			waitStarted := <-s.waitingCh
			s.queueingWaiting.Dec()
			s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))
		}
	}()

	// fast path: push item to current processed items (non-blocking)
	select {
	case s.busyCh <- struct{}{}:
		s.queueingBusy.Inc()
		return nil
	default:
		break
	}

	timer := time.NewTimer(s.timeout)
	defer timer.Stop()

	// push item to current processed items (blocking)
	select {
	case s.busyCh <- struct{}{}:
		s.queueingBusy.Inc()
		return nil

	case <-timer.C:
		s.queueingErrors.WithLabelValues("queueing_timedout").Inc()
		return ErrQueueingTimedout
	}
}

// Release marks the finish of processing of requests
// It triggers next request to be processed if it's in queue
func (s *Queue) Release() {
	// dequeue from queue to allow next request to be processed
	waitStarted := <-s.waitingCh
	s.queueingWaiting.Dec()
	s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))

	<-s.busyCh
	s.queueingBusy.Dec()
}