require 'mutex_m'

module Gitlab
  module SidekiqMiddleware
    class Shutdown
      extend Mutex_m

      # Default the RSS limit to 0, meaning the MemoryKiller is disabled
      MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i
      # Give Sidekiq 15 minutes of grace time after exceeding the RSS limit
      GRACE_TIME = (ENV['SIDEKIQ_MEMORY_KILLER_GRACE_TIME'] || 15 * 60).to_s.to_i
      # Wait 30 seconds for running jobs to finish during graceful shutdown
      SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i

      # This exception can be used to request that the middleware start shutting down Sidekiq
      WantShutdown = Class.new(StandardError)

      ShutdownWithoutRaise = Class.new(WantShutdown)
      private_constant :ShutdownWithoutRaise

      # For testing only, to avoid race conditions (?) in Rspec mocks.
      attr_reader :trace

      # We store the shutdown thread in a class variable to ensure that there
      # can be only one shutdown thread in the process.
      def self.create_shutdown_thread
        mu_synchronize do
          return unless @shutdown_thread.nil?

          @shutdown_thread = Thread.new { yield }
        end
      end

      # For testing only: so we can wait for the shutdown thread to finish.
      def self.shutdown_thread
        mu_synchronize { @shutdown_thread }
      end

      # For testing only: so that we can reset the global state before each test.
      def self.clear_shutdown_thread
        mu_synchronize { @shutdown_thread = nil }
      end

      def initialize
        @trace = Queue.new if Rails.env.test?
      end

      def call(worker, job, queue)
        shutdown_exception = nil

        begin
          yield
          check_rss!
        rescue WantShutdown => ex
          shutdown_exception = ex
        end

        return unless shutdown_exception

        self.class.create_shutdown_thread do
          do_shutdown(worker, job, shutdown_exception)
        end

        raise shutdown_exception unless shutdown_exception.is_a?(ShutdownWithoutRaise)
      end

      private

      def do_shutdown(worker, job, shutdown_exception)
        Sidekiq.logger.warn "Sidekiq worker PID-#{pid} shutting down because of #{shutdown_exception} after job "\
          "#{worker.class} JID-#{job['jid']}"
        Sidekiq.logger.warn "Sidekiq worker PID-#{pid} will stop fetching new jobs in #{GRACE_TIME} seconds, and will be shut down #{SHUTDOWN_WAIT} seconds later"

        # Wait `GRACE_TIME` to give the memory intensive job time to finish.
        # Then, tell Sidekiq to stop fetching new jobs.
        wait_and_signal(GRACE_TIME, 'SIGTSTP', 'stop fetching new jobs')

        # Wait `SHUTDOWN_WAIT` to give already fetched jobs time to finish.
        # Then, tell Sidekiq to gracefully shut down by giving jobs a few more
        # moments to finish, killing and requeuing them if they didn't, and
        # then terminating itself.
        wait_and_signal(SHUTDOWN_WAIT, 'SIGTERM', 'gracefully shut down')

        # Wait for Sidekiq to shutdown gracefully, and kill it if it didn't.
        wait_and_signal(Sidekiq.options[:timeout] + 2, 'SIGKILL', 'die')
      end

      def check_rss!
        return unless MAX_RSS > 0

        current_rss = get_rss
        return unless current_rss > MAX_RSS

        raise ShutdownWithoutRaise.new("current RSS #{current_rss} exceeds maximum RSS #{MAX_RSS}")
      end

      def get_rss
        output, status = Gitlab::Popen.popen(%W(ps -o rss= -p #{pid}), Rails.root.to_s)
        return 0 unless status.zero?

        output.to_i
      end

      def wait_and_signal(time, signal, explanation)
        Sidekiq.logger.warn "waiting #{time} seconds before sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
        sleep(time)

        Sidekiq.logger.warn "sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
        kill(signal, pid)
      end

      def pid
        Process.pid
      end

      def sleep(time)
        if Rails.env.test?
          @trace << [:sleep, time]
        else
          Kernel.sleep(time)
        end
      end

      def kill(signal, pid)
        if Rails.env.test?
          @trace << [:kill, signal, pid]
        else
          Process.kill(signal, pid)
        end
      end
    end
  end
end