package etcd import ( "context" "encoding/json" "fmt" "time" "github.com/RichardKnop/machinery/v2/backends/iface" "github.com/RichardKnop/machinery/v2/common" "github.com/RichardKnop/machinery/v2/config" "github.com/RichardKnop/machinery/v2/tasks" clientv3 "go.etcd.io/etcd/client/v3" ) type etcdBackend struct { common.Backend ctx context.Context conf *config.Config client *clientv3.Client } func New(ctx context.Context, cnf *config.Config) (iface.Backend, error) { etcdConf := clientv3.Config{ Endpoints: []string{cnf.ResultBackend}, Context: ctx, DialTimeout: time.Second * 5, } client, err := clientv3.New(etcdConf) if err != nil { return nil, err } backend := etcdBackend{ Backend: common.NewBackend(cnf), ctx: ctx, client: client, conf: cnf, } return &backend, nil } // Group related functions func (b *etcdBackend) InitGroup(groupUUID string, taskUUIDs []string) error { groupMeta := &tasks.GroupMeta{ GroupUUID: groupUUID, TaskUUIDs: taskUUIDs, CreatedAt: time.Now().UTC(), } encoded, err := json.Marshal(groupMeta) if err != nil { return err } ctx, cancel := context.WithTimeout(b.ctx, time.Second*2) defer cancel() key := fmt.Sprintf("/machinery/v2/backend/%s", groupUUID) _, err = b.client.KV.Put(ctx, key, string(encoded)) return err } func (b *etcdBackend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) { groupMeta, err := b.getGroupMeta(groupUUID) if err != nil { return false, err } taskStates, err := b.getStates(groupMeta.TaskUUIDs...) if err != nil { return false, err } var countSuccessTasks = 0 for _, taskState := range taskStates { if taskState.IsCompleted() { countSuccessTasks++ } } return countSuccessTasks == groupTaskCount, nil } func (b *etcdBackend) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) { return nil, nil } func (b *etcdBackend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) { groupMeta, err := b.getGroupMeta(groupUUID) if err != nil { return []*tasks.TaskState{}, err } return b.getStates(groupMeta.TaskUUIDs...) } func (b *etcdBackend) TriggerChord(groupUUID string) (bool, error) { return false, nil } // Setting / getting task state // SetStatePending updates task state to PENDING func (b *etcdBackend) SetStatePending(signature *tasks.Signature) error { taskState := tasks.NewPendingTaskState(signature) return b.updateState(taskState) } // SetStateReceived updates task state to RECEIVED func (b *etcdBackend) SetStateReceived(signature *tasks.Signature) error { taskState := tasks.NewReceivedTaskState(signature) b.mergeNewTaskState(taskState) return b.updateState(taskState) } // SetStateStarted updates task state to STARTED func (b *etcdBackend) SetStateStarted(signature *tasks.Signature) error { taskState := tasks.NewStartedTaskState(signature) b.mergeNewTaskState(taskState) return b.updateState(taskState) } // SetStateRetry updates task state to RETRY func (b *etcdBackend) SetStateRetry(signature *tasks.Signature) error { taskState := tasks.NewRetryTaskState(signature) b.mergeNewTaskState(taskState) return b.updateState(taskState) } // SetStateSuccess updates task state to SUCCESS func (b *etcdBackend) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error { taskState := tasks.NewSuccessTaskState(signature, results) b.mergeNewTaskState(taskState) return b.updateState(taskState) } // SetStateFailure updates task state to FAILURE func (b *etcdBackend) SetStateFailure(signature *tasks.Signature, err string) error { taskState := tasks.NewFailureTaskState(signature, err) b.mergeNewTaskState(taskState) return b.updateState(taskState) } func (b *etcdBackend) GetState(taskUUID string) (*tasks.TaskState, error) { return nil, nil } // Purging stored stored tasks states and group meta data func (b *etcdBackend) IsAMQP() bool { return false } func (b *etcdBackend) mergeNewTaskState(newState *tasks.TaskState) { state, err := b.GetState(newState.TaskUUID) if err == nil { newState.CreatedAt = state.CreatedAt newState.TaskName = state.TaskName } } func (b *etcdBackend) PurgeState(taskUUID string) error { return nil } func (b *etcdBackend) PurgeGroupMeta(groupUUID string) error { return nil } // getStates returns multiple task states func (b *etcdBackend) getStates(taskUUIDs ...string) ([]*tasks.TaskState, error) { taskStates := make([]*tasks.TaskState, len(taskUUIDs)) return taskStates, nil } // updateState saves current task state func (b *etcdBackend) updateState(taskState *tasks.TaskState) error { return nil } // getExpiration returns expiration for a stored task state func (b *etcdBackend) getExpiration() time.Duration { expiresIn := b.GetConfig().ResultsExpireIn if expiresIn == 0 { // expire results after 1 hour by default expiresIn = config.DefaultResultsExpireIn } return time.Duration(expiresIn) * time.Second }