add handle more

Browse Source
main
git 2024-06-10 18:21:36 +08:00
parent a51a6e5657
commit 8ec1227f87
Signed by: git
GPG Key ID: 3F65EFFA44207ADD
1 changed files with 14 additions and 13 deletions

View File

@ -136,19 +136,20 @@ func (b *etcdBroker) StartConsuming(consumerTag string, concurrency int, taskPro
defer b.wg.Done() defer b.wg.Done()
defer cancel() defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for { for {
select { select {
// A way to stop this goroutine from b.StopConsuming // A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan(): case <-b.GetStopChan():
return return
case <-ticker.C: default:
err := b.handleDelayedTask(ctx) more, err := b.handleDelayedTask(ctx)
if err != nil { if err != nil {
log.ERROR.Printf("handleDelayedTask err: %s", err) log.ERROR.Printf("handleDelayedTask err: %s", err)
time.Sleep(time.Second)
}
if !more {
time.Sleep(time.Second)
} }
} }
} }
@ -380,7 +381,7 @@ func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
return nil return nil
} }
func (b *etcdBroker) handleDelayedTask(ctx context.Context) error { func (b *etcdBroker) handleDelayedTask(ctx context.Context) (bool, error) {
ttl := time.Second * 10 ttl := time.Second * 10
ctx, cancel := context.WithTimeout(ctx, ttl) ctx, cancel := context.WithTimeout(ctx, ttl)
defer cancel() defer cancel()
@ -388,7 +389,7 @@ func (b *etcdBroker) handleDelayedTask(ctx context.Context) error {
// 创建一个新的session // 创建一个新的session
s, err := concurrency.NewSession(b.client, concurrency.WithTTL(int(ttl.Seconds()))) s, err := concurrency.NewSession(b.client, concurrency.WithTTL(int(ttl.Seconds())))
if err != nil { if err != nil {
return err return false, err
} }
defer s.Orphan() defer s.Orphan()
@ -396,17 +397,17 @@ func (b *etcdBroker) handleDelayedTask(ctx context.Context) error {
m := concurrency.NewMutex(s, lockKey) m := concurrency.NewMutex(s, lockKey)
if err = m.Lock(ctx); err != nil { if err = m.Lock(ctx); err != nil {
return err return false, err
} }
defer m.Unlock(ctx) // nolint defer m.Unlock(ctx) // nolint
keyPrefix := "/machinery/v2/broker/delayed_tasks/eta-" keyPrefix := "/machinery/v2/broker/delayed_tasks/eta-"
end := strconv.FormatInt(time.Now().UnixMilli(), 10) end := strconv.FormatInt(time.Now().UnixMilli(), 10)
kvs, err := b.client.Get(b.ctx, keyPrefix+"0", clientv3.WithRange(keyPrefix+end)) resp, err := b.client.Get(b.ctx, keyPrefix+"0", clientv3.WithRange(keyPrefix+end), clientv3.WithLimit(100))
if err != nil { if err != nil {
return err return false, err
} }
for _, kv := range kvs.Kvs { for _, kv := range resp.Kvs {
key := string(kv.Key) key := string(kv.Key)
parts := strings.Split(key, "/") parts := strings.Split(key, "/")
if len(parts) != 8 { if len(parts) != 8 {
@ -419,7 +420,7 @@ func (b *etcdBroker) handleDelayedTask(ctx context.Context) error {
putReq := clientv3.OpPut(pendingKey, string(kv.Value)) putReq := clientv3.OpPut(pendingKey, string(kv.Value))
c, err := b.client.Txn(b.ctx).If(cmp).Then(deleteReq, putReq).Commit() c, err := b.client.Txn(b.ctx).If(cmp).Then(deleteReq, putReq).Commit()
if err != nil { if err != nil {
return fmt.Errorf("handle delay task %s: %w", key, err) return false, fmt.Errorf("handle delay task %s: %w", key, err)
} }
if !c.Succeeded { if !c.Succeeded {
log.WARNING.Printf("handle delay task %s not success", key) log.WARNING.Printf("handle delay task %s not success", key)
@ -428,7 +429,7 @@ func (b *etcdBroker) handleDelayedTask(ctx context.Context) error {
log.DEBUG.Printf("send delay task %s to pending queue done", key) log.DEBUG.Printf("send delay task %s to pending queue done", key)
} }
return nil return resp.More, nil
} }
func getQueue(config *config.Config, taskProcessor iface.TaskProcessor) string { func getQueue(config *config.Config, taskProcessor iface.TaskProcessor) string {