dex/vendor/sigs.k8s.io/testing_frameworks/integration/internal/process.go

215 lines
4.8 KiB
Go
Raw Normal View History

2020-01-31 15:02:00 +05:30
package internal
import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"strconv"
"time"
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"
"sigs.k8s.io/testing_frameworks/integration/addr"
)
type ProcessState struct {
DefaultedProcessInput
Session *gexec.Session
// Healthcheck Endpoint. If we get http.StatusOK from this endpoint, we
// assume the process is ready to operate. E.g. "/healthz". If this is set,
// we ignore StartMessage.
HealthCheckEndpoint string
// HealthCheckPollInterval is the interval which will be used for polling the
// HealthCheckEndpoint.
// If left empty it will default to 100 Milliseconds.
HealthCheckPollInterval time.Duration
// StartMessage is the message to wait for on stderr. If we recieve this
// message, we assume the process is ready to operate. Ignored if
// HealthCheckEndpoint is specified.
//
// The usage of StartMessage is discouraged, favour HealthCheckEndpoint
// instead!
//
// Deprecated: Use HealthCheckEndpoint in favour of StartMessage
StartMessage string
Args []string
// ready holds wether the process is currently in ready state (hit the ready condition) or not.
// It will be set to true on a successful `Start()` and set to false on a successful `Stop()`
ready bool
}
type DefaultedProcessInput struct {
URL url.URL
Dir string
DirNeedsCleaning bool
Path string
StopTimeout time.Duration
StartTimeout time.Duration
}
func DoDefaulting(
name string,
listenUrl *url.URL,
dir string,
path string,
startTimeout time.Duration,
stopTimeout time.Duration,
) (DefaultedProcessInput, error) {
defaults := DefaultedProcessInput{
Dir: dir,
Path: path,
StartTimeout: startTimeout,
StopTimeout: stopTimeout,
}
if listenUrl == nil {
port, host, err := addr.Suggest()
if err != nil {
return DefaultedProcessInput{}, err
}
defaults.URL = url.URL{
Scheme: "http",
Host: net.JoinHostPort(host, strconv.Itoa(port)),
}
} else {
defaults.URL = *listenUrl
}
if dir == "" {
newDir, err := ioutil.TempDir("", "k8s_test_framework_")
if err != nil {
return DefaultedProcessInput{}, err
}
defaults.Dir = newDir
defaults.DirNeedsCleaning = true
}
if path == "" {
if name == "" {
return DefaultedProcessInput{}, fmt.Errorf("must have at least one of name or path")
}
defaults.Path = BinPathFinder(name)
}
if startTimeout == 0 {
defaults.StartTimeout = 20 * time.Second
}
if stopTimeout == 0 {
defaults.StopTimeout = 20 * time.Second
}
return defaults, nil
}
type stopChannel chan struct{}
func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) {
if ps.ready {
return nil
}
command := exec.Command(ps.Path, ps.Args...)
ready := make(chan bool)
timedOut := time.After(ps.StartTimeout)
var pollerStopCh stopChannel
if ps.HealthCheckEndpoint != "" {
healthCheckURL := ps.URL
healthCheckURL.Path = ps.HealthCheckEndpoint
pollerStopCh = make(stopChannel)
go pollURLUntilOK(healthCheckURL, ps.HealthCheckPollInterval, ready, pollerStopCh)
} else {
startDetectStream := gbytes.NewBuffer()
ready = startDetectStream.Detect(ps.StartMessage)
stderr = safeMultiWriter(stderr, startDetectStream)
}
ps.Session, err = gexec.Start(command, stdout, stderr)
if err != nil {
return err
}
select {
case <-ready:
ps.ready = true
return nil
case <-timedOut:
if pollerStopCh != nil {
close(pollerStopCh)
}
if ps.Session != nil {
ps.Session.Terminate()
}
return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path))
}
}
func safeMultiWriter(writers ...io.Writer) io.Writer {
safeWriters := []io.Writer{}
for _, w := range writers {
if w != nil {
safeWriters = append(safeWriters, w)
}
}
return io.MultiWriter(safeWriters...)
}
func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) {
if interval <= 0 {
interval = 100 * time.Millisecond
}
for {
res, err := http.Get(url.String())
if err == nil && res.StatusCode == http.StatusOK {
ready <- true
return
}
select {
case <-stopCh:
return
default:
time.Sleep(interval)
}
}
}
func (ps *ProcessState) Stop() error {
if ps.Session == nil {
return nil
}
// gexec's Session methods (Signal, Kill, ...) do not check if the Process is
// nil, so we are doing this here for now.
// This should probably be fixed in gexec.
if ps.Session.Command.Process == nil {
return nil
}
detectedStop := ps.Session.Terminate().Exited
timedOut := time.After(ps.StopTimeout)
select {
case <-detectedStop:
break
case <-timedOut:
return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path))
}
ps.ready = false
if ps.DirNeedsCleaning {
return os.RemoveAll(ps.Dir)
}
return nil
}