debian-mirror-gitlab/app/services/ci/pipeline_processing/atomic_processing_service.rb

123 lines
3.5 KiB
Ruby
Raw Normal View History

2020-03-13 15:44:24 +05:30
# frozen_string_literal: true
module Ci
module PipelineProcessing
class AtomicProcessingService
include Gitlab::Utils::StrongMemoize
include ExclusiveLeaseGuard
attr_reader :pipeline
DEFAULT_LEASE_TIMEOUT = 1.minute
BATCH_SIZE = 20
def initialize(pipeline)
@pipeline = pipeline
@collection = AtomicProcessingService::StatusCollection.new(pipeline)
end
def execute
return unless pipeline.needs_processing?
success = try_obtain_lease { process! }
# re-schedule if we need further processing
if success && pipeline.needs_processing?
PipelineProcessWorker.perform_async(pipeline.id)
end
success
end
private
def process!
update_stages!
update_pipeline!
update_statuses_processed!
2022-01-26 12:08:38 +05:30
if Feature.enabled?(:expire_job_and_pipeline_cache_synchronously, pipeline.project, default_enabled: :yaml)
Ci::ExpirePipelineCacheService.new.execute(pipeline)
end
2020-03-13 15:44:24 +05:30
true
end
def update_stages!
pipeline.stages.ordered.each(&method(:update_stage!))
end
def update_stage!(stage)
# Update processables for a given stage in bulk/slices
ids = @collection.created_processable_ids_for_stage_position(stage.position)
ids.in_groups_of(BATCH_SIZE, false, &method(:update_processables!))
status = @collection.status_for_stage_position(stage.position)
stage.set_status(status)
end
def update_processables!(ids)
2021-04-17 20:07:23 +05:30
created_processables = pipeline.processables.id_in(ids)
2020-03-13 15:44:24 +05:30
.with_project_preload
.created
.latest
.ordered_by_stage
.select_with_aggregated_needs(project)
created_processables.each(&method(:update_processable!))
end
def update_pipeline!
pipeline.set_status(@collection.status_of_all)
end
def update_statuses_processed!
processing = @collection.processing_processables
processing.each_slice(BATCH_SIZE) do |slice|
pipeline.statuses.match_id_and_lock_version(slice)
.update_as_processed!
end
end
def update_processable!(processable)
status = processable_status(processable)
2020-07-28 23:09:34 +05:30
return unless Ci::HasStatus::COMPLETED_STATUSES.include?(status)
2020-03-13 15:44:24 +05:30
# transition status if possible
2021-04-17 20:07:23 +05:30
Gitlab::OptimisticLocking.retry_lock(processable, name: 'atomic_processing_update_processable') do |subject|
2020-03-13 15:44:24 +05:30
Ci::ProcessBuildService.new(project, subject.user)
.execute(subject, status)
# update internal representation of status
# to make the status change of processable
# to be taken into account during further processing
@collection.set_processable_status(
processable.id, processable.status, processable.lock_version)
end
end
def processable_status(processable)
2020-04-22 19:07:51 +05:30
if processable.scheduling_type_dag?
2020-03-13 15:44:24 +05:30
# Processable uses DAG, get status of all dependent needs
2020-05-24 23:13:21 +05:30
@collection.status_for_names(processable.aggregated_needs_names.to_a, dag: true)
2020-03-13 15:44:24 +05:30
else
# Processable uses Stages, get status of prior stage
@collection.status_for_prior_stage_position(processable.stage_idx.to_i)
end
end
def project
pipeline.project
end
def lease_key
"#{super}::pipeline_id:#{pipeline.id}"
end
def lease_timeout
DEFAULT_LEASE_TIMEOUT
end
end
end
end