feat: add lock
							parent
							
								
									a095e14a71
								
							
						
					
					
						commit
						074d740817
					
				|  | @ -51,7 +51,7 @@ func (r etcdLock) LockWithRetries(key string, unixTsToExpireNs int64) error { | |||
| 			return nil | ||||
| 		} | ||||
| 
 | ||||
| 		log.INFO.Printf("acquired lock=%s failed, retries=%d", key, i) | ||||
| 		log.INFO.Printf("acquired lock=%s failed, retries=%d, err=%s", key, i, err) | ||||
| 		time.Sleep(time.Millisecond * 100) | ||||
| 	} | ||||
| 	return ErrRedisLockFailed | ||||
|  | @ -61,26 +61,21 @@ func (r etcdLock) LockWithRetries(key string, unixTsToExpireNs int64) error { | |||
| func (r etcdLock) Lock(key string, unixTsToExpireNs int64) error { | ||||
| 	now := time.Now().UnixNano() | ||||
| 	ttl := time.Duration(unixTsToExpireNs + 1 - now) | ||||
| 	lease := clientv3.NewLease(r.cli) | ||||
| 
 | ||||
| 	// 创建一个新的session
 | ||||
| 	s, err := concurrency.NewSession(r.cli, concurrency.WithTTL(int(ttl.Seconds()))) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	defer s.Orphan() | ||||
| 
 | ||||
| 	lockKey := fmt.Sprintf("/machinery/v2/lock/%s", strings.TrimRight(key, "/")) | ||||
| 	m := concurrency.NewMutex(s, lockKey) | ||||
| 
 | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	resp, err := lease.Grant(ctx, int64(ttl.Seconds())) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	// 创建一个新的session
 | ||||
| 	s, err := concurrency.NewSession(r.cli, concurrency.WithLease(resp.ID)) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	key = fmt.Sprintf("/machinery/v2/lock/%s", strings.TrimRight(key, "/")) | ||||
| 	m := concurrency.NewMutex(s, key) | ||||
| 
 | ||||
| 	if err := m.Lock(ctx); err != nil { | ||||
| 		_ = s.Close() | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue