init task

Browse Source
main
git 2024-06-01 14:25:48 +08:00
parent e2e29db2d1
commit 8be1a8ff3b
Signed by: git
GPG Key ID: 3F65EFFA44207ADD
8 changed files with 1391 additions and 0 deletions

398
task/manager.go Normal file
View File

@ -0,0 +1,398 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package task is a package for task management
package task
import (
"context"
"fmt"
"sync"
"time"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/backends/mongo"
"github.com/RichardKnop/machinery/v2/brokers/amqp"
"github.com/RichardKnop/machinery/v2/config"
"github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2/tasks"
types "git.ifooth.com/common/machinery-plugins/task/types"
)
const (
// DefaultWorkerConcurrency default worker concurrency
DefaultWorkerConcurrency = 10
)
// BrokerConfig config for go-machinery broker
type BrokerConfig struct {
QueueAddress string `json:"address"`
Exchange string `json:"exchange"`
}
// Manager manager for task server
type Manager struct {
moduleName string
lock sync.Locker
server *machinery.Server
worker *machinery.Worker
brokerConfig *BrokerConfig
workerNum int
stepWorkers map[string]StepWorkerInterface
callBackFuncs map[string]CallbackInterface
}
// ManagerConfig options for manager
type ManagerConfig struct {
ModuleName string
StepWorkers []StepWorkerInterface
CallBacks []CallbackInterface
WorkerNum int
Broker *BrokerConfig
}
// NewManager create new manager
func NewManager() *Manager {
m := &Manager{
lock: &sync.Mutex{},
workerNum: DefaultWorkerConcurrency,
}
return m
}
// Init init machinery server and worker
func (m *Manager) Init(cfg *ManagerConfig) error {
err := m.validate(cfg)
if err != nil {
return err
}
m.brokerConfig = cfg.Broker
m.moduleName = cfg.ModuleName
if cfg.WorkerNum != 0 {
m.workerNum = cfg.WorkerNum
}
// save step workers and check duplicate
for _, w := range cfg.StepWorkers {
if _, ok := m.stepWorkers[w.GetName()]; ok {
return fmt.Errorf("step [%s] already exists", w.GetName())
}
m.stepWorkers[w.GetName()] = w
}
// save callbacks and check duplicate
for _, c := range cfg.CallBacks {
if _, ok := m.callBackFuncs[c.GetName()]; ok {
return fmt.Errorf("callback func [%s] already exists", c.GetName())
}
m.callBackFuncs[c.GetName()] = c
}
if err := m.initServer(); err != nil {
return err
}
if err := m.initWorker(cfg.WorkerNum); err != nil {
return err
}
return nil
}
func (m *Manager) validate(c *ManagerConfig) error {
// module name check
if c.ModuleName == "" {
return fmt.Errorf("module name is empty")
}
// step worker check
if c.StepWorkers == nil || len(c.StepWorkers) == 0 {
return fmt.Errorf("step worker is empty")
}
// broker config check
if c.Broker == nil || c.Broker.Exchange == "" || c.Broker.QueueAddress == "" {
return fmt.Errorf("broker config is empty")
}
return nil
}
func (m *Manager) initServer() error {
config := &config.Config{
Broker: m.brokerConfig.QueueAddress,
DefaultQueue: m.brokerConfig.Exchange,
ResultsExpireIn: 3600 * 48,
AMQP: &config.AMQPConfig{
Exchange: m.brokerConfig.Exchange,
ExchangeType: "direct",
BindingKey: m.brokerConfig.Exchange,
PrefetchCount: 50,
},
}
broker := amqp.New(config)
backend, err := mongo.New(config)
if err != nil {
return fmt.Errorf("task server init mongo backend failed, %s", err.Error())
}
lock := eager.New()
m.server = machinery.NewServer(config, broker, backend, lock)
return nil
}
// register step workers and init workers
func (m *Manager) initWorker(workerNum int) error {
// register all workers
if err := m.registerStepWorkers(); err != nil {
return fmt.Errorf("register workers failed, err: %s", err.Error())
}
m.worker = m.server.NewWorker("", workerNum)
preTaskHandler := func(signature *tasks.Signature) {
fmt.Printf("start task handler for: %s", signature.Name)
}
postTaskHandler := func(signature *tasks.Signature) {
fmt.Printf("end task handler for: %s", signature.Name)
}
errorHandler := func(err error) {
fmt.Printf("task error handler: %s", err)
}
m.worker.SetPreTaskHandler(preTaskHandler)
m.worker.SetPostTaskHandler(postTaskHandler)
m.worker.SetErrorHandler(errorHandler)
return nil
}
// Run start worker
func (m *Manager) Run() {
// start worker
go func() {
if err := m.worker.Launch(); err != nil {
errMsg := fmt.Sprintf("task server worker launch failed, %s", err.Error())
panic(errMsg)
}
}()
}
// GetTaskWithID get task by taskid
func (m *Manager) GetTaskWithID(ctx context.Context, taskid string) (*types.Task, error) {
return getGlobalStorage().GetTask(ctx, taskid)
}
// UpdateTask update task
// ! warning: modify task status will cause task status not consistent
func (m *Manager) UpdateTask(ctx context.Context, task *types.Task) error {
return getGlobalStorage().UpdateTask(ctx, task)
}
// PatchTaskInfo update task info
// ! warning: modify task status will cause task status not consistent
func (m *Manager) PatchTaskInfo(ctx context.Context, taskID string, patchs map[string]interface{}) error {
// warning:
return getGlobalStorage().PatchTask(ctx, taskID, patchs)
}
// RetryAll reset status to running and dispatch all tasks
func (m *Manager) RetryAll(task *types.Task) error {
task.SetStatus(types.TaskStatusRunning)
task.SetMessage("task retrying")
if err := getGlobalStorage().UpdateTask(context.Background(), task); err != nil {
return err
}
return m.dispatchAt(task, "")
}
// RetryAt reset status to running and dispatch tasks which begin with stepName
func (m *Manager) RetryAt(task *types.Task, stepName string) error {
task.SetStatus(types.TaskStatusRunning)
task.SetMessage("task retrying")
if err := getGlobalStorage().UpdateTask(context.Background(), task); err != nil {
return err
}
return m.dispatchAt(task, stepName)
}
// Dispatch dispatch task
func (m *Manager) Dispatch(task *types.Task) error {
if err := getGlobalStorage().CreateTask(context.Background(), task); err != nil {
return err
}
return m.dispatchAt(task, "")
}
// dispatchAt task to machinery
func (m *Manager) dispatchAt(task *types.Task, stepNameBegin string) error {
var signatures []*tasks.Signature
for _, stepName := range task.StepSequence {
// skip steps which before begin step, empty str not skip any steps
if stepName != "" && stepName != stepNameBegin {
continue
}
signature := &tasks.Signature{
UUID: fmt.Sprintf("task-%s-%s", task.GetTaskID(), stepName),
Name: stepName,
// two parameters: taskID, stepName
Args: []tasks.Arg{{Type: "string", Value: task.GetTaskID()}, {Type: "string", Value: stepName}},
IgnoreWhenTaskNotRegistered: true,
}
signatures = append(signatures, signature)
}
m.lock.Lock()
defer m.lock.Unlock()
// create chain
chain, _ := tasks.NewChain(signatures...)
ctx, cancelFunc := context.WithCancel(context.Background())
if task.GetMaxExecutionSeconds() != time.Duration(0) {
ctx, cancelFunc = context.WithTimeout(ctx, task.GetMaxExecutionSeconds())
}
defer cancelFunc()
//send chain to machinery
asyncResult, err := m.server.SendChainWithContext(ctx, chain)
if err != nil {
return fmt.Errorf("send chain to machinery failed: %s", err.Error())
}
// get results
go func(t *types.Task, c *tasks.Chain) {
// check async results
for retry := 3; retry > 0; retry-- {
results, err := asyncResult.Get(time.Second * 5)
if err != nil {
fmt.Printf("tracing task %s result failed, %s. retry %d", t.GetTaskID(), err.Error(), retry)
continue
}
// check results
fmt.Printf("tracing task %s result %s", t.GetTaskID(), tasks.HumanReadableResults(results))
}
}(task, chain)
return nil
}
// registerStepWorkers build machinery workers for all step worker
func (m *Manager) registerStepWorkers() error {
allTasks := make(map[string]interface{}, 0)
for stepName, stepWorker := range m.stepWorkers {
do := stepWorker.DoWork
t := func(taskID string, stepName string) error {
start := time.Now()
state, step, err := m.getTaskStateAndCurrentStep(taskID, stepName)
if err != nil {
return err
}
// step executed success
if step == nil {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), step.GetMaxExecutionSeconds())
defer cancel()
stepDone := make(chan bool, 1)
go func() {
// call step worker
if err = do(state); err != nil {
if err := state.updateStepFailure(start, step.GetStepName(), err); err != nil {
fmt.Printf("update step %s to failure failed: %s", step.GetStepName(), err.Error())
}
// step done
stepDone <- true
return
}
if err := state.updateStepSuccess(start, step.GetStepName()); err != nil {
fmt.Printf("update step %s to success failed: %s", step.GetStepName(), err.Error())
}
// step done
stepDone <- true
}()
select {
case <-ctx.Done():
retErr := fmt.Errorf("step %s timeout", step.GetStepName())
if err := state.updateStepFailure(start, step.GetStepName(), retErr); err != nil {
fmt.Printf("update step %s to failure failed: %s", step.GetStepName(), err.Error())
}
if !step.GetSkipOnFailed() {
return retErr
}
return nil
case <-stepDone:
// step done
if err != nil && !step.GetSkipOnFailed() {
return err
}
return nil
}
}
if _, ok := allTasks[stepName]; ok {
return fmt.Errorf("task %s already exists", stepName)
}
allTasks[stepName] = t
}
err := m.server.RegisterTasks(allTasks)
return err
}
// getTaskStateAndCurrentStep get task state and current step
func (m *Manager) getTaskStateAndCurrentStep(taskid, stepName string) (*State, *types.Step, error) {
task, err := getGlobalStorage().GetTask(context.Background(), taskid)
if err != nil {
return nil, nil, fmt.Errorf("get task %s information failed, %s", taskid, err.Error())
}
if task.CommonParams == nil {
task.CommonParams = make(map[string]string, 0)
}
state := NewState(task, stepName)
if state.isTaskTerminated() {
return nil, nil, fmt.Errorf("task %s is terminated, step %s skip", taskid, stepName)
}
step, err := state.isReadyToStep(stepName)
if err != nil {
return nil, nil, fmt.Errorf("task %s step %s is not ready, %s", taskid, stepName, err.Error())
}
if step == nil {
// step successful and skip
return state, nil, nil
}
// inject call back func
if state.task.GetCallback() != "" {
if callback, ok := m.callBackFuncs[state.task.GetCallback()]; ok {
state.callBack = callback.Callback
}
}
return state, nil, nil
}

285
task/state.go Normal file
View File

@ -0,0 +1,285 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package task is a package for task management
package task
import (
"context"
"fmt"
"time"
types "git.ifooth.com/common/machinery-plugins/task/types"
)
// State is a struct for task state
type State struct {
task *types.Task
currentStep string
callBack func(isSuccess bool, task *types.Task)
}
// NewState return state relative to task
func NewState(task *types.Task, currentStep string) *State {
return &State{
task: task,
currentStep: currentStep,
}
}
// isTaskTerminated is terminated
func (s *State) isTaskTerminated() bool {
status := s.task.GetStatus()
if status == types.TaskStatusFailure || status == types.TaskStatusForceTerminate ||
status == types.TaskStatusTimeout || status == types.TaskStatusSuccess {
return true
}
return false
}
// isReadyToStep check if step is ready to step
func (s *State) isReadyToStep(stepName string) (*types.Step, error) {
switch s.task.GetStatus() {
case types.TaskStatusRunning, types.TaskStatusInit:
case types.TaskStatusForceTerminate:
return nil, fmt.Errorf("task %s state for terminate", s.task.GetTaskID())
default:
return nil, fmt.Errorf("task %s is not running, state is %s", s.task.GetTaskID(), s.task.GetStatus())
}
// validate step existence
curStep, ok := s.task.GetStep(stepName)
if !ok {
return nil, fmt.Errorf("step %s is not exist", stepName)
}
// return nil & nil means step had been executed
if curStep.GetStatus() == types.TaskStatusSuccess {
// step is success, skip
return nil, nil
}
// not first time to execute current step
if stepName == s.task.GetCurrentStep() {
if curStep.GetStatus() == types.TaskStatusFailure {
curStep.AddRetryCount(1)
}
nowTime := time.Now()
curStep = curStep.SetStartTime(nowTime).
SetStatus(types.TaskStatusRunning).
SetMessage("step ready to run").
SetLastUpdate(nowTime)
// update Task in storage
if err := getGlobalStorage().UpdateTask(context.Background(), s.task); err != nil {
return nil, err
}
return curStep, nil
}
// first time to execute step
for _, name := range s.task.StepSequence {
step, ok := s.task.GetStep(name)
if !ok {
return nil, fmt.Errorf("step %s is not exist", stepName)
}
// find current step
if name == stepName {
// step already success
if step.GetStatus() == types.TaskStatusSuccess {
return nil, fmt.Errorf("task %s step %s already success", s.task.GetTaskID(), stepName)
}
// set current step
nowTime := time.Now()
s.task.SetCurrentStep(stepName)
step = step.SetStartTime(nowTime).
SetStatus(types.TaskStatusRunning).
SetMessage("step ready to run").
SetLastUpdate(nowTime)
// update Task in storage
if err := getGlobalStorage().UpdateTask(context.Background(), s.task); err != nil {
return nil, fmt.Errorf("update task %s step %s status error", s.task.GetTaskID(), stepName)
}
return step, nil
}
// skip step if step allow skipOnFailed
if step.SkipOnFailed {
continue
}
// previous step execute failure
if step.GetStatus() != types.TaskStatusSuccess {
break
}
}
// previous step execute failure
return nil, fmt.Errorf("step %s is not ready", stepName)
}
// updateStepSuccess update step status to success
func (s *State) updateStepSuccess(start time.Time, stepName string) error {
step, ok := s.task.GetStep(stepName)
if !ok {
return fmt.Errorf("step %s is not exist", stepName)
}
endTime := time.Now()
step.SetStartTime(start).
SetEndTime(endTime).
SetExecutionTime(start, endTime).
SetStatus(types.TaskStatusSuccess).
SetMessage(fmt.Sprintf("step %s running successfully", step.Name)).
SetLastUpdate(endTime)
s.task.SetStatus(types.TaskStatusRunning).
SetMessage(fmt.Sprintf("step %s running successfully", step.Name)).
SetLastUpdate(endTime)
// last step
if s.isLastStep(stepName) {
taskStartTime, err := s.task.GetStartTime()
if err != nil {
return fmt.Errorf(fmt.Sprintf("get task %s start time error", s.task.GetTaskID()))
}
s.task.SetEndTime(endTime).
SetExecutionTime(taskStartTime, endTime).
SetStatus(types.TaskStatusSuccess).
SetMessage("task finished successfully")
// callback
if s.callBack != nil {
s.callBack(true, s.task)
}
}
// update Task in storage
if err := getGlobalStorage().UpdateTask(context.Background(), s.task); err != nil {
return fmt.Errorf("update task %s status error", s.task.GetTaskID())
}
return nil
}
// updateStepFailure update step status to failure
func (s *State) updateStepFailure(start time.Time, stepName string, stepErr error) error {
step, ok := s.task.GetStep(stepName)
if !ok {
return fmt.Errorf("step %s is not exist", stepName)
}
endTime := time.Now()
step.SetStartTime(start).
SetEndTime(endTime).
SetExecutionTime(start, endTime).
SetStatus(types.TaskStatusFailure).
SetMessage(fmt.Sprintf("running failed, %s", stepErr.Error())).
SetLastUpdate(endTime)
// if step SkipOnFailed, update task status to running
if step.GetSkipOnFailed() {
// skip, set task running or success
s.task.SetStatus(types.TaskStatusRunning).
SetMessage(fmt.Sprintf("step %s running failed", step.Name)).
SetLastUpdate(endTime)
// last step failed and skipOnFailed is true, update task status to success
if s.isLastStep(stepName) {
// last step
taskStartTime, err := s.task.GetStartTime()
if err != nil {
return fmt.Errorf(fmt.Sprintf("get task %s start time error", s.task.GetTaskID()))
}
s.task.SetStatus(types.TaskStatusSuccess).
SetMessage("task finished successfully").
SetLastUpdate(endTime).
SetEndTime(endTime).
SetExecutionTime(taskStartTime, endTime)
// callback
if s.callBack != nil {
s.callBack(true, s.task)
}
}
} else {
// not skip, set task faile
s.task.SetStatus(types.TaskStatusFailure).
SetMessage(fmt.Sprintf("step %s running failed", step.Name)).
SetLastUpdate(endTime).
SetEndTime(endTime).
SetExecutionTime(start, endTime)
// callback
if s.callBack != nil {
s.callBack(false, s.task)
}
}
// update Task in storage
if err := getGlobalStorage().UpdateTask(context.Background(), s.task); err != nil {
return fmt.Errorf("update task %s status error", s.task.GetTaskID())
}
return nil
}
func (s *State) isLastStep(stepName string) bool {
return stepName == s.task.StepSequence[len(s.task.StepSequence)-1]
}
// GetCommonParams get common params by key
func (s *State) GetCommonParams(key string) (string, bool) {
return s.task.GetCommonParams(key)
}
// AddCommonParams add common params
func (s *State) AddCommonParams(key, value string) *State {
s.task.AddCommonParams(key, value)
return s
}
// GetExtraParams get extra params by obj
func (s *State) GetExtraParams(obj interface{}) error {
return s.task.GetExtra(obj)
}
// SetExtraAll set extra params by obj
func (s *State) SetExtraAll(obj interface{}) error {
return s.task.SetExtraAll(obj)
}
// GetStepParam get step params by key
func (s *State) GetStepParam(stepName, key string) (string, bool) {
return s.task.GetStepParam(stepName, key)
}
// AddStepParams add step params
func (s *State) AddStepParams(stepName, key, value string) error {
return s.task.AddStepParams(stepName, key, value)
}
// GetTask get task
func (s *State) GetTask() *types.Task {
return s.task
}
// GetCurrentStep get current step
func (s *State) GetCurrentStep() (*types.Step, bool) {
return s.task.GetStep(s.currentStep)
}
// GetStep get step by stepName
func (s *State) GetStep(stepName string) (*types.Step, bool) {
return s.task.GetStep(stepName)
}

26
task/storage.go Normal file
View File

@ -0,0 +1,26 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package task is a package for task management
package task
import store "git.ifooth.com/common/machinery-plugins/task/store"
var (
// globalStorage used for state and task manager
globalStorage store.TaskManagerModel
)
// GetStorageModel for cluster manager storage tools
func getGlobalStorage() store.TaskManagerModel {
return globalStorage
}

51
task/store/interface.go Normal file
View File

@ -0,0 +1,51 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package store implements task storage
package store
import (
"context"
"git.ifooth.com/common/machinery-plugins/task/types"
)
// ListOption options for list task
type ListOption struct {
// Sort map for sort list results
Sort map[string]int
// Offset offset for list results
Offset int64
// Limit limit for list results
Limit int64
// All for all results
All bool
// Count for index
Count bool
// SkipDecrypt skip data decrypt
SkipDecrypt bool
}
// TaskManagerModel model for TaskManager
type TaskManagerModel interface {
// task information storage management
CreateTask(ctx context.Context, task *types.Task) error
UpdateTask(ctx context.Context, task *types.Task) error
PatchTask(ctx context.Context, taskID string, patchs map[string]interface{}) error
DeleteTask(ctx context.Context, taskID string) error
GetTask(ctx context.Context, taskID string) (*types.Task, error)
ListTask(ctx context.Context, cond any, opt *ListOption) ([]types.Task, error)
}
// ModelSet model for task
type ModelSet struct {
}

236
task/types/step.go Normal file
View File

@ -0,0 +1,236 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package types for task
package types
import (
"encoding/json"
"time"
)
// Step step definition
type Step struct {
Name string `json:"name" bson:"name"`
TaskName string `json:"taskname" bson:"taskname"`
Params map[string]string `json:"params" bson:"params"`
Extras string `json:"extras" bson:"extras"`
Status string `json:"status" bson:"status"`
Message string `json:"message" bson:"message"`
SkipOnFailed bool `json:"skipOnFailed" bson:"skipOnFailed"`
RetryCount uint32 `json:"retryCount" bson:"retryCount"`
Start string `json:"start" bson:"start"`
End string `json:"end" bson:"end"`
ExecutionTime uint32 `json:"executionTime" bson:"executionTime"`
MaxExecutionSeconds uint32 `json:"maxExecutionSeconds" bson:"maxExecutionSeconds"`
LastUpdate string `json:"lastUpdate" bson:"lastUpdate"`
}
// NewStep return a new step by default params
func NewStep(stepName string, taskName string) *Step {
return &Step{
Name: stepName,
TaskName: taskName,
Params: map[string]string{},
Extras: DefaultJsonExtrasContent,
Status: TaskStatusNotStarted,
Message: "",
SkipOnFailed: false,
RetryCount: 0,
MaxExecutionSeconds: DefaultMaxExecuteTimeSeconds,
}
}
// GetStepName return step name
func (s *Step) GetStepName() string {
return s.Name
}
// SetStepName set step name
func (s *Step) SetStepName(name string) *Step {
s.Name = name
return s
}
// GetTaskName return task name
func (s *Step) GetTaskName() string {
return s.TaskName
}
// SetTaskName set task name
func (s *Step) SetTaskName(taskName string) *Step {
s.TaskName = taskName
return s
}
// GetParam return step param by key
func (s *Step) GetParam(key string) (string, bool) {
if value, ok := s.Params[key]; ok {
return value, true
}
return "", false
}
// AddParam set step param by key,value
func (s *Step) AddParam(key, value string) *Step {
if s.Params == nil {
s.Params = make(map[string]string, 0)
}
s.Params[key] = value
return s
}
// GetParamsAll return all step params
func (s *Step) GetParamsAll() map[string]string {
if s.Params == nil {
s.Params = make(map[string]string, 0)
}
return s.Params
}
// SetParamMulti set step params by map
func (s *Step) SetParamMulti(params map[string]string) {
if s.Params == nil {
s.Params = make(map[string]string, 0)
}
for key, value := range params {
s.Params[key] = value
}
}
// SetNewParams replace all params by new params
func (s *Step) SetNewParams(params map[string]string) *Step {
s.Params = params
return s
}
// GetExtras return unmarshal step extras
func (s *Step) GetExtras(obj interface{}) error {
if s.Extras == "" {
s.Extras = DefaultJsonExtrasContent
}
return json.Unmarshal([]byte(s.Extras), obj)
}
// SetExtrasAll set step extras by json string
func (s *Step) SetExtrasAll(obj interface{}) error {
result, err := json.Marshal(obj)
if err != nil {
return err
}
s.Extras = string(result)
return nil
}
// GetStatus return step status
func (s *Step) GetStatus() string {
return s.Status
}
// SetStatus set status
func (s *Step) SetStatus(stat string) *Step {
s.Status = stat
return s
}
// GetMessage get step message
func (s *Step) GetMessage() string {
if s.Message == "" {
return ""
}
return s.Message
}
// SetMessage set step message
func (s *Step) SetMessage(msg string) *Step {
s.Message = msg
return s
}
// GetSkipOnFailed get step skipOnFailed
func (s *Step) GetSkipOnFailed() bool {
return s.SkipOnFailed
}
// SetSkipOnFailed set step skipOnFailed
func (s *Step) SetSkipOnFailed(skipOnFailed bool) *Step {
s.SkipOnFailed = skipOnFailed
return s
}
// GetRetryCount get step retry count
func (s *Step) GetRetryCount() uint32 {
return s.RetryCount
}
// AddRetryCount add step retry count
func (s *Step) AddRetryCount(count uint32) *Step {
s.RetryCount += count
return s
}
// GetStartTime get start time
func (s *Step) GetStartTime() (time.Time, error) {
return time.Parse(TaskTimeFormat, s.Start)
}
// SetStartTime update start time
func (s *Step) SetStartTime(t time.Time) *Step {
s.Start = t.Format(TaskTimeFormat)
return s
}
// GetEndTime get end time
func (s *Step) GetEndTime() (time.Time, error) {
return time.Parse(TaskTimeFormat, s.End)
}
// SetEndTime set end time
func (s *Step) SetEndTime(t time.Time) *Step {
// set end time
s.End = t.Format(TaskTimeFormat)
return s
}
// GetExecutionTime set execution time
func (s *Step) GetExecutionTime() time.Duration {
return time.Duration(time.Duration(s.ExecutionTime) * time.Millisecond)
}
// SetExecutionTime set execution time
func (s *Step) SetExecutionTime(start time.Time, end time.Time) *Step {
s.ExecutionTime = uint32(end.Sub(start).Milliseconds())
return s
}
// GetMaxExecutionSeconds get max execution seconds
func (s *Step) GetMaxExecutionSeconds() time.Duration {
return time.Duration(time.Duration(s.MaxExecutionSeconds) * time.Second)
}
// SetMaxExecutionSeconds set max execution seconds
func (s *Step) SetMaxExecutionSeconds(maxExecutionSeconds time.Duration) *Step {
s.MaxExecutionSeconds = uint32(maxExecutionSeconds.Seconds())
return s
}
// GetLastUpdate get last update time
func (s *Step) GetLastUpdate() (time.Time, error) {
return time.Parse(TaskTimeFormat, s.LastUpdate)
}
// SetLastUpdate set last update time
func (s *Step) SetLastUpdate(t time.Time) *Step {
s.LastUpdate = t.Format(TaskTimeFormat)
return s
}

323
task/types/task.go Normal file
View File

@ -0,0 +1,323 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package types for task
package types
import (
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
)
// Task task definition
type Task struct {
// index for task, client should set this field
Index string `json:"index" bson:"index"`
TaskID string `json:"taskId" bson:"taskId"`
TaskType string `json:"taskType" bson:"taskType"`
TaskName string `json:"taskName" bson:"taskName"`
// steps and params
CurrentStep string `json:"currentStep" bson:"currentStep"`
StepSequence []string `json:"stepSequence" bson:"stepSequence"`
Steps map[string]*Step `json:"steps" bson:"steps"`
CallBackFuncName string `json:"callBackFuncName" bson:"callBackFuncName"`
CommonParams map[string]string `json:"commonParams" bson:"commonParams"`
ExtraJson string `json:"extraJson" bson:"extraJson"`
Status string `json:"status" bson:"status"`
Message string `json:"message" bson:"message"`
ForceTerminate bool `json:"forceTerminate" bson:"forceTerminate"`
Start string `json:"start" bson:"start"`
End string `json:"end" bson:"end"`
ExecutionTime uint32 `json:"executionTime" bson:"executionTime"`
MaxExecutionSeconds uint32 `json:"maxExecutionSeconds" bson:"maxExecutionSeconds"`
Creator string `json:"creator" bson:"creator"`
LastUpdate string `json:"lastUpdate" bson:"lastUpdate"`
Updater string `json:"updater" bson:"updater"`
}
// TaskOptions task options definition
type TaskOptions struct {
TaskIndex string
TaskType string
TaskName string
Creator string
CallBackFuncName string
}
// NewTask create new task by default
func NewTask(o *TaskOptions) *Task {
nowTime := time.Now().Format(TaskTimeFormat)
return &Task{
Index: o.TaskIndex,
TaskID: uuid.NewString(),
TaskType: o.TaskType,
TaskName: o.TaskName,
Status: TaskStatusInit,
ForceTerminate: false,
Start: nowTime,
Steps: make(map[string]*Step, 0),
StepSequence: make([]string, 0),
Creator: o.Creator,
Updater: o.Creator,
LastUpdate: nowTime,
CommonParams: make(map[string]string, 0),
ExtraJson: DefaultJsonExtrasContent,
CallBackFuncName: o.CallBackFuncName,
}
}
// GetTaskID get task id
func (t *Task) GetTaskID() string {
return t.TaskID
}
// GetIndex get task id
func (t *Task) GetIndex() string {
return t.Index
}
// GetTaskType get task type
func (t *Task) GetTaskType() string {
return t.TaskType
}
// GetTaskName get task name
func (t *Task) GetTaskName() string {
return t.TaskName
}
// GetStep get step by name
func (t *Task) GetStep(stepName string) (*Step, bool) {
if _, ok := t.Steps[stepName]; !ok {
return nil, false
}
return t.Steps[stepName], true
}
// AddStep add step to task
func (t *Task) AddStep(step *Step) *Task {
if step == nil {
return t
}
if t.StepSequence == nil {
t.StepSequence = make([]string, 0)
}
t.StepSequence = append(t.StepSequence, step.GetStepName())
t.Steps[step.GetStepName()] = step
return t
}
// GetCommonParams get common params
func (t *Task) GetCommonParams(key string) (string, bool) {
if t.CommonParams == nil {
t.CommonParams = make(map[string]string, 0)
return "", false
}
if value, ok := t.CommonParams[key]; ok {
return value, true
}
return "", false
}
// AddCommonParams add common params
func (t *Task) AddCommonParams(k, v string) *Task {
if t.CommonParams == nil {
t.CommonParams = make(map[string]string, 0)
}
t.CommonParams[k] = v
return t
}
// GetCallback set callback function name
func (t *Task) GetCallback() string {
return t.CallBackFuncName
}
// SetCallback set callback function name
func (t *Task) SetCallback(callBackFuncName string) *Task {
t.CallBackFuncName = callBackFuncName
return t
}
// GetExtra get extra json
func (t *Task) GetExtra(obj interface{}) error {
if t.ExtraJson == "" {
t.ExtraJson = DefaultJsonExtrasContent
}
return json.Unmarshal([]byte(t.ExtraJson), obj)
}
// SetExtraAll set extra json
func (t *Task) SetExtraAll(obj interface{}) error {
result, err := json.Marshal(obj)
if err != nil {
return err
}
t.ExtraJson = string(result)
return nil
}
// GetStatus get status
func (t *Task) GetStatus() string {
return t.Status
}
// SetStatus set status
func (t *Task) SetStatus(status string) *Task {
t.Status = status
return t
}
// GetMessage set message
func (t *Task) GetMessage(msg string) string {
return t.Message
}
// SetMessage set message
func (t *Task) SetMessage(msg string) *Task {
t.Message = msg
return t
}
// GetForceTerminate get force terminate
func (t *Task) GetForceTerminate() bool {
return t.ForceTerminate
}
// SetForceTerminate set force terminate
func (t *Task) SetForceTerminate(f bool) *Task {
t.ForceTerminate = f
return t
}
// GetStartTime get start time
func (t *Task) GetStartTime() (time.Time, error) {
return time.Parse(TaskTimeFormat, t.Start)
}
// SetStartTime set start time
func (t *Task) SetStartTime(time time.Time) *Task {
t.Start = time.Format(TaskTimeFormat)
return t
}
// GetEndTime get end time
func (t *Task) GetEndTime() (time.Time, error) {
return time.Parse(TaskTimeFormat, t.End)
}
// SetEndTime set end time
func (t *Task) SetEndTime(time time.Time) *Task {
t.End = time.Format(TaskTimeFormat)
return t
}
// GetExecutionTime get execution time
func (t *Task) GetExecutionTime() time.Duration {
return time.Duration(time.Duration(t.ExecutionTime) * time.Millisecond)
}
// SetExecutionTime set execution time
func (t *Task) SetExecutionTime(start time.Time, end time.Time) *Task {
t.ExecutionTime = uint32(end.Sub(start).Milliseconds())
return t
}
// GetMaxExecutionSeconds get max execution seconds
func (t *Task) GetMaxExecutionSeconds() time.Duration {
return time.Duration(time.Duration(t.MaxExecutionSeconds) * time.Second)
}
// SetMaxExecutionSeconds set max execution seconds
func (t *Task) SetMaxExecutionSeconds(maxExecutionSeconds time.Duration) *Task {
t.MaxExecutionSeconds = uint32(maxExecutionSeconds.Seconds())
return t
}
// GetCreator get creator
func (t *Task) GetCreator() string {
return t.Creator
}
// SetCreator set creator
func (t *Task) SetCreator(creator string) *Task {
t.Creator = creator
return t
}
// GetUpdater get updater
func (t *Task) GetUpdater() string {
return t.Updater
}
// SetUpdater set updater
func (t *Task) SetUpdater(updater string) *Task {
t.Updater = updater
return t
}
// GetLastUpdate get last update time
func (t *Task) GetLastUpdate() (time.Time, error) {
return time.Parse(TaskTimeFormat, t.LastUpdate)
}
// SetLastUpdate set last update time
func (t *Task) SetLastUpdate(lastUpdate time.Time) *Task {
t.LastUpdate = lastUpdate.Format(TaskTimeFormat)
return t
}
// GetCurrentStep get current step
func (t *Task) GetCurrentStep() string {
return t.CurrentStep
}
// SetCurrentStep set current step
func (t *Task) SetCurrentStep(stepName string) *Task {
t.CurrentStep = stepName
return t
}
// GetStepParam get step params
func (t *Task) GetStepParam(stepName, key string) (string, bool) {
step, ok := t.GetStep(stepName)
if !ok {
return "", false
}
return step.GetParam(key)
}
// AddStepParams add step params
func (t *Task) AddStepParams(stepName string, k, v string) error {
step, ok := t.GetStep(stepName)
if !ok {
return fmt.Errorf("step %s not exist", stepName)
}
step.AddParam(k, v)
return nil
}
// AddStepParamsBatch add step params batch
func (t *Task) AddStepParamsBatch(stepName string, params map[string]string) error {
if _, ok := t.Steps[stepName]; !ok {
return fmt.Errorf("step %s not exist", stepName)
}
for k, v := range params {
t.Steps[stepName].AddParam(k, v)
}
return nil
}

44
task/types/type.go Normal file
View File

@ -0,0 +1,44 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package types for task
package types
import (
"time"
)
const (
// TaskTimeFormat task time format, e.g. 2006-01-02T15:04:05Z07:00
TaskTimeFormat = time.RFC3339
// DefaultJsonExtrasContent default json extras content
DefaultJsonExtrasContent = "{}"
// DefaultMaxExecuteTime default max execute time for 1 hour
DefaultMaxExecuteTimeSeconds = 3600
)
const (
// TaskStatusInit INIT task status
TaskStatusInit = "INITIALIZING"
// TaskStatusRunning running task status
TaskStatusRunning = "RUNNING"
// TaskStatusSuccess task success
TaskStatusSuccess = "SUCCESS"
// TaskStatusFailure task failed
TaskStatusFailure = "FAILURE"
// TaskStatusTimeout task run timeout
TaskStatusTimeout = "TIMEOUT"
// TaskStatusForceTerminate force task terminate
TaskStatusForceTerminate = "FORCETERMINATE"
// TaskStatusNotStarted force task terminate
TaskStatusNotStarted = "NOTSTARTED"
)

28
task/worker.go Normal file
View File

@ -0,0 +1,28 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package task is a package for task management
package task
import types "git.ifooth.com/common/machinery-plugins/task/types"
// StepWorkerInterface that client must implement
type StepWorkerInterface interface {
GetName() string
DoWork(state *State) error
}
// CallbackInterface that client must implement
type CallbackInterface interface {
GetName() string
Callback(isSuccess bool, task *types.Task)
}