# frozen_string_literal: true module Gitlab class UsageData class Topology include Gitlab::Utils::UsageData JOB_TO_SERVICE_NAME = { 'gitlab-rails' => 'web', 'gitlab-sidekiq' => 'sidekiq', 'gitlab-workhorse' => 'workhorse', 'redis' => 'redis', 'postgres' => 'postgres', 'gitaly' => 'gitaly', 'prometheus' => 'prometheus', 'node' => 'node-exporter', 'registry' => 'registry' }.freeze CollectionFailure = Struct.new(:query, :error) do def to_h { query => error } end end def topology_usage_data @failures = [] @instances = Set[] topology_data, duration = measure_duration { topology_fetch_all_data } { topology: topology_data .merge(duration_s: duration) .merge(failures: @failures.map(&:to_h)) } end private def topology_fetch_all_data with_prometheus_client(fallback: {}) do |client| { application_requests_per_hour: topology_app_requests_per_hour(client), nodes: topology_node_data(client) }.compact end rescue => e @failures << CollectionFailure.new('other', e.class.to_s) {} end def topology_app_requests_per_hour(client) result = query_safely('gitlab_usage_ping:ops:rate5m', 'app_requests', fallback: nil) do |query| client.query(one_week_average(query)).first end return unless result # the metric is recorded as a per-second rate (result['value'].last.to_f * 1.hour).to_i end def topology_node_data(client) # node-level data by_instance_mem = topology_node_memory(client) by_instance_cpus = topology_node_cpus(client) by_instance_uname_info = topology_node_uname_info(client) # service-level data by_instance_by_job_by_type_memory = topology_all_service_memory(client) by_instance_by_job_process_count = topology_all_service_process_count(client) by_instance_by_job_server_types = topology_all_service_server_types(client) @instances.map do |instance| { node_memory_total_bytes: by_instance_mem[instance], node_cpus: by_instance_cpus[instance], node_uname_info: by_instance_uname_info[instance], node_services: topology_node_services( instance, by_instance_by_job_process_count, by_instance_by_job_by_type_memory, by_instance_by_job_server_types ) }.compact end end def topology_node_memory(client) query_safely('gitlab_usage_ping:node_memory_total_bytes:avg', 'node_memory', fallback: {}) do |query| aggregate_by_instance(client, one_week_average(query)) end end def topology_node_cpus(client) query_safely('gitlab_usage_ping:node_cpus:count', 'node_cpus', fallback: {}) do |query| aggregate_by_instance(client, one_week_average(query)) end end def topology_node_uname_info(client) node_uname_info = query_safely('node_uname_info', 'node_uname_info', fallback: []) do |query| client.query(query) end map_instance_labels(node_uname_info, %w(machine sysname release)) end def topology_all_service_memory(client) { rss: topology_service_memory_rss(client), uss: topology_service_memory_uss(client), pss: topology_service_memory_pss(client) } end def topology_service_memory_rss(client) query_safely( 'gitlab_usage_ping:node_service_process_resident_memory_bytes:avg', 'service_rss', fallback: {} ) { |query| aggregate_by_labels(client, one_week_average(query)) } end def topology_service_memory_uss(client) query_safely( 'gitlab_usage_ping:node_service_process_unique_memory_bytes:avg', 'service_uss', fallback: {} ) { |query| aggregate_by_labels(client, one_week_average(query)) } end def topology_service_memory_pss(client) query_safely( 'gitlab_usage_ping:node_service_process_proportional_memory_bytes:avg', 'service_pss', fallback: {} ) { |query| aggregate_by_labels(client, one_week_average(query)) } end def topology_all_service_process_count(client) query_safely( 'gitlab_usage_ping:node_service_process:count', 'service_process_count', fallback: {} ) { |query| aggregate_by_labels(client, one_week_average(query)) } end def topology_all_service_server_types(client) query_safely( 'gitlab_usage_ping:node_service_app_server_workers:sum', 'service_workers', fallback: {} ) { |query| aggregate_by_labels(client, query) } end def query_safely(query, query_name, fallback:) result = yield query return result if result.present? @failures << CollectionFailure.new(query_name, 'empty_result') fallback rescue => e @failures << CollectionFailure.new(query_name, e.class.to_s) fallback end def topology_node_services(instance, all_process_counts, all_process_memory, all_server_types) # returns all node service data grouped by service name as the key instance_service_data = topology_instance_service_process_count(instance, all_process_counts) .deep_merge(topology_instance_service_memory(instance, all_process_memory)) .deep_merge(topology_instance_service_server_types(instance, all_server_types)) # map to list of hashes where service names become values instead, and remove # unknown services, since they might not be ours instance_service_data.each_with_object([]) do |entry, list| service, service_metrics = entry gitlab_service = JOB_TO_SERVICE_NAME[service.to_s] next unless gitlab_service list << { name: gitlab_service }.merge(service_metrics) end end def topology_instance_service_process_count(instance, all_instance_data) topology_data_for_instance(instance, all_instance_data).to_h do |metric, count| [metric['job'], { process_count: count }] end end # Given a hash mapping memory set types to Prometheus response data, returns a hash # mapping instance/node names to services and their respective memory use in bytes def topology_instance_service_memory(instance, instance_data_by_type) result = {} instance_data_by_type.each do |memory_type, instance_data| topology_data_for_instance(instance, instance_data).each do |metric, memory_bytes| job = metric['job'] key = "process_memory_#{memory_type}".to_sym result[job] ||= {} result[job][key] ||= memory_bytes end end result end def topology_instance_service_server_types(instance, all_instance_data) topology_data_for_instance(instance, all_instance_data).to_h do |metric, _value| [metric['job'], { server: metric['server'] }] end end def topology_data_for_instance(instance, all_instance_data) all_instance_data.filter { |metric, _value| metric['instance'] == instance } end def normalize_instance_label(instance) normalize_localhost_address(drop_port_number(instance)) end def normalize_localhost_address(instance) ip_addr = IPAddr.new(instance) is_local_ip = ip_addr.loopback? || ip_addr.to_i.zero? is_local_ip ? 'localhost' : instance rescue IPAddr::InvalidAddressError # This most likely means it was a host name, not an IP address instance end def drop_port_number(instance) instance.gsub(/:\d+$/, '') end def normalize_and_track_instance(instance) normalize_instance_label(instance).tap do |normalized_instance| @instances << normalized_instance end end def one_week_average(query) "avg_over_time (#{query}[1w])" end def aggregate_by_instance(client, query) client.aggregate(query) { |metric| normalize_and_track_instance(metric['instance']) } end # Will retain a composite key that values are mapped to def aggregate_by_labels(client, query) client.aggregate(query) do |metric| metric['instance'] = normalize_and_track_instance(metric['instance']) metric end end # Given query result vector, map instance to a hash of target labels key/value. # @return [Hash] mapping instance to a hash of target labels key/value, or the empty hash if input empty vector def map_instance_labels(query_result_vector, target_labels) query_result_vector.to_h do |result| key = normalize_and_track_instance(result['metric']['instance']) value = result['metric'].slice(*target_labels).symbolize_keys [key, value] end end end end end