462 lines
17 KiB
Ruby
462 lines
17 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require 'spec_helper'
|
|
|
|
RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
|
|
def clear_queues
|
|
Sidekiq::Queue.new('authorized_projects').clear
|
|
Sidekiq::Queue.new('post_receive').clear
|
|
Sidekiq::RetrySet.new.clear
|
|
Sidekiq::ScheduledSet.new.clear
|
|
end
|
|
|
|
around do |example|
|
|
clear_queues
|
|
Sidekiq::Testing.disable!(&example)
|
|
clear_queues
|
|
end
|
|
|
|
describe '#migrate_set', :aggregate_failures do
|
|
shared_examples 'processing a set' do
|
|
let(:migrator) { described_class.new(mappings) }
|
|
|
|
let(:set_after) do
|
|
Sidekiq.redis { |c| c.zrange(set_name, 0, -1, with_scores: true) }
|
|
.map { |item, score| [Gitlab::Json.load(item), score] }
|
|
end
|
|
|
|
context 'when the set is empty' do
|
|
let(:mappings) { { 'AuthorizedProjectsWorker' => 'new_queue' } }
|
|
|
|
it 'returns the number of scanned and migrated jobs' do
|
|
expect(migrator.migrate_set(set_name)).to eq(
|
|
scanned: 0,
|
|
migrated: 0)
|
|
end
|
|
end
|
|
|
|
context 'when the set is not empty' do
|
|
let(:mappings) { {} }
|
|
|
|
it 'returns the number of scanned and migrated jobs' do
|
|
create_jobs
|
|
|
|
expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 0)
|
|
end
|
|
end
|
|
|
|
context 'when there are no matching jobs' do
|
|
let(:mappings) { { 'PostReceive' => 'new_queue' } }
|
|
|
|
it 'does not change any queue names' do
|
|
create_jobs(include_post_receive: false)
|
|
|
|
expect(migrator.migrate_set(set_name)).to eq(scanned: 3, migrated: 0)
|
|
|
|
expect(set_after.length).to eq(3)
|
|
expect(set_after.map(&:first)).to all(include('queue' => 'default',
|
|
'class' => 'AuthorizedProjectsWorker'))
|
|
end
|
|
end
|
|
|
|
context 'when there are matching jobs' do
|
|
it 'migrates only the workers matching the given worker from the set' do
|
|
migrator = described_class.new({ 'AuthorizedProjectsWorker' => 'new_queue' })
|
|
freeze_time do
|
|
create_jobs
|
|
|
|
expect(migrator.migrate_set(set_name)).to eq(
|
|
scanned: 4,
|
|
migrated: 3)
|
|
|
|
set_after.each.with_index do |(item, score), i|
|
|
if item['class'] == 'AuthorizedProjectsWorker'
|
|
expect(item).to include('queue' => 'new_queue', 'args' => [i])
|
|
else
|
|
expect(item).to include('queue' => 'default', 'args' => [i])
|
|
end
|
|
|
|
expect(score).to be_within(schedule_jitter).of(i.succ.hours.from_now.to_i)
|
|
end
|
|
end
|
|
end
|
|
|
|
it 'allows migrating multiple workers at once' do
|
|
migrator = described_class.new({
|
|
'AuthorizedProjectsWorker' => 'new_queue',
|
|
'PostReceive' => 'another_queue'
|
|
})
|
|
freeze_time do
|
|
create_jobs
|
|
|
|
expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 4)
|
|
|
|
set_after.each.with_index do |(item, score), i|
|
|
if item['class'] == 'AuthorizedProjectsWorker'
|
|
expect(item).to include('queue' => 'new_queue', 'args' => [i])
|
|
else
|
|
expect(item).to include('queue' => 'another_queue', 'args' => [i])
|
|
end
|
|
|
|
expect(score).to be_within(schedule_jitter).of(i.succ.hours.from_now.to_i)
|
|
end
|
|
end
|
|
end
|
|
|
|
it 'allows migrating multiple workers to the same queue' do
|
|
migrator = described_class.new({
|
|
'AuthorizedProjectsWorker' => 'new_queue',
|
|
'PostReceive' => 'new_queue'
|
|
})
|
|
freeze_time do
|
|
create_jobs
|
|
|
|
expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 4)
|
|
|
|
set_after.each.with_index do |(item, score), i|
|
|
expect(item).to include('queue' => 'new_queue', 'args' => [i])
|
|
expect(score).to be_within(schedule_jitter).of(i.succ.hours.from_now.to_i)
|
|
end
|
|
end
|
|
end
|
|
|
|
it 'does not try to migrate jobs that are removed from the set during the migration' do
|
|
migrator = described_class.new({ 'PostReceive' => 'new_queue' })
|
|
freeze_time do
|
|
create_jobs
|
|
|
|
allow(migrator).to receive(:migrate_job_in_set).and_wrap_original do |meth, *args|
|
|
Sidekiq.redis { |c| c.zrem(set_name, args.second) }
|
|
|
|
meth.call(*args)
|
|
end
|
|
|
|
expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 0)
|
|
|
|
expect(set_after.length).to eq(3)
|
|
expect(set_after.map(&:first)).to all(include('queue' => 'default'))
|
|
end
|
|
end
|
|
|
|
it 'does not try to migrate unmatched jobs that are added to the set during the migration' do
|
|
migrator = described_class.new({ 'PostReceive' => 'new_queue' })
|
|
create_jobs
|
|
|
|
calls = 0
|
|
|
|
allow(migrator).to receive(:migrate_job_in_set).and_wrap_original do |meth, *args|
|
|
if calls == 0
|
|
travel_to(5.hours.from_now) { create_jobs(include_post_receive: false) }
|
|
end
|
|
|
|
calls += 1
|
|
|
|
meth.call(*args)
|
|
end
|
|
|
|
expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 1)
|
|
|
|
expect(set_after.group_by { |job| job.first['queue'] }.transform_values(&:count))
|
|
.to eq('default' => 6, 'new_queue' => 1)
|
|
end
|
|
|
|
it 'iterates through the entire set of jobs' do
|
|
migrator = described_class.new({ 'NonExistentWorker' => 'new_queue' })
|
|
50.times do |i|
|
|
travel_to(i.hours.from_now) { create_jobs }
|
|
end
|
|
|
|
expect(migrator.migrate_set(set_name)).to eq(scanned: 200, migrated: 0)
|
|
|
|
expect(set_after.length).to eq(200)
|
|
end
|
|
|
|
it 'logs output at the start, finish, and every LOG_FREQUENCY jobs' do
|
|
freeze_time do
|
|
create_jobs
|
|
|
|
stub_const("#{described_class}::LOG_FREQUENCY", 2)
|
|
|
|
logger = Logger.new(StringIO.new)
|
|
migrator = described_class.new({
|
|
'AuthorizedProjectsWorker' => 'new_queue',
|
|
'PostReceive' => 'another_queue'
|
|
}, logger: logger)
|
|
|
|
expect(logger).to receive(:info).with(a_string_matching('Processing')).once.ordered
|
|
expect(logger).to receive(:info).with(a_string_matching('In progress')).once.ordered
|
|
expect(logger).to receive(:info).with(a_string_matching('Done')).once.ordered
|
|
|
|
expect(migrator.migrate_set(set_name)).to eq(scanned: 4, migrated: 4)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
context 'scheduled jobs' do
|
|
let(:set_name) { 'schedule' }
|
|
let(:schedule_jitter) { 0 }
|
|
|
|
def create_jobs(include_post_receive: true)
|
|
AuthorizedProjectsWorker.perform_in(1.hour, 0)
|
|
AuthorizedProjectsWorker.perform_in(2.hours, 1)
|
|
PostReceive.perform_in(3.hours, 2) if include_post_receive
|
|
AuthorizedProjectsWorker.perform_in(4.hours, 3)
|
|
end
|
|
|
|
it_behaves_like 'processing a set'
|
|
end
|
|
|
|
context 'retried jobs' do
|
|
def create_jobs(include_post_receive: true)
|
|
retry_in(AuthorizedProjectsWorker, 1.hour, 0)
|
|
retry_in(AuthorizedProjectsWorker, 2.hours, 1)
|
|
retry_in(PostReceive, 3.hours, 2) if include_post_receive
|
|
retry_in(AuthorizedProjectsWorker, 4.hours, 3)
|
|
end
|
|
|
|
include_context 'when handling retried jobs'
|
|
it_behaves_like 'processing a set'
|
|
end
|
|
end
|
|
|
|
describe '#migrate_queues', :aggregate_failures do
|
|
let(:migrator) { described_class.new(mappings, logger: logger) }
|
|
let(:logger) { nil }
|
|
|
|
def list_queues
|
|
queues = Sidekiq.redis do |conn|
|
|
conn.scan_each(match: "queue:*").to_a
|
|
end
|
|
queues.uniq.map { |queue| queue.split(':', 2).last }
|
|
end
|
|
|
|
def list_jobs(queue_name)
|
|
Sidekiq.redis { |conn| conn.lrange("queue:#{queue_name}", 0, -1) }
|
|
.map { |item| Gitlab::Json.load(item) }
|
|
end
|
|
|
|
def pre_migrate_checks; end
|
|
|
|
before do
|
|
queue_name_from_worker_name = Gitlab::SidekiqConfig::WorkerRouter.method(:queue_name_from_worker_name)
|
|
EmailReceiverWorker.sidekiq_options(queue: queue_name_from_worker_name.call(EmailReceiverWorker))
|
|
EmailReceiverWorker.perform_async('foo')
|
|
EmailReceiverWorker.perform_async('bar')
|
|
|
|
# test worker that has ':' inside the queue name
|
|
AuthorizedProjectUpdate::ProjectRecalculateWorker.sidekiq_options(
|
|
queue: queue_name_from_worker_name.call(AuthorizedProjectUpdate::ProjectRecalculateWorker)
|
|
)
|
|
AuthorizedProjectUpdate::ProjectRecalculateWorker.perform_async
|
|
end
|
|
|
|
after do
|
|
# resets the queue name to its original
|
|
EmailReceiverWorker.set_queue
|
|
AuthorizedProjectUpdate::ProjectRecalculateWorker.set_queue
|
|
end
|
|
|
|
shared_examples 'migrating queues' do
|
|
it 'migrates the jobs to the correct destination queue' do
|
|
queues = list_queues
|
|
expect(queues).to include(*queues_included_pre_migrate)
|
|
expect(queues).not_to include(*queues_excluded_pre_migrate)
|
|
pre_migrate_checks
|
|
|
|
migrator.migrate_queues
|
|
|
|
queues = list_queues
|
|
expect(queues).not_to include(*queues_excluded_post_migrate)
|
|
expect(queues).to include(*queues_included_post_migrate)
|
|
post_migrate_checks
|
|
end
|
|
end
|
|
|
|
context 'with all workers mapped to default queue' do
|
|
let(:mappings) do
|
|
{ 'EmailReceiverWorker' => 'default', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default' }
|
|
end
|
|
|
|
let(:queues_included_pre_migrate) do
|
|
['email_receiver',
|
|
'authorized_project_update:authorized_project_update_project_recalculate']
|
|
end
|
|
|
|
let(:queues_excluded_pre_migrate) { ['default'] }
|
|
let(:queues_excluded_post_migrate) do
|
|
['email_receiver',
|
|
'authorized_project_update:authorized_project_update_project_recalculate']
|
|
end
|
|
|
|
let(:queues_included_post_migrate) { ['default'] }
|
|
|
|
def post_migrate_checks
|
|
jobs = list_jobs('default')
|
|
expect(jobs.length).to eq(3)
|
|
sorted = jobs.sort_by { |job| [job["class"], job["args"]] }
|
|
expect(sorted[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker',
|
|
'queue' => 'default')
|
|
expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'default')
|
|
expect(sorted[2]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'default')
|
|
end
|
|
|
|
it_behaves_like 'migrating queues'
|
|
end
|
|
|
|
context 'with custom mapping to different queues' do
|
|
let(:mappings) do
|
|
{ 'EmailReceiverWorker' => 'new_email',
|
|
'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'new_authorized' }
|
|
end
|
|
|
|
let(:queues_included_pre_migrate) do
|
|
['email_receiver',
|
|
'authorized_project_update:authorized_project_update_project_recalculate']
|
|
end
|
|
|
|
let(:queues_excluded_pre_migrate) { %w[new_email new_authorized] }
|
|
let(:queues_excluded_post_migrate) do
|
|
['email_receiver',
|
|
'authorized_project_update:authorized_project_update_project_recalculate']
|
|
end
|
|
|
|
let(:queues_included_post_migrate) { %w[new_email new_authorized] }
|
|
|
|
def post_migrate_checks
|
|
email_jobs = list_jobs('new_email')
|
|
expect(email_jobs.length).to eq(2)
|
|
expect(email_jobs[0]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'new_email')
|
|
expect(email_jobs[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'new_email')
|
|
|
|
export_jobs = list_jobs('new_authorized')
|
|
expect(export_jobs.length).to eq(1)
|
|
expect(export_jobs[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker',
|
|
'queue' => 'new_authorized')
|
|
end
|
|
|
|
it_behaves_like 'migrating queues'
|
|
end
|
|
|
|
context 'with illegal JSON payload' do
|
|
let(:job) { '{foo: 1}' }
|
|
let(:mappings) do
|
|
{ 'EmailReceiverWorker' => 'default', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default' }
|
|
end
|
|
|
|
let(:queues_included_pre_migrate) do
|
|
['email_receiver',
|
|
'authorized_project_update:authorized_project_update_project_recalculate']
|
|
end
|
|
|
|
let(:queues_excluded_pre_migrate) { ['default'] }
|
|
let(:queues_excluded_post_migrate) do
|
|
['email_receiver',
|
|
'authorized_project_update:authorized_project_update_project_recalculate']
|
|
end
|
|
|
|
let(:queues_included_post_migrate) { ['default'] }
|
|
let(:logger) { Logger.new(StringIO.new) }
|
|
|
|
before do
|
|
Sidekiq.redis do |conn|
|
|
conn.lpush("queue:email_receiver", job)
|
|
end
|
|
end
|
|
|
|
def pre_migrate_checks
|
|
expect(logger).to receive(:error)
|
|
.with(a_string_matching('Unmarshal JSON payload from SidekiqMigrateJobs failed'))
|
|
.once
|
|
end
|
|
|
|
def post_migrate_checks
|
|
jobs = list_jobs('default')
|
|
expect(jobs.length).to eq(3)
|
|
sorted = jobs.sort_by { |job| [job["class"], job["args"]] }
|
|
expect(sorted[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker',
|
|
'queue' => 'default')
|
|
expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'default')
|
|
expect(sorted[2]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'default')
|
|
end
|
|
|
|
it_behaves_like 'migrating queues'
|
|
end
|
|
|
|
context 'when multiple workers are in the same queue' do
|
|
before do
|
|
ExportCsvWorker.sidekiq_options(queue: 'email_receiver') # follows EmailReceiverWorker's queue
|
|
ExportCsvWorker.perform_async('fizz')
|
|
end
|
|
|
|
after do
|
|
ExportCsvWorker.set_queue
|
|
end
|
|
|
|
context 'when the queue exists in mappings' do
|
|
let(:mappings) do
|
|
{ 'EmailReceiverWorker' => 'email_receiver', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default',
|
|
'ExportCsvWorker' => 'default' }
|
|
end
|
|
|
|
let(:queues_included_pre_migrate) do
|
|
['email_receiver',
|
|
'authorized_project_update:authorized_project_update_project_recalculate']
|
|
end
|
|
|
|
let(:queues_excluded_pre_migrate) { ['default'] }
|
|
let(:queues_excluded_post_migrate) do
|
|
['authorized_project_update:authorized_project_update_project_recalculate']
|
|
end
|
|
|
|
let(:queues_included_post_migrate) { %w[default email_receiver] }
|
|
|
|
it_behaves_like 'migrating queues'
|
|
def post_migrate_checks
|
|
# jobs from email_receiver are not migrated at all
|
|
jobs = list_jobs('email_receiver')
|
|
expect(jobs.length).to eq(3)
|
|
sorted = jobs.sort_by { |job| [job["class"], job["args"]] }
|
|
expect(sorted[0]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'email_receiver')
|
|
expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'email_receiver')
|
|
expect(sorted[2]).to include('class' => 'ExportCsvWorker', 'args' => ['fizz'], 'queue' => 'email_receiver')
|
|
end
|
|
end
|
|
|
|
context 'when the queue doesnt exist in mappings' do
|
|
let(:mappings) do
|
|
{ 'EmailReceiverWorker' => 'default', 'AuthorizedProjectUpdate::ProjectRecalculateWorker' => 'default',
|
|
'ExportCsvWorker' => 'default' }
|
|
end
|
|
|
|
let(:queues_included_pre_migrate) do
|
|
['email_receiver',
|
|
'authorized_project_update:authorized_project_update_project_recalculate']
|
|
end
|
|
|
|
let(:queues_excluded_pre_migrate) { ['default'] }
|
|
let(:queues_excluded_post_migrate) do
|
|
['email_receiver', 'authorized_project_update:authorized_project_update_project_recalculate']
|
|
end
|
|
|
|
let(:queues_included_post_migrate) { ['default'] }
|
|
|
|
it_behaves_like 'migrating queues'
|
|
def post_migrate_checks
|
|
# jobs from email_receiver are all migrated
|
|
jobs = list_jobs('email_receiver')
|
|
expect(jobs.length).to eq(0)
|
|
|
|
jobs = list_jobs('default')
|
|
expect(jobs.length).to eq(4)
|
|
sorted = jobs.sort_by { |job| [job["class"], job["args"]] }
|
|
expect(sorted[0]).to include('class' => 'AuthorizedProjectUpdate::ProjectRecalculateWorker',
|
|
'queue' => 'default')
|
|
expect(sorted[1]).to include('class' => 'EmailReceiverWorker', 'args' => ['bar'], 'queue' => 'default')
|
|
expect(sorted[2]).to include('class' => 'EmailReceiverWorker', 'args' => ['foo'], 'queue' => 'default')
|
|
expect(sorted[3]).to include('class' => 'ExportCsvWorker', 'args' => ['fizz'], 'queue' => 'default')
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|