machinery-plugins/task/manager.go

399 lines
11 KiB
Go
Raw Normal View History

2024-06-01 06:25:48 +00:00
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package task is a package for task management
package task
import (
"context"
"fmt"
"sync"
"time"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/backends/mongo"
"github.com/RichardKnop/machinery/v2/brokers/amqp"
"github.com/RichardKnop/machinery/v2/config"
"github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2/tasks"
types "git.ifooth.com/common/machinery-plugins/task/types"
)
const (
// DefaultWorkerConcurrency default worker concurrency
DefaultWorkerConcurrency = 10
)
// BrokerConfig config for go-machinery broker
type BrokerConfig struct {
QueueAddress string `json:"address"`
Exchange string `json:"exchange"`
}
// Manager manager for task server
type Manager struct {
moduleName string
lock sync.Locker
server *machinery.Server
worker *machinery.Worker
brokerConfig *BrokerConfig
workerNum int
stepWorkers map[string]StepWorkerInterface
callBackFuncs map[string]CallbackInterface
}
// ManagerConfig options for manager
type ManagerConfig struct {
ModuleName string
StepWorkers []StepWorkerInterface
CallBacks []CallbackInterface
WorkerNum int
Broker *BrokerConfig
}
// NewManager create new manager
func NewManager() *Manager {
m := &Manager{
lock: &sync.Mutex{},
workerNum: DefaultWorkerConcurrency,
}
return m
}
// Init init machinery server and worker
func (m *Manager) Init(cfg *ManagerConfig) error {
err := m.validate(cfg)
if err != nil {
return err
}
m.brokerConfig = cfg.Broker
m.moduleName = cfg.ModuleName
if cfg.WorkerNum != 0 {
m.workerNum = cfg.WorkerNum
}
// save step workers and check duplicate
for _, w := range cfg.StepWorkers {
if _, ok := m.stepWorkers[w.GetName()]; ok {
return fmt.Errorf("step [%s] already exists", w.GetName())
}
m.stepWorkers[w.GetName()] = w
}
// save callbacks and check duplicate
for _, c := range cfg.CallBacks {
if _, ok := m.callBackFuncs[c.GetName()]; ok {
return fmt.Errorf("callback func [%s] already exists", c.GetName())
}
m.callBackFuncs[c.GetName()] = c
}
if err := m.initServer(); err != nil {
return err
}
if err := m.initWorker(cfg.WorkerNum); err != nil {
return err
}
return nil
}
func (m *Manager) validate(c *ManagerConfig) error {
// module name check
if c.ModuleName == "" {
return fmt.Errorf("module name is empty")
}
// step worker check
if c.StepWorkers == nil || len(c.StepWorkers) == 0 {
return fmt.Errorf("step worker is empty")
}
// broker config check
if c.Broker == nil || c.Broker.Exchange == "" || c.Broker.QueueAddress == "" {
return fmt.Errorf("broker config is empty")
}
return nil
}
func (m *Manager) initServer() error {
config := &config.Config{
Broker: m.brokerConfig.QueueAddress,
DefaultQueue: m.brokerConfig.Exchange,
ResultsExpireIn: 3600 * 48,
AMQP: &config.AMQPConfig{
Exchange: m.brokerConfig.Exchange,
ExchangeType: "direct",
BindingKey: m.brokerConfig.Exchange,
PrefetchCount: 50,
},
}
broker := amqp.New(config)
backend, err := mongo.New(config)
if err != nil {
return fmt.Errorf("task server init mongo backend failed, %s", err.Error())
}
lock := eager.New()
m.server = machinery.NewServer(config, broker, backend, lock)
return nil
}
// register step workers and init workers
func (m *Manager) initWorker(workerNum int) error {
// register all workers
if err := m.registerStepWorkers(); err != nil {
return fmt.Errorf("register workers failed, err: %s", err.Error())
}
m.worker = m.server.NewWorker("", workerNum)
preTaskHandler := func(signature *tasks.Signature) {
fmt.Printf("start task handler for: %s", signature.Name)
}
postTaskHandler := func(signature *tasks.Signature) {
fmt.Printf("end task handler for: %s", signature.Name)
}
errorHandler := func(err error) {
fmt.Printf("task error handler: %s", err)
}
m.worker.SetPreTaskHandler(preTaskHandler)
m.worker.SetPostTaskHandler(postTaskHandler)
m.worker.SetErrorHandler(errorHandler)
return nil
}
// Run start worker
func (m *Manager) Run() {
// start worker
go func() {
if err := m.worker.Launch(); err != nil {
errMsg := fmt.Sprintf("task server worker launch failed, %s", err.Error())
panic(errMsg)
}
}()
}
// GetTaskWithID get task by taskid
func (m *Manager) GetTaskWithID(ctx context.Context, taskid string) (*types.Task, error) {
return getGlobalStorage().GetTask(ctx, taskid)
}
// UpdateTask update task
// ! warning: modify task status will cause task status not consistent
func (m *Manager) UpdateTask(ctx context.Context, task *types.Task) error {
return getGlobalStorage().UpdateTask(ctx, task)
}
// PatchTaskInfo update task info
// ! warning: modify task status will cause task status not consistent
func (m *Manager) PatchTaskInfo(ctx context.Context, taskID string, patchs map[string]interface{}) error {
// warning:
return getGlobalStorage().PatchTask(ctx, taskID, patchs)
}
// RetryAll reset status to running and dispatch all tasks
func (m *Manager) RetryAll(task *types.Task) error {
task.SetStatus(types.TaskStatusRunning)
task.SetMessage("task retrying")
if err := getGlobalStorage().UpdateTask(context.Background(), task); err != nil {
return err
}
return m.dispatchAt(task, "")
}
// RetryAt reset status to running and dispatch tasks which begin with stepName
func (m *Manager) RetryAt(task *types.Task, stepName string) error {
task.SetStatus(types.TaskStatusRunning)
task.SetMessage("task retrying")
if err := getGlobalStorage().UpdateTask(context.Background(), task); err != nil {
return err
}
return m.dispatchAt(task, stepName)
}
// Dispatch dispatch task
func (m *Manager) Dispatch(task *types.Task) error {
if err := getGlobalStorage().CreateTask(context.Background(), task); err != nil {
return err
}
return m.dispatchAt(task, "")
}
// dispatchAt task to machinery
func (m *Manager) dispatchAt(task *types.Task, stepNameBegin string) error {
var signatures []*tasks.Signature
for _, stepName := range task.StepSequence {
// skip steps which before begin step, empty str not skip any steps
if stepName != "" && stepName != stepNameBegin {
continue
}
signature := &tasks.Signature{
UUID: fmt.Sprintf("task-%s-%s", task.GetTaskID(), stepName),
Name: stepName,
// two parameters: taskID, stepName
Args: []tasks.Arg{{Type: "string", Value: task.GetTaskID()}, {Type: "string", Value: stepName}},
IgnoreWhenTaskNotRegistered: true,
}
signatures = append(signatures, signature)
}
m.lock.Lock()
defer m.lock.Unlock()
// create chain
chain, _ := tasks.NewChain(signatures...)
ctx, cancelFunc := context.WithCancel(context.Background())
if task.GetMaxExecutionSeconds() != time.Duration(0) {
ctx, cancelFunc = context.WithTimeout(ctx, task.GetMaxExecutionSeconds())
}
defer cancelFunc()
//send chain to machinery
asyncResult, err := m.server.SendChainWithContext(ctx, chain)
if err != nil {
return fmt.Errorf("send chain to machinery failed: %s", err.Error())
}
// get results
go func(t *types.Task, c *tasks.Chain) {
// check async results
for retry := 3; retry > 0; retry-- {
results, err := asyncResult.Get(time.Second * 5)
if err != nil {
fmt.Printf("tracing task %s result failed, %s. retry %d", t.GetTaskID(), err.Error(), retry)
continue
}
// check results
fmt.Printf("tracing task %s result %s", t.GetTaskID(), tasks.HumanReadableResults(results))
}
}(task, chain)
return nil
}
// registerStepWorkers build machinery workers for all step worker
func (m *Manager) registerStepWorkers() error {
allTasks := make(map[string]interface{}, 0)
for stepName, stepWorker := range m.stepWorkers {
do := stepWorker.DoWork
t := func(taskID string, stepName string) error {
start := time.Now()
state, step, err := m.getTaskStateAndCurrentStep(taskID, stepName)
if err != nil {
return err
}
// step executed success
if step == nil {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), step.GetMaxExecutionSeconds())
defer cancel()
stepDone := make(chan bool, 1)
go func() {
// call step worker
if err = do(state); err != nil {
if err := state.updateStepFailure(start, step.GetStepName(), err); err != nil {
fmt.Printf("update step %s to failure failed: %s", step.GetStepName(), err.Error())
}
// step done
stepDone <- true
return
}
if err := state.updateStepSuccess(start, step.GetStepName()); err != nil {
fmt.Printf("update step %s to success failed: %s", step.GetStepName(), err.Error())
}
// step done
stepDone <- true
}()
select {
case <-ctx.Done():
retErr := fmt.Errorf("step %s timeout", step.GetStepName())
if err := state.updateStepFailure(start, step.GetStepName(), retErr); err != nil {
fmt.Printf("update step %s to failure failed: %s", step.GetStepName(), err.Error())
}
if !step.GetSkipOnFailed() {
return retErr
}
return nil
case <-stepDone:
// step done
if err != nil && !step.GetSkipOnFailed() {
return err
}
return nil
}
}
if _, ok := allTasks[stepName]; ok {
return fmt.Errorf("task %s already exists", stepName)
}
allTasks[stepName] = t
}
err := m.server.RegisterTasks(allTasks)
return err
}
// getTaskStateAndCurrentStep get task state and current step
func (m *Manager) getTaskStateAndCurrentStep(taskid, stepName string) (*State, *types.Step, error) {
task, err := getGlobalStorage().GetTask(context.Background(), taskid)
if err != nil {
return nil, nil, fmt.Errorf("get task %s information failed, %s", taskid, err.Error())
}
if task.CommonParams == nil {
task.CommonParams = make(map[string]string, 0)
}
state := NewState(task, stepName)
if state.isTaskTerminated() {
return nil, nil, fmt.Errorf("task %s is terminated, step %s skip", taskid, stepName)
}
step, err := state.isReadyToStep(stepName)
if err != nil {
return nil, nil, fmt.Errorf("task %s step %s is not ready, %s", taskid, stepName, err.Error())
}
if step == nil {
// step successful and skip
return state, nil, nil
}
// inject call back func
if state.task.GetCallback() != "" {
if callback, ok := m.callBackFuncs[state.task.GetCallback()]; ok {
state.callBack = callback.Callback
}
}
return state, nil, nil
}