419 lines
11 KiB
Go
419 lines
11 KiB
Go
package task
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"time"
|
|
|
|
"git.ifooth.com/common/pkg/rest"
|
|
"git.ifooth.com/common/pkg/validator"
|
|
"github.com/samber/lo"
|
|
|
|
istore "git.ifooth.com/common/pkg/task/stores/iface"
|
|
"git.ifooth.com/common/pkg/task/types"
|
|
itypes "git.ifooth.com/common/pkg/task/types"
|
|
)
|
|
|
|
type service struct {
|
|
host string
|
|
routePrefix string
|
|
mgr *TaskManager
|
|
}
|
|
|
|
var (
|
|
// TaskStatusSlice ...
|
|
TaskStatusSlice = []string{
|
|
types.TaskStatusInit,
|
|
types.TaskStatusRunning,
|
|
types.TaskStatusSuccess,
|
|
types.TaskStatusFailure,
|
|
types.TaskStatusTimeout,
|
|
types.TaskStatusRevoked,
|
|
types.TaskStatusNotStarted,
|
|
}
|
|
)
|
|
|
|
func NewHandler(mgr *TaskManager, host string, routePrefix string) http.Handler {
|
|
mux := http.NewServeMux()
|
|
|
|
s := &service{
|
|
mgr: mgr,
|
|
host: host,
|
|
routePrefix: routePrefix,
|
|
}
|
|
|
|
// 任务管理
|
|
mux.HandleFunc("POST /update", rest.Handle(s.Update))
|
|
mux.HandleFunc("POST /retry", rest.Handle(s.Retry))
|
|
mux.HandleFunc("POST /revoke", rest.Handle(s.Revoke))
|
|
mux.HandleFunc("GET /status", rest.Handle(s.Status))
|
|
mux.HandleFunc("GET /list", rest.Handle(s.List))
|
|
|
|
return mux
|
|
}
|
|
|
|
// Task bcs_task task with execution duration
|
|
type Task struct {
|
|
*itypes.Task
|
|
Steps []*Step `json:"steps,omitempty"`
|
|
ExecutionDuration time.Duration `json:"executionDuration" swaggertype:"string"`
|
|
MaxExecutionDuration time.Duration `json:"maxExecutionDuration" swaggertype:"string"`
|
|
DetailURL string `json:"detailURL,omitempty"`
|
|
}
|
|
|
|
// Step bcs_task step with execution duration
|
|
type Step struct {
|
|
*itypes.Step
|
|
ExecutionDuration time.Duration `json:"executionDuration" swaggertype:"string"`
|
|
MaxExecutionDuration time.Duration `json:"maxExecutionDuration" swaggertype:"string"`
|
|
}
|
|
|
|
// StepReq ...
|
|
type StepReq struct {
|
|
Name string `json:"name"`
|
|
Params *map[string]string `json:"params"`
|
|
Payload *string `json:"payload"`
|
|
RetryCount *uint32 `json:"retryCount"`
|
|
Status *string `json:"status"`
|
|
}
|
|
|
|
// Validate ...
|
|
func (s *StepReq) Validate() error {
|
|
if s.Status != nil && !lo.Contains(TaskStatusSlice, *s.Status) {
|
|
return fmt.Errorf("step status %s not valid", *s.Status)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateReq 请求参数
|
|
type UpdateReq struct {
|
|
TaskID string `json:"taskID" validate:"required"`
|
|
ResetStartTime bool `json:"resetStartTime"`
|
|
Status string `json:"status"`
|
|
Steps []*StepReq `json:"steps"`
|
|
}
|
|
|
|
// Validate ...
|
|
func (r *UpdateReq) Validate() error {
|
|
if r.Status != "" && !lo.Contains(TaskStatusSlice, r.Status) {
|
|
return fmt.Errorf("task status %s not valid", r.Status)
|
|
}
|
|
|
|
for _, v := range r.Steps {
|
|
if err := v.Validate(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RetryReq 请求参数
|
|
type RetryReq struct {
|
|
TaskID string `json:"taskID" validate:"required"`
|
|
StepName string `json:"stepName"`
|
|
}
|
|
|
|
// Update 更新任务
|
|
//
|
|
// @ID UpdateTask
|
|
// @Summary 更新任务
|
|
// @Description 更新任务
|
|
// @Tags 任务管理
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param request body UpdateReq true "task info"
|
|
// @Success 200 {object} rest.APIResponse{data=nil}
|
|
// @Failure 400
|
|
// @x-bk-apigateway {"isPublic": false, "appVerifiedRequired": true, "userVerifiedRequired": false}
|
|
// @Router /api/v1/remotedevenv/task/update [post]
|
|
func (s *service) Update(ctx context.Context, req *UpdateReq) (*rest.EmptyResp, error) {
|
|
if err := validator.Struct(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
t, err := s.mgr.GetTaskWithID(ctx, req.TaskID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if req.ResetStartTime {
|
|
t.Start = time.Now()
|
|
}
|
|
if req.Status != "" {
|
|
t.Status = req.Status
|
|
}
|
|
|
|
for _, v := range req.Steps {
|
|
step, ok := t.GetStep(v.Name)
|
|
if !ok {
|
|
return nil, fmt.Errorf("step %s not found", v.Name)
|
|
}
|
|
if v.Params != nil {
|
|
step.Params = *v.Params
|
|
}
|
|
if v.RetryCount != nil {
|
|
step.RetryCount = *v.RetryCount
|
|
}
|
|
if v.Status != nil {
|
|
step.Status = *v.Status
|
|
}
|
|
if v.Payload != nil {
|
|
body, err := base64.StdEncoding.DecodeString(*v.Payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
step.Payload = string(body)
|
|
}
|
|
|
|
t.SetCurrentStep(v.Name)
|
|
// 只更新DB
|
|
if err := s.mgr.UpdateTask(ctx, t); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
resp := &rest.EmptyResp{}
|
|
return resp, nil
|
|
}
|
|
|
|
// Retry 重试任务
|
|
//
|
|
// @ID RetryTask
|
|
// @Summary 重试任务
|
|
// @Description 重试任务
|
|
// @Tags 任务管理,clouddev,cloudpc
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param request body RetryReq true "task info"
|
|
// @Success 200 {object} rest.APIResponse{data=nil}
|
|
// @Failure 400
|
|
// @x-bk-apigateway {"isPublic": false, "appVerifiedRequired": true, "userVerifiedRequired": false}
|
|
// @Router /api/v1/remotedevenv/task/retry [post]
|
|
func (s *service) Retry(ctx context.Context, req *RetryReq) (*rest.EmptyResp, error) {
|
|
if err := validator.Struct(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
t, err := s.mgr.GetTaskWithID(ctx, req.TaskID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp := &rest.EmptyResp{}
|
|
|
|
if t.Status != itypes.TaskStatusFailure &&
|
|
t.Status != itypes.TaskStatusRevoked &&
|
|
t.Status != itypes.TaskStatusTimeout {
|
|
return nil, fmt.Errorf("task %s already in process %s", req.TaskID, t.Status)
|
|
}
|
|
|
|
if t.Status == itypes.TaskStatusTimeout {
|
|
t.Start = time.Now()
|
|
}
|
|
|
|
// 只重试单步骤
|
|
if req.StepName != "" {
|
|
step, ok := t.GetStep(req.StepName)
|
|
if !ok {
|
|
return nil, fmt.Errorf("step %s not found", req.StepName)
|
|
}
|
|
if step.Status == itypes.TaskStatusSuccess {
|
|
return nil, fmt.Errorf("step %s already success", req.StepName)
|
|
}
|
|
|
|
if step.Status == itypes.TaskStatusFailure {
|
|
step.RetryCount = 0
|
|
step.Status = itypes.TaskStatusNotStarted
|
|
}
|
|
t.SetCurrentStep(req.StepName)
|
|
if err := s.mgr.RetryAt(t, req.StepName); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// 全量重试必须有失败的任务
|
|
notSuccessStep := lo.Filter(t.Steps, func(v *itypes.Step, _ int) bool {
|
|
return v.Status != itypes.TaskStatusSuccess
|
|
})
|
|
if len(notSuccessStep) == 0 {
|
|
return nil, fmt.Errorf("task %s all step already success", req.TaskID)
|
|
}
|
|
|
|
// 错误步骤重试次数置为0, 只当前步骤有效
|
|
for _, step := range t.Steps {
|
|
if step.Status == itypes.TaskStatusFailure {
|
|
step.RetryCount = 0
|
|
step.Status = itypes.TaskStatusNotStarted
|
|
}
|
|
}
|
|
// 任务级重试
|
|
if err := s.mgr.RetryAll(t); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// commonReq 任务请求参数
|
|
type commonReq struct {
|
|
TaskID string `json:"taskID" req:"taskID,in=query" validate:"required"`
|
|
}
|
|
|
|
// Revoke 取消任务
|
|
//
|
|
// @ID RevokeTask
|
|
// @Summary 取消任务
|
|
// @Description 取消任务
|
|
// @Tags 任务管理,clouddev,cloudpc
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param request body commonReq true "common info"
|
|
// @Success 200 {object} rest.APIResponse{data=nil}
|
|
// @Failure 400
|
|
// @x-bk-apigateway {"isPublic": false, "appVerifiedRequired": true, "userVerifiedRequired": false}
|
|
// @Router /api/v1/remotedevenv/task/revoke [post]
|
|
func (s *service) Revoke(ctx context.Context, req *commonReq) (*rest.EmptyResp, error) {
|
|
if err := validator.Struct(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
t, err := s.mgr.GetTaskWithID(ctx, req.TaskID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if t.Status != itypes.TaskStatusRunning {
|
|
return nil, fmt.Errorf("task %s not running", req.TaskID)
|
|
}
|
|
|
|
if err := s.mgr.Revoke(t); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &rest.EmptyResp{}, nil
|
|
}
|
|
|
|
// Status 任务状态
|
|
//
|
|
// @ID TaskStatus
|
|
// @Summary 任务状态
|
|
// @Description 获取任务状态
|
|
// @Tags 任务管理,clouddev,cloudpc
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param taskID query string true "task id"
|
|
// @Success 200 {object} rest.APIResponse{data=Task}
|
|
// @Failure 400
|
|
// @x-bk-apigateway {"isPublic": false, "appVerifiedRequired": true, "userVerifiedRequired": false}
|
|
// @Router /api/v1/remotedevenv/task/status [get]
|
|
func (s *service) Status(ctx context.Context, req *commonReq) (*Task, error) {
|
|
if err := validator.Struct(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
taskData, err := s.mgr.GetTaskWithID(ctx, req.TaskID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
steps := lo.Map(taskData.Steps, func(v *itypes.Step, _ int) *Step {
|
|
return &Step{
|
|
Step: v,
|
|
ExecutionDuration: time.Duration(v.ExecutionTime) * time.Millisecond,
|
|
MaxExecutionDuration: time.Duration(v.MaxExecutionSeconds) * time.Second,
|
|
}
|
|
})
|
|
|
|
t := &Task{
|
|
Task: taskData,
|
|
Steps: steps,
|
|
ExecutionDuration: time.Duration(taskData.ExecutionTime) * time.Millisecond,
|
|
MaxExecutionDuration: time.Duration(taskData.MaxExecutionSeconds) * time.Second,
|
|
}
|
|
|
|
return t, nil
|
|
}
|
|
|
|
// ListReq ...
|
|
type ListReq struct {
|
|
rest.PaginationReq
|
|
TaskID string `json:"taskID" req:"taskID,in=query"`
|
|
TaskType string `json:"taskType" req:"taskType,in=query"`
|
|
TaskName string `json:"taskName" req:"taskType,in=query"`
|
|
TaskIndex string `json:"taskIndex" req:"taskIndex,in=query"`
|
|
CurrentStep string `json:"currentStep" req:"currentStep,in=query"`
|
|
Status string `json:"status" req:"status,in=query" validate:"oneof='' SUCCESS FAILURE RUNNING TIMEOUT REVOKED NOTSTARTED INITIALIZING"` // nolint
|
|
Creator string `json:"creator" req:"creator,in=query"`
|
|
}
|
|
|
|
// List 任务分页列表
|
|
//
|
|
// @ID ListTask
|
|
// @Summary 任务列表
|
|
// @Description 获取任务列表
|
|
// @Tags 任务管理
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param offset query int false "offset"
|
|
// @Param limit query int false "limit"
|
|
// @Param taskID query string false "task id"
|
|
// @Param taskType query string false "task type"
|
|
// @Param taskName query string false "task name"
|
|
// @Param taskIndex query string false "task id"
|
|
// @Param currentStep query string false "current step"
|
|
// @Param status query string false "status"
|
|
// @Param creator query string false "creator"
|
|
// @Success 200 {object} rest.APIResponse{data=rest.PaginationResp[Task]}
|
|
// @Failure 400
|
|
// @x-bk-apigateway {"isPublic": false, "appVerifiedRequired": true, "userVerifiedRequired": false}
|
|
// @Router /api/v1/task/list [get]
|
|
func (s *service) List(ctx context.Context, req *ListReq) (*rest.PaginationResp[Task], error) {
|
|
if err := validator.Struct(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
statusURL := "/api/v1/task/status"
|
|
detailURL, err := url.JoinPath(s.host, s.routePrefix, statusURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 默认返回20条数据
|
|
if req.Limit == 0 {
|
|
req.Limit = 20
|
|
}
|
|
|
|
listReq := &istore.ListOption{
|
|
TaskID: req.TaskID,
|
|
TaskType: req.TaskType,
|
|
TaskName: req.TaskName,
|
|
TaskIndex: req.TaskIndex,
|
|
CurrentStep: req.CurrentStep,
|
|
Status: req.Status,
|
|
Creator: req.Creator,
|
|
Limit: int64(req.Limit),
|
|
Offset: int64(req.Offset),
|
|
}
|
|
|
|
result, err := s.mgr.ListTask(ctx, listReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
items := lo.Map(result.Items, func(v *itypes.Task, _ int) Task {
|
|
return Task{
|
|
Task: v,
|
|
ExecutionDuration: time.Duration(v.ExecutionTime) * time.Millisecond,
|
|
MaxExecutionDuration: time.Duration(v.MaxExecutionSeconds) * time.Second,
|
|
DetailURL: fmt.Sprintf("%s?taskID=%s", detailURL, v.TaskID),
|
|
}
|
|
})
|
|
|
|
resp := &rest.PaginationResp[Task]{
|
|
Items: items,
|
|
Count: result.Count,
|
|
}
|
|
return resp, nil
|
|
}
|