diff --git a/task/manager.go b/task/manager.go index e290b92..68f23a0 100644 --- a/task/manager.go +++ b/task/manager.go @@ -28,6 +28,8 @@ import ( ilock "github.com/RichardKnop/machinery/v2/locks/iface" "github.com/RichardKnop/machinery/v2/log" "github.com/RichardKnop/machinery/v2/tasks" + "github.com/RichardKnop/machinery/v2/utils" + "github.com/robfig/cron/v3" irevoker "git.ifooth.com/common/pkg/task/revokers/iface" istep "git.ifooth.com/common/pkg/task/steps/iface" @@ -54,6 +56,7 @@ type TaskManager struct { // nolint callbackExecutors map[istep.CallbackName]istep.CallbackExecutor cfg *ManagerConfig store istore.Store + scheduler *cron.Cron ctx context.Context cancel context.CancelFunc @@ -83,7 +86,9 @@ func NewTaskManager() *TaskManager { workerNum: DefaultWorkerConcurrency, stepExecutors: istep.GetRegisters(), // get all step workers cfg: &ManagerConfig{}, + scheduler: cron.New(), } + return m } @@ -180,6 +185,16 @@ func (m *TaskManager) Run() error { return m.worker.Launch() } +// RunSchedule start scheduler +func (m *TaskManager) RunSchedule() error { + if m.cfg.Lock == nil { + return fmt.Errorf("lock is required") + } + + m.scheduler.Run() + return nil +} + // GetTaskWithID get task by taskid func (m *TaskManager) GetTaskWithID(ctx context.Context, taskId string) (*types.Task, error) { return GetGlobalStorage().GetTask(ctx, taskId) @@ -247,6 +262,44 @@ func (m *TaskManager) Dispatch(task *types.Task) error { return m.dispatchAt(task, "") } +// RegisterScheduleTask task +func (m *TaskManager) RegisterScheduleTask(spec string, task *types.Task) error { + if err := task.Validate(); err != nil { + return err + } + + if m.cfg.Lock == nil { + return fmt.Errorf("lock is required") + } + + return m.registerScheduleTask(spec, task) +} + +// registerScheduleTask task to machinery +func (m *TaskManager) registerScheduleTask(spec string, task *types.Task) error { + //check spec + schedule, err := cron.ParseStandard(spec) + if err != nil { + return err + } + + f := func() { + //get lock + err := m.cfg.Lock.LockWithRetries(utils.GetLockName(task.TaskName, spec), schedule.Next(time.Now()).UnixNano()-1) + if err != nil { + return + } + + err = m.Dispatch(task) + if err != nil { + log.ERROR.Printf("periodic task failed. task name is: %s. error is %s", task.TaskName, err.Error()) + } + } + + _, err = m.scheduler.AddFunc(spec, f) + return err +} + func (m *TaskManager) transTaskToSignature(task *types.Task, stepNameBegin string) []*tasks.Signature { var signatures []*tasks.Signature @@ -459,3 +512,9 @@ func (m *TaskManager) Stop() { m.worker.Quit() m.cancel() } + +// StopSchedule running schedule +func (m *TaskManager) StopSchedule() { + m.scheduler.Stop() + m.cancel() +}