549 lines
18 KiB
Go
549 lines
18 KiB
Go
/*
|
|
* Copyright 2022 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package orca_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/balancer"
|
|
"google.golang.org/grpc/balancer/roundrobin"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/internal/grpctest"
|
|
"google.golang.org/grpc/internal/testutils"
|
|
"google.golang.org/grpc/orca"
|
|
"google.golang.org/grpc/orca/internal"
|
|
"google.golang.org/grpc/resolver"
|
|
"google.golang.org/grpc/resolver/manual"
|
|
"google.golang.org/grpc/status"
|
|
|
|
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
|
|
v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
|
|
v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
|
|
)
|
|
|
|
// customLBB wraps a round robin LB policy but provides a ClientConn wrapper to
|
|
// add an ORCA OOB report producer for all created SubConns.
|
|
type customLBB struct{}
|
|
|
|
func (customLBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
|
return balancer.Get(roundrobin.Name).Build(&ccWrapper{ClientConn: cc}, opts)
|
|
}
|
|
|
|
func (customLBB) Name() string { return "customLB" }
|
|
|
|
func init() {
|
|
balancer.Register(customLBB{})
|
|
}
|
|
|
|
type ccWrapper struct {
|
|
balancer.ClientConn
|
|
}
|
|
|
|
func (w *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
|
if len(addrs) != 1 {
|
|
panic(fmt.Sprintf("got addrs=%v; want len(addrs) == 1", addrs))
|
|
}
|
|
sc, err := w.ClientConn.NewSubConn(addrs, opts)
|
|
if err != nil {
|
|
return sc, err
|
|
}
|
|
l := getListenerInfo(addrs[0])
|
|
l.listener.cleanup = orca.RegisterOOBListener(sc, l.listener, l.opts)
|
|
l.sc = sc
|
|
return sc, nil
|
|
}
|
|
|
|
// listenerInfo is stored in an address's attributes to allow ORCA
|
|
// listeners to be registered on subconns created for that address.
|
|
type listenerInfo struct {
|
|
listener *testOOBListener
|
|
opts orca.OOBListenerOptions
|
|
sc balancer.SubConn // Set by the LB policy
|
|
}
|
|
|
|
type listenerInfoKey struct{}
|
|
|
|
func setListenerInfo(addr resolver.Address, l *listenerInfo) resolver.Address {
|
|
addr.Attributes = addr.Attributes.WithValue(listenerInfoKey{}, l)
|
|
return addr
|
|
}
|
|
|
|
func getListenerInfo(addr resolver.Address) *listenerInfo {
|
|
return addr.Attributes.Value(listenerInfoKey{}).(*listenerInfo)
|
|
}
|
|
|
|
// testOOBListener is a simple listener that pushes load reports to a channel.
|
|
type testOOBListener struct {
|
|
cleanup func()
|
|
loadReportCh chan *v3orcapb.OrcaLoadReport
|
|
}
|
|
|
|
func newTestOOBListener() *testOOBListener {
|
|
return &testOOBListener{cleanup: func() {}, loadReportCh: make(chan *v3orcapb.OrcaLoadReport)}
|
|
}
|
|
|
|
func (t *testOOBListener) Stop() { t.cleanup() }
|
|
|
|
func (t *testOOBListener) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
|
|
t.loadReportCh <- r
|
|
}
|
|
|
|
// TestProducer is a basic, end-to-end style test of an LB policy with an
|
|
// OOBListener communicating with a server with an ORCA service.
|
|
func (s) TestProducer(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
|
|
// Use a fixed backoff for stream recreation.
|
|
oldBackoff := internal.DefaultBackoffFunc
|
|
internal.DefaultBackoffFunc = func(int) time.Duration { return 10 * time.Millisecond }
|
|
defer func() { internal.DefaultBackoffFunc = oldBackoff }()
|
|
|
|
// Initialize listener for our ORCA server.
|
|
lis, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Register the OpenRCAService with a very short metrics reporting interval.
|
|
const shortReportingInterval = 50 * time.Millisecond
|
|
opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval}
|
|
internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts)
|
|
s := grpc.NewServer()
|
|
orcaSrv, err := orca.Register(s, opts)
|
|
if err != nil {
|
|
t.Fatalf("orca.Register failed: %v", err)
|
|
}
|
|
go s.Serve(lis)
|
|
defer s.Stop()
|
|
|
|
// Create our client with an OOB listener in the LB policy it selects.
|
|
r := manual.NewBuilderWithScheme("whatever")
|
|
oobLis := newTestOOBListener()
|
|
|
|
lisOpts := orca.OOBListenerOptions{ReportInterval: 50 * time.Millisecond}
|
|
li := &listenerInfo{listener: oobLis, opts: lisOpts}
|
|
addr := setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)
|
|
r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
|
|
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("grpc.Dial failed: %v", err)
|
|
}
|
|
defer cc.Close()
|
|
|
|
// Ensure the OOB listener is stopped before the client is closed to avoid
|
|
// a potential irrelevant error in the logs.
|
|
defer oobLis.Stop()
|
|
|
|
// Set a few metrics and wait for them on the client side.
|
|
orcaSrv.SetCPUUtilization(10)
|
|
orcaSrv.SetMemoryUtilization(100)
|
|
orcaSrv.SetUtilization("bob", 555)
|
|
loadReportWant := &v3orcapb.OrcaLoadReport{
|
|
CpuUtilization: 10,
|
|
MemUtilization: 100,
|
|
Utilization: map[string]float64{"bob": 555},
|
|
}
|
|
|
|
testReport:
|
|
for {
|
|
select {
|
|
case r := <-oobLis.loadReportCh:
|
|
t.Log("Load report received: ", r)
|
|
if proto.Equal(r, loadReportWant) {
|
|
// Success!
|
|
break testReport
|
|
}
|
|
case <-ctx.Done():
|
|
t.Fatalf("timed out waiting for load report: %v", loadReportWant)
|
|
}
|
|
}
|
|
|
|
// Change and add metrics and wait for them on the client side.
|
|
orcaSrv.SetCPUUtilization(50)
|
|
orcaSrv.SetMemoryUtilization(200)
|
|
orcaSrv.SetUtilization("mary", 321)
|
|
loadReportWant = &v3orcapb.OrcaLoadReport{
|
|
CpuUtilization: 50,
|
|
MemUtilization: 200,
|
|
Utilization: map[string]float64{"bob": 555, "mary": 321},
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case r := <-oobLis.loadReportCh:
|
|
t.Log("Load report received: ", r)
|
|
if proto.Equal(r, loadReportWant) {
|
|
// Success!
|
|
return
|
|
}
|
|
case <-ctx.Done():
|
|
t.Fatalf("timed out waiting for load report: %v", loadReportWant)
|
|
}
|
|
}
|
|
}
|
|
|
|
// fakeORCAService is a simple implementation of an ORCA service that pushes
|
|
// requests it receives from clients to a channel and sends responses from a
|
|
// channel back. This allows tests to verify the client is sending requests
|
|
// and processing responses properly.
|
|
type fakeORCAService struct {
|
|
v3orcaservicegrpc.UnimplementedOpenRcaServiceServer
|
|
|
|
reqCh chan *v3orcaservicepb.OrcaLoadReportRequest
|
|
respCh chan interface{} // either *v3orcapb.OrcaLoadReport or error
|
|
}
|
|
|
|
func newFakeORCAService() *fakeORCAService {
|
|
return &fakeORCAService{
|
|
reqCh: make(chan *v3orcaservicepb.OrcaLoadReportRequest),
|
|
respCh: make(chan interface{}),
|
|
}
|
|
}
|
|
|
|
func (f *fakeORCAService) close() {
|
|
close(f.respCh)
|
|
}
|
|
|
|
func (f *fakeORCAService) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
|
|
f.reqCh <- req
|
|
for resp := range f.respCh {
|
|
if err, ok := resp.(error); ok {
|
|
return err
|
|
}
|
|
if err := stream.Send(resp.(*v3orcapb.OrcaLoadReport)); err != nil {
|
|
// In the event that a stream error occurs, a new stream will have
|
|
// been created that was waiting for this response message. Push
|
|
// it back onto the channel and return.
|
|
//
|
|
// This happens because we range over respCh. If we changed to
|
|
// instead select on respCh + stream.Context(), the same situation
|
|
// could still occur due to a race between noticing the two events,
|
|
// so such a workaround would still be needed to prevent flakiness.
|
|
f.respCh <- resp
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TestProducerBackoff verifies that the ORCA producer applies the proper
|
|
// backoff after stream failures.
|
|
func (s) TestProducerBackoff(t *testing.T) {
|
|
grpctest.TLogger.ExpectErrorN("injected error", 4)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
|
|
// Provide a convenient way to expect backoff calls and return a minimal
|
|
// value.
|
|
const backoffShouldNotBeCalled = 9999 // Use to assert backoff function is not called.
|
|
const backoffAllowAny = -1 // Use to ignore any backoff calls.
|
|
expectedBackoff := backoffAllowAny
|
|
oldBackoff := internal.DefaultBackoffFunc
|
|
internal.DefaultBackoffFunc = func(got int) time.Duration {
|
|
if expectedBackoff == backoffShouldNotBeCalled {
|
|
t.Errorf("Unexpected backoff call; parameter = %v", got)
|
|
} else if expectedBackoff != backoffAllowAny {
|
|
if got != expectedBackoff {
|
|
t.Errorf("Unexpected backoff received; got %v want %v", got, expectedBackoff)
|
|
}
|
|
}
|
|
return time.Millisecond
|
|
}
|
|
defer func() { internal.DefaultBackoffFunc = oldBackoff }()
|
|
|
|
// Initialize listener for our ORCA server.
|
|
lis, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Register our fake ORCA service.
|
|
s := grpc.NewServer()
|
|
fake := newFakeORCAService()
|
|
defer fake.close()
|
|
v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, fake)
|
|
go s.Serve(lis)
|
|
defer s.Stop()
|
|
|
|
// Define the report interval and a function to wait for it to be sent to
|
|
// the server.
|
|
const reportInterval = 123 * time.Second
|
|
awaitRequest := func(interval time.Duration) {
|
|
select {
|
|
case req := <-fake.reqCh:
|
|
if got := req.GetReportInterval().AsDuration(); got != interval {
|
|
t.Errorf("Unexpected report interval; got %v want %v", got, interval)
|
|
}
|
|
case <-ctx.Done():
|
|
t.Fatalf("Did not receive client request")
|
|
}
|
|
}
|
|
|
|
// Create our client with an OOB listener in the LB policy it selects.
|
|
r := manual.NewBuilderWithScheme("whatever")
|
|
oobLis := newTestOOBListener()
|
|
|
|
lisOpts := orca.OOBListenerOptions{ReportInterval: reportInterval}
|
|
li := &listenerInfo{listener: oobLis, opts: lisOpts}
|
|
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
|
|
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("grpc.Dial failed: %v", err)
|
|
}
|
|
defer cc.Close()
|
|
|
|
// Ensure the OOB listener is stopped before the client is closed to avoid
|
|
// a potential irrelevant error in the logs.
|
|
defer oobLis.Stop()
|
|
|
|
// Define a load report to send and expect the client to see.
|
|
loadReportWant := &v3orcapb.OrcaLoadReport{
|
|
CpuUtilization: 10,
|
|
MemUtilization: 100,
|
|
Utilization: map[string]float64{"bob": 555},
|
|
}
|
|
|
|
// Unblock the fake.
|
|
awaitRequest(reportInterval)
|
|
fake.respCh <- loadReportWant
|
|
select {
|
|
case r := <-oobLis.loadReportCh:
|
|
t.Log("Load report received: ", r)
|
|
if proto.Equal(r, loadReportWant) {
|
|
// Success!
|
|
break
|
|
}
|
|
case <-ctx.Done():
|
|
t.Fatalf("timed out waiting for load report: %v", loadReportWant)
|
|
}
|
|
|
|
// The next request should be immediate, since there was a message
|
|
// received.
|
|
expectedBackoff = backoffShouldNotBeCalled
|
|
fake.respCh <- status.Errorf(codes.Internal, "injected error")
|
|
awaitRequest(reportInterval)
|
|
|
|
// The next requests will need to backoff.
|
|
expectedBackoff = 0
|
|
fake.respCh <- status.Errorf(codes.Internal, "injected error")
|
|
awaitRequest(reportInterval)
|
|
expectedBackoff = 1
|
|
fake.respCh <- status.Errorf(codes.Internal, "injected error")
|
|
awaitRequest(reportInterval)
|
|
expectedBackoff = 2
|
|
fake.respCh <- status.Errorf(codes.Internal, "injected error")
|
|
awaitRequest(reportInterval)
|
|
// The next request should be immediate, since there was a message
|
|
// received.
|
|
expectedBackoff = backoffShouldNotBeCalled
|
|
|
|
// Send another valid response and wait for it on the client.
|
|
fake.respCh <- loadReportWant
|
|
select {
|
|
case r := <-oobLis.loadReportCh:
|
|
t.Log("Load report received: ", r)
|
|
if proto.Equal(r, loadReportWant) {
|
|
// Success!
|
|
break
|
|
}
|
|
case <-ctx.Done():
|
|
t.Fatalf("timed out waiting for load report: %v", loadReportWant)
|
|
}
|
|
}
|
|
|
|
// TestProducerMultipleListeners tests that multiple listeners works as
|
|
// expected in a producer: requesting the proper interval and delivering the
|
|
// update to all listeners.
|
|
func (s) TestProducerMultipleListeners(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
|
|
// Provide a convenient way to expect backoff calls and return a minimal
|
|
// value.
|
|
oldBackoff := internal.DefaultBackoffFunc
|
|
internal.DefaultBackoffFunc = func(got int) time.Duration {
|
|
return time.Millisecond
|
|
}
|
|
defer func() { internal.DefaultBackoffFunc = oldBackoff }()
|
|
|
|
// Initialize listener for our ORCA server.
|
|
lis, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Register our fake ORCA service.
|
|
s := grpc.NewServer()
|
|
fake := newFakeORCAService()
|
|
defer fake.close()
|
|
v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, fake)
|
|
go s.Serve(lis)
|
|
defer s.Stop()
|
|
|
|
// Define the report interval and a function to wait for it to be sent to
|
|
// the server.
|
|
const reportInterval1 = 123 * time.Second
|
|
const reportInterval2 = 234 * time.Second
|
|
const reportInterval3 = 56 * time.Second
|
|
awaitRequest := func(interval time.Duration) {
|
|
select {
|
|
case req := <-fake.reqCh:
|
|
if got := req.GetReportInterval().AsDuration(); got != interval {
|
|
t.Errorf("Unexpected report interval; got %v want %v", got, interval)
|
|
}
|
|
case <-ctx.Done():
|
|
t.Fatalf("Did not receive client request")
|
|
}
|
|
}
|
|
|
|
// Create our client with an OOB listener in the LB policy it selects.
|
|
r := manual.NewBuilderWithScheme("whatever")
|
|
oobLis1 := newTestOOBListener()
|
|
lisOpts1 := orca.OOBListenerOptions{ReportInterval: reportInterval1}
|
|
li := &listenerInfo{listener: oobLis1, opts: lisOpts1}
|
|
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
|
|
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("grpc.Dial failed: %v", err)
|
|
}
|
|
defer cc.Close()
|
|
|
|
// Ensure the OOB listener is stopped before the client is closed to avoid
|
|
// a potential irrelevant error in the logs.
|
|
defer oobLis1.Stop()
|
|
|
|
oobLis2 := newTestOOBListener()
|
|
lisOpts2 := orca.OOBListenerOptions{ReportInterval: reportInterval2}
|
|
|
|
oobLis3 := newTestOOBListener()
|
|
lisOpts3 := orca.OOBListenerOptions{ReportInterval: reportInterval3}
|
|
|
|
// Define a load report to send and expect the client to see.
|
|
loadReportWant := &v3orcapb.OrcaLoadReport{
|
|
CpuUtilization: 10,
|
|
MemUtilization: 100,
|
|
Utilization: map[string]float64{"bob": 555},
|
|
}
|
|
|
|
// Receive reports and update counts for the three listeners.
|
|
var reportsMu sync.Mutex
|
|
var reportsReceived1, reportsReceived2, reportsReceived3 int
|
|
go func() {
|
|
for {
|
|
select {
|
|
case r := <-oobLis1.loadReportCh:
|
|
t.Log("Load report 1 received: ", r)
|
|
if !proto.Equal(r, loadReportWant) {
|
|
t.Errorf("Unexpected report received: %+v", r)
|
|
}
|
|
reportsMu.Lock()
|
|
reportsReceived1++
|
|
reportsMu.Unlock()
|
|
case r := <-oobLis2.loadReportCh:
|
|
t.Log("Load report 2 received: ", r)
|
|
if !proto.Equal(r, loadReportWant) {
|
|
t.Errorf("Unexpected report received: %+v", r)
|
|
}
|
|
reportsMu.Lock()
|
|
reportsReceived2++
|
|
reportsMu.Unlock()
|
|
case r := <-oobLis3.loadReportCh:
|
|
t.Log("Load report 3 received: ", r)
|
|
if !proto.Equal(r, loadReportWant) {
|
|
t.Errorf("Unexpected report received: %+v", r)
|
|
}
|
|
reportsMu.Lock()
|
|
reportsReceived3++
|
|
reportsMu.Unlock()
|
|
case <-ctx.Done():
|
|
// Test has ended; exit
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// checkReports is a helper function to check the report counts for the three listeners.
|
|
checkReports := func(r1, r2, r3 int) {
|
|
t.Helper()
|
|
for ctx.Err() == nil {
|
|
reportsMu.Lock()
|
|
if r1 == reportsReceived1 && r2 == reportsReceived2 && r3 == reportsReceived3 {
|
|
// Success!
|
|
reportsMu.Unlock()
|
|
return
|
|
}
|
|
if reportsReceived1 > r1 || reportsReceived2 > r2 || reportsReceived3 > r3 {
|
|
reportsMu.Unlock()
|
|
t.Fatalf("received excess reports. got %v %v %v; want %v %v %v", reportsReceived1, reportsReceived2, reportsReceived3, r1, r2, r3)
|
|
return
|
|
}
|
|
reportsMu.Unlock()
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
t.Fatalf("timed out waiting for reports received. got %v %v %v; want %v %v %v", reportsReceived1, reportsReceived2, reportsReceived3, r1, r2, r3)
|
|
}
|
|
|
|
// Only 1 listener; expect reportInterval1 to be used and expect the report
|
|
// to be sent to the listener.
|
|
awaitRequest(reportInterval1)
|
|
fake.respCh <- loadReportWant
|
|
checkReports(1, 0, 0)
|
|
|
|
// Register listener 2 with a less frequent interval; no need to recreate
|
|
// stream. Report should go to both listeners.
|
|
oobLis2.cleanup = orca.RegisterOOBListener(li.sc, oobLis2, lisOpts2)
|
|
fake.respCh <- loadReportWant
|
|
checkReports(2, 1, 0)
|
|
|
|
// Register listener 3 with a more frequent interval; stream is recreated
|
|
// with this interval after the next report is received. The first report
|
|
// will go to all three listeners.
|
|
oobLis3.cleanup = orca.RegisterOOBListener(li.sc, oobLis3, lisOpts3)
|
|
fake.respCh <- loadReportWant
|
|
checkReports(3, 2, 1)
|
|
awaitRequest(reportInterval3)
|
|
|
|
// Another report without a change in listeners should go to all three listeners.
|
|
fake.respCh <- loadReportWant
|
|
checkReports(4, 3, 2)
|
|
|
|
// Stop listener 2. This does not affect the interval as listener 3 is
|
|
// still the shortest. The next update goes to listeners 1 and 3.
|
|
oobLis2.Stop()
|
|
fake.respCh <- loadReportWant
|
|
checkReports(5, 3, 3)
|
|
|
|
// Stop listener 3. This makes the interval longer, with stream recreation
|
|
// delayed until the next report is received. Reports should only go to
|
|
// listener 1 now.
|
|
oobLis3.Stop()
|
|
fake.respCh <- loadReportWant
|
|
checkReports(6, 3, 3)
|
|
awaitRequest(reportInterval1)
|
|
// Another report without a change in listeners should go to the first listener.
|
|
fake.respCh <- loadReportWant
|
|
checkReports(7, 3, 3)
|
|
}
|