debian-mirror-gitlab/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb

142 lines
3.8 KiB
Ruby
Raw Normal View History

2020-04-08 14:13:33 +05:30
# frozen_string_literal: true
require 'digest'
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
# This class defines an identifier of a job in a queue
# The identifier based on a job's class and arguments.
#
# As strategy decides when to keep track of the job in redis and when to
# remove it.
#
# Storing the deduplication key in redis can be done by calling `check!`
# check returns the `jid` of the job if it was scheduled, or the `jid` of
# the duplicate job if it was already scheduled
#
# When new jobs can be scheduled again, the strategy calls `#delete`.
class DuplicateJob
DUPLICATE_KEY_TTL = 6.hours
2020-06-23 00:09:42 +05:30
DEFAULT_STRATEGY = :until_executing
2021-09-04 01:27:46 +05:30
STRATEGY_NONE = :none
2020-04-08 14:13:33 +05:30
attr_reader :existing_jid
2020-06-23 00:09:42 +05:30
def initialize(job, queue_name)
2020-04-08 14:13:33 +05:30
@job = job
@queue_name = queue_name
end
# This will continue the middleware chain if the job should be scheduled
# It will return false if the job needs to be cancelled
def schedule(&block)
Strategies.for(strategy).new(self).schedule(job, &block)
end
# This will continue the server middleware chain if the job should be
# executed.
# It will return false if the job should not be executed.
def perform(&block)
Strategies.for(strategy).new(self).perform(job, &block)
end
# This method will return the jid that was set in redis
2020-06-23 00:09:42 +05:30
def check!(expiry = DUPLICATE_KEY_TTL)
2020-04-08 14:13:33 +05:30
read_jid = nil
Sidekiq.redis do |redis|
redis.multi do |multi|
2020-06-23 00:09:42 +05:30
redis.set(idempotency_key, jid, ex: expiry, nx: true)
2020-04-08 14:13:33 +05:30
read_jid = redis.get(idempotency_key)
end
end
2021-09-04 01:27:46 +05:30
job['idempotency_key'] = idempotency_key
2020-04-08 14:13:33 +05:30
self.existing_jid = read_jid.value
end
def delete!
Sidekiq.redis do |redis|
redis.del(idempotency_key)
end
end
2020-06-23 00:09:42 +05:30
def scheduled?
scheduled_at.present?
end
2020-04-08 14:13:33 +05:30
def duplicate?
raise "Call `#check!` first to check for existing duplicates" unless existing_jid
jid != existing_jid
end
2020-06-23 00:09:42 +05:30
def scheduled_at
job['at']
end
def options
return {} unless worker_klass
return {} unless worker_klass.respond_to?(:get_deduplication_options)
worker_klass.get_deduplication_options
2020-04-08 14:13:33 +05:30
end
2021-02-22 17:27:13 +05:30
def idempotent?
return false unless worker_klass
return false unless worker_klass.respond_to?(:idempotent?)
worker_klass.idempotent?
end
2020-04-08 14:13:33 +05:30
private
2020-06-23 00:09:42 +05:30
attr_reader :queue_name, :job
2020-04-08 14:13:33 +05:30
attr_writer :existing_jid
2020-06-23 00:09:42 +05:30
def worker_klass
@worker_klass ||= worker_class_name.to_s.safe_constantize
end
def strategy
return DEFAULT_STRATEGY unless worker_klass
return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?)
2021-09-04 01:27:46 +05:30
return STRATEGY_NONE unless worker_klass.deduplication_enabled?
2020-06-23 00:09:42 +05:30
worker_klass.get_deduplicate_strategy
end
2020-04-08 14:13:33 +05:30
def worker_class_name
job['class']
end
def arguments
job['args']
end
def jid
job['jid']
end
def idempotency_key
2021-09-04 01:27:46 +05:30
@idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
2020-04-08 14:13:33 +05:30
end
def idempotency_hash
Digest::SHA256.hexdigest(idempotency_string)
end
def namespace
"#{Gitlab::Redis::Queues::SIDEKIQ_NAMESPACE}:duplicate:#{queue_name}"
end
def idempotency_string
2021-09-30 23:02:18 +05:30
"#{worker_class_name}:#{Sidekiq.dump_json(arguments)}"
2020-04-08 14:13:33 +05:30
end
end
end
end
end