add mtx
parent
d8b9d33b82
commit
72a4a9731e
|
@ -134,20 +134,19 @@ func (b *etcdBroker) StartConsuming(consumerTag string, concurrency int, taskPro
|
||||||
defer b.wg.Done()
|
defer b.wg.Done()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
ticker := time.NewTicker(time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
// A way to stop this goroutine from b.StopConsuming
|
// A way to stop this goroutine from b.StopConsuming
|
||||||
case <-b.GetStopChan():
|
case <-b.GetStopChan():
|
||||||
return
|
return
|
||||||
|
|
||||||
default:
|
case <-ticker.C:
|
||||||
more, err := b.handleDelayedTask(ctx)
|
err := b.handleDelayedTask(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ERROR.Printf("handleDelayedTask err: %s", err)
|
log.ERROR.Printf("handleDelayedTask err: %s", err)
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
if !more {
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -395,7 +394,7 @@ func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *etcdBroker) handleDelayedTask(ctx context.Context) (bool, error) {
|
func (b *etcdBroker) handleDelayedTask(ctx context.Context) error {
|
||||||
ttl := time.Second * 10
|
ttl := time.Second * 10
|
||||||
ctx, cancel := context.WithTimeout(ctx, ttl)
|
ctx, cancel := context.WithTimeout(ctx, ttl)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -403,7 +402,7 @@ func (b *etcdBroker) handleDelayedTask(ctx context.Context) (bool, error) {
|
||||||
// 创建一个新的session
|
// 创建一个新的session
|
||||||
s, err := concurrency.NewSession(b.client, concurrency.WithTTL(int(ttl.Seconds())))
|
s, err := concurrency.NewSession(b.client, concurrency.WithTTL(int(ttl.Seconds())))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
defer s.Orphan()
|
defer s.Orphan()
|
||||||
|
|
||||||
|
@ -411,15 +410,15 @@ func (b *etcdBroker) handleDelayedTask(ctx context.Context) (bool, error) {
|
||||||
m := concurrency.NewMutex(s, lockKey)
|
m := concurrency.NewMutex(s, lockKey)
|
||||||
|
|
||||||
if err = m.Lock(ctx); err != nil {
|
if err = m.Lock(ctx); err != nil {
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
defer m.Unlock(ctx) // nolint
|
defer m.Unlock(ctx) // nolint
|
||||||
|
|
||||||
keyPrefix := "/machinery/v2/broker/delayed_tasks/eta-"
|
keyPrefix := "/machinery/v2/broker/delayed_tasks/eta-"
|
||||||
end := strconv.FormatInt(time.Now().UnixMilli(), 10)
|
end := strconv.FormatInt(time.Now().UnixMilli(), 10)
|
||||||
resp, err := b.client.Get(b.ctx, keyPrefix+"0", clientv3.WithRange(keyPrefix+end), clientv3.WithLimit(100))
|
resp, err := b.client.Get(b.ctx, keyPrefix+"0", clientv3.WithRange(keyPrefix+end))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
for _, kv := range resp.Kvs {
|
for _, kv := range resp.Kvs {
|
||||||
key := string(kv.Key)
|
key := string(kv.Key)
|
||||||
|
@ -434,7 +433,7 @@ func (b *etcdBroker) handleDelayedTask(ctx context.Context) (bool, error) {
|
||||||
putReq := clientv3.OpPut(pendingKey, string(kv.Value))
|
putReq := clientv3.OpPut(pendingKey, string(kv.Value))
|
||||||
c, err := b.client.Txn(b.ctx).If(cmp).Then(deleteReq, putReq).Commit()
|
c, err := b.client.Txn(b.ctx).If(cmp).Then(deleteReq, putReq).Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("handle delay task %s: %w", key, err)
|
return fmt.Errorf("handle delay task %s: %w", key, err)
|
||||||
}
|
}
|
||||||
if !c.Succeeded {
|
if !c.Succeeded {
|
||||||
log.WARNING.Printf("handle delay task %s not success", key)
|
log.WARNING.Printf("handle delay task %s not success", key)
|
||||||
|
@ -443,7 +442,7 @@ func (b *etcdBroker) handleDelayedTask(ctx context.Context) (bool, error) {
|
||||||
log.DEBUG.Printf("send delay task %s to pending queue done", key)
|
log.DEBUG.Printf("send delay task %s to pending queue done", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp.More, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getQueue(config *config.Config, taskProcessor iface.TaskProcessor) string {
|
func getQueue(config *config.Config, taskProcessor iface.TaskProcessor) string {
|
||||||
|
|
Loading…
Reference in New Issue