chore: refine waitPrefixEvents (#5)

Browse Source
main
git 2024-05-19 23:09:35 +08:00 committed by GitHub
parent d42efb69d6
commit ebc5c30a94
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 10 additions and 27 deletions

View File

@ -2,7 +2,6 @@ package etcd
import (
"context"
"errors"
"fmt"
"strconv"
"time"
@ -12,13 +11,6 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
ErrKeyExists = errors.New("key already exists")
ErrWaitMismatch = errors.New("unexpected wait result")
ErrTooManyClients = errors.New("too many clients")
ErrNoWatcher = errors.New("no watcher channel")
)
type checkFunc func(*mvccpb.KeyValue) error
// deleteRevKey deletes a key by revision, returning false if key is missing
@ -38,27 +30,22 @@ func deleteRevKey(ctx context.Context, kv clientv3.KV, key string, rev int64) er
}
func waitPrefixEvents(ctx context.Context, c *clientv3.Client, prefix string, rev int64) (*clientv3.Event, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wc := c.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(rev))
if wc == nil {
return nil, ErrNoWatcher
}
return waitEvents(wc), nil
}
for {
select {
case <-ctx.Done():
return nil, haveNoTaskErr
func waitEvents(wc clientv3.WatchChan) *clientv3.Event {
for wresp := range wc {
case wresp := <-wc:
for _, ev := range wresp.Events {
if ev.Type != mvccpb.PUT {
continue
}
return ev
return ev, nil
}
}
}
return nil
}
func getItem(ctx context.Context, c *clientv3.Client, prefix string, opts []clientv3.OpOption, check checkFunc) (*mvccpb.KeyValue, error) {
@ -78,10 +65,6 @@ func getItem(ctx context.Context, c *clientv3.Client, prefix string, opts []clie
return nil, err
}
if ev == nil {
return nil, haveNoTaskErr
}
if err := check(ev.Kv); err != nil {
return nil, err
}