update etcd

Browse Source
main
git 2024-05-25 19:51:06 +08:00
parent cfcc6d68de
commit b61a719a75
Signed by: git
GPG Key ID: 3F65EFFA44207ADD
1 changed files with 15 additions and 10 deletions

View File

@ -28,6 +28,7 @@ type etcdBackend struct {
client *clientv3.Client client *clientv3.Client
} }
// New ..
func New(ctx context.Context, conf *config.Config) (iface.Backend, error) { func New(ctx context.Context, conf *config.Config) (iface.Backend, error) {
etcdConf := clientv3.Config{ etcdConf := clientv3.Config{
Endpoints: []string{conf.ResultBackend}, Endpoints: []string{conf.ResultBackend},
@ -50,7 +51,7 @@ func New(ctx context.Context, conf *config.Config) (iface.Backend, error) {
} }
// Group related functions // InitGroup Group related functions
func (b *etcdBackend) InitGroup(groupUUID string, taskUUIDs []string) error { func (b *etcdBackend) InitGroup(groupUUID string, taskUUIDs []string) error {
lease, err := b.getLease() lease, err := b.getLease()
if err != nil { if err != nil {
@ -69,14 +70,12 @@ func (b *etcdBackend) InitGroup(groupUUID string, taskUUIDs []string) error {
return err return err
} }
ctx, cancel := context.WithTimeout(b.ctx, time.Second*2)
defer cancel()
key := fmt.Sprintf(groupKey, groupUUID) key := fmt.Sprintf(groupKey, groupUUID)
_, err = b.client.Put(ctx, key, string(encoded), clientv3.WithLease(lease.ID)) _, err = b.client.Put(b.ctx, key, string(encoded), clientv3.WithLease(lease.ID))
return err return err
} }
// GroupCompleted ..
func (b *etcdBackend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) { func (b *etcdBackend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) {
groupMeta, err := b.getGroupMeta(groupUUID) groupMeta, err := b.getGroupMeta(groupUUID)
if err != nil { if err != nil {
@ -98,15 +97,20 @@ func (b *etcdBackend) GroupCompleted(groupUUID string, groupTaskCount int) (bool
return countSuccessTasks == groupTaskCount, nil return countSuccessTasks == groupTaskCount, nil
} }
// GroupTaskStates ..
func (b *etcdBackend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) { func (b *etcdBackend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) {
groupMeta, err := b.getGroupMeta(groupUUID) groupMeta, err := b.getGroupMeta(groupUUID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(groupMeta.TaskUUIDs) != groupTaskCount {
return nil, fmt.Errorf("group task count not equal, %d != %d", len(groupMeta.TaskUUIDs), groupTaskCount)
}
return b.getStates(groupMeta.TaskUUIDs...) return b.getStates(groupMeta.TaskUUIDs...)
} }
// TriggerChord ..
func (b *etcdBackend) TriggerChord(groupUUID string) (bool, error) { func (b *etcdBackend) TriggerChord(groupUUID string) (bool, error) {
key := fmt.Sprintf(groupKey, groupUUID) key := fmt.Sprintf(groupKey, groupUUID)
resp, err := b.client.Get(b.ctx, key) resp, err := b.client.Get(b.ctx, key)
@ -149,7 +153,6 @@ func (b *etcdBackend) TriggerChord(groupUUID string) (bool, error) {
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", kv.ModRevision) cmp := clientv3.Compare(clientv3.ModRevision(key), "=", kv.ModRevision)
update := clientv3.OpPut(key, string(encoded), clientv3.WithLease(lease.ID)) update := clientv3.OpPut(key, string(encoded), clientv3.WithLease(lease.ID))
b.client.Txn(b.ctx).If(cmp).Then(update)
txnresp, err := b.client.Txn(b.ctx).If(cmp).Then(update).Commit() txnresp, err := b.client.Txn(b.ctx).If(cmp).Then(update).Commit()
if err != nil { if err != nil {
return false, err return false, err
@ -157,7 +160,7 @@ func (b *etcdBackend) TriggerChord(groupUUID string) (bool, error) {
// 有写入或者删除竞争 // 有写入或者删除竞争
if !txnresp.Succeeded { if !txnresp.Succeeded {
return false, fmt.Errorf("update key failed") return false, fmt.Errorf("trigger chord failed, groupId: %s", groupUUID)
} }
return true, nil return true, nil
@ -185,7 +188,6 @@ func (b *etcdBackend) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) {
return meta, nil return meta, nil
} }
// Setting / getting task state
// SetStatePending updates task state to PENDING // SetStatePending updates task state to PENDING
func (b *etcdBackend) SetStatePending(signature *tasks.Signature) error { func (b *etcdBackend) SetStatePending(signature *tasks.Signature) error {
taskState := tasks.NewPendingTaskState(signature) taskState := tasks.NewPendingTaskState(signature)
@ -227,13 +229,14 @@ func (b *etcdBackend) SetStateFailure(signature *tasks.Signature, err string) er
return b.updateState(taskState) return b.updateState(taskState)
} }
// GetState ..
func (b *etcdBackend) GetState(taskUUID string) (*tasks.TaskState, error) { func (b *etcdBackend) GetState(taskUUID string) (*tasks.TaskState, error) {
return b.getState(b.ctx, taskUUID) return b.getState(b.ctx, taskUUID)
} }
func (b *etcdBackend) getState(ctx context.Context, taskUUID string) (*tasks.TaskState, error) { func (b *etcdBackend) getState(ctx context.Context, taskUUID string) (*tasks.TaskState, error) {
key := fmt.Sprintf(taskKey, taskUUID) key := fmt.Sprintf(taskKey, taskUUID)
resp, err := b.client.Get(b.ctx, key) resp, err := b.client.Get(ctx, key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -260,12 +263,14 @@ func (b *etcdBackend) mergeNewTaskState(newState *tasks.TaskState) {
} }
} }
// PurgeState ..
func (b *etcdBackend) PurgeState(taskUUID string) error { func (b *etcdBackend) PurgeState(taskUUID string) error {
key := fmt.Sprintf(taskKey, taskUUID) key := fmt.Sprintf(taskKey, taskUUID)
_, err := b.client.Delete(b.ctx, key) _, err := b.client.Delete(b.ctx, key)
return err return err
} }
// PurgeGroupMeta ..
func (b *etcdBackend) PurgeGroupMeta(groupUUID string) error { func (b *etcdBackend) PurgeGroupMeta(groupUUID string) error {
key := fmt.Sprintf(groupKey, groupUUID) key := fmt.Sprintf(groupKey, groupUUID)
_, err := b.client.Delete(b.ctx, key) _, err := b.client.Delete(b.ctx, key)
@ -322,7 +327,7 @@ func (b *etcdBackend) updateState(taskState *tasks.TaskState) error {
return nil return nil
} }
// getExpiration returns expiration for a stored task state // getLease returns expiration for a stored task state
func (b *etcdBackend) getLease() (*clientv3.LeaseGrantResponse, error) { func (b *etcdBackend) getLease() (*clientv3.LeaseGrantResponse, error) {
expiresIn := b.GetConfig().ResultsExpireIn expiresIn := b.GetConfig().ResultsExpireIn
if expiresIn <= 0 { if expiresIn <= 0 {