debian-mirror-gitlab/lib/gitlab/sidekiq_config/cli_methods.rb
2020-04-08 18:37:08 +05:30

125 lines
3.8 KiB
Ruby

# frozen_string_literal: true
require 'yaml'
require 'set'
# These methods are called by `sidekiq-cluster`, which runs outside of
# the bundler/Rails context, so we cannot use any gem or Rails methods.
module Gitlab
module SidekiqConfig
module CliMethods
# The methods in this module are used as module methods
# rubocop:disable Gitlab/ModuleWithInstanceVariables
extend self
QUEUE_CONFIG_PATHS = begin
result = %w[app/workers/all_queues.yml]
result << 'ee/app/workers/all_queues.yml' if Gitlab.ee?
result
end.freeze
QUERY_OR_OPERATOR = '|'
QUERY_AND_OPERATOR = '&'
QUERY_CONCATENATE_OPERATOR = ','
QUERY_TERM_REGEX = %r{^(\w+)(!?=)([\w:#{QUERY_CONCATENATE_OPERATOR}]+)}.freeze
QUERY_PREDICATES = {
feature_category: :to_sym,
has_external_dependencies: lambda { |value| value == 'true' },
name: :to_s,
resource_boundary: :to_sym,
urgency: :to_sym
}.freeze
QueryError = Class.new(StandardError)
InvalidTerm = Class.new(QueryError)
UnknownOperator = Class.new(QueryError)
UnknownPredicate = Class.new(QueryError)
def all_queues(rails_path = Rails.root.to_s)
@worker_queues ||= {}
@worker_queues[rails_path] ||= QUEUE_CONFIG_PATHS.flat_map do |path|
full_path = File.join(rails_path, path)
File.exist?(full_path) ? YAML.load_file(full_path) : []
end
end
# rubocop:enable Gitlab/ModuleWithInstanceVariables
def worker_queues(rails_path = Rails.root.to_s)
# https://gitlab.com/gitlab-org/gitlab/issues/199230
worker_names(all_queues(rails_path))
end
def expand_queues(queues, all_queues = self.worker_queues)
return [] if queues.empty?
queues_set = all_queues.to_set
queues.flat_map do |queue|
[queue, *queues_set.grep(/\A#{queue}:/)]
end
end
def query_workers(query_string, queues)
worker_names(queues.select(&query_string_to_lambda(query_string)))
end
def clear_memoization!
if instance_variable_defined?('@worker_queues')
remove_instance_variable('@worker_queues')
end
end
private
def worker_names(workers)
workers.map { |queue| queue.is_a?(Hash) ? queue[:name] : queue }
end
def query_string_to_lambda(query_string)
or_clauses = query_string.split(QUERY_OR_OPERATOR).map do |and_clauses_string|
and_clauses_predicates = and_clauses_string.split(QUERY_AND_OPERATOR).map do |term|
predicate_for_term(term)
end
lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
end
lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
end
def predicate_for_term(term)
match = term.match(QUERY_TERM_REGEX)
raise InvalidTerm.new("Invalid term: #{term}") unless match
_, lhs, op, rhs = *match
predicate_for_op(op, predicate_factory(lhs, rhs.split(QUERY_CONCATENATE_OPERATOR)))
end
def predicate_for_op(op, predicate)
case op
when '='
predicate
when '!='
lambda { |worker| !predicate.call(worker) }
else
# This is unreachable because InvalidTerm will be raised instead, but
# keeping it allows to guard against that changing in future.
raise UnknownOperator.new("Unknown operator: #{op}")
end
end
def predicate_factory(lhs, values)
values_block = QUERY_PREDICATES[lhs.to_sym]
raise UnknownPredicate.new("Unknown predicate: #{lhs}") unless values_block
lambda { |queue| values.map(&values_block).include?(queue[lhs.to_sym]) }
end
end
end
end