package etcd import ( "context" "errors" "fmt" "strings" "time" "github.com/RichardKnop/machinery/v2/config" "github.com/RichardKnop/machinery/v2/locks/iface" "github.com/RichardKnop/machinery/v2/log" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" ) var ( ErrRedisLockFailed = errors.New("redis lock: failed to acquire lock") ) type etcdLock struct { globalConf *config.Config conf clientv3.Config cli *clientv3.Client retries int } func New(cnf *config.Config, endpoint string, retries int) (iface.Lock, error) { etcdConf := clientv3.Config{Endpoints: []string{endpoint}} cli, err := clientv3.New(etcdConf) if err != nil { return nil, err } lock := etcdLock{ globalConf: cnf, conf: etcdConf, cli: cli, retries: retries, } return &lock, nil } func (r etcdLock) LockWithRetries(key string, unixTsToExpireNs int64) error { for i := 0; i < r.retries; i++ { err := r.Lock(key, unixTsToExpireNs) if err == nil { // 成功拿到锁,返回 return nil } log.INFO.Printf("acquired lock=%s failed, retries=%d", key, i) time.Sleep(time.Millisecond * 100) } return ErrRedisLockFailed } func (r etcdLock) Lock(key string, unixTsToExpireNs int64) error { now := time.Now().UnixNano() ttl := time.Duration(unixTsToExpireNs + 1 - now) lease := clientv3.NewLease(r.cli) ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() resp, err := lease.Grant(ctx, int64(ttl.Seconds())) if err != nil { return err } // 创建一个新的session s, err := concurrency.NewSession(r.cli, concurrency.WithLease(resp.ID)) if err != nil { return err } key = fmt.Sprintf("/machinery/v2/lock/%s", strings.TrimRight(key, "/")) m := concurrency.NewMutex(s, key) if err := m.Lock(ctx); err != nil { return err } log.INFO.Printf("acquired lock=%s, duration=%s", key, ttl) return nil }