mirror of
https://codeberg.org/forgejo/forgejo
synced 2024-11-29 21:26:10 +01:00
cbc9a0fe47
The name of the job or step comes from the workflow file, while the name of the runner comes from its registration. If the strings used for these names are too long, they could cause db issues.
224 lines
6.1 KiB
Go
224 lines
6.1 KiB
Go
// Copyright 2022 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package runner
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net/http"
|
|
|
|
actions_model "code.gitea.io/gitea/models/actions"
|
|
"code.gitea.io/gitea/modules/actions"
|
|
"code.gitea.io/gitea/modules/json"
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/util"
|
|
actions_service "code.gitea.io/gitea/services/actions"
|
|
|
|
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
|
"code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
|
|
"github.com/bufbuild/connect-go"
|
|
gouuid "github.com/google/uuid"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
func NewRunnerServiceHandler() (string, http.Handler) {
|
|
return runnerv1connect.NewRunnerServiceHandler(
|
|
&Service{},
|
|
connect.WithCompressMinBytes(1024),
|
|
withRunner,
|
|
)
|
|
}
|
|
|
|
var _ runnerv1connect.RunnerServiceClient = (*Service)(nil)
|
|
|
|
type Service struct {
|
|
runnerv1connect.UnimplementedRunnerServiceHandler
|
|
}
|
|
|
|
// Register for new runner.
|
|
func (s *Service) Register(
|
|
ctx context.Context,
|
|
req *connect.Request[runnerv1.RegisterRequest],
|
|
) (*connect.Response[runnerv1.RegisterResponse], error) {
|
|
if req.Msg.Token == "" || req.Msg.Name == "" {
|
|
return nil, errors.New("missing runner token, name")
|
|
}
|
|
|
|
runnerToken, err := actions_model.GetRunnerToken(ctx, req.Msg.Token)
|
|
if err != nil {
|
|
return nil, errors.New("runner token not found")
|
|
}
|
|
|
|
if runnerToken.IsActive {
|
|
return nil, errors.New("runner token has already activated")
|
|
}
|
|
|
|
// create new runner
|
|
name, _ := util.SplitStringAtByteN(req.Msg.Name, 255)
|
|
runner := &actions_model.ActionRunner{
|
|
UUID: gouuid.New().String(),
|
|
Name: name,
|
|
OwnerID: runnerToken.OwnerID,
|
|
RepoID: runnerToken.RepoID,
|
|
AgentLabels: req.Msg.AgentLabels,
|
|
CustomLabels: req.Msg.CustomLabels,
|
|
}
|
|
if err := runner.GenerateToken(); err != nil {
|
|
return nil, errors.New("can't generate token")
|
|
}
|
|
|
|
// create new runner
|
|
if err := actions_model.CreateRunner(ctx, runner); err != nil {
|
|
return nil, errors.New("can't create new runner")
|
|
}
|
|
|
|
// update token status
|
|
runnerToken.IsActive = true
|
|
if err := actions_model.UpdateRunnerToken(ctx, runnerToken, "is_active"); err != nil {
|
|
return nil, errors.New("can't update runner token status")
|
|
}
|
|
|
|
res := connect.NewResponse(&runnerv1.RegisterResponse{
|
|
Runner: &runnerv1.Runner{
|
|
Id: runner.ID,
|
|
Uuid: runner.UUID,
|
|
Token: runner.Token,
|
|
Name: runner.Name,
|
|
AgentLabels: runner.AgentLabels,
|
|
CustomLabels: runner.CustomLabels,
|
|
},
|
|
})
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// FetchTask assigns a task to the runner
|
|
func (s *Service) FetchTask(
|
|
ctx context.Context,
|
|
req *connect.Request[runnerv1.FetchTaskRequest],
|
|
) (*connect.Response[runnerv1.FetchTaskResponse], error) {
|
|
runner := GetRunner(ctx)
|
|
|
|
var task *runnerv1.Task
|
|
if t, ok, err := pickTask(ctx, runner); err != nil {
|
|
log.Error("pick task failed: %v", err)
|
|
return nil, status.Errorf(codes.Internal, "pick task: %v", err)
|
|
} else if ok {
|
|
task = t
|
|
}
|
|
|
|
res := connect.NewResponse(&runnerv1.FetchTaskResponse{
|
|
Task: task,
|
|
})
|
|
return res, nil
|
|
}
|
|
|
|
// UpdateTask updates the task status.
|
|
func (s *Service) UpdateTask(
|
|
ctx context.Context,
|
|
req *connect.Request[runnerv1.UpdateTaskRequest],
|
|
) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
|
|
{
|
|
// to debug strange runner behaviors, it could be removed if all problems have been solved.
|
|
stateMsg, _ := json.Marshal(req.Msg.State)
|
|
log.Trace("update task with state: %s", stateMsg)
|
|
}
|
|
|
|
// Get Task first
|
|
task, err := actions_model.GetTaskByID(ctx, req.Msg.State.Id)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "can't find the task: %v", err)
|
|
}
|
|
if task.Status.IsCancelled() {
|
|
return connect.NewResponse(&runnerv1.UpdateTaskResponse{
|
|
State: &runnerv1.TaskState{
|
|
Id: req.Msg.State.Id,
|
|
Result: task.Status.AsResult(),
|
|
},
|
|
}), nil
|
|
}
|
|
|
|
task, err = actions_model.UpdateTaskByState(ctx, req.Msg.State)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "update task: %v", err)
|
|
}
|
|
|
|
if err := task.LoadJob(ctx); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "load job: %v", err)
|
|
}
|
|
|
|
if err := actions_service.CreateCommitStatus(ctx, task.Job); err != nil {
|
|
log.Error("Update commit status failed: %v", err)
|
|
// go on
|
|
}
|
|
|
|
if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
|
|
if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil {
|
|
log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
|
|
}
|
|
}
|
|
|
|
return connect.NewResponse(&runnerv1.UpdateTaskResponse{
|
|
State: &runnerv1.TaskState{
|
|
Id: req.Msg.State.Id,
|
|
Result: task.Status.AsResult(),
|
|
},
|
|
}), nil
|
|
}
|
|
|
|
// UpdateLog uploads log of the task.
|
|
func (s *Service) UpdateLog(
|
|
ctx context.Context,
|
|
req *connect.Request[runnerv1.UpdateLogRequest],
|
|
) (*connect.Response[runnerv1.UpdateLogResponse], error) {
|
|
res := connect.NewResponse(&runnerv1.UpdateLogResponse{})
|
|
|
|
task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "get task: %v", err)
|
|
}
|
|
ack := task.LogLength
|
|
|
|
if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack {
|
|
res.Msg.AckIndex = ack
|
|
return res, nil
|
|
}
|
|
|
|
if task.LogInStorage {
|
|
return nil, status.Errorf(codes.AlreadyExists, "log file has been archived")
|
|
}
|
|
|
|
rows := req.Msg.Rows[ack-req.Msg.Index:]
|
|
ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "write logs: %v", err)
|
|
}
|
|
task.LogLength += int64(len(rows))
|
|
for _, n := range ns {
|
|
task.LogIndexes = append(task.LogIndexes, task.LogSize)
|
|
task.LogSize += int64(n)
|
|
}
|
|
|
|
res.Msg.AckIndex = task.LogLength
|
|
|
|
var remove func()
|
|
if req.Msg.NoMore {
|
|
task.LogInStorage = true
|
|
remove, err = actions.TransferLogs(ctx, task.LogFilename)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "transfer logs: %v", err)
|
|
}
|
|
}
|
|
|
|
if err := actions_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "update task: %v", err)
|
|
}
|
|
if remove != nil {
|
|
remove()
|
|
}
|
|
|
|
return res, nil
|
|
}
|