machinery-plugins/backends/etcd/etcd.go

338 lines
7.9 KiB
Go
Raw Normal View History

2024-05-24 17:03:13 +00:00
package etcd
import (
2024-05-25 07:19:23 +00:00
"bytes"
2024-05-24 17:03:13 +00:00
"context"
"encoding/json"
"fmt"
2024-05-25 07:19:23 +00:00
"log"
"sync"
2024-05-24 17:03:13 +00:00
"time"
"github.com/RichardKnop/machinery/v2/backends/iface"
"github.com/RichardKnop/machinery/v2/common"
"github.com/RichardKnop/machinery/v2/config"
"github.com/RichardKnop/machinery/v2/tasks"
clientv3 "go.etcd.io/etcd/client/v3"
2024-05-25 07:19:23 +00:00
"golang.org/x/sync/errgroup"
2024-05-24 17:03:13 +00:00
)
2024-05-25 07:51:18 +00:00
const (
groupKey = "/machinery/v2/backend/groups/%s"
taskKey = "/machinery/v2/backend/tasks/%s"
)
2024-05-24 17:03:13 +00:00
type etcdBackend struct {
common.Backend
ctx context.Context
conf *config.Config
client *clientv3.Client
}
2024-05-24 17:13:08 +00:00
func New(ctx context.Context, conf *config.Config) (iface.Backend, error) {
2024-05-24 17:03:13 +00:00
etcdConf := clientv3.Config{
2024-05-24 17:13:08 +00:00
Endpoints: []string{conf.ResultBackend},
2024-05-24 17:03:13 +00:00
Context: ctx,
DialTimeout: time.Second * 5,
2024-05-24 17:13:08 +00:00
TLS: conf.TLSConfig,
2024-05-24 17:03:13 +00:00
}
client, err := clientv3.New(etcdConf)
if err != nil {
return nil, err
}
backend := etcdBackend{
2024-05-24 17:13:08 +00:00
Backend: common.NewBackend(conf),
2024-05-24 17:03:13 +00:00
ctx: ctx,
client: client,
2024-05-24 17:13:08 +00:00
conf: conf,
2024-05-24 17:03:13 +00:00
}
return &backend, nil
}
// Group related functions
func (b *etcdBackend) InitGroup(groupUUID string, taskUUIDs []string) error {
groupMeta := &tasks.GroupMeta{
GroupUUID: groupUUID,
TaskUUIDs: taskUUIDs,
CreatedAt: time.Now().UTC(),
}
encoded, err := json.Marshal(groupMeta)
if err != nil {
return err
}
2024-05-25 08:04:29 +00:00
lease, err := b.getLease()
if err != nil {
return err
}
2024-05-24 17:03:13 +00:00
ctx, cancel := context.WithTimeout(b.ctx, time.Second*2)
defer cancel()
2024-05-25 07:51:18 +00:00
key := fmt.Sprintf(groupKey, groupUUID)
2024-05-25 08:04:29 +00:00
_, err = b.client.Put(ctx, key, string(encoded), clientv3.WithLease(lease))
2024-05-24 17:03:13 +00:00
return err
}
func (b *etcdBackend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) {
groupMeta, err := b.getGroupMeta(groupUUID)
if err != nil {
return false, err
}
taskStates, err := b.getStates(groupMeta.TaskUUIDs...)
if err != nil {
return false, err
}
var countSuccessTasks = 0
for _, taskState := range taskStates {
if taskState.IsCompleted() {
countSuccessTasks++
}
}
return countSuccessTasks == groupTaskCount, nil
}
2024-05-25 07:51:18 +00:00
func (b *etcdBackend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) {
groupMeta, err := b.getGroupMeta(groupUUID)
2024-05-25 07:19:23 +00:00
if err != nil {
return nil, err
}
2024-05-25 07:51:18 +00:00
return b.getStates(groupMeta.TaskUUIDs...)
}
func (b *etcdBackend) TriggerChord(groupUUID string) (bool, error) {
key := fmt.Sprintf(groupKey, groupUUID)
resp, err := b.client.Get(b.ctx, key)
if err != nil {
return false, err
}
2024-05-25 07:19:23 +00:00
if len(resp.Kvs) == 0 {
2024-05-25 07:51:18 +00:00
return false, fmt.Errorf("task %s not exist", groupUUID)
2024-05-25 07:19:23 +00:00
}
kv := resp.Kvs[0]
meta := new(tasks.GroupMeta)
decoder := json.NewDecoder(bytes.NewReader(kv.Value))
decoder.UseNumber()
2024-05-25 07:51:18 +00:00
if e := decoder.Decode(meta); e != nil {
return false, e
2024-05-25 07:19:23 +00:00
}
2024-05-24 17:03:13 +00:00
2024-05-25 07:51:18 +00:00
if meta.ChordTriggered {
return false, nil
}
// Set flag to true
meta.ChordTriggered = true
// Update the group meta
encoded, err := json.Marshal(&meta)
2024-05-24 17:03:13 +00:00
if err != nil {
2024-05-25 07:51:18 +00:00
return false, err
2024-05-24 17:03:13 +00:00
}
2024-05-25 08:04:29 +00:00
lease, err := b.getLease()
if err != nil {
return false, err
}
2024-05-25 07:51:18 +00:00
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", kv.ModRevision)
2024-05-25 08:04:29 +00:00
update := clientv3.OpPut(key, string(encoded), clientv3.WithLease(lease))
2024-05-25 07:51:18 +00:00
b.client.Txn(b.ctx).If(cmp).Then(update)
txnresp, err := b.client.Txn(b.ctx).If(cmp).Then(update).Commit()
if err != nil {
return false, err
}
// 有写入或者删除竞争
if !txnresp.Succeeded {
return false, fmt.Errorf("update key failed")
}
return true, nil
2024-05-24 17:03:13 +00:00
}
2024-05-25 07:51:18 +00:00
func (b *etcdBackend) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) {
key := fmt.Sprintf(groupKey, groupUUID)
resp, err := b.client.Get(b.ctx, key)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return nil, fmt.Errorf("task %s not exist", groupUUID)
}
kv := resp.Kvs[0]
meta := new(tasks.GroupMeta)
decoder := json.NewDecoder(bytes.NewReader(kv.Value))
decoder.UseNumber()
if err := decoder.Decode(meta); err != nil {
return nil, err
}
return meta, nil
2024-05-24 17:03:13 +00:00
}
// Setting / getting task state
// SetStatePending updates task state to PENDING
func (b *etcdBackend) SetStatePending(signature *tasks.Signature) error {
taskState := tasks.NewPendingTaskState(signature)
return b.updateState(taskState)
}
// SetStateReceived updates task state to RECEIVED
func (b *etcdBackend) SetStateReceived(signature *tasks.Signature) error {
taskState := tasks.NewReceivedTaskState(signature)
b.mergeNewTaskState(taskState)
return b.updateState(taskState)
}
// SetStateStarted updates task state to STARTED
func (b *etcdBackend) SetStateStarted(signature *tasks.Signature) error {
taskState := tasks.NewStartedTaskState(signature)
b.mergeNewTaskState(taskState)
return b.updateState(taskState)
}
// SetStateRetry updates task state to RETRY
func (b *etcdBackend) SetStateRetry(signature *tasks.Signature) error {
taskState := tasks.NewRetryTaskState(signature)
b.mergeNewTaskState(taskState)
return b.updateState(taskState)
}
// SetStateSuccess updates task state to SUCCESS
func (b *etcdBackend) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error {
taskState := tasks.NewSuccessTaskState(signature, results)
b.mergeNewTaskState(taskState)
return b.updateState(taskState)
}
// SetStateFailure updates task state to FAILURE
func (b *etcdBackend) SetStateFailure(signature *tasks.Signature, err string) error {
taskState := tasks.NewFailureTaskState(signature, err)
b.mergeNewTaskState(taskState)
return b.updateState(taskState)
}
func (b *etcdBackend) GetState(taskUUID string) (*tasks.TaskState, error) {
2024-05-25 07:19:23 +00:00
return b.getState(b.ctx, taskUUID)
}
func (b *etcdBackend) getState(ctx context.Context, taskUUID string) (*tasks.TaskState, error) {
2024-05-25 07:51:18 +00:00
key := fmt.Sprintf(taskKey, taskUUID)
2024-05-25 07:19:23 +00:00
resp, err := b.client.Get(b.ctx, key)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return nil, fmt.Errorf("task %s not exist", taskUUID)
}
kv := resp.Kvs[0]
state := new(tasks.TaskState)
decoder := json.NewDecoder(bytes.NewReader(kv.Value))
decoder.UseNumber()
if err := decoder.Decode(state); err != nil {
return nil, err
}
return state, nil
2024-05-24 17:03:13 +00:00
}
func (b *etcdBackend) mergeNewTaskState(newState *tasks.TaskState) {
state, err := b.GetState(newState.TaskUUID)
if err == nil {
newState.CreatedAt = state.CreatedAt
newState.TaskName = state.TaskName
}
}
func (b *etcdBackend) PurgeState(taskUUID string) error {
2024-05-25 07:51:18 +00:00
key := fmt.Sprintf(taskKey, taskUUID)
2024-05-25 08:04:29 +00:00
_, err := b.client.Delete(b.ctx, key)
2024-05-25 07:19:23 +00:00
return err
2024-05-24 17:03:13 +00:00
}
2024-05-25 07:19:23 +00:00
2024-05-24 17:03:13 +00:00
func (b *etcdBackend) PurgeGroupMeta(groupUUID string) error {
2024-05-25 07:51:18 +00:00
key := fmt.Sprintf(groupKey, groupUUID)
2024-05-25 08:04:29 +00:00
_, err := b.client.Delete(b.ctx, key)
2024-05-25 07:19:23 +00:00
return err
2024-05-24 17:03:13 +00:00
}
// getStates returns multiple task states
func (b *etcdBackend) getStates(taskUUIDs ...string) ([]*tasks.TaskState, error) {
2024-05-25 07:19:23 +00:00
eg, ctx := errgroup.WithContext(b.ctx)
eg.SetLimit(10)
taskStates := make([]*tasks.TaskState, 0, len(taskUUIDs))
var mtx sync.Mutex
for _, taskUUID := range taskUUIDs {
t := taskUUID
eg.Go(func() error {
state, err := b.getState(ctx, t)
if err != nil {
return err
}
mtx.Lock()
taskStates = append(taskStates, state)
mtx.Unlock()
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
2024-05-24 17:03:13 +00:00
return taskStates, nil
}
// updateState saves current task state
func (b *etcdBackend) updateState(taskState *tasks.TaskState) error {
2024-05-25 07:19:23 +00:00
encoded, err := json.Marshal(taskState)
if err != nil {
return err
}
2024-05-25 08:04:29 +00:00
lease, err := b.getLease()
if err != nil {
return err
}
2024-05-25 07:51:18 +00:00
key := fmt.Sprintf(taskKey, taskState.TaskUUID)
2024-05-25 08:04:29 +00:00
_, err = b.client.Put(b.ctx, key, string(encoded), clientv3.WithLease(lease))
2024-05-25 07:19:23 +00:00
if err != nil {
return err
}
log.Default().Printf("update taskstate %s %s, %s", taskState.TaskName, taskState.TaskUUID, encoded)
2024-05-24 17:03:13 +00:00
return nil
}
// getExpiration returns expiration for a stored task state
2024-05-25 08:04:29 +00:00
func (b *etcdBackend) getLease() (clientv3.LeaseID, error) {
2024-05-24 17:03:13 +00:00
expiresIn := b.GetConfig().ResultsExpireIn
2024-05-25 08:04:29 +00:00
if expiresIn <= 0 {
2024-05-24 17:03:13 +00:00
// expire results after 1 hour by default
expiresIn = config.DefaultResultsExpireIn
}
2024-05-25 08:04:29 +00:00
resp, err := b.client.Grant(b.ctx, int64(expiresIn))
if err != nil {
return clientv3.NoLease, err
}
return resp.ID, nil
2024-05-24 17:03:13 +00:00
}