debian-mirror-gitlab/app/workers/concerns/application_worker.rb

99 lines
2.3 KiB
Ruby
Raw Permalink Normal View History

2018-11-08 19:23:39 +05:30
# frozen_string_literal: true
2019-09-30 21:07:59 +05:30
require 'sidekiq/api'
2018-03-17 18:26:18 +05:30
Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker
extend ActiveSupport::Concern
include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
2019-12-21 20:55:43 +05:30
include WorkerAttributes
2020-03-09 13:42:32 +05:30
include WorkerContext
2018-03-17 18:26:18 +05:30
2020-05-24 23:13:21 +05:30
LOGGING_EXTRA_KEY = 'extra'
2018-03-17 18:26:18 +05:30
included do
set_queue
2020-04-08 14:13:33 +05:30
def structured_payload(payload = {})
context = Labkit::Context.current.to_h.merge(
'class' => self.class,
'job_status' => 'running',
'queue' => self.class.queue,
'jid' => jid
)
payload.stringify_keys.merge(context)
end
2020-05-24 23:13:21 +05:30
def log_extra_metadata_on_done(key, value)
@done_log_extra_metadata ||= {}
@done_log_extra_metadata[key] = value
end
def logging_extras
return {} unless @done_log_extra_metadata
# Prefix keys with class name to avoid conflicts in Elasticsearch types.
# Also prefix with "extra." so that we know to log these new fields.
@done_log_extra_metadata.transform_keys do |k|
"#{LOGGING_EXTRA_KEY}.#{self.class.name.gsub("::", "_").underscore}.#{k}"
end
end
2018-03-17 18:26:18 +05:30
end
2018-11-20 20:47:30 +05:30
class_methods do
2018-03-17 18:26:18 +05:30
def inherited(subclass)
subclass.set_queue
end
def set_queue
queue_name = [queue_namespace, base_queue_name].compact.join(':')
sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
end
def base_queue_name
name
.sub(/\AGitlab::/, '')
.sub(/Worker\z/, '')
.underscore
.tr('/', '_')
end
def queue_namespace(new_namespace = nil)
if new_namespace
sidekiq_options queue_namespace: new_namespace
set_queue
else
get_sidekiq_options['queue_namespace']&.to_s
end
end
def queue
get_sidekiq_options['queue'].to_s
end
2019-09-30 21:07:59 +05:30
def queue_size
Sidekiq::Queue.new(queue).size
end
2018-03-17 18:26:18 +05:30
def bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end
def bulk_perform_in(delay, args_list)
now = Time.now.to_i
schedule = now + delay.to_i
if schedule <= now
2019-07-31 22:56:46 +05:30
raise ArgumentError, _('The schedule time must be in the future!')
2018-03-17 18:26:18 +05:30
end
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
end
end
end