debian-mirror-gitlab/lib/gitlab/database/migrations/sidekiq_helpers.rb

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

118 lines
3.8 KiB
Ruby
Raw Normal View History

2023-03-04 22:38:38 +05:30
# frozen_string_literal: true
module Gitlab
module Database
module Migrations
# rubocop:disable Cop/SidekiqApiUsage
# rubocop:disable Cop/SidekiqRedisCall
module SidekiqHelpers
# Constants for default sidekiq_remove_jobs values
DEFAULT_MAX_ATTEMPTS = 5
DEFAULT_TIMES_IN_A_ROW = 2
# Probabilistically removes job_klasses from their specific queues, the
# retry set and the scheduled set.
#
# If jobs are still being processed at the same time, then there is a
# small chance it will not remove all instances of job_klass. To
# minimize this risk, it repeatedly removes matching jobs from each
# until nothing is removed twice in a row.
#
# Before calling this method, you should make sure that job_klass is no
# longer being scheduled within the running application.
def sidekiq_remove_jobs(
job_klasses:,
times_in_a_row: DEFAULT_TIMES_IN_A_ROW,
max_attempts: DEFAULT_MAX_ATTEMPTS
)
kwargs = { times_in_a_row: times_in_a_row, max_attempts: max_attempts }
2023-07-09 08:55:56 +05:30
if transaction_open?
raise 'sidekiq_remove_jobs can not be run inside a transaction, ' \
'you can disable transactions by calling disable_ddl_transaction! ' \
'in the body of your migration class'
end
2023-03-04 22:38:38 +05:30
job_klasses_queues = job_klasses
.select { |job_klass| job_klass.to_s.safe_constantize.present? }
.map { |job_klass| job_klass.safe_constantize.queue }
.uniq
job_klasses_queues.each do |queue|
delete_jobs_for(
set: Sidekiq::Queue.new(queue),
job_klasses: job_klasses,
kwargs: kwargs
)
end
delete_jobs_for(
set: Sidekiq::RetrySet.new,
kwargs: kwargs,
job_klasses: job_klasses
)
delete_jobs_for(
set: Sidekiq::ScheduledSet.new,
kwargs: kwargs,
job_klasses: job_klasses
)
end
def sidekiq_queue_migrate(queue_from, to:)
while sidekiq_queue_length(queue_from) > 0
Sidekiq.redis do |conn|
conn.rpoplpush "queue:#{queue_from}", "queue:#{to}"
end
end
end
def sidekiq_queue_length(queue_name)
Sidekiq.redis do |conn|
conn.llen("queue:#{queue_name}")
end
end
private
# Handle the "jobs deleted" tracking that is needed in order to track
# whether a job was deleted or not.
def delete_jobs_for(set:, kwargs:, job_klasses:)
until_equal_to(0, **kwargs) do
set.count do |job|
job_klasses.include?(job.klass) && job.delete
end
end
end
# Control how many times in a row you want to see a job deleted 0
# times. The idea is that if you see 0 jobs deleted x number of times
# in a row you've *likely* covered the case in which the queue was
# mutating while this was running.
def until_equal_to(target, times_in_a_row:, max_attempts:)
streak = 0
result = { attempts: 0, success: false }
1.upto(max_attempts) do |current_attempt|
# yield's return value is a count of "jobs_deleted"
if yield == target
streak += 1
elsif streak > 0
streak = 0
end
result[:attempts] = current_attempt
result[:success] = streak == times_in_a_row
break if result[:success]
end
result
end
end
# rubocop:enable Cop/SidekiqApiUsage
# rubocop:enable Cop/SidekiqRedisCall
end
end
end