add mtx

Browse Source
main
git 2024-06-10 18:39:47 +08:00
parent 8ec1227f87
commit d8b9d33b82
Signed by: git
GPG Key ID: 3F65EFFA44207ADD
1 changed files with 25 additions and 11 deletions

View File

@ -116,17 +116,15 @@ func (b *etcdBroker) StartConsuming(consumerTag string, concurrency int, taskPro
continue continue
} }
task, err := b.nextTask(getQueue(b.GetConfig(), taskProcessor), consumerTag) task := b.nextTask(getQueue(b.GetConfig(), taskProcessor), consumerTag)
if err != nil { if task == nil {
log.ERROR.Printf("get next task failed: %s", err) time.Sleep(time.Second)
continue continue
} }
if task != nil {
deliveries <- task deliveries <- task
} }
} }
}
}() }()
// A goroutine to watch for delayed tasks and push them to deliveries // A goroutine to watch for delayed tasks and push them to deliveries
@ -297,8 +295,15 @@ func (b *etcdBroker) GetDelayedTasks() ([]*tasks.Signature, error) {
return items, nil return items, nil
} }
func (b *etcdBroker) nextTask(queue string, consumerTag string) (Delivery, error) { func (b *etcdBroker) nextTask(queue string, consumerTag string) Delivery {
for k, assigned := range b.assignMap { b.mtx.Lock()
assignMap := make(map[string]bool, len(b.assignMap))
for k, v := range assignMap {
assignMap[k] = v
}
b.mtx.Unlock()
for k, assigned := range assignMap {
if assigned { if assigned {
continue continue
} }
@ -311,11 +316,10 @@ func (b *etcdBroker) nextTask(queue string, consumerTag string) (Delivery, error
continue continue
} }
return d, nil return d
} }
time.Sleep(time.Second) return nil
return b.nextTask(queue, consumerTag)
} }
func (b *etcdBroker) setAssign(key string, assign bool) bool { func (b *etcdBroker) setAssign(key string, assign bool) bool {
@ -350,7 +354,9 @@ func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
if b.setAssign(key, true) { if b.setAssign(key, true) {
continue continue
} }
b.mtx.Lock()
b.assignMap[key] = false b.assignMap[key] = false
b.mtx.Unlock()
} }
// Watch // Watch
@ -358,6 +364,10 @@ func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
defer watchCancel() defer watchCancel()
wc := b.client.Watch(watchCtx, keyPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly(), clientv3.WithRev(resp.Header.Revision)) wc := b.client.Watch(watchCtx, keyPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly(), clientv3.WithRev(resp.Header.Revision))
for wresp := range wc { for wresp := range wc {
if wresp.Err() != nil {
return watchCtx.Err()
}
for _, ev := range wresp.Events { for _, ev := range wresp.Events {
key := string(ev.Kv.Key) key := string(ev.Kv.Key)
if ev.Type == clientv3.EventTypeDelete { if ev.Type == clientv3.EventTypeDelete {
@ -365,7 +375,9 @@ func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
continue continue
} }
b.mtx.Lock()
delete(b.assignMap, key) delete(b.assignMap, key)
b.mtx.Unlock()
} }
if ev.Type == clientv3.EventTypePut { if ev.Type == clientv3.EventTypePut {
@ -373,7 +385,9 @@ func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
continue continue
} }
b.mtx.Lock()
b.assignMap[key] = false b.assignMap[key] = false
b.mtx.Unlock()
} }
} }
} }