# frozen_string_literal: true module Gitlab module SidekiqDaemon class Monitor < Daemon include ::Gitlab::Utils::StrongMemoize extend ::Gitlab::Utils::Override NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications' CANCEL_DEADLINE = 24.hours.seconds RECONNECT_TIME = 3.seconds # We use exception derived from `Exception` # to consider this as an very low-level exception # that should not be caught by application CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException attr_reader :jobs attr_reader :jobs_mutex def initialize super @jobs = {} @jobs_mutex = Mutex.new end override :thread_name def thread_name "job_monitor" end def within_job(worker_class, jid, queue) jobs_mutex.synchronize do jobs[jid] = { worker_class: worker_class, thread: Thread.current, started_at: Gitlab::Metrics::System.monotonic_time } end if cancelled?(jid) Sidekiq.logger.warn( class: self.class.to_s, action: 'run', queue: queue, jid: jid, canceled: true ) raise CancelledError end yield ensure jobs_mutex.synchronize do jobs.delete(jid) end end def self.cancel_job(jid) payload = { action: 'cancel', jid: jid }.to_json ::Gitlab::Redis::SharedState.with do |redis| redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1) redis.publish(NOTIFICATION_CHANNEL, payload) end end private def run_thread return unless notification_channel_enabled? begin Sidekiq.logger.info( class: self.class.to_s, action: 'start', message: 'Starting Monitor Daemon' ) while enabled? process_messages sleep(RECONNECT_TIME) end ensure Sidekiq.logger.warn( class: self.class.to_s, action: 'stop', message: 'Stopping Monitor Daemon' ) end end def stop_working thread.raise(Interrupt) if thread.alive? end def process_messages ::Gitlab::Redis::SharedState.with do |redis| redis.subscribe(NOTIFICATION_CHANNEL) do |on| on.message do |channel, message| process_message(message) end end end rescue Exception => e # rubocop:disable Lint/RescueException Sidekiq.logger.warn( class: self.class.to_s, action: 'exception', message: e.message ) # we re-raise system exceptions raise e unless e.is_a?(StandardError) end def process_message(message) Sidekiq.logger.info( class: self.class.to_s, channel: NOTIFICATION_CHANNEL, message: 'Received payload on channel', payload: message ) message = safe_parse(message) return unless message case message['action'] when 'cancel' process_job_cancel(message['jid']) else # unknown message end end def safe_parse(message) Gitlab::Json.parse(message) rescue JSON::ParserError end def process_job_cancel(jid) return unless jid # try to find thread without lock return unless find_thread_unsafe(jid) Thread.new do # try to find a thread, but with guaranteed # that handle for thread corresponds to actually # running job find_thread_with_lock(jid) do |thread| Sidekiq.logger.warn( class: self.class.to_s, action: 'cancel', message: 'Canceling thread with CancelledError', jid: jid, thread_id: thread.object_id ) thread&.raise(CancelledError) end end end # This method needs to be thread-safe # This is why it passes thread in block, # to ensure that we do process this thread def find_thread_unsafe(jid) jobs.dig(jid, :thread) end def find_thread_with_lock(jid) # don't try to lock if we cannot find the thread return unless find_thread_unsafe(jid) jobs_mutex.synchronize do find_thread_unsafe(jid).tap do |thread| yield(thread) if thread end end end def cancelled?(jid) ::Gitlab::Redis::SharedState.with do |redis| redis.exists(self.class.cancel_job_key(jid)) end end def self.cancel_job_key(jid) "sidekiq:cancel:#{jid}" end def notification_channel_enabled? ENV.fetch("SIDEKIQ_MONITOR_WORKER", 0).to_i.nonzero? end end end end