debian-mirror-gitlab/sidekiq_cluster/sidekiq_cluster.rb

113 lines
3.6 KiB
Ruby
Raw Permalink Normal View History

2020-04-08 14:13:33 +05:30
# frozen_string_literal: true
2022-01-26 12:08:38 +05:30
require_relative '../lib/gitlab/process_management'
2023-03-04 22:38:38 +05:30
require_relative '../lib/gitlab/process_supervisor'
2020-04-22 19:07:51 +05:30
2020-04-08 14:13:33 +05:30
module Gitlab
module SidekiqCluster
2021-12-11 22:18:48 +05:30
# How long to wait when asking for a clean termination.
# It maps the Sidekiq default timeout:
# https://github.com/mperham/sidekiq/wiki/Signals#term
#
# This value is passed to Sidekiq's `-t` if none
# is given through arguments.
DEFAULT_SOFT_TIMEOUT_SECONDS = 25
2022-05-07 20:08:51 +05:30
# Additional time granted after surpassing the soft timeout
# before we kill the process.
TIMEOUT_GRACE_PERIOD_SECONDS = 5
# The singleton instance used to supervise cluster processes.
SidekiqProcessSupervisor = Class.new(Gitlab::ProcessSupervisor)
2021-12-11 22:18:48 +05:30
2020-04-08 14:13:33 +05:30
# Starts Sidekiq workers for the pairs of processes.
#
# Example:
#
# start([ ['foo'], ['bar', 'baz'] ], :production)
#
# This would start two Sidekiq processes: one processing "foo", and one
# processing "bar" and "baz". Each one is placed in its own process group.
#
# queues - An Array containing Arrays. Each sub Array should specify the
# queues to use for a single process.
#
# directory - The directory of the Rails application.
#
2023-03-04 22:38:38 +05:30
# Returns an Array containing the waiter threads (from Process.detach) of
# the started processes.
def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 20, min_concurrency: 0, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false)
2020-04-08 14:13:33 +05:30
queues.map.with_index do |pair, index|
2020-04-22 19:07:51 +05:30
start_sidekiq(pair, env: env,
directory: directory,
max_concurrency: max_concurrency,
min_concurrency: min_concurrency,
worker_id: index,
timeout: timeout,
dryrun: dryrun)
2020-04-08 14:13:33 +05:30
end
end
# Starts a Sidekiq process that processes _only_ the given queues.
#
# Returns the PID of the started process.
2020-04-22 19:07:51 +05:30
def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurrency:, worker_id:, timeout:, dryrun:)
2020-04-08 14:13:33 +05:30
counts = count_by_queue(queues)
cmd = %w[bundle exec sidekiq]
2020-04-22 19:07:51 +05:30
cmd << "-c#{self.concurrency(queues, min_concurrency, max_concurrency)}"
2020-04-08 14:13:33 +05:30
cmd << "-e#{env}"
2020-04-22 19:07:51 +05:30
cmd << "-t#{timeout}"
cmd << "-gqueues:#{proc_details(counts)}"
2020-04-08 14:13:33 +05:30
cmd << "-r#{directory}"
counts.each do |queue, count|
cmd << "-q#{queue},#{count}"
end
if dryrun
2020-04-22 19:07:51 +05:30
puts Shellwords.join(cmd) # rubocop:disable Rails/Output
2020-04-08 14:13:33 +05:30
return
end
2022-01-26 12:08:38 +05:30
# We need to remove Bundler specific env vars, since otherwise the
# child process will think we are passing an alternative Gemfile
# and will clear and reset LOAD_PATH.
pid = Bundler.with_original_env do
Process.spawn(
{ 'ENABLE_SIDEKIQ_CLUSTER' => '1',
'SIDEKIQ_WORKER_ID' => worker_id.to_s },
*cmd,
pgroup: true,
err: $stderr,
out: $stdout
)
end
2020-04-08 14:13:33 +05:30
2023-03-04 22:38:38 +05:30
Process.detach(pid)
2020-04-08 14:13:33 +05:30
end
def self.count_by_queue(queues)
2021-02-22 17:27:13 +05:30
queues.tally
2020-04-08 14:13:33 +05:30
end
def self.proc_details(counts)
counts.map do |queue, count|
if count == 1
queue
else
"#{queue} (#{count})"
end
2020-04-22 19:07:51 +05:30
end.join(',')
2020-04-08 14:13:33 +05:30
end
def self.concurrency(queues, min_concurrency, max_concurrency)
concurrency_from_queues = queues.length + 1
2020-10-24 23:57:45 +05:30
max = max_concurrency > 0 ? max_concurrency : concurrency_from_queues
2020-04-08 14:13:33 +05:30
min = [min_concurrency, max].min
concurrency_from_queues.clamp(min, max)
end
end
end