101 lines
2.9 KiB
Ruby
101 lines
2.9 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module Gitlab
|
|
module BitbucketServerImport
|
|
module ParallelScheduling
|
|
include Loggable
|
|
|
|
attr_reader :project, :already_processed_cache_key, :job_waiter_cache_key
|
|
|
|
# The base cache key to use for tracking already processed objects.
|
|
ALREADY_PROCESSED_CACHE_KEY =
|
|
'bitbucket-server-importer/already-processed/%{project}/%{collection}'
|
|
|
|
# The base cache key to use for storing job waiter key
|
|
JOB_WAITER_CACHE_KEY =
|
|
'bitbucket-server-importer/job-waiter/%{project}/%{collection}'
|
|
|
|
BATCH_SIZE = 100
|
|
|
|
# project - An instance of `Project`.
|
|
def initialize(project)
|
|
@project = project
|
|
|
|
@already_processed_cache_key =
|
|
format(ALREADY_PROCESSED_CACHE_KEY, project: project.id, collection: collection_method)
|
|
@job_waiter_cache_key =
|
|
format(JOB_WAITER_CACHE_KEY, project: project.id, collection: collection_method)
|
|
end
|
|
|
|
private
|
|
|
|
def client
|
|
@client ||= BitbucketServer::Client.new(project.import_data.credentials)
|
|
end
|
|
|
|
def project_key
|
|
@project_key ||= project.import_data.data['project_key']
|
|
end
|
|
|
|
def repository_slug
|
|
@repository_slug ||= project.import_data.data['repo_slug']
|
|
end
|
|
|
|
# Returns the ID to use for the cache used for checking if an object has
|
|
# already been processed or not.
|
|
#
|
|
# object - The object we may want to import.
|
|
def id_for_already_processed_cache(object)
|
|
raise NotImplementedError
|
|
end
|
|
|
|
# The Sidekiq worker class used for scheduling the importing of objects in
|
|
# parallel.
|
|
def sidekiq_worker_class
|
|
raise NotImplementedError
|
|
end
|
|
|
|
# The name of the method to call to retrieve the data to import.
|
|
def collection_method
|
|
raise NotImplementedError
|
|
end
|
|
|
|
def job_waiter
|
|
@job_waiter ||= begin
|
|
key = Gitlab::Cache::Import::Caching.read(job_waiter_cache_key)
|
|
key ||= Gitlab::Cache::Import::Caching.write(job_waiter_cache_key, JobWaiter.generate_key)
|
|
|
|
JobWaiter.new(0, key)
|
|
end
|
|
end
|
|
|
|
def already_processed?(object)
|
|
id = id_for_already_processed_cache(object)
|
|
|
|
Gitlab::Cache::Import::Caching.set_includes?(already_processed_cache_key, id)
|
|
end
|
|
|
|
# Marks the given object as "already processed".
|
|
def mark_as_processed(object)
|
|
id = id_for_already_processed_cache(object)
|
|
|
|
Gitlab::Cache::Import::Caching.set_add(already_processed_cache_key, id)
|
|
end
|
|
|
|
def calculate_job_delay(job_index)
|
|
multiplier = (job_index / BATCH_SIZE)
|
|
|
|
(multiplier * 1.minute) + 1.second
|
|
end
|
|
|
|
def track_import_failure!(project, exception:, **args)
|
|
Gitlab::Import::ImportFailureService.track(
|
|
project_id: project.id,
|
|
error_source: self.class.name,
|
|
exception: exception,
|
|
**args
|
|
)
|
|
end
|
|
end
|
|
end
|
|
end
|