150 lines
4 KiB
Ruby
150 lines
4 KiB
Ruby
|
# frozen_string_literal: true
|
||
|
|
||
|
module Gitlab
|
||
|
# Given a set of process IDs, the supervisor can monitor processes
|
||
|
# for being alive and invoke a callback if some or all should go away.
|
||
|
# The receiver of the callback can then act on this event, for instance
|
||
|
# by restarting those processes or performing clean-up work.
|
||
|
#
|
||
|
# The supervisor will also trap termination signals if provided and
|
||
|
# propagate those to the supervised processes. Any supervised processes
|
||
|
# that do not terminate within a specified grace period will be killed.
|
||
|
class ProcessSupervisor < Gitlab::Daemon
|
||
|
DEFAULT_HEALTH_CHECK_INTERVAL_SECONDS = 5
|
||
|
DEFAULT_TERMINATE_INTERVAL_SECONDS = 1
|
||
|
DEFAULT_TERMINATE_TIMEOUT_SECONDS = 10
|
||
|
|
||
|
attr_reader :alive
|
||
|
|
||
|
def initialize(
|
||
|
health_check_interval_seconds: DEFAULT_HEALTH_CHECK_INTERVAL_SECONDS,
|
||
|
check_terminate_interval_seconds: DEFAULT_TERMINATE_INTERVAL_SECONDS,
|
||
|
terminate_timeout_seconds: DEFAULT_TERMINATE_TIMEOUT_SECONDS,
|
||
|
term_signals: %i(INT TERM),
|
||
|
forwarded_signals: [],
|
||
|
**options)
|
||
|
super(**options)
|
||
|
|
||
|
@term_signals = term_signals
|
||
|
@forwarded_signals = forwarded_signals
|
||
|
@health_check_interval_seconds = health_check_interval_seconds
|
||
|
@check_terminate_interval_seconds = check_terminate_interval_seconds
|
||
|
@terminate_timeout_seconds = terminate_timeout_seconds
|
||
|
|
||
|
@pids = []
|
||
|
@alive = false
|
||
|
end
|
||
|
|
||
|
# Starts a supervision loop for the given process ID(s).
|
||
|
#
|
||
|
# If any or all processes go away, the IDs of any dead processes will
|
||
|
# be yielded to the given block, so callers can act on them.
|
||
|
#
|
||
|
# If the block returns a non-empty list of IDs, the supervisor will
|
||
|
# start observing those processes instead. Otherwise it will shut down.
|
||
|
def supervise(pid_or_pids, &on_process_death)
|
||
|
@pids = Array(pid_or_pids)
|
||
|
@on_process_death = on_process_death
|
||
|
|
||
|
trap_signals!
|
||
|
|
||
|
start
|
||
|
end
|
||
|
|
||
|
# Shuts down the supervisor and all supervised processes with the given signal.
|
||
|
def shutdown(signal = :TERM)
|
||
|
return unless @alive
|
||
|
|
||
|
stop_processes(signal)
|
||
|
stop
|
||
|
end
|
||
|
|
||
|
def supervised_pids
|
||
|
@pids
|
||
|
end
|
||
|
|
||
|
private
|
||
|
|
||
|
def start_working
|
||
|
@alive = true
|
||
|
end
|
||
|
|
||
|
def stop_working
|
||
|
@alive = false
|
||
|
end
|
||
|
|
||
|
def run_thread
|
||
|
while @alive
|
||
|
sleep(@health_check_interval_seconds)
|
||
|
|
||
|
check_process_health
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def check_process_health
|
||
|
unless all_alive?
|
||
|
existing_pids = live_pids # Capture this value for the duration of the block.
|
||
|
dead_pids = @pids - existing_pids
|
||
|
new_pids = Array(@on_process_death.call(dead_pids))
|
||
|
@pids = existing_pids + new_pids
|
||
|
@alive = @pids.any?
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def stop_processes(signal)
|
||
|
# Set this prior to shutting down so that shutdown hooks which read `alive`
|
||
|
# know the supervisor is about to shut down.
|
||
|
@alive = false
|
||
|
|
||
|
# Shut down supervised processes.
|
||
|
signal_all(signal)
|
||
|
wait_for_termination
|
||
|
end
|
||
|
|
||
|
def trap_signals!
|
||
|
ProcessManagement.trap_signals(@term_signals) do |signal|
|
||
|
stop_processes(signal)
|
||
|
end
|
||
|
|
||
|
ProcessManagement.trap_signals(@forwarded_signals) do |signal|
|
||
|
signal_all(signal)
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def wait_for_termination
|
||
|
deadline = monotonic_time + @terminate_timeout_seconds
|
||
|
sleep(@check_terminate_interval_seconds) while continue_waiting?(deadline)
|
||
|
|
||
|
hard_stop_stuck_pids
|
||
|
end
|
||
|
|
||
|
def monotonic_time
|
||
|
Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_second)
|
||
|
end
|
||
|
|
||
|
def continue_waiting?(deadline)
|
||
|
any_alive? && monotonic_time < deadline
|
||
|
end
|
||
|
|
||
|
def signal_all(signal)
|
||
|
ProcessManagement.signal_processes(@pids, signal)
|
||
|
end
|
||
|
|
||
|
def hard_stop_stuck_pids
|
||
|
ProcessManagement.signal_processes(live_pids, "-KILL")
|
||
|
end
|
||
|
|
||
|
def any_alive?
|
||
|
ProcessManagement.any_alive?(@pids)
|
||
|
end
|
||
|
|
||
|
def all_alive?
|
||
|
ProcessManagement.all_alive?(@pids)
|
||
|
end
|
||
|
|
||
|
def live_pids
|
||
|
ProcessManagement.pids_alive(@pids)
|
||
|
end
|
||
|
end
|
||
|
end
|