201 lines
5 KiB
Ruby
201 lines
5 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module BulkImports
|
|
module Pipeline
|
|
extend ActiveSupport::Concern
|
|
include Gitlab::Utils::StrongMemoize
|
|
include Gitlab::ClassAttributes
|
|
include Runner
|
|
|
|
NotAllowedError = Class.new(StandardError)
|
|
ExpiredError = Class.new(StandardError)
|
|
FailedError = Class.new(StandardError)
|
|
|
|
CACHE_KEY_EXPIRATION = 2.hours
|
|
NDJSON_EXPORT_TIMEOUT = 90.minutes
|
|
|
|
def initialize(context)
|
|
@context = context
|
|
end
|
|
|
|
def tracker
|
|
@tracker ||= context.tracker
|
|
end
|
|
|
|
def portable
|
|
@portable ||= context.portable
|
|
end
|
|
|
|
def import_export_config
|
|
@import_export_config ||= context.import_export_config
|
|
end
|
|
|
|
def current_user
|
|
@current_user ||= context.current_user
|
|
end
|
|
|
|
included do
|
|
private
|
|
|
|
attr_reader :context
|
|
|
|
# Fetch pipeline extractor.
|
|
# An extractor is defined either by instance `#extract(context)` method
|
|
# or by using `extractor` DSL.
|
|
#
|
|
# @example
|
|
# class MyPipeline
|
|
# extractor MyExtractor, foo: :bar
|
|
# end
|
|
#
|
|
# class MyPipeline
|
|
# def extract(context)
|
|
# puts 'Fetch some data'
|
|
# end
|
|
# end
|
|
#
|
|
# If pipeline implements instance method `extract` - use it
|
|
# and ignore class `extractor` method implementation.
|
|
def extractor
|
|
@extractor ||= self.respond_to?(:extract) ? self : instantiate(self.class.get_extractor)
|
|
end
|
|
|
|
# Fetch pipeline transformers.
|
|
#
|
|
# A transformer can be defined using:
|
|
# - `transformer` class method
|
|
# - `transform` instance method
|
|
#
|
|
# Multiple transformers can be defined within a single
|
|
# pipeline and run sequentially for each record in the
|
|
# following order:
|
|
# - Instance method `transform`
|
|
# - Transformers defined using `transformer` class method
|
|
#
|
|
# Instance method `transform` is always the last to run.
|
|
#
|
|
# @example
|
|
# class MyPipeline
|
|
# transformer MyTransformerOne, foo: :bar
|
|
# transformer MyTransformerTwo, foo: :bar
|
|
#
|
|
# def transform(context, data)
|
|
# # perform transformation here
|
|
# end
|
|
# end
|
|
#
|
|
# In the example above `#transform` is the first to run and
|
|
# `MyTransformerTwo` method is the last.
|
|
def transformers
|
|
strong_memoize(:transformers) do
|
|
defined_transformers = self.class.transformers.map(&method(:instantiate))
|
|
|
|
transformers = []
|
|
transformers << self if respond_to?(:transform)
|
|
transformers.concat(defined_transformers)
|
|
transformers
|
|
end
|
|
end
|
|
|
|
# Fetch pipeline loader.
|
|
# A loader is defined either by instance method `#load(context, data)`
|
|
# or by using `loader` DSL.
|
|
#
|
|
# @example
|
|
# class MyPipeline
|
|
# loader MyLoader, foo: :bar
|
|
# end
|
|
#
|
|
# class MyPipeline
|
|
# def load(context, data)
|
|
# puts 'Load some data'
|
|
# end
|
|
# end
|
|
#
|
|
# If pipeline implements instance method `load` - use it
|
|
# and ignore class `loader` method implementation.
|
|
def loader
|
|
@loader ||= self.respond_to?(:load) ? self : instantiate(self.class.get_loader)
|
|
end
|
|
|
|
def pipeline
|
|
@pipeline ||= self.class.name
|
|
end
|
|
|
|
def instantiate(class_config)
|
|
options = class_config[:options]
|
|
|
|
if options
|
|
class_config[:klass].new(**class_config[:options])
|
|
else
|
|
class_config[:klass].new
|
|
end
|
|
end
|
|
|
|
def abort_on_failure?
|
|
self.class.abort_on_failure?
|
|
end
|
|
end
|
|
|
|
class_methods do
|
|
def extractor(klass, options = nil)
|
|
class_attributes[:extractor] = { klass: klass, options: options }
|
|
end
|
|
|
|
def transformer(klass, options = nil)
|
|
add_attribute(:transformers, klass, options)
|
|
end
|
|
|
|
def loader(klass, options = nil)
|
|
class_attributes[:loader] = { klass: klass, options: options }
|
|
end
|
|
|
|
def get_extractor
|
|
class_attributes[:extractor]
|
|
end
|
|
|
|
def transformers
|
|
class_attributes[:transformers] || []
|
|
end
|
|
|
|
def get_loader
|
|
class_attributes[:loader]
|
|
end
|
|
|
|
def abort_on_failure!
|
|
class_attributes[:abort_on_failure] = true
|
|
end
|
|
|
|
def abort_on_failure?
|
|
class_attributes[:abort_on_failure]
|
|
end
|
|
|
|
def file_extraction_pipeline!
|
|
class_attributes[:file_extraction_pipeline] = true
|
|
end
|
|
|
|
def file_extraction_pipeline?
|
|
class_attributes[:file_extraction_pipeline]
|
|
end
|
|
|
|
def relation_name(name)
|
|
class_attributes[:relation_name] = name
|
|
end
|
|
|
|
def relation
|
|
class_attributes[:relation_name] || default_relation
|
|
end
|
|
|
|
def default_relation
|
|
self.name.demodulize.chomp('Pipeline').underscore
|
|
end
|
|
|
|
private
|
|
|
|
def add_attribute(sym, klass, options)
|
|
class_attributes[sym] ||= []
|
|
class_attributes[sym] << { klass: klass, options: options }
|
|
end
|
|
end
|
|
end
|
|
end
|