package grpc_prometheus import ( prom "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/status" ) // ServerMetrics represents a collection of metrics to be registered on a // Prometheus metrics registry for a gRPC server. type ServerMetrics struct { serverStartedCounter *prom.CounterVec serverHandledCounter *prom.CounterVec serverStreamMsgReceived *prom.CounterVec serverStreamMsgSent *prom.CounterVec serverHandledHistogramEnabled bool serverHandledHistogramOpts prom.HistogramOpts serverHandledHistogram *prom.HistogramVec } // NewServerMetrics returns a ServerMetrics object. Use a new instance of // ServerMetrics when not using the default Prometheus metrics registry, for // example when wanting to control which metrics are added to a registry as // opposed to automatically adding metrics via init functions. func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics { opts := counterOptions(counterOpts) return &ServerMetrics{ serverStartedCounter: prom.NewCounterVec( opts.apply(prom.CounterOpts{ Name: "grpc_server_started_total", Help: "Total number of RPCs started on the server.", }), []string{"grpc_type", "grpc_service", "grpc_method"}), serverHandledCounter: prom.NewCounterVec( opts.apply(prom.CounterOpts{ Name: "grpc_server_handled_total", Help: "Total number of RPCs completed on the server, regardless of success or failure.", }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}), serverStreamMsgReceived: prom.NewCounterVec( opts.apply(prom.CounterOpts{ Name: "grpc_server_msg_received_total", Help: "Total number of RPC stream messages received on the server.", }), []string{"grpc_type", "grpc_service", "grpc_method"}), serverStreamMsgSent: prom.NewCounterVec( opts.apply(prom.CounterOpts{ Name: "grpc_server_msg_sent_total", Help: "Total number of gRPC stream messages sent by the server.", }), []string{"grpc_type", "grpc_service", "grpc_method"}), serverHandledHistogramEnabled: false, serverHandledHistogramOpts: prom.HistogramOpts{ Name: "grpc_server_handling_seconds", Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.", Buckets: prom.DefBuckets, }, serverHandledHistogram: nil, } } // EnableHandlingTimeHistogram enables histograms being registered when // registering the ServerMetrics on a Prometheus registry. Histograms can be // expensive on Prometheus servers. It takes options to configure histogram // options such as the defined buckets. func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) { for _, o := range opts { o(&m.serverHandledHistogramOpts) } if !m.serverHandledHistogramEnabled { m.serverHandledHistogram = prom.NewHistogramVec( m.serverHandledHistogramOpts, []string{"grpc_type", "grpc_service", "grpc_method"}, ) } m.serverHandledHistogramEnabled = true } // Describe sends the super-set of all possible descriptors of metrics // collected by this Collector to the provided channel and returns once // the last descriptor has been sent. func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) { m.serverStartedCounter.Describe(ch) m.serverHandledCounter.Describe(ch) m.serverStreamMsgReceived.Describe(ch) m.serverStreamMsgSent.Describe(ch) if m.serverHandledHistogramEnabled { m.serverHandledHistogram.Describe(ch) } } // Collect is called by the Prometheus registry when collecting // metrics. The implementation sends each collected metric via the // provided channel and returns once the last metric has been sent. func (m *ServerMetrics) Collect(ch chan<- prom.Metric) { m.serverStartedCounter.Collect(ch) m.serverHandledCounter.Collect(ch) m.serverStreamMsgReceived.Collect(ch) m.serverStreamMsgSent.Collect(ch) if m.serverHandledHistogramEnabled { m.serverHandledHistogram.Collect(ch) } } // UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs. func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { monitor := newServerReporter(m, Unary, info.FullMethod) monitor.ReceivedMessage() resp, err := handler(ctx, req) st, _ := status.FromError(err) monitor.Handled(st.Code()) if err == nil { monitor.SentMessage() } return resp, err } } // StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs. func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { monitor := newServerReporter(m, streamRPCType(info), info.FullMethod) err := handler(srv, &monitoredServerStream{ss, monitor}) st, _ := status.FromError(err) monitor.Handled(st.Code()) return err } } // InitializeMetrics initializes all metrics, with their appropriate null // value, for all gRPC methods registered on a gRPC server. This is useful, to // ensure that all metrics exist when collecting and querying. func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) { serviceInfo := server.GetServiceInfo() for serviceName, info := range serviceInfo { for _, mInfo := range info.Methods { preRegisterMethod(m, serviceName, &mInfo) } } } func streamRPCType(info *grpc.StreamServerInfo) grpcType { if info.IsClientStream && !info.IsServerStream { return ClientStream } else if !info.IsClientStream && info.IsServerStream { return ServerStream } return BidiStream } // monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters. type monitoredServerStream struct { grpc.ServerStream monitor *serverReporter } func (s *monitoredServerStream) SendMsg(m interface{}) error { err := s.ServerStream.SendMsg(m) if err == nil { s.monitor.SentMessage() } return err } func (s *monitoredServerStream) RecvMsg(m interface{}) error { err := s.ServerStream.RecvMsg(m) if err == nil { s.monitor.ReceivedMessage() } return err } // preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated. func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) { methodName := mInfo.Name methodType := string(typeFromMethodInfo(mInfo)) // These are just references (no increments), as just referencing will create the labels but not set values. metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName) metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName) metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName) if metrics.serverHandledHistogramEnabled { metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName) } for _, code := range allCodes { metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String()) } }