191 lines
4.8 KiB
Go
191 lines
4.8 KiB
Go
|
package etcd
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"github.com/RichardKnop/machinery/v2/backends/iface"
|
||
|
"github.com/RichardKnop/machinery/v2/common"
|
||
|
"github.com/RichardKnop/machinery/v2/config"
|
||
|
"github.com/RichardKnop/machinery/v2/tasks"
|
||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||
|
)
|
||
|
|
||
|
type etcdBackend struct {
|
||
|
common.Backend
|
||
|
ctx context.Context
|
||
|
conf *config.Config
|
||
|
client *clientv3.Client
|
||
|
}
|
||
|
|
||
|
func New(ctx context.Context, cnf *config.Config) (iface.Backend, error) {
|
||
|
etcdConf := clientv3.Config{
|
||
|
Endpoints: []string{cnf.ResultBackend},
|
||
|
Context: ctx,
|
||
|
DialTimeout: time.Second * 5,
|
||
|
}
|
||
|
client, err := clientv3.New(etcdConf)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
backend := etcdBackend{
|
||
|
Backend: common.NewBackend(cnf),
|
||
|
ctx: ctx,
|
||
|
client: client,
|
||
|
conf: cnf,
|
||
|
}
|
||
|
|
||
|
return &backend, nil
|
||
|
|
||
|
}
|
||
|
|
||
|
// Group related functions
|
||
|
func (b *etcdBackend) InitGroup(groupUUID string, taskUUIDs []string) error {
|
||
|
groupMeta := &tasks.GroupMeta{
|
||
|
GroupUUID: groupUUID,
|
||
|
TaskUUIDs: taskUUIDs,
|
||
|
CreatedAt: time.Now().UTC(),
|
||
|
}
|
||
|
|
||
|
encoded, err := json.Marshal(groupMeta)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
ctx, cancel := context.WithTimeout(b.ctx, time.Second*2)
|
||
|
defer cancel()
|
||
|
|
||
|
key := fmt.Sprintf("/machinery/v2/backend/%s", groupUUID)
|
||
|
_, err = b.client.KV.Put(ctx, key, string(encoded))
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (b *etcdBackend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) {
|
||
|
groupMeta, err := b.getGroupMeta(groupUUID)
|
||
|
if err != nil {
|
||
|
return false, err
|
||
|
}
|
||
|
|
||
|
taskStates, err := b.getStates(groupMeta.TaskUUIDs...)
|
||
|
if err != nil {
|
||
|
return false, err
|
||
|
}
|
||
|
|
||
|
var countSuccessTasks = 0
|
||
|
for _, taskState := range taskStates {
|
||
|
if taskState.IsCompleted() {
|
||
|
countSuccessTasks++
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return countSuccessTasks == groupTaskCount, nil
|
||
|
}
|
||
|
|
||
|
func (b *etcdBackend) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) {
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
func (b *etcdBackend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) {
|
||
|
groupMeta, err := b.getGroupMeta(groupUUID)
|
||
|
if err != nil {
|
||
|
return []*tasks.TaskState{}, err
|
||
|
}
|
||
|
|
||
|
return b.getStates(groupMeta.TaskUUIDs...)
|
||
|
}
|
||
|
|
||
|
func (b *etcdBackend) TriggerChord(groupUUID string) (bool, error) {
|
||
|
return false, nil
|
||
|
}
|
||
|
|
||
|
// Setting / getting task state
|
||
|
// SetStatePending updates task state to PENDING
|
||
|
func (b *etcdBackend) SetStatePending(signature *tasks.Signature) error {
|
||
|
taskState := tasks.NewPendingTaskState(signature)
|
||
|
return b.updateState(taskState)
|
||
|
}
|
||
|
|
||
|
// SetStateReceived updates task state to RECEIVED
|
||
|
func (b *etcdBackend) SetStateReceived(signature *tasks.Signature) error {
|
||
|
taskState := tasks.NewReceivedTaskState(signature)
|
||
|
b.mergeNewTaskState(taskState)
|
||
|
return b.updateState(taskState)
|
||
|
}
|
||
|
|
||
|
// SetStateStarted updates task state to STARTED
|
||
|
func (b *etcdBackend) SetStateStarted(signature *tasks.Signature) error {
|
||
|
taskState := tasks.NewStartedTaskState(signature)
|
||
|
b.mergeNewTaskState(taskState)
|
||
|
return b.updateState(taskState)
|
||
|
}
|
||
|
|
||
|
// SetStateRetry updates task state to RETRY
|
||
|
func (b *etcdBackend) SetStateRetry(signature *tasks.Signature) error {
|
||
|
taskState := tasks.NewRetryTaskState(signature)
|
||
|
b.mergeNewTaskState(taskState)
|
||
|
return b.updateState(taskState)
|
||
|
}
|
||
|
|
||
|
// SetStateSuccess updates task state to SUCCESS
|
||
|
func (b *etcdBackend) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error {
|
||
|
taskState := tasks.NewSuccessTaskState(signature, results)
|
||
|
b.mergeNewTaskState(taskState)
|
||
|
return b.updateState(taskState)
|
||
|
}
|
||
|
|
||
|
// SetStateFailure updates task state to FAILURE
|
||
|
func (b *etcdBackend) SetStateFailure(signature *tasks.Signature, err string) error {
|
||
|
taskState := tasks.NewFailureTaskState(signature, err)
|
||
|
b.mergeNewTaskState(taskState)
|
||
|
return b.updateState(taskState)
|
||
|
}
|
||
|
|
||
|
func (b *etcdBackend) GetState(taskUUID string) (*tasks.TaskState, error) {
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
// Purging stored stored tasks states and group meta data
|
||
|
func (b *etcdBackend) IsAMQP() bool {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
func (b *etcdBackend) mergeNewTaskState(newState *tasks.TaskState) {
|
||
|
state, err := b.GetState(newState.TaskUUID)
|
||
|
if err == nil {
|
||
|
newState.CreatedAt = state.CreatedAt
|
||
|
newState.TaskName = state.TaskName
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (b *etcdBackend) PurgeState(taskUUID string) error {
|
||
|
return nil
|
||
|
}
|
||
|
func (b *etcdBackend) PurgeGroupMeta(groupUUID string) error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// getStates returns multiple task states
|
||
|
func (b *etcdBackend) getStates(taskUUIDs ...string) ([]*tasks.TaskState, error) {
|
||
|
taskStates := make([]*tasks.TaskState, len(taskUUIDs))
|
||
|
return taskStates, nil
|
||
|
}
|
||
|
|
||
|
// updateState saves current task state
|
||
|
func (b *etcdBackend) updateState(taskState *tasks.TaskState) error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// getExpiration returns expiration for a stored task state
|
||
|
func (b *etcdBackend) getExpiration() time.Duration {
|
||
|
expiresIn := b.GetConfig().ResultsExpireIn
|
||
|
if expiresIn == 0 {
|
||
|
// expire results after 1 hour by default
|
||
|
expiresIn = config.DefaultResultsExpireIn
|
||
|
}
|
||
|
|
||
|
return time.Duration(expiresIn) * time.Second
|
||
|
}
|