314 lines
8.3 KiB
Ruby
314 lines
8.3 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require 'spec_helper'
|
|
|
|
RSpec.describe Gitlab::SidekiqDaemon::Monitor do
|
|
let(:monitor) { described_class.new }
|
|
|
|
describe '#within_job' do
|
|
it 'tracks thread, jid and worker_class' do
|
|
blk = proc do
|
|
monitor.jobs do |jobs|
|
|
jobs.each do |jid, job|
|
|
expect(job[:thread]).not_to be_nil
|
|
expect(jid).to eq('jid')
|
|
expect(job[:worker_class]).to eq('worker_class')
|
|
end
|
|
end
|
|
|
|
"OK"
|
|
end
|
|
|
|
expect(monitor.within_job('worker_class', 'jid', 'queue', &blk)).to eq("OK")
|
|
end
|
|
|
|
context 'when job is canceled' do
|
|
let(:jid) { SecureRandom.hex }
|
|
|
|
before do
|
|
described_class.cancel_job(jid)
|
|
end
|
|
|
|
it 'does not execute a block' do
|
|
expect do |blk|
|
|
monitor.within_job('worker_class', jid, 'queue', &blk)
|
|
rescue described_class::CancelledError
|
|
end.not_to yield_control
|
|
end
|
|
|
|
it 'raises exception' do
|
|
expect { monitor.within_job('worker_class', jid, 'queue') }.to raise_error(
|
|
described_class::CancelledError)
|
|
end
|
|
end
|
|
end
|
|
|
|
describe '#jobs' do
|
|
it 'returns running jobs hash' do
|
|
jid = SecureRandom.hex
|
|
running_jobs = { jid => hash_including(worker_class: 'worker_class') }
|
|
|
|
monitor.within_job('worker_class', jid, 'queue') do
|
|
expect(monitor.jobs).to match(running_jobs)
|
|
end
|
|
end
|
|
end
|
|
|
|
describe '#run_thread when notification channel not enabled' do
|
|
subject { monitor.send(:run_thread) }
|
|
|
|
it 'return directly' do
|
|
allow(monitor).to receive(:notification_channel_enabled?).and_return(nil)
|
|
|
|
expect(Sidekiq.logger).not_to receive(:info)
|
|
expect(Sidekiq.logger).not_to receive(:warn)
|
|
expect(monitor).not_to receive(:enabled?)
|
|
expect(monitor).not_to receive(:process_messages)
|
|
|
|
subject
|
|
end
|
|
end
|
|
|
|
describe '#run_thread when notification channel enabled' do
|
|
subject { monitor.send(:run_thread) }
|
|
|
|
before do
|
|
# we want to run at most once cycle
|
|
# we toggle `enabled?` flag after the first call
|
|
stub_const('Gitlab::SidekiqDaemon::Monitor::RECONNECT_TIME', 0)
|
|
allow(monitor).to receive(:enabled?).and_return(true, false)
|
|
allow(monitor).to receive(:notification_channel_enabled?).and_return(1)
|
|
|
|
allow(Sidekiq.logger).to receive(:info)
|
|
allow(Sidekiq.logger).to receive(:warn)
|
|
end
|
|
|
|
context 'when structured logging is used' do
|
|
it 'logs start message' do
|
|
expect(Sidekiq.logger).to receive(:info)
|
|
.with(
|
|
class: described_class.to_s,
|
|
action: 'start',
|
|
message: 'Starting Monitor Daemon')
|
|
|
|
expect(::Gitlab::Redis::SharedState).to receive(:with)
|
|
|
|
subject
|
|
end
|
|
|
|
it 'logs stop message' do
|
|
expect(Sidekiq.logger).to receive(:warn)
|
|
.with(
|
|
class: described_class.to_s,
|
|
action: 'stop',
|
|
message: 'Stopping Monitor Daemon')
|
|
|
|
expect(::Gitlab::Redis::SharedState).to receive(:with)
|
|
|
|
subject
|
|
end
|
|
|
|
it 'logs StandardError message' do
|
|
expect(Sidekiq.logger).to receive(:warn)
|
|
.with(
|
|
class: described_class.to_s,
|
|
action: 'exception',
|
|
message: 'My Exception')
|
|
|
|
expect(::Gitlab::Redis::SharedState).to receive(:with)
|
|
.and_raise(StandardError, 'My Exception')
|
|
|
|
expect { subject }.not_to raise_error
|
|
end
|
|
|
|
it 'logs and raises Exception message' do
|
|
expect(Sidekiq.logger).to receive(:warn)
|
|
.with(
|
|
class: described_class.to_s,
|
|
action: 'exception',
|
|
message: 'My Exception')
|
|
|
|
expect(::Gitlab::Redis::SharedState).to receive(:with)
|
|
.and_raise(Exception, 'My Exception')
|
|
|
|
expect { subject }.to raise_error(Exception, 'My Exception')
|
|
end
|
|
end
|
|
|
|
context 'when StandardError is raised' do
|
|
it 'does retry connection' do
|
|
expect(::Gitlab::Redis::SharedState).to receive(:with)
|
|
.and_raise(StandardError, 'My Exception')
|
|
|
|
expect(::Gitlab::Redis::SharedState).to receive(:with)
|
|
|
|
# we expect to run `process_messages` twice
|
|
expect(monitor).to receive(:enabled?).and_return(true, true, false)
|
|
|
|
subject
|
|
end
|
|
end
|
|
|
|
context 'when message is published' do
|
|
let(:subscribed) { double }
|
|
|
|
before do
|
|
expect_any_instance_of(::Redis).to receive(:subscribe)
|
|
.and_yield(subscribed)
|
|
|
|
expect(subscribed).to receive(:message)
|
|
.and_yield(
|
|
described_class::NOTIFICATION_CHANNEL,
|
|
payload
|
|
)
|
|
|
|
expect(Sidekiq.logger).to receive(:info)
|
|
.with(
|
|
class: described_class.to_s,
|
|
action: 'start',
|
|
message: 'Starting Monitor Daemon')
|
|
|
|
expect(Sidekiq.logger).to receive(:info)
|
|
.with(
|
|
class: described_class.to_s,
|
|
channel: described_class::NOTIFICATION_CHANNEL,
|
|
message: 'Received payload on channel',
|
|
payload: payload
|
|
)
|
|
end
|
|
|
|
context 'and message is valid' do
|
|
let(:payload) { '{"action":"cancel","jid":"my-jid"}' }
|
|
|
|
it 'processes cancel' do
|
|
expect(monitor).to receive(:process_job_cancel).with('my-jid')
|
|
|
|
subject
|
|
end
|
|
end
|
|
|
|
context 'and message is not valid json' do
|
|
let(:payload) { '{"action"}' }
|
|
|
|
it 'skips processing' do
|
|
expect(monitor).not_to receive(:process_job_cancel)
|
|
|
|
subject
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
describe '#stop' do
|
|
let!(:monitor_thread) { monitor.start }
|
|
|
|
it 'does stop the thread' do
|
|
expect(monitor_thread).to be_alive
|
|
|
|
expect { monitor.stop }.not_to raise_error
|
|
|
|
expect(monitor_thread).not_to be_alive
|
|
expect { monitor_thread.value }.to raise_error(Interrupt)
|
|
end
|
|
end
|
|
|
|
describe '#process_job_cancel' do
|
|
subject { monitor.send(:process_job_cancel, jid) }
|
|
|
|
context 'when jid is missing' do
|
|
let(:jid) { nil }
|
|
|
|
it 'does not run thread' do
|
|
expect(subject).to be_nil
|
|
end
|
|
end
|
|
|
|
context 'when jid is provided' do
|
|
let(:jid) { 'my-jid' }
|
|
|
|
context 'when jid is not found' do
|
|
it 'does not log cancellation message' do
|
|
expect(Sidekiq.logger).not_to receive(:warn)
|
|
expect(subject).to be_nil
|
|
end
|
|
end
|
|
|
|
context 'when jid is found' do
|
|
let(:thread) { Thread.new { sleep 1000 } }
|
|
|
|
before do
|
|
allow(monitor).to receive(:find_thread_unsafe).with(jid).and_return(thread)
|
|
end
|
|
|
|
after do
|
|
thread.kill
|
|
rescue StandardError
|
|
end
|
|
|
|
it 'does log cancellation message' do
|
|
expect(Sidekiq.logger).to receive(:warn)
|
|
.with(
|
|
class: described_class.to_s,
|
|
action: 'cancel',
|
|
message: 'Canceling thread with CancelledError',
|
|
jid: 'my-jid',
|
|
thread_id: thread.object_id)
|
|
|
|
expect(subject).to be_a(Thread)
|
|
|
|
subject.join
|
|
end
|
|
|
|
it 'does cancel the thread' do
|
|
expect(subject).to be_a(Thread)
|
|
|
|
subject.join
|
|
|
|
# we wait for the thread to be cancelled
|
|
# by `process_job_cancel`
|
|
expect { thread.join(5) }.to raise_error(described_class::CancelledError)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
describe '.cancel_job' do
|
|
subject { described_class.cancel_job('my-jid') }
|
|
|
|
it 'sets a redis key' do
|
|
expect_any_instance_of(::Redis).to receive(:setex)
|
|
.with('sidekiq:cancel:my-jid', anything, 1)
|
|
|
|
subject
|
|
end
|
|
|
|
it 'notifies all workers' do
|
|
payload = '{"action":"cancel","jid":"my-jid"}'
|
|
|
|
expect_any_instance_of(::Redis).to receive(:publish)
|
|
.with('sidekiq:cancel:notifications', payload)
|
|
|
|
subject
|
|
end
|
|
end
|
|
|
|
describe '#notification_channel_enabled?' do
|
|
subject { monitor.send(:notification_channel_enabled?) }
|
|
|
|
it 'return nil when SIDEKIQ_MONITOR_WORKER is not set' do
|
|
expect(subject).to be nil
|
|
end
|
|
|
|
it 'return nil when SIDEKIQ_MONITOR_WORKER set to 0' do
|
|
allow(ENV).to receive(:fetch).with('SIDEKIQ_MONITOR_WORKER', 0).and_return("0")
|
|
|
|
expect(subject).to be nil
|
|
end
|
|
|
|
it 'return 1 when SIDEKIQ_MONITOR_WORKER set to 1' do
|
|
allow(ENV).to receive(:fetch).with('SIDEKIQ_MONITOR_WORKER', 0).and_return("1")
|
|
|
|
expect(subject).to be 1
|
|
end
|
|
end
|
|
end
|