debian-mirror-gitlab/sidekiq_cluster/cli.rb

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

246 lines
8.1 KiB
Ruby
Raw Normal View History

2020-04-08 14:13:33 +05:30
# frozen_string_literal: true
2022-01-26 12:08:38 +05:30
require_relative '../config/bundler_setup'
2020-04-08 14:13:33 +05:30
require 'optparse'
require 'logger'
require 'time'
2021-12-11 22:18:48 +05:30
# In environments where code is preloaded and cached such as `spring`,
# we may run into "already initialized" warnings, hence the check.
2023-03-17 16:20:25 +05:30
require_relative '../lib/gitlab' unless Object.const_defined?(:Gitlab)
2021-12-11 22:18:48 +05:30
require_relative '../lib/gitlab/utils'
require_relative '../lib/gitlab/sidekiq_config/cli_methods'
require_relative '../lib/gitlab/sidekiq_config/worker_matcher'
require_relative '../lib/gitlab/sidekiq_logging/json_formatter'
2022-05-07 20:08:51 +05:30
require_relative '../metrics_server/dependencies'
2022-01-26 12:08:38 +05:30
require_relative '../metrics_server/metrics_server'
2021-12-11 22:18:48 +05:30
require_relative 'sidekiq_cluster'
2020-04-08 14:13:33 +05:30
module Gitlab
module SidekiqCluster
class CLI
2022-07-16 23:28:13 +05:30
THREAD_NAME = 'sidekiq-cluster'
2022-01-26 12:08:38 +05:30
# The signals that should terminate both the master and workers.
TERMINATE_SIGNALS = %i(INT TERM).freeze
# The signals that should simply be forwarded to the workers.
FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP).freeze
2020-04-08 14:13:33 +05:30
CommandError = Class.new(StandardError)
2021-09-04 01:27:46 +05:30
def initialize(log_output = $stderr)
2023-03-04 22:38:38 +05:30
# https://github.com/mperham/sidekiq/wiki/Advanced-Options#concurrency
# https://ruby.social/@getajobmike/109326475545816363
@max_concurrency = 20
2020-04-08 14:13:33 +05:30
@min_concurrency = 0
@environment = ENV['RAILS_ENV'] || 'development'
2022-01-26 12:08:38 +05:30
@metrics_dir = ENV["prometheus_multiproc_dir"] || File.absolute_path("tmp/prometheus_multiproc_dir/sidekiq")
2020-04-08 14:13:33 +05:30
@pid = nil
@interval = 5
2022-05-07 20:08:51 +05:30
@soft_timeout_seconds = DEFAULT_SOFT_TIMEOUT_SECONDS
2020-04-08 14:13:33 +05:30
@logger = Logger.new(log_output)
@logger.formatter = ::Gitlab::SidekiqLogging::JSONFormatter.new
@rails_path = Dir.pwd
@dryrun = false
2021-10-27 15:23:28 +05:30
@list_queues = false
2020-04-08 14:13:33 +05:30
end
def run(argv = ARGV)
2022-01-26 12:08:38 +05:30
Thread.current.name = THREAD_NAME
2020-04-08 14:13:33 +05:30
if argv.empty?
raise CommandError,
'You must specify at least one queue to start a worker for'
end
option_parser.parse!(argv)
2021-10-27 15:23:28 +05:30
if @dryrun && @list_queues
raise CommandError,
'The --dryrun and --list-queues options are mutually exclusive'
end
2021-04-29 21:17:54 +05:30
worker_metadatas = SidekiqConfig::CliMethods.worker_metadatas(@rails_path)
worker_queues = SidekiqConfig::CliMethods.worker_queues(@rails_path)
2020-04-08 14:13:33 +05:30
2021-04-29 21:17:54 +05:30
queue_groups = argv.map do |queues_or_query_string|
2021-11-11 11:23:49 +05:30
if queues_or_query_string =~ /[\r\n]/
raise CommandError,
'The queue arguments cannot contain newlines'
end
2021-04-29 21:17:54 +05:30
next worker_queues if queues_or_query_string == SidekiqConfig::WorkerMatcher::WILDCARD_MATCH
2020-04-08 14:13:33 +05:30
2021-01-29 00:20:46 +05:30
# When using the queue query syntax, we treat each queue group
# as a worker attribute query, and resolve the queues for the
# queue group using this query.
2021-09-04 01:27:46 +05:30
if @queue_selector
2021-04-29 21:17:54 +05:30
SidekiqConfig::CliMethods.query_queues(queues_or_query_string, worker_metadatas)
2020-04-08 14:13:33 +05:30
else
2021-04-29 21:17:54 +05:30
SidekiqConfig::CliMethods.expand_queues(queues_or_query_string.split(','), worker_queues)
2020-04-08 14:13:33 +05:30
end
end
if @negate_queues
2021-04-29 21:17:54 +05:30
queue_groups.map! { |queues| worker_queues - queues }
2020-04-08 14:13:33 +05:30
end
if queue_groups.all?(&:empty?)
raise CommandError,
'No queues found, you must select at least one queue'
end
2021-10-27 15:23:28 +05:30
if @list_queues
puts queue_groups.map(&:sort) # rubocop:disable Rails/Output
return
end
2020-04-22 19:07:51 +05:30
unless @dryrun
@logger.info("Starting cluster with #{queue_groups.length} processes")
2022-06-21 17:19:12 +05:30
# Make sure we reset the metrics directory prior to:
# - starting a metrics server process
# - starting new workers
::Prometheus::CleanupMultiprocDirService.new(@metrics_dir).execute
2020-04-22 19:07:51 +05:30
end
2020-04-08 14:13:33 +05:30
2022-05-07 20:08:51 +05:30
start_and_supervise_workers(queue_groups)
end
2022-01-26 12:08:38 +05:30
2022-05-07 20:08:51 +05:30
def start_and_supervise_workers(queue_groups)
2023-03-04 22:38:38 +05:30
wait_threads = SidekiqCluster.start(
2020-04-08 14:13:33 +05:30
queue_groups,
env: @environment,
directory: @rails_path,
max_concurrency: @max_concurrency,
min_concurrency: @min_concurrency,
2020-04-22 19:07:51 +05:30
dryrun: @dryrun,
2022-05-07 20:08:51 +05:30
timeout: @soft_timeout_seconds
2020-04-08 14:13:33 +05:30
)
return if @dryrun
2022-01-26 12:08:38 +05:30
ProcessManagement.write_pid(@pid) if @pid
2020-04-08 14:13:33 +05:30
2022-05-07 20:08:51 +05:30
supervisor = SidekiqProcessSupervisor.instance(
health_check_interval_seconds: @interval,
terminate_timeout_seconds: @soft_timeout_seconds + TIMEOUT_GRACE_PERIOD_SECONDS,
term_signals: TERMINATE_SIGNALS,
forwarded_signals: FORWARD_SIGNALS,
synchronous: true
)
2020-04-08 14:13:33 +05:30
2022-05-07 20:08:51 +05:30
metrics_server_pid = start_metrics_server
2023-03-04 22:38:38 +05:30
worker_pids = wait_threads.map(&:pid)
2022-07-16 23:28:13 +05:30
supervisor.supervise(worker_pids + Array(metrics_server_pid)) do |dead_pids|
2022-05-07 20:08:51 +05:30
# If we're not in the process of shutting down the cluster,
# and the metrics server died, restart it.
2022-07-16 23:28:13 +05:30
if dead_pids == Array(metrics_server_pid)
2022-05-07 20:08:51 +05:30
@logger.info('Sidekiq metrics server terminated, restarting...')
2022-06-21 17:19:12 +05:30
metrics_server_pid = restart_metrics_server
2022-05-07 20:08:51 +05:30
else
# If a worker process died we'll just terminate the whole cluster.
# We let an external system (runit, kubernetes) handle the restart.
2020-04-08 14:13:33 +05:30
@logger.info('A worker terminated, shutting down the cluster')
2022-07-16 23:28:13 +05:30
supervisor.shutdown
2022-05-07 20:08:51 +05:30
[]
2020-04-08 14:13:33 +05:30
end
end
2023-03-04 22:38:38 +05:30
exit_statuses = wait_threads.map do |thread|
thread.join
thread.value
end
exit 1 unless exit_statuses.compact.all?(&:success?)
2020-04-08 14:13:33 +05:30
end
2022-05-07 20:08:51 +05:30
def start_metrics_server
2022-01-26 12:08:38 +05:30
return unless metrics_server_enabled?
2022-06-21 17:19:12 +05:30
restart_metrics_server
2022-05-07 20:08:51 +05:30
end
2022-06-21 17:19:12 +05:30
def restart_metrics_server
2022-01-26 12:08:38 +05:30
@logger.info("Starting metrics server on port #{sidekiq_exporter_port}")
2022-07-16 23:28:13 +05:30
MetricsServer.start_for_sidekiq(
2022-01-26 12:08:38 +05:30
metrics_dir: @metrics_dir,
2022-04-04 11:22:00 +05:30
reset_signals: TERMINATE_SIGNALS + FORWARD_SIGNALS
2022-01-26 12:08:38 +05:30
)
end
def sidekiq_exporter_enabled?
2022-03-02 08:16:31 +05:30
::Settings.dig('monitoring', 'sidekiq_exporter', 'enabled')
2022-01-26 12:08:38 +05:30
end
def sidekiq_exporter_port
2022-03-02 08:16:31 +05:30
::Settings.dig('monitoring', 'sidekiq_exporter', 'port')
2022-01-26 12:08:38 +05:30
end
def metrics_server_enabled?
2022-07-16 23:28:13 +05:30
!@dryrun && sidekiq_exporter_enabled?
2022-01-26 12:08:38 +05:30
end
2020-04-08 14:13:33 +05:30
def option_parser
OptionParser.new do |opt|
opt.banner = "#{File.basename(__FILE__)} [QUEUE,QUEUE] [QUEUE] ... [OPTIONS]"
opt.separator "\nOptions:\n"
opt.on('-h', '--help', 'Shows this help message') do
abort opt.to_s
end
opt.on('-m', '--max-concurrency INT', 'Maximum threads to use with Sidekiq (default: 50, 0 to disable)') do |int|
@max_concurrency = int.to_i
end
opt.on('--min-concurrency INT', 'Minimum threads to use with Sidekiq (default: 0)') do |int|
@min_concurrency = int.to_i
end
opt.on('-e', '--environment ENV', 'The application environment') do |env|
@environment = env
end
opt.on('-P', '--pidfile PATH', 'Path to the PID file') do |pid|
@pid = pid
end
opt.on('-r', '--require PATH', 'Location of the Rails application') do |path|
@rails_path = path
end
2021-01-29 00:20:46 +05:30
opt.on('--queue-selector', 'Run workers based on the provided selector') do |queue_selector|
@queue_selector = queue_selector
end
2020-04-08 14:13:33 +05:30
opt.on('-n', '--negate', 'Run workers for all queues in sidekiq_queues.yml except the given ones') do
@negate_queues = true
end
opt.on('-i', '--interval INT', 'The number of seconds to wait between worker checks') do |int|
@interval = int.to_i
end
2020-04-22 19:07:51 +05:30
opt.on('-t', '--timeout INT', 'Graceful timeout for all running processes') do |timeout|
@soft_timeout_seconds = timeout.to_i
end
2020-04-08 14:13:33 +05:30
opt.on('-d', '--dryrun', 'Print commands that would be run without this flag, and quit') do |int|
@dryrun = true
end
2021-10-27 15:23:28 +05:30
opt.on('--list-queues', 'List matching queues, and quit') do |int|
@list_queues = true
end
2020-04-08 14:13:33 +05:30
end
end
end
end
end