399 lines
11 KiB
Go
399 lines
11 KiB
Go
|
/*
|
||
|
* Tencent is pleased to support the open source community by making Blueking Container Service available.
|
||
|
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
|
||
|
* Licensed under the MIT License (the "License"); you may not use this file except
|
||
|
* in compliance with the License. You may obtain a copy of the License at
|
||
|
* http://opensource.org/licenses/MIT
|
||
|
* Unless required by applicable law or agreed to in writing, software distributed under
|
||
|
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||
|
* either express or implied. See the License for the specific language governing permissions and
|
||
|
* limitations under the License.
|
||
|
*/
|
||
|
|
||
|
// Package task is a package for task management
|
||
|
package task
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/RichardKnop/machinery/v2"
|
||
|
"github.com/RichardKnop/machinery/v2/backends/mongo"
|
||
|
"github.com/RichardKnop/machinery/v2/brokers/amqp"
|
||
|
"github.com/RichardKnop/machinery/v2/config"
|
||
|
"github.com/RichardKnop/machinery/v2/locks/eager"
|
||
|
"github.com/RichardKnop/machinery/v2/tasks"
|
||
|
|
||
|
types "git.ifooth.com/common/machinery-plugins/task/types"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
// DefaultWorkerConcurrency default worker concurrency
|
||
|
DefaultWorkerConcurrency = 10
|
||
|
)
|
||
|
|
||
|
// BrokerConfig config for go-machinery broker
|
||
|
type BrokerConfig struct {
|
||
|
QueueAddress string `json:"address"`
|
||
|
Exchange string `json:"exchange"`
|
||
|
}
|
||
|
|
||
|
// Manager manager for task server
|
||
|
type Manager struct {
|
||
|
moduleName string
|
||
|
lock sync.Locker
|
||
|
server *machinery.Server
|
||
|
worker *machinery.Worker
|
||
|
|
||
|
brokerConfig *BrokerConfig
|
||
|
|
||
|
workerNum int
|
||
|
stepWorkers map[string]StepWorkerInterface
|
||
|
callBackFuncs map[string]CallbackInterface
|
||
|
}
|
||
|
|
||
|
// ManagerConfig options for manager
|
||
|
type ManagerConfig struct {
|
||
|
ModuleName string
|
||
|
StepWorkers []StepWorkerInterface
|
||
|
CallBacks []CallbackInterface
|
||
|
WorkerNum int
|
||
|
Broker *BrokerConfig
|
||
|
}
|
||
|
|
||
|
// NewManager create new manager
|
||
|
func NewManager() *Manager {
|
||
|
m := &Manager{
|
||
|
lock: &sync.Mutex{},
|
||
|
workerNum: DefaultWorkerConcurrency,
|
||
|
}
|
||
|
return m
|
||
|
}
|
||
|
|
||
|
// Init init machinery server and worker
|
||
|
func (m *Manager) Init(cfg *ManagerConfig) error {
|
||
|
err := m.validate(cfg)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
m.brokerConfig = cfg.Broker
|
||
|
m.moduleName = cfg.ModuleName
|
||
|
if cfg.WorkerNum != 0 {
|
||
|
m.workerNum = cfg.WorkerNum
|
||
|
}
|
||
|
|
||
|
// save step workers and check duplicate
|
||
|
for _, w := range cfg.StepWorkers {
|
||
|
if _, ok := m.stepWorkers[w.GetName()]; ok {
|
||
|
return fmt.Errorf("step [%s] already exists", w.GetName())
|
||
|
}
|
||
|
m.stepWorkers[w.GetName()] = w
|
||
|
}
|
||
|
|
||
|
// save callbacks and check duplicate
|
||
|
for _, c := range cfg.CallBacks {
|
||
|
if _, ok := m.callBackFuncs[c.GetName()]; ok {
|
||
|
return fmt.Errorf("callback func [%s] already exists", c.GetName())
|
||
|
}
|
||
|
m.callBackFuncs[c.GetName()] = c
|
||
|
}
|
||
|
|
||
|
if err := m.initServer(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if err := m.initWorker(cfg.WorkerNum); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (m *Manager) validate(c *ManagerConfig) error {
|
||
|
// module name check
|
||
|
if c.ModuleName == "" {
|
||
|
return fmt.Errorf("module name is empty")
|
||
|
}
|
||
|
|
||
|
// step worker check
|
||
|
if c.StepWorkers == nil || len(c.StepWorkers) == 0 {
|
||
|
return fmt.Errorf("step worker is empty")
|
||
|
}
|
||
|
|
||
|
// broker config check
|
||
|
if c.Broker == nil || c.Broker.Exchange == "" || c.Broker.QueueAddress == "" {
|
||
|
return fmt.Errorf("broker config is empty")
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (m *Manager) initServer() error {
|
||
|
|
||
|
config := &config.Config{
|
||
|
Broker: m.brokerConfig.QueueAddress,
|
||
|
DefaultQueue: m.brokerConfig.Exchange,
|
||
|
ResultsExpireIn: 3600 * 48,
|
||
|
|
||
|
AMQP: &config.AMQPConfig{
|
||
|
Exchange: m.brokerConfig.Exchange,
|
||
|
ExchangeType: "direct",
|
||
|
BindingKey: m.brokerConfig.Exchange,
|
||
|
PrefetchCount: 50,
|
||
|
},
|
||
|
}
|
||
|
broker := amqp.New(config)
|
||
|
backend, err := mongo.New(config)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("task server init mongo backend failed, %s", err.Error())
|
||
|
}
|
||
|
lock := eager.New()
|
||
|
m.server = machinery.NewServer(config, broker, backend, lock)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// register step workers and init workers
|
||
|
func (m *Manager) initWorker(workerNum int) error {
|
||
|
// register all workers
|
||
|
if err := m.registerStepWorkers(); err != nil {
|
||
|
return fmt.Errorf("register workers failed, err: %s", err.Error())
|
||
|
}
|
||
|
|
||
|
m.worker = m.server.NewWorker("", workerNum)
|
||
|
|
||
|
preTaskHandler := func(signature *tasks.Signature) {
|
||
|
fmt.Printf("start task handler for: %s", signature.Name)
|
||
|
}
|
||
|
postTaskHandler := func(signature *tasks.Signature) {
|
||
|
fmt.Printf("end task handler for: %s", signature.Name)
|
||
|
}
|
||
|
errorHandler := func(err error) {
|
||
|
fmt.Printf("task error handler: %s", err)
|
||
|
}
|
||
|
|
||
|
m.worker.SetPreTaskHandler(preTaskHandler)
|
||
|
m.worker.SetPostTaskHandler(postTaskHandler)
|
||
|
m.worker.SetErrorHandler(errorHandler)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Run start worker
|
||
|
func (m *Manager) Run() {
|
||
|
// start worker
|
||
|
go func() {
|
||
|
if err := m.worker.Launch(); err != nil {
|
||
|
errMsg := fmt.Sprintf("task server worker launch failed, %s", err.Error())
|
||
|
panic(errMsg)
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
// GetTaskWithID get task by taskid
|
||
|
func (m *Manager) GetTaskWithID(ctx context.Context, taskid string) (*types.Task, error) {
|
||
|
return getGlobalStorage().GetTask(ctx, taskid)
|
||
|
}
|
||
|
|
||
|
// UpdateTask update task
|
||
|
// ! warning: modify task status will cause task status not consistent
|
||
|
func (m *Manager) UpdateTask(ctx context.Context, task *types.Task) error {
|
||
|
return getGlobalStorage().UpdateTask(ctx, task)
|
||
|
}
|
||
|
|
||
|
// PatchTaskInfo update task info
|
||
|
// ! warning: modify task status will cause task status not consistent
|
||
|
func (m *Manager) PatchTaskInfo(ctx context.Context, taskID string, patchs map[string]interface{}) error {
|
||
|
// warning:
|
||
|
return getGlobalStorage().PatchTask(ctx, taskID, patchs)
|
||
|
}
|
||
|
|
||
|
// RetryAll reset status to running and dispatch all tasks
|
||
|
func (m *Manager) RetryAll(task *types.Task) error {
|
||
|
task.SetStatus(types.TaskStatusRunning)
|
||
|
task.SetMessage("task retrying")
|
||
|
|
||
|
if err := getGlobalStorage().UpdateTask(context.Background(), task); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return m.dispatchAt(task, "")
|
||
|
}
|
||
|
|
||
|
// RetryAt reset status to running and dispatch tasks which begin with stepName
|
||
|
func (m *Manager) RetryAt(task *types.Task, stepName string) error {
|
||
|
task.SetStatus(types.TaskStatusRunning)
|
||
|
task.SetMessage("task retrying")
|
||
|
|
||
|
if err := getGlobalStorage().UpdateTask(context.Background(), task); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return m.dispatchAt(task, stepName)
|
||
|
}
|
||
|
|
||
|
// Dispatch dispatch task
|
||
|
func (m *Manager) Dispatch(task *types.Task) error {
|
||
|
if err := getGlobalStorage().CreateTask(context.Background(), task); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return m.dispatchAt(task, "")
|
||
|
}
|
||
|
|
||
|
// dispatchAt task to machinery
|
||
|
func (m *Manager) dispatchAt(task *types.Task, stepNameBegin string) error {
|
||
|
var signatures []*tasks.Signature
|
||
|
for _, stepName := range task.StepSequence {
|
||
|
// skip steps which before begin step, empty str not skip any steps
|
||
|
if stepName != "" && stepName != stepNameBegin {
|
||
|
continue
|
||
|
}
|
||
|
signature := &tasks.Signature{
|
||
|
UUID: fmt.Sprintf("task-%s-%s", task.GetTaskID(), stepName),
|
||
|
Name: stepName,
|
||
|
// two parameters: taskID, stepName
|
||
|
Args: []tasks.Arg{{Type: "string", Value: task.GetTaskID()}, {Type: "string", Value: stepName}},
|
||
|
IgnoreWhenTaskNotRegistered: true,
|
||
|
}
|
||
|
signatures = append(signatures, signature)
|
||
|
}
|
||
|
|
||
|
m.lock.Lock()
|
||
|
defer m.lock.Unlock()
|
||
|
|
||
|
// create chain
|
||
|
chain, _ := tasks.NewChain(signatures...)
|
||
|
|
||
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
||
|
if task.GetMaxExecutionSeconds() != time.Duration(0) {
|
||
|
ctx, cancelFunc = context.WithTimeout(ctx, task.GetMaxExecutionSeconds())
|
||
|
}
|
||
|
defer cancelFunc()
|
||
|
|
||
|
//send chain to machinery
|
||
|
asyncResult, err := m.server.SendChainWithContext(ctx, chain)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("send chain to machinery failed: %s", err.Error())
|
||
|
}
|
||
|
|
||
|
// get results
|
||
|
go func(t *types.Task, c *tasks.Chain) {
|
||
|
// check async results
|
||
|
for retry := 3; retry > 0; retry-- {
|
||
|
results, err := asyncResult.Get(time.Second * 5)
|
||
|
if err != nil {
|
||
|
fmt.Printf("tracing task %s result failed, %s. retry %d", t.GetTaskID(), err.Error(), retry)
|
||
|
continue
|
||
|
}
|
||
|
// check results
|
||
|
fmt.Printf("tracing task %s result %s", t.GetTaskID(), tasks.HumanReadableResults(results))
|
||
|
}
|
||
|
}(task, chain)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// registerStepWorkers build machinery workers for all step worker
|
||
|
func (m *Manager) registerStepWorkers() error {
|
||
|
allTasks := make(map[string]interface{}, 0)
|
||
|
for stepName, stepWorker := range m.stepWorkers {
|
||
|
do := stepWorker.DoWork
|
||
|
|
||
|
t := func(taskID string, stepName string) error {
|
||
|
start := time.Now()
|
||
|
state, step, err := m.getTaskStateAndCurrentStep(taskID, stepName)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// step executed success
|
||
|
if step == nil {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), step.GetMaxExecutionSeconds())
|
||
|
defer cancel()
|
||
|
stepDone := make(chan bool, 1)
|
||
|
|
||
|
go func() {
|
||
|
// call step worker
|
||
|
if err = do(state); err != nil {
|
||
|
if err := state.updateStepFailure(start, step.GetStepName(), err); err != nil {
|
||
|
fmt.Printf("update step %s to failure failed: %s", step.GetStepName(), err.Error())
|
||
|
}
|
||
|
|
||
|
// step done
|
||
|
stepDone <- true
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if err := state.updateStepSuccess(start, step.GetStepName()); err != nil {
|
||
|
fmt.Printf("update step %s to success failed: %s", step.GetStepName(), err.Error())
|
||
|
}
|
||
|
|
||
|
// step done
|
||
|
stepDone <- true
|
||
|
}()
|
||
|
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
retErr := fmt.Errorf("step %s timeout", step.GetStepName())
|
||
|
if err := state.updateStepFailure(start, step.GetStepName(), retErr); err != nil {
|
||
|
fmt.Printf("update step %s to failure failed: %s", step.GetStepName(), err.Error())
|
||
|
}
|
||
|
if !step.GetSkipOnFailed() {
|
||
|
return retErr
|
||
|
}
|
||
|
return nil
|
||
|
case <-stepDone:
|
||
|
// step done
|
||
|
if err != nil && !step.GetSkipOnFailed() {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if _, ok := allTasks[stepName]; ok {
|
||
|
return fmt.Errorf("task %s already exists", stepName)
|
||
|
}
|
||
|
allTasks[stepName] = t
|
||
|
}
|
||
|
err := m.server.RegisterTasks(allTasks)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// getTaskStateAndCurrentStep get task state and current step
|
||
|
func (m *Manager) getTaskStateAndCurrentStep(taskid, stepName string) (*State, *types.Step, error) {
|
||
|
task, err := getGlobalStorage().GetTask(context.Background(), taskid)
|
||
|
if err != nil {
|
||
|
return nil, nil, fmt.Errorf("get task %s information failed, %s", taskid, err.Error())
|
||
|
}
|
||
|
|
||
|
if task.CommonParams == nil {
|
||
|
task.CommonParams = make(map[string]string, 0)
|
||
|
}
|
||
|
|
||
|
state := NewState(task, stepName)
|
||
|
if state.isTaskTerminated() {
|
||
|
return nil, nil, fmt.Errorf("task %s is terminated, step %s skip", taskid, stepName)
|
||
|
}
|
||
|
step, err := state.isReadyToStep(stepName)
|
||
|
if err != nil {
|
||
|
return nil, nil, fmt.Errorf("task %s step %s is not ready, %s", taskid, stepName, err.Error())
|
||
|
}
|
||
|
if step == nil {
|
||
|
// step successful and skip
|
||
|
return state, nil, nil
|
||
|
}
|
||
|
|
||
|
// inject call back func
|
||
|
if state.task.GetCallback() != "" {
|
||
|
if callback, ok := m.callBackFuncs[state.task.GetCallback()]; ok {
|
||
|
state.callBack = callback.Callback
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return state, nil, nil
|
||
|
}
|