debian-mirror-gitlab/spec/support/helpers/concurrent_helpers.rb

41 lines
1.3 KiB
Ruby
Raw Permalink Normal View History

2020-05-24 23:13:21 +05:30
# frozen_string_literal: true
module ConcurrentHelpers
Cancelled = Class.new(StandardError)
# To test for contention, we may need to run some actions in parallel. This
# helper takes an array of blocks and schedules them all on different threads
# in a fixed-size thread pool.
#
# @param [Array[Proc]] blocks
# @param [Integer] task_wait_time: time to wait for each task (upper bound on
# reasonable task execution time)
# @param [Integer] max_concurrency: maximum number of tasks to run at once
#
def run_parallel(blocks, task_wait_time: 20.seconds, max_concurrency: Concurrent.processor_count - 1)
thread_pool = Concurrent::FixedThreadPool.new(
[2, max_concurrency].max, { max_queue: blocks.size }
)
opts = { executor: thread_pool }
error = Concurrent::MVar.new
blocks.map { |block| Concurrent::Future.execute(opts, &block) }.each do |future|
future.wait(task_wait_time)
if future.complete?
error.put(future.reason) if future.reason && error.empty?
else
future.cancel
error.put(Cancelled.new) if error.empty?
end
end
raise error.take if error.full?
ensure
thread_pool.shutdown
thread_pool.wait_for_termination(10)
thread_pool.kill if thread_pool.running?
end
end