2021-04-29 21:17:54 +05:30
|
|
|
# frozen_string_literal: true
|
|
|
|
|
|
|
|
require 'spec_helper'
|
|
|
|
|
|
|
|
RSpec.describe BulkImports::PipelineWorker do
|
|
|
|
let(:pipeline_class) do
|
|
|
|
Class.new do
|
|
|
|
def initialize(_); end
|
|
|
|
|
|
|
|
def run; end
|
2021-09-04 01:27:46 +05:30
|
|
|
|
|
|
|
def self.ndjson_pipeline?
|
|
|
|
false
|
|
|
|
end
|
2021-04-29 21:17:54 +05:30
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2021-09-04 01:27:46 +05:30
|
|
|
let_it_be(:bulk_import) { create(:bulk_import) }
|
|
|
|
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) }
|
|
|
|
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
|
2021-04-29 21:17:54 +05:30
|
|
|
|
|
|
|
before do
|
|
|
|
stub_const('FakePipeline', pipeline_class)
|
|
|
|
end
|
|
|
|
|
|
|
|
it 'runs the given pipeline successfully' do
|
|
|
|
pipeline_tracker = create(
|
|
|
|
:bulk_import_tracker,
|
|
|
|
entity: entity,
|
|
|
|
pipeline_name: 'FakePipeline'
|
|
|
|
)
|
|
|
|
|
|
|
|
expect(BulkImports::Stage)
|
|
|
|
.to receive(:pipeline_exists?)
|
|
|
|
.with('FakePipeline')
|
2021-09-04 01:27:46 +05:30
|
|
|
.twice
|
2021-04-29 21:17:54 +05:30
|
|
|
.and_return(true)
|
|
|
|
|
|
|
|
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
|
|
|
|
expect(logger)
|
|
|
|
.to receive(:info)
|
|
|
|
.with(
|
|
|
|
worker: described_class.name,
|
|
|
|
pipeline_name: 'FakePipeline',
|
|
|
|
entity_id: entity.id
|
|
|
|
)
|
|
|
|
end
|
|
|
|
|
|
|
|
expect(BulkImports::EntityWorker)
|
|
|
|
.to receive(:perform_async)
|
|
|
|
.with(entity.id, pipeline_tracker.stage)
|
|
|
|
|
|
|
|
expect(subject).to receive(:jid).and_return('jid')
|
|
|
|
|
|
|
|
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
|
|
|
|
|
|
|
|
pipeline_tracker.reload
|
|
|
|
|
|
|
|
expect(pipeline_tracker.status_name).to eq(:finished)
|
|
|
|
expect(pipeline_tracker.jid).to eq('jid')
|
|
|
|
end
|
|
|
|
|
|
|
|
context 'when the pipeline cannot be found' do
|
|
|
|
it 'logs the error' do
|
|
|
|
pipeline_tracker = create(
|
|
|
|
:bulk_import_tracker,
|
|
|
|
:started,
|
|
|
|
entity: entity,
|
|
|
|
pipeline_name: 'FakePipeline'
|
|
|
|
)
|
|
|
|
|
|
|
|
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
|
|
|
|
expect(logger)
|
|
|
|
.to receive(:error)
|
|
|
|
.with(
|
|
|
|
worker: described_class.name,
|
|
|
|
pipeline_tracker_id: pipeline_tracker.id,
|
|
|
|
entity_id: entity.id,
|
|
|
|
message: 'Unstarted pipeline not found'
|
|
|
|
)
|
|
|
|
end
|
|
|
|
|
|
|
|
expect(BulkImports::EntityWorker)
|
|
|
|
.to receive(:perform_async)
|
|
|
|
.with(entity.id, pipeline_tracker.stage)
|
|
|
|
|
|
|
|
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
context 'when the pipeline raises an exception' do
|
|
|
|
it 'logs the error' do
|
|
|
|
pipeline_tracker = create(
|
|
|
|
:bulk_import_tracker,
|
|
|
|
entity: entity,
|
|
|
|
pipeline_name: 'InexistentPipeline'
|
|
|
|
)
|
|
|
|
|
|
|
|
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
|
|
|
|
expect(logger)
|
|
|
|
.to receive(:error)
|
|
|
|
.with(
|
|
|
|
worker: described_class.name,
|
|
|
|
pipeline_name: 'InexistentPipeline',
|
|
|
|
entity_id: entity.id,
|
|
|
|
message: "'InexistentPipeline' is not a valid BulkImport Pipeline"
|
|
|
|
)
|
|
|
|
end
|
|
|
|
|
|
|
|
expect(Gitlab::ErrorTracking)
|
|
|
|
.to receive(:track_exception)
|
|
|
|
.with(
|
|
|
|
instance_of(NameError),
|
|
|
|
entity_id: entity.id,
|
|
|
|
pipeline_name: pipeline_tracker.pipeline_name
|
|
|
|
)
|
|
|
|
|
|
|
|
expect(BulkImports::EntityWorker)
|
|
|
|
.to receive(:perform_async)
|
|
|
|
.with(entity.id, pipeline_tracker.stage)
|
|
|
|
|
|
|
|
expect(subject).to receive(:jid).and_return('jid')
|
|
|
|
|
|
|
|
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
|
|
|
|
|
|
|
|
pipeline_tracker.reload
|
|
|
|
|
|
|
|
expect(pipeline_tracker.status_name).to eq(:failed)
|
|
|
|
expect(pipeline_tracker.jid).to eq('jid')
|
|
|
|
end
|
|
|
|
end
|
2021-09-04 01:27:46 +05:30
|
|
|
|
|
|
|
context 'when ndjson pipeline' do
|
|
|
|
let(:ndjson_pipeline) do
|
|
|
|
Class.new do
|
|
|
|
def initialize(_); end
|
|
|
|
|
|
|
|
def run; end
|
|
|
|
|
|
|
|
def self.ndjson_pipeline?
|
|
|
|
true
|
|
|
|
end
|
|
|
|
|
|
|
|
def self.relation
|
|
|
|
'test'
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
let(:pipeline_tracker) do
|
|
|
|
create(
|
|
|
|
:bulk_import_tracker,
|
|
|
|
entity: entity,
|
|
|
|
pipeline_name: 'NdjsonPipeline'
|
|
|
|
)
|
|
|
|
end
|
|
|
|
|
|
|
|
before do
|
|
|
|
stub_const('NdjsonPipeline', ndjson_pipeline)
|
|
|
|
allow(BulkImports::Stage)
|
|
|
|
.to receive(:pipeline_exists?)
|
|
|
|
.with('NdjsonPipeline')
|
|
|
|
.and_return(true)
|
|
|
|
end
|
|
|
|
|
|
|
|
it 'runs the pipeline successfully' do
|
|
|
|
allow_next_instance_of(BulkImports::ExportStatus) do |status|
|
|
|
|
allow(status).to receive(:started?).and_return(false)
|
|
|
|
allow(status).to receive(:failed?).and_return(false)
|
|
|
|
end
|
|
|
|
|
|
|
|
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
|
|
|
|
|
|
|
|
expect(pipeline_tracker.reload.status_name).to eq(:finished)
|
|
|
|
end
|
|
|
|
|
|
|
|
context 'when export status is started' do
|
|
|
|
it 'reenqueues pipeline worker' do
|
|
|
|
allow_next_instance_of(BulkImports::ExportStatus) do |status|
|
|
|
|
allow(status).to receive(:started?).and_return(true)
|
|
|
|
allow(status).to receive(:failed?).and_return(false)
|
|
|
|
end
|
|
|
|
|
|
|
|
expect(described_class)
|
|
|
|
.to receive(:perform_in)
|
|
|
|
.with(
|
|
|
|
described_class::NDJSON_PIPELINE_PERFORM_DELAY,
|
|
|
|
pipeline_tracker.id,
|
|
|
|
pipeline_tracker.stage,
|
|
|
|
entity.id
|
|
|
|
)
|
|
|
|
|
|
|
|
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
context 'when job reaches timeout' do
|
|
|
|
it 'marks as failed and logs the error' do
|
|
|
|
old_created_at = entity.created_at
|
|
|
|
entity.update!(created_at: (BulkImports::Pipeline::NDJSON_EXPORT_TIMEOUT + 1.hour).ago)
|
|
|
|
|
|
|
|
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
|
|
|
|
expect(logger)
|
|
|
|
.to receive(:error)
|
|
|
|
.with(
|
|
|
|
worker: described_class.name,
|
|
|
|
pipeline_name: 'NdjsonPipeline',
|
|
|
|
entity_id: entity.id,
|
|
|
|
message: 'Pipeline timeout'
|
|
|
|
)
|
|
|
|
end
|
|
|
|
|
|
|
|
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
|
|
|
|
|
|
|
|
expect(pipeline_tracker.reload.status_name).to eq(:failed)
|
|
|
|
|
|
|
|
entity.update!(created_at: old_created_at)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
context 'when export status is failed' do
|
|
|
|
it 'marks as failed and logs the error' do
|
|
|
|
allow_next_instance_of(BulkImports::ExportStatus) do |status|
|
|
|
|
allow(status).to receive(:failed?).and_return(true)
|
|
|
|
allow(status).to receive(:error).and_return('Error!')
|
|
|
|
end
|
|
|
|
|
|
|
|
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
|
|
|
|
expect(logger)
|
|
|
|
.to receive(:error)
|
|
|
|
.with(
|
|
|
|
worker: described_class.name,
|
|
|
|
pipeline_name: 'NdjsonPipeline',
|
|
|
|
entity_id: entity.id,
|
|
|
|
message: 'Error!'
|
|
|
|
)
|
|
|
|
end
|
|
|
|
|
|
|
|
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
|
|
|
|
|
|
|
|
expect(pipeline_tracker.reload.status_name).to eq(:failed)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
2021-04-29 21:17:54 +05:30
|
|
|
end
|