diff --git a/locks/etcd/etcd.go b/locks/etcd/etcd.go index 0581971..c175d3f 100644 --- a/locks/etcd/etcd.go +++ b/locks/etcd/etcd.go @@ -51,7 +51,7 @@ func (r etcdLock) LockWithRetries(key string, unixTsToExpireNs int64) error { return nil } - log.INFO.Printf("acquired lock=%s failed, retries=%d", key, i) + log.INFO.Printf("acquired lock=%s failed, retries=%d, err=%s", key, i, err) time.Sleep(time.Millisecond * 100) } return ErrRedisLockFailed @@ -61,26 +61,21 @@ func (r etcdLock) LockWithRetries(key string, unixTsToExpireNs int64) error { func (r etcdLock) Lock(key string, unixTsToExpireNs int64) error { now := time.Now().UnixNano() ttl := time.Duration(unixTsToExpireNs + 1 - now) - lease := clientv3.NewLease(r.cli) + + // 创建一个新的session + s, err := concurrency.NewSession(r.cli, concurrency.WithTTL(int(ttl.Seconds()))) + if err != nil { + return err + } + defer s.Orphan() + + lockKey := fmt.Sprintf("/machinery/v2/lock/%s", strings.TrimRight(key, "/")) + m := concurrency.NewMutex(s, lockKey) ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() - - resp, err := lease.Grant(ctx, int64(ttl.Seconds())) - if err != nil { - return err - } - - // 创建一个新的session - s, err := concurrency.NewSession(r.cli, concurrency.WithLease(resp.ID)) - if err != nil { - return err - } - - key = fmt.Sprintf("/machinery/v2/lock/%s", strings.TrimRight(key, "/")) - m := concurrency.NewMutex(s, key) - if err := m.Lock(ctx); err != nil { + _ = s.Close() return err }