fix
parent
5a8968d2b6
commit
2d0bf711bf
|
@ -17,6 +17,11 @@ import (
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
groupKey = "/machinery/v2/backend/groups/%s"
|
||||||
|
taskKey = "/machinery/v2/backend/tasks/%s"
|
||||||
|
)
|
||||||
|
|
||||||
type etcdBackend struct {
|
type etcdBackend struct {
|
||||||
common.Backend
|
common.Backend
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
@ -63,7 +68,7 @@ func (b *etcdBackend) InitGroup(groupUUID string, taskUUIDs []string) error {
|
||||||
ctx, cancel := context.WithTimeout(b.ctx, time.Second*2)
|
ctx, cancel := context.WithTimeout(b.ctx, time.Second*2)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
key := fmt.Sprintf("/machinery/v2/backend/%s", groupUUID)
|
key := fmt.Sprintf(groupKey, groupUUID)
|
||||||
_, err = b.client.KV.Put(ctx, key, string(encoded))
|
_, err = b.client.KV.Put(ctx, key, string(encoded))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -89,8 +94,68 @@ func (b *etcdBackend) GroupCompleted(groupUUID string, groupTaskCount int) (bool
|
||||||
return countSuccessTasks == groupTaskCount, nil
|
return countSuccessTasks == groupTaskCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *etcdBackend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) {
|
||||||
|
groupMeta, err := b.getGroupMeta(groupUUID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return b.getStates(groupMeta.TaskUUIDs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *etcdBackend) TriggerChord(groupUUID string) (bool, error) {
|
||||||
|
key := fmt.Sprintf(groupKey, groupUUID)
|
||||||
|
resp, err := b.client.Get(b.ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if len(resp.Kvs) == 0 {
|
||||||
|
return false, fmt.Errorf("task %s not exist", groupUUID)
|
||||||
|
}
|
||||||
|
kv := resp.Kvs[0]
|
||||||
|
|
||||||
|
meta := new(tasks.GroupMeta)
|
||||||
|
|
||||||
|
decoder := json.NewDecoder(bytes.NewReader(kv.Value))
|
||||||
|
decoder.UseNumber()
|
||||||
|
|
||||||
|
if e := decoder.Decode(meta); e != nil {
|
||||||
|
return false, e
|
||||||
|
}
|
||||||
|
|
||||||
|
if meta.ChordTriggered {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set flag to true
|
||||||
|
meta.ChordTriggered = true
|
||||||
|
|
||||||
|
// Update the group meta
|
||||||
|
encoded, err := json.Marshal(&meta)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", kv.ModRevision)
|
||||||
|
update := clientv3.OpPut(key, string(encoded))
|
||||||
|
|
||||||
|
b.client.Txn(b.ctx).If(cmp).Then(update)
|
||||||
|
txnresp, err := b.client.Txn(b.ctx).If(cmp).Then(update).Commit()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 有写入或者删除竞争
|
||||||
|
if !txnresp.Succeeded {
|
||||||
|
return false, fmt.Errorf("update key failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (b *etcdBackend) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) {
|
func (b *etcdBackend) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) {
|
||||||
key := fmt.Sprintf("/machinery/v2/backend/%s", groupUUID)
|
key := fmt.Sprintf(groupKey, groupUUID)
|
||||||
resp, err := b.client.Get(b.ctx, key)
|
resp, err := b.client.Get(b.ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -110,19 +175,6 @@ func (b *etcdBackend) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) {
|
||||||
return meta, nil
|
return meta, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *etcdBackend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) {
|
|
||||||
groupMeta, err := b.getGroupMeta(groupUUID)
|
|
||||||
if err != nil {
|
|
||||||
return []*tasks.TaskState{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return b.getStates(groupMeta.TaskUUIDs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *etcdBackend) TriggerChord(groupUUID string) (bool, error) {
|
|
||||||
return false, fmt.Errorf("not support")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setting / getting task state
|
// Setting / getting task state
|
||||||
// SetStatePending updates task state to PENDING
|
// SetStatePending updates task state to PENDING
|
||||||
func (b *etcdBackend) SetStatePending(signature *tasks.Signature) error {
|
func (b *etcdBackend) SetStatePending(signature *tasks.Signature) error {
|
||||||
|
@ -170,7 +222,7 @@ func (b *etcdBackend) GetState(taskUUID string) (*tasks.TaskState, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *etcdBackend) getState(ctx context.Context, taskUUID string) (*tasks.TaskState, error) {
|
func (b *etcdBackend) getState(ctx context.Context, taskUUID string) (*tasks.TaskState, error) {
|
||||||
key := fmt.Sprintf("/machinery/v2/backend/%s", taskUUID)
|
key := fmt.Sprintf(taskKey, taskUUID)
|
||||||
resp, err := b.client.Get(b.ctx, key)
|
resp, err := b.client.Get(b.ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -190,11 +242,6 @@ func (b *etcdBackend) getState(ctx context.Context, taskUUID string) (*tasks.Tas
|
||||||
return state, nil
|
return state, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Purging stored stored tasks states and group meta data
|
|
||||||
func (b *etcdBackend) IsAMQP() bool {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *etcdBackend) mergeNewTaskState(newState *tasks.TaskState) {
|
func (b *etcdBackend) mergeNewTaskState(newState *tasks.TaskState) {
|
||||||
state, err := b.GetState(newState.TaskUUID)
|
state, err := b.GetState(newState.TaskUUID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -204,13 +251,13 @@ func (b *etcdBackend) mergeNewTaskState(newState *tasks.TaskState) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *etcdBackend) PurgeState(taskUUID string) error {
|
func (b *etcdBackend) PurgeState(taskUUID string) error {
|
||||||
key := fmt.Sprintf("/machinery/v2/backend/%s", taskUUID)
|
key := fmt.Sprintf(taskKey, taskUUID)
|
||||||
_, err := b.client.KV.Delete(b.ctx, key)
|
_, err := b.client.KV.Delete(b.ctx, key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *etcdBackend) PurgeGroupMeta(groupUUID string) error {
|
func (b *etcdBackend) PurgeGroupMeta(groupUUID string) error {
|
||||||
key := fmt.Sprintf("/machinery/v2/backend/%s", groupUUID)
|
key := fmt.Sprintf(groupKey, groupUUID)
|
||||||
_, err := b.client.KV.Delete(b.ctx, key)
|
_, err := b.client.KV.Delete(b.ctx, key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -249,7 +296,7 @@ func (b *etcdBackend) updateState(taskState *tasks.TaskState) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
key := fmt.Sprintf("/machinery/v2/backend/%s", taskState.TaskUUID)
|
key := fmt.Sprintf(taskKey, taskState.TaskUUID)
|
||||||
_, err = b.client.Put(b.ctx, key, string(encoded))
|
_, err = b.client.Put(b.ctx, key, string(encoded))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue