finish etcd backend

Browse Source
main
git 2024-05-25 16:04:29 +08:00
parent 2d0bf711bf
commit 228b8e21eb
Signed by: git
GPG Key ID: 3F65EFFA44207ADD
1 changed files with 27 additions and 8 deletions

View File

@ -64,12 +64,16 @@ func (b *etcdBackend) InitGroup(groupUUID string, taskUUIDs []string) error {
if err != nil {
return err
}
lease, err := b.getLease()
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(b.ctx, time.Second*2)
defer cancel()
key := fmt.Sprintf(groupKey, groupUUID)
_, err = b.client.KV.Put(ctx, key, string(encoded))
_, err = b.client.Put(ctx, key, string(encoded), clientv3.WithLease(lease))
return err
}
@ -136,8 +140,13 @@ func (b *etcdBackend) TriggerChord(groupUUID string) (bool, error) {
return false, err
}
lease, err := b.getLease()
if err != nil {
return false, err
}
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", kv.ModRevision)
update := clientv3.OpPut(key, string(encoded))
update := clientv3.OpPut(key, string(encoded), clientv3.WithLease(lease))
b.client.Txn(b.ctx).If(cmp).Then(update)
txnresp, err := b.client.Txn(b.ctx).If(cmp).Then(update).Commit()
@ -252,13 +261,13 @@ func (b *etcdBackend) mergeNewTaskState(newState *tasks.TaskState) {
func (b *etcdBackend) PurgeState(taskUUID string) error {
key := fmt.Sprintf(taskKey, taskUUID)
_, err := b.client.KV.Delete(b.ctx, key)
_, err := b.client.Delete(b.ctx, key)
return err
}
func (b *etcdBackend) PurgeGroupMeta(groupUUID string) error {
key := fmt.Sprintf(groupKey, groupUUID)
_, err := b.client.KV.Delete(b.ctx, key)
_, err := b.client.Delete(b.ctx, key)
return err
}
@ -296,8 +305,13 @@ func (b *etcdBackend) updateState(taskState *tasks.TaskState) error {
return err
}
lease, err := b.getLease()
if err != nil {
return err
}
key := fmt.Sprintf(taskKey, taskState.TaskUUID)
_, err = b.client.Put(b.ctx, key, string(encoded))
_, err = b.client.Put(b.ctx, key, string(encoded), clientv3.WithLease(lease))
if err != nil {
return err
}
@ -307,12 +321,17 @@ func (b *etcdBackend) updateState(taskState *tasks.TaskState) error {
}
// getExpiration returns expiration for a stored task state
func (b *etcdBackend) getExpiration() time.Duration {
func (b *etcdBackend) getLease() (clientv3.LeaseID, error) {
expiresIn := b.GetConfig().ResultsExpireIn
if expiresIn == 0 {
if expiresIn <= 0 {
// expire results after 1 hour by default
expiresIn = config.DefaultResultsExpireIn
}
return time.Duration(expiresIn) * time.Second
resp, err := b.client.Grant(b.ctx, int64(expiresIn))
if err != nil {
return clientv3.NoLease, err
}
return resp.ID, nil
}