# frozen_string_literal: true

module Gitlab
  class UsageData
    class Topology
      include Gitlab::Utils::UsageData

      JOB_TO_SERVICE_NAME = {
        'gitlab-rails' => 'web',
        'gitlab-sidekiq' => 'sidekiq',
        'gitlab-workhorse' => 'workhorse',
        'redis' => 'redis',
        'postgres' => 'postgres',
        'gitaly' => 'gitaly',
        'prometheus' => 'prometheus',
        'node' => 'node-exporter',
        'registry' => 'registry'
      }.freeze

      # If these errors occur, all subsequent queries are likely to fail for the same error
      TIMEOUT_ERRORS = [Errno::ETIMEDOUT, Net::OpenTimeout, Net::ReadTimeout].freeze

      CollectionFailure = Struct.new(:query, :error) do
        def to_h
          { query => error }
        end
      end

      def topology_usage_data
        @failures = []
        @instances = Set[]
        topology_data, duration = measure_duration { topology_fetch_all_data }
        {
          topology: topology_data
                      .merge(duration_s: duration)
                      .merge(failures: @failures.map(&:to_h))
        }
      end

      private

      def topology_fetch_all_data
        with_prometheus_client(fallback: {}, verify: false) do |client|
          {
            application_requests_per_hour: topology_app_requests_per_hour(client),
            query_apdex_weekly_average: topology_query_apdex_weekly_average(client),
            nodes: topology_node_data(client)
          }.compact
        end
      rescue => e
        @failures << CollectionFailure.new('other', e.class.to_s)

        {}
      end

      def topology_app_requests_per_hour(client)
        result = query_safely('gitlab_usage_ping:ops:rate5m', 'app_requests', fallback: nil) do |query|
          client.query(aggregate_one_week(query)).first
        end

        return unless result

        # the metric is recorded as a per-second rate
        (result['value'].last.to_f * 1.hour).to_i
      end

      def topology_query_apdex_weekly_average(client)
        result = query_safely('gitlab_usage_ping:sql_duration_apdex:ratio_rate5m', 'query_apdex', fallback: nil) do |query|
          client.query(aggregate_one_week(query)).first
        end

        return unless result

        result['value'].last.to_f
      end

      def topology_node_data(client)
        # node-level data
        by_instance_mem = topology_node_memory(client)
        by_instance_mem_utilization = topology_node_memory_utilization(client)
        by_instance_cpus = topology_node_cpus(client)
        by_instance_cpu_utilization = topology_node_cpu_utilization(client)
        by_instance_uname_info = topology_node_uname_info(client)
        # service-level data
        by_instance_by_job_by_type_memory = topology_all_service_memory(client)
        by_instance_by_job_process_count = topology_all_service_process_count(client)
        by_instance_by_job_server_types = topology_all_service_server_types(client)

        @instances.map do |instance|
          {
            node_memory_total_bytes: by_instance_mem[instance],
            node_memory_utilization: by_instance_mem_utilization[instance],
            node_cpus: by_instance_cpus[instance],
            node_cpu_utilization: by_instance_cpu_utilization[instance],
            node_uname_info: by_instance_uname_info[instance],
            node_services:
              topology_node_services(
                instance, by_instance_by_job_process_count, by_instance_by_job_by_type_memory, by_instance_by_job_server_types
              )
          }.compact
        end
      end

      def topology_node_memory(client)
        query_safely('gitlab_usage_ping:node_memory_total_bytes:max', 'node_memory', fallback: {}) do |query|
          aggregate_by_instance(client, aggregate_one_week(query, aggregation: :max))
        end
      end

      def topology_node_memory_utilization(client)
        query_safely('gitlab_usage_ping:node_memory_utilization:avg', 'node_memory_utilization', fallback: {}) do |query|
          aggregate_by_instance(client, aggregate_one_week(query), transform_value: :to_f)
        end
      end

      def topology_node_cpus(client)
        query_safely('gitlab_usage_ping:node_cpus:count', 'node_cpus', fallback: {}) do |query|
          aggregate_by_instance(client, aggregate_one_week(query, aggregation: :max))
        end
      end

      def topology_node_cpu_utilization(client)
        query_safely('gitlab_usage_ping:node_cpu_utilization:avg', 'node_cpu_utilization', fallback: {}) do |query|
          aggregate_by_instance(client, aggregate_one_week(query), transform_value: :to_f)
        end
      end

      def topology_node_uname_info(client)
        node_uname_info = query_safely('node_uname_info', 'node_uname_info', fallback: []) do |query|
          client.query(query)
        end

        map_instance_labels(node_uname_info, %w(machine sysname release))
      end

      def topology_all_service_memory(client)
        {
          rss: topology_service_memory_rss(client),
          uss: topology_service_memory_uss(client),
          pss: topology_service_memory_pss(client)
        }
      end

      def topology_service_memory_rss(client)
        query_safely(
          'gitlab_usage_ping:node_service_process_resident_memory_bytes:avg', 'service_rss', fallback: {}
        ) { |query| aggregate_by_labels(client, aggregate_one_week(query)) }
      end

      def topology_service_memory_uss(client)
        query_safely(
          'gitlab_usage_ping:node_service_process_unique_memory_bytes:avg', 'service_uss', fallback: {}
        ) { |query| aggregate_by_labels(client, aggregate_one_week(query)) }
      end

      def topology_service_memory_pss(client)
        query_safely(
          'gitlab_usage_ping:node_service_process_proportional_memory_bytes:avg', 'service_pss', fallback: {}
        ) { |query| aggregate_by_labels(client, aggregate_one_week(query)) }
      end

      def topology_all_service_process_count(client)
        query_safely(
          'gitlab_usage_ping:node_service_process:count', 'service_process_count', fallback: {}
        ) { |query| aggregate_by_labels(client, aggregate_one_week(query)) }
      end

      def topology_all_service_server_types(client)
        query_safely(
          'gitlab_usage_ping:node_service_app_server_workers:sum', 'service_workers', fallback: {}
        ) { |query| aggregate_by_labels(client, query) }
      end

      def query_safely(query, query_name, fallback:)
        if timeout_error_exists?
          @failures << CollectionFailure.new(query_name, 'timeout_cancellation')
          return fallback
        end

        result = yield query

        return result if result.present?

        @failures << CollectionFailure.new(query_name, 'empty_result')
        fallback
      rescue => e
        @failures << CollectionFailure.new(query_name, e.class.to_s)
        fallback
      end

      def timeout_error_exists?
        timeout_error_names = TIMEOUT_ERRORS.map(&:to_s).to_set

        @failures.any? do |failure|
          timeout_error_names.include?(failure.error)
        end
      end

      def topology_node_services(instance, all_process_counts, all_process_memory, all_server_types)
        # returns all node service data grouped by service name as the key
        instance_service_data =
          topology_instance_service_process_count(instance, all_process_counts)
            .deep_merge(topology_instance_service_memory(instance, all_process_memory))
            .deep_merge(topology_instance_service_server_types(instance, all_server_types))

        # map to list of hashes where service names become values instead, and skip
        # unknown services, since they might not be ours
        instance_service_data.each_with_object([]) do |entry, list|
          service, service_metrics = entry
          service_name = service.to_s.strip

          if gitlab_service = JOB_TO_SERVICE_NAME[service_name]
            list << { name: gitlab_service }.merge(service_metrics)
          else
            @failures << CollectionFailure.new('service_unknown', service_name)
          end
        end
      end

      def topology_instance_service_process_count(instance, all_instance_data)
        topology_data_for_instance(instance, all_instance_data).to_h do |metric, count|
          [metric['job'], { process_count: count }]
        end
      end

      # Given a hash mapping memory set types to Prometheus response data, returns a hash
      # mapping instance/node names to services and their respective memory use in bytes
      def topology_instance_service_memory(instance, instance_data_by_type)
        result = {}
        instance_data_by_type.each do |memory_type, instance_data|
          topology_data_for_instance(instance, instance_data).each do |metric, memory_bytes|
            job = metric['job']
            key = "process_memory_#{memory_type}".to_sym

            result[job] ||= {}
            result[job][key] ||= memory_bytes
          end
        end

        result
      end

      def topology_instance_service_server_types(instance, all_instance_data)
        topology_data_for_instance(instance, all_instance_data).to_h do |metric, _value|
          [metric['job'], { server: metric['server'] }]
        end
      end

      def topology_data_for_instance(instance, all_instance_data)
        all_instance_data.filter { |metric, _value| metric['instance'] == instance }
      end

      def normalize_instance_label(instance)
        normalize_localhost_address(drop_port_number(instance))
      end

      def normalize_localhost_address(instance)
        ip_addr = IPAddr.new(instance)
        is_local_ip = ip_addr.loopback? || ip_addr.to_i == 0

        is_local_ip ? 'localhost' : instance
      rescue IPAddr::InvalidAddressError
        # This most likely means it was a host name, not an IP address
        instance
      end

      def drop_port_number(instance)
        instance.gsub(/:\d+$/, '')
      end

      def normalize_and_track_instance(instance)
        normalize_instance_label(instance).tap do |normalized_instance|
          @instances << normalized_instance
        end
      end

      def aggregate_one_week(query, aggregation: :avg)
        "#{aggregation}_over_time (#{query}[1w])"
      end

      def aggregate_by_instance(client, query, transform_value: :to_i)
        client.aggregate(query, transform_value: transform_value) { |metric| normalize_and_track_instance(metric['instance']) }
      end

      # Will retain a composite key that values are mapped to
      def aggregate_by_labels(client, query, transform_value: :to_i)
        client.aggregate(query, transform_value: transform_value) do |metric|
          metric['instance'] = normalize_and_track_instance(metric['instance'])
          metric
        end
      end

      # Given query result vector, map instance to a hash of target labels key/value.
      # @return [Hash] mapping instance to a hash of target labels key/value, or the empty hash if input empty vector
      def map_instance_labels(query_result_vector, target_labels)
        query_result_vector.to_h do |result|
          key = normalize_and_track_instance(result['metric']['instance'])
          value = result['metric'].slice(*target_labels).symbolize_keys
          [key, value]
        end
      end
    end
  end
end