# frozen_string_literal: true module Gitlab module GithubImport module ParallelScheduling attr_reader :project, :client, :page_counter, :already_imported_cache_key, :job_waiter_cache_key, :job_waiter_remaining_cache_key # The base cache key to use for tracking already imported objects. ALREADY_IMPORTED_CACHE_KEY = 'github-importer/already-imported/%{project}/%{collection}' # The base cache key to use for storing job waiter key JOB_WAITER_CACHE_KEY = 'github-importer/job-waiter/%{project}/%{collection}' # The base cache key to use for storing job waiter remaining jobs JOB_WAITER_REMAINING_CACHE_KEY = 'github-importer/job-waiter-remaining/%{project}/%{collection}' # project - An instance of `Project`. # client - An instance of `Gitlab::GithubImport::Client`. # parallel - When set to true the objects will be imported in parallel. def initialize(project, client, parallel: true) @project = project @client = client @parallel = parallel @page_counter = PageCounter.new(project, collection_method) @already_imported_cache_key = ALREADY_IMPORTED_CACHE_KEY % { project: project.id, collection: collection_method } @job_waiter_cache_key = JOB_WAITER_CACHE_KEY % { project: project.id, collection: collection_method } @job_waiter_remaining_cache_key = JOB_WAITER_REMAINING_CACHE_KEY % { project: project.id, collection: collection_method } end def parallel? @parallel end def execute info(project.id, message: "starting importer") retval = if parallel? parallel_import else sequential_import end # Once we have completed all work we can remove our "already exists" # cache so we don't put too much pressure on Redis. # # We don't immediately remove it since it's technically possible for # other instances of this job to still run, instead we set the # expiration time to a lower value. This prevents the other jobs from # still scheduling duplicates while. Since all work has already been # completed those jobs will just cycle through any remaining pages while # not scheduling anything. Gitlab::Cache::Import::Caching.expire(already_imported_cache_key, Gitlab::Cache::Import::Caching::SHORTER_TIMEOUT) info(project.id, message: "importer finished") retval rescue StandardError => e Gitlab::Import::ImportFailureService.track( project_id: project.id, error_source: self.class.name, exception: e, fail_import: abort_on_failure, metrics: true ) raise(e) end # Imports all the objects in sequence in the current thread. def sequential_import each_object_to_import do |object| repr = object_representation(object) importer_class.new(repr, project, client).execute end end # Imports all objects in parallel by scheduling a Sidekiq job for every # individual object. def parallel_import raise 'Batch settings must be defined for parallel import' if parallel_import_batch.blank? if Feature.enabled?(:improved_spread_parallel_import) improved_spread_parallel_import else spread_parallel_import end end def improved_spread_parallel_import enqueued_job_counter = 0 each_object_to_import do |object| repr = object_representation(object) job_delay = calculate_job_delay(enqueued_job_counter) sidekiq_worker_class.perform_in(job_delay, project.id, repr.to_hash, job_waiter.key) enqueued_job_counter += 1 job_waiter.jobs_remaining = Gitlab::Cache::Import::Caching.increment(job_waiter_remaining_cache_key) end job_waiter end def spread_parallel_import waiter = JobWaiter.new import_arguments = [] each_object_to_import do |object| repr = object_representation(object) import_arguments << [project.id, repr.to_hash, waiter.key] waiter.jobs_remaining += 1 end # rubocop:disable Scalability/BulkPerformWithContext Gitlab::ApplicationContext.with_context(project: project) do sidekiq_worker_class.bulk_perform_in( 1.second, import_arguments, batch_size: parallel_import_batch[:size], batch_delay: parallel_import_batch[:delay] ) end # rubocop:enable Scalability/BulkPerformWithContext waiter end # The method that will be called for traversing through all the objects to # import, yielding them to the supplied block. def each_object_to_import repo = project.import_source # We inject the page number here to make sure that all importers always # start where they left off. Simply starting over wouldn't work for # repositories with a lot of data (e.g. tens of thousands of comments). options = collection_options.merge(page: page_counter.current) client.each_page(collection_method, repo, options) do |page| # Technically it's possible that the same work is performed multiple # times, as Sidekiq doesn't guarantee there will ever only be one # instance of a job. In such a scenario it's possible for one job to # have a lower page number (e.g. 5) compared to another (e.g. 10). In # this case we skip over all the objects until we have caught up, # reducing the number of duplicate jobs scheduled by the provided # block. next unless page_counter.set(page.number) page.objects.each do |object| object = object.to_h next if already_imported?(object) if increment_object_counter?(object) Gitlab::GithubImport::ObjectCounter.increment(project, object_type, :fetched) end yield object # We mark the object as imported immediately so we don't end up # scheduling it multiple times. mark_as_imported(object) end end end def increment_object_counter?(_object) true end # Returns true if the given object has already been imported, false # otherwise. # # object - The object to check. def already_imported?(object) id = id_for_already_imported_cache(object) Gitlab::Cache::Import::Caching.set_includes?(already_imported_cache_key, id) end # Marks the given object as "already imported". def mark_as_imported(object) id = id_for_already_imported_cache(object) Gitlab::Cache::Import::Caching.set_add(already_imported_cache_key, id) end def object_type raise NotImplementedError end # Returns the ID to use for the cache used for checking if an object has # already been imported or not. # # object - The object we may want to import. def id_for_already_imported_cache(object) raise NotImplementedError end # The class used for converting API responses to Hashes when performing # the import. def representation_class raise NotImplementedError end # The class to use for importing objects when importing them sequentially. def importer_class raise NotImplementedError end # The Sidekiq worker class used for scheduling the importing of objects in # parallel. def sidekiq_worker_class raise NotImplementedError end # The name of the method to call to retrieve the data to import. def collection_method raise NotImplementedError end # Default batch settings for parallel import (can be redefined in Importer classes) def parallel_import_batch { size: 1000, delay: 1.minute } end def abort_on_failure false end # Any options to be passed to the method used for retrieving the data to # import. def collection_options {} end private def additional_object_data {} end def object_representation(object) representation_class.from_api_response(object, additional_object_data) end def info(project_id, extra = {}) Logger.info(log_attributes(project_id, extra)) end def log_attributes(project_id, extra = {}) extra.merge( project_id: project_id, importer: importer_class.name, parallel: parallel? ) end def job_waiter @job_waiter ||= begin key = Gitlab::Cache::Import::Caching.read(job_waiter_cache_key) key ||= Gitlab::Cache::Import::Caching.write(job_waiter_cache_key, JobWaiter.generate_key) jobs_remaining = Gitlab::Cache::Import::Caching.read(job_waiter_remaining_cache_key).to_i || 0 JobWaiter.new(jobs_remaining, key) end end def calculate_job_delay(job_index) multiplier = (job_index / parallel_import_batch[:size]) (multiplier * parallel_import_batch[:delay]) + 1.second end end end end