finish delivery
parent
051e4e7e34
commit
daa0b5a0ba
|
@ -1,10 +1,13 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/RichardKnop/machinery/v2/brokers/errs"
|
||||
"github.com/RichardKnop/machinery/v2/log"
|
||||
"github.com/RichardKnop/machinery/v2/tasks"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
@ -13,7 +16,7 @@ import (
|
|||
type Delivery interface {
|
||||
Ack()
|
||||
Nack()
|
||||
Body() string
|
||||
Body() []byte
|
||||
Signature() *tasks.Signature
|
||||
}
|
||||
|
||||
|
@ -21,30 +24,90 @@ type deliver struct {
|
|||
ctx context.Context
|
||||
client *clientv3.Client
|
||||
signature *tasks.Signature
|
||||
value string
|
||||
value []byte
|
||||
key string
|
||||
aliveCancel func()
|
||||
}
|
||||
|
||||
func NewDelivery(ctx context.Context, key string) (Delivery, error) {
|
||||
return &deliver{}, nil
|
||||
func NewDelivery(ctx context.Context, client *clientv3.Client, key string) (Delivery, error) {
|
||||
d := &deliver{
|
||||
ctx: ctx,
|
||||
client: client,
|
||||
key: key,
|
||||
}
|
||||
|
||||
if err := d.assign(key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return d, nil
|
||||
}
|
||||
|
||||
func (d *deliver) assign(key string) error {
|
||||
assignKey := fmt.Sprintf("%s/assign", key)
|
||||
ctx, cancel := context.WithTimeout(d.ctx, time.Second*5)
|
||||
defer cancel()
|
||||
|
||||
grantResp, err := d.client.Grant(ctx, 30)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmp := clientv3.Compare(clientv3.ModRevision(assignKey), "=", "0")
|
||||
putReq := clientv3.OpPut(assignKey, "node", clientv3.WithLease(grantResp.ID), clientv3.WithCreatedNotify())
|
||||
getReq := clientv3.OpGet(key)
|
||||
resp, err := d.client.Txn(ctx).If(cmp).Then(putReq, getReq).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !resp.Succeeded {
|
||||
return fmt.Errorf("key %s already assign", key)
|
||||
}
|
||||
|
||||
getResp := resp.Responses[1].GetResponseRange()
|
||||
if len(getResp.Kvs) == 0 {
|
||||
return fmt.Errorf("have no task %s", key)
|
||||
}
|
||||
kv := getResp.Kvs[0]
|
||||
|
||||
signature := new(tasks.Signature)
|
||||
decoder := json.NewDecoder(bytes.NewReader(kv.Value))
|
||||
decoder.UseNumber()
|
||||
if err = decoder.Decode(signature); err != nil {
|
||||
return errs.NewErrCouldNotUnmarshalTaskSignature(kv.Value, err)
|
||||
}
|
||||
|
||||
aliveCtx, aliveCancel := context.WithCancel(d.ctx)
|
||||
if _, err = d.client.KeepAlive(aliveCtx, grantResp.ID); err != nil {
|
||||
aliveCancel()
|
||||
return err
|
||||
}
|
||||
|
||||
d.aliveCancel = aliveCancel
|
||||
d.signature = signature
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *deliver) Ack() {
|
||||
keyPrefix := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s", d.signature.RoutingKey, d.signature.UUID)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer d.aliveCancel()
|
||||
|
||||
ctx, cancel := context.WithTimeout(d.ctx, time.Second*2)
|
||||
defer cancel()
|
||||
|
||||
_, err := d.client.Delete(ctx, keyPrefix, clientv3.WithPrefix())
|
||||
_, err := d.client.Delete(ctx, d.key, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
log.ERROR.Printf("ack task %s err: %s", d.value, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *deliver) Nack() {
|
||||
keyPrefix := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s/state", d.signature.RoutingKey, d.signature.UUID)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer d.aliveCancel()
|
||||
|
||||
assignKey := fmt.Sprintf("%s/assign", d.key)
|
||||
ctx, cancel := context.WithTimeout(d.ctx, time.Second*2)
|
||||
defer cancel()
|
||||
|
||||
_, err := d.client.Delete(ctx, keyPrefix)
|
||||
_, err := d.client.Delete(ctx, assignKey)
|
||||
if err != nil {
|
||||
log.ERROR.Printf("nack task %s err: %s", d.value, err)
|
||||
}
|
||||
|
@ -54,6 +117,6 @@ func (d *deliver) Signature() *tasks.Signature {
|
|||
return d.signature
|
||||
}
|
||||
|
||||
func (d *deliver) Body() string {
|
||||
func (d *deliver) Body() []byte {
|
||||
return d.value
|
||||
}
|
||||
|
|
|
@ -282,7 +282,7 @@ func (b *etcdBroker) nextTask(queue string) (Delivery, error) {
|
|||
return nil, errs.NewErrCouldNotUnmarshalTaskSignature(item, err)
|
||||
}
|
||||
|
||||
d, err := NewDelivery(ctx, "")
|
||||
d, err := NewDelivery(b.ctx, b.client, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue