finish delivery

Browse Source
main
git 2024-06-08 23:50:48 +08:00
parent 611441b5a2
commit 5931e3ce8f
Signed by: git
GPG Key ID: 3F65EFFA44207ADD
2 changed files with 50 additions and 26 deletions

View File

@ -55,17 +55,21 @@ func (d *deliver) assign(key string, node string) error {
return err return err
} }
cmp := clientv3.Compare(clientv3.CreateRevision(assignKey), "=", 0) keyExist := clientv3.Compare(clientv3.CreateRevision(key), ">", 0)
assignNotExist := clientv3.Compare(clientv3.CreateRevision(assignKey), "=", 0)
putReq := clientv3.OpPut(assignKey, node, clientv3.WithLease(grantResp.ID)) putReq := clientv3.OpPut(assignKey, node, clientv3.WithLease(grantResp.ID))
getReq := clientv3.OpGet(key) getReq := clientv3.OpGet(key)
resp, err := d.client.Txn(ctx).If(cmp).Then(putReq, getReq).Commit() resp, err := d.client.Txn(ctx).If(keyExist, assignNotExist).Then(putReq, getReq).Commit()
if err != nil { if err != nil {
return err return err
} }
if !resp.Succeeded { if !resp.Succeeded {
return fmt.Errorf("key %s already assign", key) return fmt.Errorf("key %s not exist or already assign ", key)
} }
if len(resp.Responses) < 2 {
return fmt.Errorf("have no resp %s", key)
}
getResp := resp.Responses[1].GetResponseRange() getResp := resp.Responses[1].GetResponseRange()
if len(getResp.Kvs) == 0 { if len(getResp.Kvs) == 0 {
return fmt.Errorf("have no task %s", key) return fmt.Errorf("have no task %s", key)

View File

@ -30,7 +30,7 @@ type etcdBroker struct {
ctx context.Context ctx context.Context
client *clientv3.Client client *clientv3.Client
wg sync.WaitGroup wg sync.WaitGroup
keyMap map[string]struct{} assignMap map[string]bool
mtx sync.Mutex mtx sync.Mutex
} }
@ -52,7 +52,7 @@ func New(ctx context.Context, conf *config.Config) (iface.Broker, error) {
Broker: common.NewBroker(conf), Broker: common.NewBroker(conf),
ctx: ctx, ctx: ctx,
client: client, client: client,
keyMap: map[string]struct{}{}, assignMap: map[string]bool{},
} }
return &broker, nil return &broker, nil
@ -296,7 +296,10 @@ func (b *etcdBroker) GetDelayedTasks() ([]*tasks.Signature, error) {
} }
func (b *etcdBroker) nextTask(queue string, consumerTag string) (Delivery, error) { func (b *etcdBroker) nextTask(queue string, consumerTag string) (Delivery, error) {
for k := range b.keyMap { for k, assigned := range b.assignMap {
if assigned {
continue
}
if !strings.Contains(k, queue) { if !strings.Contains(k, queue) {
continue continue
} }
@ -313,6 +316,22 @@ func (b *etcdBroker) nextTask(queue string, consumerTag string) (Delivery, error
return b.nextTask(queue, consumerTag) return b.nextTask(queue, consumerTag)
} }
func (b *etcdBroker) setAssign(key string, assign bool) bool {
if !strings.Contains(key, "/assign") {
return false
}
k := strings.TrimSuffix(key, "/assign")
b.mtx.Lock()
defer b.mtx.Unlock()
if _, ok := b.assignMap[k]; ok {
b.assignMap[k] = assign
}
return true
}
func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error { func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
keyPrefix := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s", queue) keyPrefix := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s", queue)
@ -325,11 +344,11 @@ func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
} }
for _, kv := range resp.Kvs { for _, kv := range resp.Kvs {
if bytes.Contains(kv.Key, []byte("/assign")) { key := string(kv.Key)
delete(b.keyMap, string(kv.Key)[:23]) if b.setAssign(key, true) {
continue continue
} }
b.keyMap[string(kv.Key)] = struct{}{} b.assignMap[key] = false
} }
// Watch // Watch
@ -338,20 +357,21 @@ func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
wc := b.client.Watch(watchCtx, keyPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly(), clientv3.WithRev(resp.Header.Revision)) wc := b.client.Watch(watchCtx, keyPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly(), clientv3.WithRev(resp.Header.Revision))
for wresp := range wc { for wresp := range wc {
for _, ev := range wresp.Events { for _, ev := range wresp.Events {
key := string(ev.Kv.Key)
if ev.Type == clientv3.EventTypeDelete { if ev.Type == clientv3.EventTypeDelete {
if bytes.Contains(ev.Kv.Key, []byte("/assign")) { if b.setAssign(key, false) {
b.keyMap[string(ev.Kv.Key)] = struct{}{}
}
continue continue
} }
delete(b.assignMap, key)
}
if ev.Type == clientv3.EventTypePut { if ev.Type == clientv3.EventTypePut {
if bytes.Contains(ev.Kv.Key, []byte("/assign")) { if b.setAssign(key, true) {
delete(b.keyMap, string(ev.Kv.Key)[:23])
continue continue
} }
b.keyMap[string(ev.Kv.Key)] = struct{}{} b.assignMap[key] = false
} }
} }
} }