ac384c4e1d
See [Defining outputs for jobs](https://docs.github.com/en/actions/using-jobs/defining-outputs-for-jobs) and [Example usage of the needs context](https://docs.github.com/en/actions/learn-github-actions/contexts#example-usage-of-the-needs-context). Related to: - [actions-proto-def #5](https://gitea.com/gitea/actions-proto-def/pulls/5) - [act_runner #133](https://gitea.com/gitea/act_runner/pulls/133) <details> <summary>Tests & screenshots</summary> Test workflow file: ```yaml name: outputs on: push jobs: job1: runs-on: ubuntu-latest outputs: output1: ${{ steps.step1.outputs.output1 }} output2: ${{ steps.step2.outputs.output2 }} steps: - name: step1 id: step1 run: | date -Is > output1 cat output1 echo "output1=$(cat output1)" >> $GITHUB_OUTPUT - name: step2 id: step2 run: | cat /proc/sys/kernel/random/uuid > output2 cat output2 echo "output2=$(cat output2)" >> $GITHUB_OUTPUT job2: needs: job1 runs-on: ubuntu-latest steps: - run: echo ${{ needs.job1.outputs.output1 }} - run: echo ${{ needs.job1.outputs.output2 }} - run: echo ${{ needs.job1.result }} ``` <img width="397" alt="image" src="https://user-images.githubusercontent.com/9418365/233313322-903e7ebf-49a7-48e2-8c17-95a4581b3284.png"> <img width="385" alt="image" src="https://user-images.githubusercontent.com/9418365/233313442-30909135-1711-4b78-a5c6-133fcc79f47c.png"> </details> --------- Co-authored-by: Giteabot <teabot@gitea.io>
246 lines
7.2 KiB
Go
246 lines
7.2 KiB
Go
// Copyright 2022 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package runner
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net/http"
|
|
|
|
actions_model "code.gitea.io/gitea/models/actions"
|
|
"code.gitea.io/gitea/modules/actions"
|
|
"code.gitea.io/gitea/modules/json"
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/util"
|
|
actions_service "code.gitea.io/gitea/services/actions"
|
|
|
|
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
|
"code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
|
|
"github.com/bufbuild/connect-go"
|
|
gouuid "github.com/google/uuid"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
func NewRunnerServiceHandler() (string, http.Handler) {
|
|
return runnerv1connect.NewRunnerServiceHandler(
|
|
&Service{},
|
|
connect.WithCompressMinBytes(1024),
|
|
withRunner,
|
|
)
|
|
}
|
|
|
|
var _ runnerv1connect.RunnerServiceClient = (*Service)(nil)
|
|
|
|
type Service struct {
|
|
runnerv1connect.UnimplementedRunnerServiceHandler
|
|
}
|
|
|
|
// Register for new runner.
|
|
func (s *Service) Register(
|
|
ctx context.Context,
|
|
req *connect.Request[runnerv1.RegisterRequest],
|
|
) (*connect.Response[runnerv1.RegisterResponse], error) {
|
|
if req.Msg.Token == "" || req.Msg.Name == "" {
|
|
return nil, errors.New("missing runner token, name")
|
|
}
|
|
|
|
runnerToken, err := actions_model.GetRunnerToken(ctx, req.Msg.Token)
|
|
if err != nil {
|
|
return nil, errors.New("runner token not found")
|
|
}
|
|
|
|
if runnerToken.IsActive {
|
|
return nil, errors.New("runner token has already been activated")
|
|
}
|
|
|
|
// create new runner
|
|
name, _ := util.SplitStringAtByteN(req.Msg.Name, 255)
|
|
runner := &actions_model.ActionRunner{
|
|
UUID: gouuid.New().String(),
|
|
Name: name,
|
|
OwnerID: runnerToken.OwnerID,
|
|
RepoID: runnerToken.RepoID,
|
|
AgentLabels: req.Msg.AgentLabels,
|
|
CustomLabels: req.Msg.CustomLabels,
|
|
}
|
|
if err := runner.GenerateToken(); err != nil {
|
|
return nil, errors.New("can't generate token")
|
|
}
|
|
|
|
// create new runner
|
|
if err := actions_model.CreateRunner(ctx, runner); err != nil {
|
|
return nil, errors.New("can't create new runner")
|
|
}
|
|
|
|
// update token status
|
|
runnerToken.IsActive = true
|
|
if err := actions_model.UpdateRunnerToken(ctx, runnerToken, "is_active"); err != nil {
|
|
return nil, errors.New("can't update runner token status")
|
|
}
|
|
|
|
res := connect.NewResponse(&runnerv1.RegisterResponse{
|
|
Runner: &runnerv1.Runner{
|
|
Id: runner.ID,
|
|
Uuid: runner.UUID,
|
|
Token: runner.Token,
|
|
Name: runner.Name,
|
|
AgentLabels: runner.AgentLabels,
|
|
CustomLabels: runner.CustomLabels,
|
|
},
|
|
})
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// FetchTask assigns a task to the runner
|
|
func (s *Service) FetchTask(
|
|
ctx context.Context,
|
|
_ *connect.Request[runnerv1.FetchTaskRequest],
|
|
) (*connect.Response[runnerv1.FetchTaskResponse], error) {
|
|
runner := GetRunner(ctx)
|
|
|
|
var task *runnerv1.Task
|
|
if t, ok, err := pickTask(ctx, runner); err != nil {
|
|
log.Error("pick task failed: %v", err)
|
|
return nil, status.Errorf(codes.Internal, "pick task: %v", err)
|
|
} else if ok {
|
|
task = t
|
|
}
|
|
|
|
res := connect.NewResponse(&runnerv1.FetchTaskResponse{
|
|
Task: task,
|
|
})
|
|
return res, nil
|
|
}
|
|
|
|
// UpdateTask updates the task status.
|
|
func (s *Service) UpdateTask(
|
|
ctx context.Context,
|
|
req *connect.Request[runnerv1.UpdateTaskRequest],
|
|
) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
|
|
{
|
|
// to debug strange runner behaviors, it could be removed if all problems have been solved.
|
|
stateMsg, _ := json.Marshal(req.Msg.State)
|
|
log.Trace("update task with state: %s", stateMsg)
|
|
}
|
|
|
|
// Get Task first
|
|
task, err := actions_model.GetTaskByID(ctx, req.Msg.State.Id)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "can't find the task: %v", err)
|
|
}
|
|
if task.Status.IsCancelled() {
|
|
return connect.NewResponse(&runnerv1.UpdateTaskResponse{
|
|
State: &runnerv1.TaskState{
|
|
Id: req.Msg.State.Id,
|
|
Result: task.Status.AsResult(),
|
|
},
|
|
}), nil
|
|
}
|
|
|
|
task, err = actions_model.UpdateTaskByState(ctx, req.Msg.State)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "update task: %v", err)
|
|
}
|
|
|
|
for k, v := range req.Msg.Outputs {
|
|
if len(k) > 255 {
|
|
log.Warn("Ignore the output of task %d because the key is too long: %q", task.ID, k)
|
|
continue
|
|
}
|
|
// The value can be a maximum of 1 MB
|
|
if l := len(v); l > 1024*1024 {
|
|
log.Warn("Ignore the output %q of task %d because the value is too long: %v", k, task.ID, l)
|
|
continue
|
|
}
|
|
// There's another limitation on GitHub that the total of all outputs in a workflow run can be a maximum of 50 MB.
|
|
// We don't check the total size here because it's not easy to do, and it doesn't really worth it.
|
|
// See https://docs.github.com/en/actions/using-jobs/defining-outputs-for-jobs
|
|
|
|
if err := actions_model.InsertTaskOutputIfNotExist(ctx, task.ID, k, v); err != nil {
|
|
log.Warn("Failed to insert the output %q of task %d: %v", k, task.ID, err)
|
|
// It's ok not to return errors, the runner will resend the outputs.
|
|
}
|
|
}
|
|
sentOutputs, err := actions_model.FindTaskOutputKeyByTaskID(ctx, task.ID)
|
|
if err != nil {
|
|
log.Warn("Failed to find the sent outputs of task %d: %v", task.ID, err)
|
|
// It's not to return errors, it can be handled when the runner resends sent outputs.
|
|
}
|
|
|
|
if err := task.LoadJob(ctx); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "load job: %v", err)
|
|
}
|
|
|
|
actions_service.CreateCommitStatus(ctx, task.Job)
|
|
|
|
if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
|
|
if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil {
|
|
log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
|
|
}
|
|
}
|
|
|
|
return connect.NewResponse(&runnerv1.UpdateTaskResponse{
|
|
State: &runnerv1.TaskState{
|
|
Id: req.Msg.State.Id,
|
|
Result: task.Status.AsResult(),
|
|
},
|
|
SentOutputs: sentOutputs,
|
|
}), nil
|
|
}
|
|
|
|
// UpdateLog uploads log of the task.
|
|
func (s *Service) UpdateLog(
|
|
ctx context.Context,
|
|
req *connect.Request[runnerv1.UpdateLogRequest],
|
|
) (*connect.Response[runnerv1.UpdateLogResponse], error) {
|
|
res := connect.NewResponse(&runnerv1.UpdateLogResponse{})
|
|
|
|
task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "get task: %v", err)
|
|
}
|
|
ack := task.LogLength
|
|
|
|
if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack {
|
|
res.Msg.AckIndex = ack
|
|
return res, nil
|
|
}
|
|
|
|
if task.LogInStorage {
|
|
return nil, status.Errorf(codes.AlreadyExists, "log file has been archived")
|
|
}
|
|
|
|
rows := req.Msg.Rows[ack-req.Msg.Index:]
|
|
ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "write logs: %v", err)
|
|
}
|
|
task.LogLength += int64(len(rows))
|
|
for _, n := range ns {
|
|
task.LogIndexes = append(task.LogIndexes, task.LogSize)
|
|
task.LogSize += int64(n)
|
|
}
|
|
|
|
res.Msg.AckIndex = task.LogLength
|
|
|
|
var remove func()
|
|
if req.Msg.NoMore {
|
|
task.LogInStorage = true
|
|
remove, err = actions.TransferLogs(ctx, task.LogFilename)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "transfer logs: %v", err)
|
|
}
|
|
}
|
|
|
|
if err := actions_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "update task: %v", err)
|
|
}
|
|
if remove != nil {
|
|
remove()
|
|
}
|
|
|
|
return res, nil
|
|
}
|