package main

import (
	"bytes"
	"encoding/base64"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"math/rand"
	"net"
	"net/http"
	"os"
	"os/exec"
	"path"
	"strings"
	"sync"
	"testing"
	"time"

	"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274
	"github.com/golang/protobuf/proto"  //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274
	"github.com/stretchr/testify/require"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"

	"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"

	"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
	"gitlab.com/gitlab-org/gitlab-workhorse/internal/git"
	"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
	"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
)

func TestFailedCloneNoGitaly(t *testing.T) {
	// Prepare clone directory
	require.NoError(t, os.RemoveAll(scratchDir))

	authBody := &api.Response{
		GL_ID:       "user-123",
		GL_USERNAME: "username",
		// This will create a failure to connect to Gitaly
		GitalyServer: gitaly.Server{Address: "unix:/nonexistent"},
	}

	// Prepare test server and backend
	ts := testAuthServer(t, nil, nil, 200, authBody)
	defer ts.Close()
	ws := startWorkhorseServer(ts.URL)
	defer ws.Close()

	// Do the git clone
	cloneCmd := exec.Command("git", "clone", fmt.Sprintf("%s/%s", ws.URL, testRepo), checkoutDir)
	out, err := cloneCmd.CombinedOutput()
	t.Log(string(out))
	require.Error(t, err, "git clone should have failed")
}

func TestGetInfoRefsProxiedToGitalySuccessfully(t *testing.T) {
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath

	apiResponse := gitOkBody(t)
	apiResponse.GitalyServer.Address = gitalyAddress

	goodMetadata := map[string]string{
		"gitaly-feature-foobar": "true",
		"gitaly-feature-bazqux": "false",
	}
	badMetadata := map[string]string{
		"bad-metadata": "is blocked",
	}

	features := make(map[string]string)
	for k, v := range goodMetadata {
		features[k] = v
	}
	for k, v := range badMetadata {
		features[k] = v
	}
	apiResponse.GitalyServer.Features = features

	testCases := []struct {
		showAllRefs bool
		gitRpc      string
	}{
		{showAllRefs: false, gitRpc: "git-upload-pack"},
		{showAllRefs: true, gitRpc: "git-upload-pack"},
		{showAllRefs: false, gitRpc: "git-receive-pack"},
		{showAllRefs: true, gitRpc: "git-receive-pack"},
	}

	for _, tc := range testCases {
		t.Run(fmt.Sprintf("ShowAllRefs=%v,gitRpc=%v", tc.showAllRefs, tc.gitRpc), func(t *testing.T) {
			apiResponse.ShowAllRefs = tc.showAllRefs

			ts := testAuthServer(t, nil, nil, 200, apiResponse)
			defer ts.Close()

			ws := startWorkhorseServer(ts.URL)
			defer ws.Close()

			gitProtocol := "fake git protocol"
			resource := "/gitlab-org/gitlab-test.git/info/refs?service=" + tc.gitRpc
			resp, body := httpGet(t, ws.URL+resource, map[string]string{"Git-Protocol": gitProtocol})

			require.Equal(t, 200, resp.StatusCode)

			bodySplit := strings.SplitN(body, "\000", 3)
			require.Len(t, bodySplit, 3)

			gitalyRequest := &gitalypb.InfoRefsRequest{}
			require.NoError(t, jsonpb.UnmarshalString(bodySplit[0], gitalyRequest))

			require.Equal(t, gitProtocol, gitalyRequest.GitProtocol)
			if tc.showAllRefs {
				require.Equal(t, []string{git.GitConfigShowAllRefs}, gitalyRequest.GitConfigOptions)
			} else {
				require.Empty(t, gitalyRequest.GitConfigOptions)
			}

			require.Equal(t, tc.gitRpc, bodySplit[1])

			require.Equal(t, string(testhelper.GitalyInfoRefsResponseMock), bodySplit[2], "GET %q: response body", resource)

			md := gitalyServer.LastIncomingMetadata
			for k, v := range goodMetadata {
				actual := md[k]
				require.Len(t, actual, 1, "number of metadata values for %v", k)
				require.Equal(t, v, actual[0], "value for %v", k)
			}

			for k := range badMetadata {
				actual := md[k]
				require.Empty(t, actual, "metadata for bad key %v", k)
			}
		})
	}
}

func TestGetInfoRefsProxiedToGitalyInterruptedStream(t *testing.T) {
	apiResponse := gitOkBody(t)
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath
	apiResponse.GitalyServer.Address = gitalyAddress

	ts := testAuthServer(t, nil, nil, 200, apiResponse)
	defer ts.Close()

	ws := startWorkhorseServer(ts.URL)
	defer ws.Close()

	resource := "/gitlab-org/gitlab-test.git/info/refs?service=git-upload-pack"
	resp, err := http.Get(ws.URL + resource)
	require.NoError(t, err)

	// This causes the server stream to be interrupted instead of consumed entirely.
	resp.Body.Close()

	done := make(chan struct{})
	go func() {
		gitalyServer.WaitGroup.Wait()
		close(done)
	}()

	waitDone(t, done)
}

func waitDone(t *testing.T, done chan struct{}) {
	t.Helper()
	select {
	case <-done:
		return
	case <-time.After(10 * time.Second):
		t.Fatal("time out waiting for gitaly handler to return")
	}
}

func TestPostReceivePackProxiedToGitalySuccessfully(t *testing.T) {
	apiResponse := gitOkBody(t)

	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	apiResponse.GitalyServer.Address = "unix:" + socketPath
	apiResponse.GitConfigOptions = []string{"git-config-hello=world"}
	ts := testAuthServer(t, nil, nil, 200, apiResponse)
	defer ts.Close()

	ws := startWorkhorseServer(ts.URL)
	defer ws.Close()

	gitProtocol := "fake Git protocol"
	resource := "/gitlab-org/gitlab-test.git/git-receive-pack"
	resp := httpPost(
		t,
		ws.URL+resource,
		map[string]string{
			"Content-Type": "application/x-git-receive-pack-request",
			"Git-Protocol": gitProtocol,
		},
		bytes.NewReader(testhelper.GitalyReceivePackResponseMock),
	)
	defer resp.Body.Close()
	body := string(testhelper.ReadAll(t, resp.Body))

	split := strings.SplitN(body, "\000", 2)
	require.Len(t, split, 2)

	gitalyRequest := &gitalypb.PostReceivePackRequest{}
	require.NoError(t, jsonpb.UnmarshalString(split[0], gitalyRequest))

	require.Equal(t, apiResponse.Repository.StorageName, gitalyRequest.Repository.StorageName)
	require.Equal(t, apiResponse.Repository.RelativePath, gitalyRequest.Repository.RelativePath)
	require.Equal(t, apiResponse.GL_ID, gitalyRequest.GlId)
	require.Equal(t, apiResponse.GL_USERNAME, gitalyRequest.GlUsername)
	require.Equal(t, apiResponse.GitConfigOptions, gitalyRequest.GitConfigOptions)
	require.Equal(t, gitProtocol, gitalyRequest.GitProtocol)

	require.Equal(t, 200, resp.StatusCode, "POST %q", resource)
	require.Equal(t, string(testhelper.GitalyReceivePackResponseMock), split[1])
	testhelper.RequireResponseHeader(t, resp, "Content-Type", "application/x-git-receive-pack-result")
}

func TestPostReceivePackProxiedToGitalyInterrupted(t *testing.T) {
	apiResponse := gitOkBody(t)

	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	apiResponse.GitalyServer.Address = "unix:" + socketPath
	ts := testAuthServer(t, nil, nil, 200, apiResponse)
	defer ts.Close()

	ws := startWorkhorseServer(ts.URL)
	defer ws.Close()

	resource := "/gitlab-org/gitlab-test.git/git-receive-pack"
	resp, err := http.Post(
		ws.URL+resource,
		"application/x-git-receive-pack-request",
		bytes.NewReader(testhelper.GitalyReceivePackResponseMock),
	)
	require.NoError(t, err)
	require.Equal(t, 200, resp.StatusCode, "POST %q", resource)

	// This causes the server stream to be interrupted instead of consumed entirely.
	resp.Body.Close()

	done := make(chan struct{})
	go func() {
		gitalyServer.WaitGroup.Wait()
		close(done)
	}()

	waitDone(t, done)
}

// ReaderFunc is an adapter to turn a conforming function into an io.Reader.
type ReaderFunc func(b []byte) (int, error)

func (r ReaderFunc) Read(b []byte) (int, error) { return r(b) }

func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) {
	for i, tc := range []struct {
		showAllRefs bool
		code        codes.Code
	}{
		{true, codes.OK},
		{true, codes.Unavailable},
		{false, codes.OK},
		{false, codes.Unavailable},
	} {
		t.Run(fmt.Sprintf("Case %d", i), func(t *testing.T) {
			apiResponse := gitOkBody(t)
			apiResponse.ShowAllRefs = tc.showAllRefs

			gitalyServer, socketPath := startGitalyServer(t, tc.code)
			defer gitalyServer.GracefulStop()

			apiResponse.GitalyServer.Address = "unix:" + socketPath
			ts := testAuthServer(t, nil, nil, 200, apiResponse)
			defer ts.Close()

			ws := startWorkhorseServer(ts.URL)
			defer ws.Close()

			gitProtocol := "fake git protocol"
			resource := "/gitlab-org/gitlab-test.git/git-upload-pack"

			requestReader := bytes.NewReader(testhelper.GitalyUploadPackResponseMock)
			var m sync.Mutex
			requestReadFinished := false
			resp := httpPost(
				t,
				ws.URL+resource,
				map[string]string{
					"Content-Type": "application/x-git-upload-pack-request",
					"Git-Protocol": gitProtocol,
				},
				ReaderFunc(func(b []byte) (int, error) {
					n, err := requestReader.Read(b)
					if err != nil {
						m.Lock()
						requestReadFinished = true
						m.Unlock()
					}
					return n, err
				}),
			)
			defer resp.Body.Close()
			require.Equal(t, 200, resp.StatusCode, "POST %q", resource)
			testhelper.RequireResponseHeader(t, resp, "Content-Type", "application/x-git-upload-pack-result")

			m.Lock()
			requestFinished := requestReadFinished
			m.Unlock()
			require.True(t, requestFinished, "response written before request was fully read")

			body := string(testhelper.ReadAll(t, resp.Body))
			bodySplit := strings.SplitN(body, "\000", 2)
			require.Len(t, bodySplit, 2)

			gitalyRequest := &gitalypb.PostUploadPackRequest{}
			require.NoError(t, jsonpb.UnmarshalString(bodySplit[0], gitalyRequest))

			require.Equal(t, apiResponse.Repository.StorageName, gitalyRequest.Repository.StorageName)
			require.Equal(t, apiResponse.Repository.RelativePath, gitalyRequest.Repository.RelativePath)
			require.Equal(t, gitProtocol, gitalyRequest.GitProtocol)

			if tc.showAllRefs {
				require.Equal(t, []string{git.GitConfigShowAllRefs}, gitalyRequest.GitConfigOptions)
			} else {
				require.Empty(t, gitalyRequest.GitConfigOptions)
			}

			require.Equal(t, string(testhelper.GitalyUploadPackResponseMock), bodySplit[1], "POST %q: response body", resource)
		})
	}
}

func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) {
	apiResponse := gitOkBody(t)

	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	apiResponse.GitalyServer.Address = "unix:" + socketPath
	ts := testAuthServer(t, nil, nil, 200, apiResponse)
	defer ts.Close()

	ws := startWorkhorseServer(ts.URL)
	defer ws.Close()

	resource := "/gitlab-org/gitlab-test.git/git-upload-pack"
	resp, err := http.Post(
		ws.URL+resource,
		"application/x-git-upload-pack-request",
		bytes.NewReader(testhelper.GitalyUploadPackResponseMock),
	)
	require.NoError(t, err)
	require.Equal(t, 200, resp.StatusCode, "POST %q", resource)

	// This causes the server stream to be interrupted instead of consumed entirely.
	resp.Body.Close()

	done := make(chan struct{})
	go func() {
		gitalyServer.WaitGroup.Wait()
		close(done)
	}()

	waitDone(t, done)
}

func TestGetDiffProxiedToGitalySuccessfully(t *testing.T) {
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath
	repoStorage := "default"
	rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
	leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
	repoRelativePath := "foo/bar.git"
	jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawDiffRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
		gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)
	expectedBody := testhelper.GitalyGetDiffResponseMock

	resp, body, err := doSendDataRequest("/something", "git-diff", jsonParams)
	require.NoError(t, err)

	require.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL)
	require.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL)
}

func TestGetPatchProxiedToGitalySuccessfully(t *testing.T) {
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath
	repoStorage := "default"
	rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
	leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
	repoRelativePath := "foo/bar.git"
	jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawPatchRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
		gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)
	expectedBody := testhelper.GitalyGetPatchResponseMock

	resp, body, err := doSendDataRequest("/something", "git-format-patch", jsonParams)
	require.NoError(t, err)

	require.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL)
	require.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL)
}

func TestGetBlobProxiedToGitalyInterruptedStream(t *testing.T) {
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath
	repoStorage := "default"
	oid := "54fcc214b94e78d7a41a9a8fe6d87a5e59500e51"
	repoRelativePath := "foo/bar.git"
	jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"GetBlobRequest":{"repository":{"storage_name":"%s","relative_path":"%s"},"oid":"%s","limit":-1}}`,
		gitalyAddress, repoStorage, repoRelativePath, oid)

	resp, _, err := doSendDataRequest("/something", "git-blob", jsonParams)
	require.NoError(t, err)

	// This causes the server stream to be interrupted instead of consumed entirely.
	resp.Body.Close()

	done := make(chan struct{})
	go func() {
		gitalyServer.WaitGroup.Wait()
		close(done)
	}()

	waitDone(t, done)
}

func TestGetArchiveProxiedToGitalySuccessfully(t *testing.T) {
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath
	repoStorage := "default"
	oid := "54fcc214b94e78d7a41a9a8fe6d87a5e59500e51"
	repoRelativePath := "foo/bar.git"
	archivePrefix := "repo-1"
	expectedBody := testhelper.GitalyGetArchiveResponseMock
	archiveLength := len(expectedBody)

	testCases := []struct {
		archivePath   string
		cacheDisabled bool
	}{
		{archivePath: path.Join(scratchDir, "my/path"), cacheDisabled: false},
		{archivePath: "/var/empty/my/path", cacheDisabled: true},
	}

	for _, tc := range testCases {
		jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"GitalyRepository":{"storage_name":"%s","relative_path":"%s"},"ArchivePath":"%s","ArchivePrefix":"%s","CommitId":"%s","DisableCache":%v}`,
			gitalyAddress, repoStorage, repoRelativePath, tc.archivePath, archivePrefix, oid, tc.cacheDisabled)
		resp, body, err := doSendDataRequest("/archive.tar.gz", "git-archive", jsonParams)
		require.NoError(t, err)

		require.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL)
		require.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL)
		require.Equal(t, archiveLength, len(body), "GET %q: body size", resp.Request.URL)

		if tc.cacheDisabled {
			_, err := os.Stat(tc.archivePath)
			require.True(t, os.IsNotExist(err), "expected 'does not exist', got: %v", err)
		} else {
			cachedArchive, err := ioutil.ReadFile(tc.archivePath)
			require.NoError(t, err)
			require.Equal(t, expectedBody, string(cachedArchive))
		}
	}
}

func TestGetArchiveProxiedToGitalyInterruptedStream(t *testing.T) {
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath
	repoStorage := "default"
	oid := "54fcc214b94e78d7a41a9a8fe6d87a5e59500e51"
	repoRelativePath := "foo/bar.git"
	archivePath := "my/path"
	archivePrefix := "repo-1"
	jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"GitalyRepository":{"storage_name":"%s","relative_path":"%s"},"ArchivePath":"%s","ArchivePrefix":"%s","CommitId":"%s"}`,
		gitalyAddress, repoStorage, repoRelativePath, path.Join(scratchDir, archivePath), archivePrefix, oid)

	resp, _, err := doSendDataRequest("/archive.tar.gz", "git-archive", jsonParams)
	require.NoError(t, err)

	// This causes the server stream to be interrupted instead of consumed entirely.
	resp.Body.Close()

	done := make(chan struct{})
	go func() {
		gitalyServer.WaitGroup.Wait()
		close(done)
	}()

	waitDone(t, done)
}

func TestGetDiffProxiedToGitalyInterruptedStream(t *testing.T) {
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath
	repoStorage := "default"
	rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
	leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
	repoRelativePath := "foo/bar.git"
	jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawDiffRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
		gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)

	resp, _, err := doSendDataRequest("/something", "git-diff", jsonParams)
	require.NoError(t, err)

	// This causes the server stream to be interrupted instead of consumed entirely.
	resp.Body.Close()

	done := make(chan struct{})
	go func() {
		gitalyServer.WaitGroup.Wait()
		close(done)
	}()

	waitDone(t, done)
}

func TestGetPatchProxiedToGitalyInterruptedStream(t *testing.T) {
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath
	repoStorage := "default"
	rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
	leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
	repoRelativePath := "foo/bar.git"
	jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawPatchRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
		gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)

	resp, _, err := doSendDataRequest("/something", "git-format-patch", jsonParams)
	require.NoError(t, err)

	// This causes the server stream to be interrupted instead of consumed entirely.
	resp.Body.Close()

	done := make(chan struct{})
	go func() {
		gitalyServer.WaitGroup.Wait()
		close(done)
	}()

	waitDone(t, done)
}

func TestGetSnapshotProxiedToGitalySuccessfully(t *testing.T) {
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath
	expectedBody := testhelper.GitalyGetSnapshotResponseMock
	archiveLength := len(expectedBody)

	params := buildGetSnapshotParams(gitalyAddress, buildPbRepo("default", "foo/bar.git"))
	resp, body, err := doSendDataRequest("/api/v4/projects/:id/snapshot", "git-snapshot", params)
	require.NoError(t, err)

	require.Equal(t, http.StatusOK, resp.StatusCode, "GET %q: status code", resp.Request.URL)
	require.Equal(t, expectedBody, string(body), "GET %q: body", resp.Request.URL)
	require.Equal(t, archiveLength, len(body), "GET %q: body size", resp.Request.URL)

	testhelper.RequireResponseHeader(t, resp, "Content-Disposition", `attachment; filename="snapshot.tar"`)
	testhelper.RequireResponseHeader(t, resp, "Content-Type", "application/x-tar")
	testhelper.RequireResponseHeader(t, resp, "Content-Transfer-Encoding", "binary")
	testhelper.RequireResponseHeader(t, resp, "Cache-Control", "private")
}

func TestGetSnapshotProxiedToGitalyInterruptedStream(t *testing.T) {
	gitalyServer, socketPath := startGitalyServer(t, codes.OK)
	defer gitalyServer.GracefulStop()

	gitalyAddress := "unix:" + socketPath

	params := buildGetSnapshotParams(gitalyAddress, buildPbRepo("default", "foo/bar.git"))
	resp, _, err := doSendDataRequest("/api/v4/projects/:id/snapshot", "git-snapshot", params)
	require.NoError(t, err)

	// This causes the server stream to be interrupted instead of consumed entirely.
	resp.Body.Close()

	done := make(chan struct{})
	go func() {
		gitalyServer.WaitGroup.Wait()
		close(done)
	}()

	waitDone(t, done)
}

func buildGetSnapshotParams(gitalyAddress string, repo *gitalypb.Repository) string {
	msg := serializedMessage("GetSnapshotRequest", &gitalypb.GetSnapshotRequest{Repository: repo})
	return buildGitalyRPCParams(gitalyAddress, msg)
}

type rpcArg struct {
	k string
	v interface{}
}

// Gitlab asks workhorse to perform some long-running RPCs for it by sending
// the RPC arguments (which are protobuf messages) in HTTP response headers.
// The messages are encoded to JSON objects using pbjson, The strings are then
// re-encoded to JSON strings using json. We must replicate this behaviour here
func buildGitalyRPCParams(gitalyAddress string, rpcArgs ...rpcArg) string {
	built := map[string]interface{}{
		"GitalyServer": map[string]string{
			"Address": gitalyAddress,
			"Token":   "",
		},
	}

	for _, arg := range rpcArgs {
		built[arg.k] = arg.v
	}

	b, err := json.Marshal(interface{}(built))
	if err != nil {
		panic(err)
	}

	return string(b)
}

func buildPbRepo(storageName, relativePath string) *gitalypb.Repository {
	return &gitalypb.Repository{
		StorageName:  storageName,
		RelativePath: relativePath,
	}
}

func serializedMessage(name string, arg proto.Message) rpcArg {
	m := &jsonpb.Marshaler{}
	str, err := m.MarshalToString(arg)
	if err != nil {
		panic(err)
	}

	return rpcArg{name, str}
}

func serializedProtoMessage(name string, arg proto.Message) rpcArg {
	msg, err := proto.Marshal(arg)

	if err != nil {
		panic(err)
	}

	return rpcArg{name, base64.URLEncoding.EncodeToString(msg)}
}

type combinedServer struct {
	*grpc.Server
	*testhelper.GitalyTestServer
}

func startGitalyServer(t *testing.T, finalMessageCode codes.Code) (*combinedServer, string) {
	socketPath := path.Join(scratchDir, fmt.Sprintf("gitaly-%d.sock", rand.Int()))
	if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
		t.Fatal(err)
	}
	server := grpc.NewServer()
	listener, err := net.Listen("unix", socketPath)
	require.NoError(t, err)

	gitalyServer := testhelper.NewGitalyServer(finalMessageCode)
	gitalypb.RegisterSmartHTTPServiceServer(server, gitalyServer)
	gitalypb.RegisterBlobServiceServer(server, gitalyServer)
	gitalypb.RegisterRepositoryServiceServer(server, gitalyServer)
	gitalypb.RegisterDiffServiceServer(server, gitalyServer)

	go server.Serve(listener)

	return &combinedServer{Server: server, GitalyTestServer: gitalyServer}, socketPath
}