135 lines
3.5 KiB
Ruby
135 lines
3.5 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
# Usage:
|
|
#
|
|
# Worker that performs the tasks:
|
|
#
|
|
# class DummyWorker
|
|
# include ApplicationWorker
|
|
# include LimitedCapacity::Worker
|
|
#
|
|
# # For each job that raises any error, a worker instance will be disabled
|
|
# # until the next schedule-run.
|
|
# # If you wish to get around this, exceptions must by handled by the implementer.
|
|
# #
|
|
# def perform_work(*args)
|
|
# end
|
|
#
|
|
# def remaining_work_count(*args)
|
|
# 5
|
|
# end
|
|
#
|
|
# def max_running_jobs
|
|
# 25
|
|
# end
|
|
# end
|
|
#
|
|
# Cron worker to fill the pool of regular workers:
|
|
#
|
|
# class ScheduleDummyCronWorker
|
|
# include ApplicationWorker
|
|
# include CronjobQueue
|
|
#
|
|
# def perform(*args)
|
|
# DummyWorker.perform_with_capacity(*args)
|
|
# end
|
|
# end
|
|
#
|
|
|
|
module LimitedCapacity
|
|
module Worker
|
|
extend ActiveSupport::Concern
|
|
include Gitlab::Utils::StrongMemoize
|
|
|
|
included do
|
|
# Disable Sidekiq retries, log the error, and send the job to the dead queue.
|
|
# This is done to have only one source that produces jobs and because the slot
|
|
# would be occupied by a job that will be performed in the distant future.
|
|
# We let the cron worker enqueue new jobs, this could be seen as our retry and
|
|
# back off mechanism because the job might fail again if executed immediately.
|
|
sidekiq_options retry: 0, status_expiration: Gitlab::SidekiqStatus::DEFAULT_EXPIRATION
|
|
deduplicate :none
|
|
end
|
|
|
|
class_methods do
|
|
def perform_with_capacity(*args)
|
|
worker = self.new
|
|
worker.remove_failed_jobs
|
|
|
|
arguments = Array.new(worker.max_running_jobs) { args }
|
|
self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext
|
|
end
|
|
end
|
|
|
|
def perform(...)
|
|
perform_registered(...) if job_tracker.register(jid, max_running_jobs)
|
|
end
|
|
|
|
def perform_work(*args)
|
|
raise NotImplementedError
|
|
end
|
|
|
|
def remaining_work_count(*args)
|
|
raise NotImplementedError
|
|
end
|
|
|
|
def max_running_jobs
|
|
raise NotImplementedError
|
|
end
|
|
|
|
def remove_failed_jobs
|
|
job_tracker.clean_up
|
|
end
|
|
|
|
def report_prometheus_metrics(...)
|
|
report_running_jobs_metrics
|
|
set_metric(:remaining_work_gauge, remaining_work_count(...))
|
|
set_metric(:max_running_jobs_gauge, max_running_jobs)
|
|
end
|
|
|
|
private
|
|
|
|
def perform_registered(*args)
|
|
report_running_jobs_metrics
|
|
perform_work(*args)
|
|
rescue StandardError => exception
|
|
raise
|
|
ensure
|
|
job_tracker.remove(jid)
|
|
report_prometheus_metrics(*args)
|
|
re_enqueue(*args) unless exception
|
|
end
|
|
|
|
def report_running_jobs_metrics
|
|
set_metric(:running_jobs_gauge, running_jobs_count)
|
|
end
|
|
|
|
def running_jobs_count
|
|
job_tracker.count
|
|
end
|
|
|
|
def job_tracker
|
|
strong_memoize(:job_tracker) do
|
|
JobTracker.new(self.class.name)
|
|
end
|
|
end
|
|
|
|
def re_enqueue(*args)
|
|
return unless remaining_work_count(*args) > 0
|
|
|
|
self.class.perform_async(*args)
|
|
end
|
|
|
|
def set_metric(name, value)
|
|
metrics = strong_memoize(:metrics) do
|
|
{
|
|
running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs'),
|
|
max_running_jobs_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs'),
|
|
remaining_work_gauge: Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued')
|
|
}
|
|
end
|
|
|
|
metrics[name].set({ worker: self.class.name }, value)
|
|
end
|
|
end
|
|
end
|