141 lines
2.9 KiB
Go
141 lines
2.9 KiB
Go
package etcd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/RichardKnop/machinery/v2/log"
|
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
)
|
|
|
|
type checkFunc func(*mvccpb.KeyValue) error
|
|
|
|
// deleteRevKey deletes a key by revision, returning false if key is missing
|
|
func deleteRevKey(ctx context.Context, kv clientv3.KV, key string, rev int64) error {
|
|
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", rev)
|
|
req := clientv3.OpDelete(key)
|
|
txnresp, err := kv.Txn(ctx).If(cmp).Then(req).Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 有写入或者删除竞争
|
|
if !txnresp.Succeeded {
|
|
return fmt.Errorf("delete key failed: %s: %w", key, haveNoTaskErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func waitPrefixEvents(ctx context.Context, c *clientv3.Client, prefix string, rev int64) (*clientv3.Event, error) {
|
|
wc := c.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(rev))
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, haveNoTaskErr
|
|
|
|
case wresp := <-wc:
|
|
for _, ev := range wresp.Events {
|
|
if ev.Type != mvccpb.PUT {
|
|
continue
|
|
}
|
|
return ev, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func getItem(ctx context.Context, c *clientv3.Client, prefix string, opts []clientv3.OpOption, check checkFunc) (*mvccpb.KeyValue, error) {
|
|
resp, err := c.Get(ctx, prefix, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// nothing yet; wait on elements
|
|
if len(resp.Kvs) == 0 {
|
|
ev, err := waitPrefixEvents(ctx, c, prefix, resp.Header.Revision)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := check(ev.Kv); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := deleteRevKey(ctx, c, string(ev.Kv.Key), ev.Kv.ModRevision); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return ev.Kv, nil
|
|
}
|
|
|
|
kv := resp.Kvs[0]
|
|
if err := check(kv); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := deleteRevKey(ctx, c, string(kv.Key), kv.ModRevision); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return kv, nil
|
|
}
|
|
|
|
func getFirstItem(ctx context.Context, c *clientv3.Client, prefix string) ([]byte, error) {
|
|
// 按创建时间排序, 只返回单个数据
|
|
opts := clientv3.WithFirstRev()
|
|
|
|
check := func(*mvccpb.KeyValue) error {
|
|
return nil
|
|
}
|
|
|
|
kv, err := getItem(ctx, c, prefix, opts, check)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return kv.Value, nil
|
|
}
|
|
|
|
func getFirstETAItem(ctx context.Context, c *clientv3.Client, prefix string) ([]byte, error) {
|
|
// 按创建key排序, 只返回单个数据
|
|
opts := clientv3.WithFirstKey()
|
|
|
|
check := func(kv *mvccpb.KeyValue) error {
|
|
k := string(kv.Key)
|
|
if len(k) < 55 {
|
|
log.ERROR.Print("not valid eta key: %s", k)
|
|
return fmt.Errorf("not valid eta key: %s", k)
|
|
}
|
|
|
|
nsStr := k[36:55]
|
|
ns, err := strconv.ParseInt(nsStr, 10, 64)
|
|
if err != nil {
|
|
return fmt.Errorf("parse key: %w", err)
|
|
}
|
|
|
|
t := time.Unix(0, ns)
|
|
if t.After(time.Now()) {
|
|
time.Sleep(time.Millisecond * 100)
|
|
return haveNoTaskErr
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
kv, err := getItem(ctx, c, prefix, opts, check)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return kv.Value, nil
|
|
}
|
|
|
|
func putItem() {
|
|
|
|
}
|