task add schedule

main task/v1.0.9
git 2025-12-20 00:37:57 +08:00
parent 792c7be92a
commit f5a1158e64
Signed by: git
GPG Key ID: 3F65EFFA44207ADD
1 changed files with 59 additions and 0 deletions

View File

@ -28,6 +28,8 @@ import (
ilock "github.com/RichardKnop/machinery/v2/locks/iface" ilock "github.com/RichardKnop/machinery/v2/locks/iface"
"github.com/RichardKnop/machinery/v2/log" "github.com/RichardKnop/machinery/v2/log"
"github.com/RichardKnop/machinery/v2/tasks" "github.com/RichardKnop/machinery/v2/tasks"
"github.com/RichardKnop/machinery/v2/utils"
"github.com/robfig/cron/v3"
irevoker "git.ifooth.com/common/pkg/task/revokers/iface" irevoker "git.ifooth.com/common/pkg/task/revokers/iface"
istep "git.ifooth.com/common/pkg/task/steps/iface" istep "git.ifooth.com/common/pkg/task/steps/iface"
@ -54,6 +56,7 @@ type TaskManager struct { // nolint
callbackExecutors map[istep.CallbackName]istep.CallbackExecutor callbackExecutors map[istep.CallbackName]istep.CallbackExecutor
cfg *ManagerConfig cfg *ManagerConfig
store istore.Store store istore.Store
scheduler *cron.Cron
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -83,7 +86,9 @@ func NewTaskManager() *TaskManager {
workerNum: DefaultWorkerConcurrency, workerNum: DefaultWorkerConcurrency,
stepExecutors: istep.GetRegisters(), // get all step workers stepExecutors: istep.GetRegisters(), // get all step workers
cfg: &ManagerConfig{}, cfg: &ManagerConfig{},
scheduler: cron.New(),
} }
return m return m
} }
@ -180,6 +185,16 @@ func (m *TaskManager) Run() error {
return m.worker.Launch() return m.worker.Launch()
} }
// RunSchedule start scheduler
func (m *TaskManager) RunSchedule() error {
if m.cfg.Lock == nil {
return fmt.Errorf("lock is required")
}
m.scheduler.Run()
return nil
}
// GetTaskWithID get task by taskid // GetTaskWithID get task by taskid
func (m *TaskManager) GetTaskWithID(ctx context.Context, taskId string) (*types.Task, error) { func (m *TaskManager) GetTaskWithID(ctx context.Context, taskId string) (*types.Task, error) {
return GetGlobalStorage().GetTask(ctx, taskId) return GetGlobalStorage().GetTask(ctx, taskId)
@ -247,6 +262,44 @@ func (m *TaskManager) Dispatch(task *types.Task) error {
return m.dispatchAt(task, "") return m.dispatchAt(task, "")
} }
// RegisterScheduleTask task
func (m *TaskManager) RegisterScheduleTask(spec string, task *types.Task) error {
if err := task.Validate(); err != nil {
return err
}
if m.cfg.Lock == nil {
return fmt.Errorf("lock is required")
}
return m.registerScheduleTask(spec, task)
}
// registerScheduleTask task to machinery
func (m *TaskManager) registerScheduleTask(spec string, task *types.Task) error {
//check spec
schedule, err := cron.ParseStandard(spec)
if err != nil {
return err
}
f := func() {
//get lock
err := m.cfg.Lock.LockWithRetries(utils.GetLockName(task.TaskName, spec), schedule.Next(time.Now()).UnixNano()-1)
if err != nil {
return
}
err = m.Dispatch(task)
if err != nil {
log.ERROR.Printf("periodic task failed. task name is: %s. error is %s", task.TaskName, err.Error())
}
}
_, err = m.scheduler.AddFunc(spec, f)
return err
}
func (m *TaskManager) transTaskToSignature(task *types.Task, stepNameBegin string) []*tasks.Signature { func (m *TaskManager) transTaskToSignature(task *types.Task, stepNameBegin string) []*tasks.Signature {
var signatures []*tasks.Signature var signatures []*tasks.Signature
@ -459,3 +512,9 @@ func (m *TaskManager) Stop() {
m.worker.Quit() m.worker.Quit()
m.cancel() m.cancel()
} }
// StopSchedule running schedule
func (m *TaskManager) StopSchedule() {
m.scheduler.Stop()
m.cancel()
}