finish handleDelayedTask
parent
4224490a66
commit
051e4e7e34
|
@ -54,7 +54,7 @@ machinery-plugins is based on the Apache 2.0 protocol. Please refer to [LICENSE]
|
|||
|
||||
## 设计
|
||||
push -> pending_tasks/{queue}/{task_uuid}
|
||||
handle -> 获取keys-only, 判断是否有consumer,如果没有,原子添加consumer -> 成功 -> handler -> delete(每次全量key N,可以对吧lock)
|
||||
handle -> 获取keys-only, 判断是否有consumer,list/watch(可用key列表) -> 原子操作 -> handler -> delete(每次全量key N,可以对吧lock)
|
||||
|
||||
delay -> delay_tasks/r{xx}-taskuuid
|
||||
handlr -> 按key排序,获取最新的一个(可以大于当前时间) -> 原子put/delete操作 -> done (性能没有问题, 多个服务会有冲突,最多N-1,可以对比Lock)
|
|
@ -0,0 +1,59 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/RichardKnop/machinery/v2/log"
|
||||
"github.com/RichardKnop/machinery/v2/tasks"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
type Delivery interface {
|
||||
Ack()
|
||||
Nack()
|
||||
Body() string
|
||||
Signature() *tasks.Signature
|
||||
}
|
||||
|
||||
type deliver struct {
|
||||
ctx context.Context
|
||||
client *clientv3.Client
|
||||
signature *tasks.Signature
|
||||
value string
|
||||
}
|
||||
|
||||
func NewDelivery(ctx context.Context, key string) (Delivery, error) {
|
||||
return &deliver{}, nil
|
||||
}
|
||||
|
||||
func (d *deliver) Ack() {
|
||||
keyPrefix := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s", d.signature.RoutingKey, d.signature.UUID)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
_, err := d.client.Delete(ctx, keyPrefix, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
log.ERROR.Printf("ack task %s err: %s", d.value, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *deliver) Nack() {
|
||||
keyPrefix := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s/state", d.signature.RoutingKey, d.signature.UUID)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
_, err := d.client.Delete(ctx, keyPrefix)
|
||||
if err != nil {
|
||||
log.ERROR.Printf("nack task %s err: %s", d.value, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *deliver) Signature() *tasks.Signature {
|
||||
return d.signature
|
||||
}
|
||||
|
||||
func (d *deliver) Body() string {
|
||||
return d.value
|
||||
}
|
|
@ -7,6 +7,8 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -17,38 +19,12 @@ import (
|
|||
"github.com/RichardKnop/machinery/v2/log"
|
||||
"github.com/RichardKnop/machinery/v2/tasks"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var haveNoTaskErr = errors.New("have no task")
|
||||
|
||||
// Signature ..
|
||||
type Signature struct {
|
||||
*tasks.Signature
|
||||
CreateAt time.Time `json:"CreatedAt"`
|
||||
Score int64 `json:"Score"` // 写入时间, 队列排序用
|
||||
body []byte `json:"-"` // 序列号后的值
|
||||
}
|
||||
|
||||
func (s *Signature) Body() ([]byte, error) {
|
||||
if len(s.body) > 0 {
|
||||
return s.body, nil
|
||||
}
|
||||
|
||||
body, err := json.Marshal(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.body = body
|
||||
return s.body, nil
|
||||
}
|
||||
|
||||
func (s *Signature) String() string {
|
||||
body, _ := s.Body()
|
||||
return string(body)
|
||||
}
|
||||
|
||||
type etcdBroker struct {
|
||||
common.Broker
|
||||
ctx context.Context
|
||||
|
@ -90,7 +66,7 @@ func (b *etcdBroker) StartConsuming(consumerTag string, concurrency int, taskPro
|
|||
log.INFO.Printf("[*] Waiting for messages, concurrency=%d. To exit press CTRL+C", concurrency)
|
||||
|
||||
// Channel to which we will push tasks ready for processing by worker
|
||||
deliveries := make(chan *Signature)
|
||||
deliveries := make(chan Delivery)
|
||||
|
||||
// A receiving goroutine keeps popping messages from the queue by BLPOP
|
||||
// If the message is valid and can be unmarshaled into a proper structure
|
||||
|
@ -131,6 +107,8 @@ func (b *etcdBroker) StartConsuming(consumerTag string, concurrency int, taskPro
|
|||
b.wg.Add(1)
|
||||
go func() {
|
||||
defer b.wg.Done()
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -138,17 +116,10 @@ func (b *etcdBroker) StartConsuming(consumerTag string, concurrency int, taskPro
|
|||
case <-b.GetStopChan():
|
||||
return
|
||||
|
||||
default:
|
||||
task, err := b.nextDelayedTask()
|
||||
case <-ticker.C:
|
||||
err := b.handleDelayedTask()
|
||||
if err != nil {
|
||||
if !errors.Is(err, haveNoTaskErr) {
|
||||
log.ERROR.Print(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if err := b.Publish(context.Background(), task.Signature); err != nil {
|
||||
log.ERROR.Print(err)
|
||||
log.ERROR.Printf("handleDelayedTask err: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -165,7 +136,7 @@ func (b *etcdBroker) StartConsuming(consumerTag string, concurrency int, taskPro
|
|||
|
||||
// consume takes delivered messages from the channel and manages a worker pool
|
||||
// to process tasks concurrently
|
||||
func (b *etcdBroker) consume(deliveries <-chan *Signature, concurrency int, taskProcessor iface.TaskProcessor) error {
|
||||
func (b *etcdBroker) consume(deliveries <-chan Delivery, concurrency int, taskProcessor iface.TaskProcessor) error {
|
||||
eg, ctx := errgroup.WithContext(context.Background())
|
||||
eg.SetLimit(concurrency)
|
||||
|
||||
|
@ -194,23 +165,23 @@ func (b *etcdBroker) consume(deliveries <-chan *Signature, concurrency int, task
|
|||
}
|
||||
|
||||
// consumeOne processes a single message using TaskProcessor
|
||||
func (b *etcdBroker) consumeOne(signature *Signature, taskProcessor iface.TaskProcessor) error {
|
||||
func (b *etcdBroker) consumeOne(d Delivery, taskProcessor iface.TaskProcessor) error {
|
||||
// If the task is not registered, we requeue it,
|
||||
// there might be different workers for processing specific tasks
|
||||
if !b.IsTaskRegistered(signature.Name) {
|
||||
if signature.IgnoreWhenTaskNotRegistered {
|
||||
if !b.IsTaskRegistered(d.Signature().Name) {
|
||||
if d.Signature().IgnoreWhenTaskNotRegistered {
|
||||
return nil
|
||||
}
|
||||
log.INFO.Printf("Task not registered with this worker. Requeuing message: %s", signature)
|
||||
// b.requeueMessage(signature, taskProcessor)
|
||||
log.INFO.Printf("Task not registered with this worker. Requeuing message: %s", d.Body())
|
||||
d.Nack()
|
||||
return nil
|
||||
}
|
||||
|
||||
log.DEBUG.Printf("Received new message: %s", signature)
|
||||
log.DEBUG.Printf("Received new message: %s", d.Body())
|
||||
|
||||
err := taskProcessor.Process(d.Signature())
|
||||
d.Ack()
|
||||
|
||||
err := taskProcessor.Process(signature.Signature)
|
||||
// ack
|
||||
// b.deleteKey(si)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -227,42 +198,34 @@ func (b *etcdBroker) Publish(ctx context.Context, signature *tasks.Signature) er
|
|||
b.Broker.AdjustRoutingKey(signature)
|
||||
|
||||
now := time.Now()
|
||||
s := Signature{
|
||||
Signature: signature,
|
||||
CreateAt: now,
|
||||
Score: now.UnixMilli(),
|
||||
}
|
||||
|
||||
msg, err := json.Marshal(s)
|
||||
msg, err := json.Marshal(signature)
|
||||
if err != nil {
|
||||
return fmt.Errorf("JSON marshal error: %s", err)
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s", s.RoutingKey, s.UUID)
|
||||
key := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s", signature.RoutingKey, signature.UUID)
|
||||
|
||||
// Check the ETA signature field, if it is set and it is in the future,
|
||||
// delay the task
|
||||
if s.ETA != nil {
|
||||
if s.ETA.After(now) {
|
||||
key = fmt.Sprintf("/machinery/v2/broker/delayed_tasks/t%d-%s", s.ETA.UnixNano(), s.UUID)
|
||||
if signature.ETA != nil && signature.ETA.After(now) {
|
||||
key = fmt.Sprintf("/machinery/v2/broker/delayed_tasks/eta-%d/%s/%s", signature.ETA.UnixMilli(), signature.RoutingKey, signature.UUID)
|
||||
_, err = b.client.Put(ctx, key, string(msg))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
_, err = b.client.Put(ctx, key, string(msg))
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *etcdBroker) getTasks(ctx context.Context, key string) ([]*Signature, error) {
|
||||
func (b *etcdBroker) getTasks(ctx context.Context, key string) ([]*tasks.Signature, error) {
|
||||
resp, err := b.client.Get(ctx, key, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make([]*Signature, 0, len(resp.Kvs))
|
||||
result := make([]*tasks.Signature, 0, len(resp.Kvs))
|
||||
for _, kvs := range resp.Kvs {
|
||||
signature := new(Signature)
|
||||
signature := new(tasks.Signature)
|
||||
decoder := json.NewDecoder(bytes.NewReader(kvs.Value))
|
||||
decoder.UseNumber()
|
||||
if err := decoder.Decode(signature); err != nil {
|
||||
|
@ -287,12 +250,7 @@ func (b *etcdBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
rawTasks := make([]*tasks.Signature, 0, len(items))
|
||||
for _, v := range items {
|
||||
rawTasks = append(rawTasks, v.Signature)
|
||||
}
|
||||
|
||||
return rawTasks, nil
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// GetDelayedTasks 任务统计可使用
|
||||
|
@ -304,15 +262,10 @@ func (b *etcdBroker) GetDelayedTasks() ([]*tasks.Signature, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
rawTasks := make([]*tasks.Signature, 0, len(items))
|
||||
for _, v := range items {
|
||||
rawTasks = append(rawTasks, v.Signature)
|
||||
return items, nil
|
||||
}
|
||||
|
||||
return rawTasks, nil
|
||||
}
|
||||
|
||||
func (b *etcdBroker) nextTask(queue string) (*Signature, error) {
|
||||
func (b *etcdBroker) nextTask(queue string) (Delivery, error) {
|
||||
keyPrefix := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s", queue)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
|
||||
defer cancel()
|
||||
|
@ -322,55 +275,72 @@ func (b *etcdBroker) nextTask(queue string) (*Signature, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
signature := new(Signature)
|
||||
signature := new(tasks.Signature)
|
||||
decoder := json.NewDecoder(bytes.NewReader(item))
|
||||
decoder.UseNumber()
|
||||
if err := decoder.Decode(signature); err != nil {
|
||||
return nil, errs.NewErrCouldNotUnmarshalTaskSignature(item, err)
|
||||
}
|
||||
|
||||
return signature, nil
|
||||
}
|
||||
|
||||
func (b *etcdBroker) nextDelayedTask() (*Signature, error) {
|
||||
keyPrefix := "/machinery/v2/broker/delayed_tasks"
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
|
||||
defer cancel()
|
||||
|
||||
item, err := getFirstETAItem(ctx, b.client, keyPrefix)
|
||||
d, err := NewDelivery(ctx, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
signature := new(Signature)
|
||||
decoder := json.NewDecoder(bytes.NewReader(item))
|
||||
decoder.UseNumber()
|
||||
if err := decoder.Decode(signature); err != nil {
|
||||
return nil, errs.NewErrCouldNotUnmarshalTaskSignature(item, err)
|
||||
return d, nil
|
||||
}
|
||||
|
||||
return signature, nil
|
||||
}
|
||||
func (b *etcdBroker) handleDelayedTask() error {
|
||||
ttl := time.Second * 10
|
||||
ctx, cancel := context.WithTimeout(b.ctx, ttl)
|
||||
defer cancel()
|
||||
|
||||
func (b *etcdBroker) requeueMessage(delivery *Signature, taskProcessor iface.TaskProcessor) {
|
||||
queue := getQueue(b.GetConfig(), taskProcessor)
|
||||
key := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s", queue)
|
||||
now := time.Now()
|
||||
s := Signature{
|
||||
Signature: delivery.Signature,
|
||||
CreateAt: now,
|
||||
Score: now.UnixMilli(),
|
||||
}
|
||||
body, err := json.Marshal(s)
|
||||
// 创建一个新的session
|
||||
s, err := concurrency.NewSession(b.client, concurrency.WithTTL(int(ttl.Seconds())))
|
||||
if err != nil {
|
||||
log.ERROR.Print(err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer s.Orphan()
|
||||
|
||||
lockKey := "/machinery/v2/lock/delayed_tasks"
|
||||
m := concurrency.NewMutex(s, lockKey)
|
||||
|
||||
if err = m.Lock(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.Unlock(ctx) // nolint
|
||||
|
||||
log.INFO.Printf("acquired lock=%s, duration=%s", lockKey, ttl)
|
||||
|
||||
keyPrefix := "/machinery/v2/broker/delayed_tasks/eta-"
|
||||
end := strconv.FormatInt(time.Now().UnixMilli(), 10)
|
||||
kvs, err := b.client.Get(b.ctx, keyPrefix+"0", clientv3.WithRange(keyPrefix+end))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, kv := range kvs.Kvs {
|
||||
key := string(kv.Key)
|
||||
parts := strings.Split(key, "/")
|
||||
if len(parts) != 7 {
|
||||
log.WARNING.Printf("invalid delay task %s, continue", key)
|
||||
continue
|
||||
}
|
||||
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", kv.ModRevision)
|
||||
deleteReq := clientv3.OpDelete(key)
|
||||
pendingKey := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s", parts[5], parts[6])
|
||||
putReq := clientv3.OpPut(pendingKey, string(kv.Value))
|
||||
c, err := b.client.Txn(b.ctx).If(cmp).Then(deleteReq, putReq).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !c.Succeeded {
|
||||
log.WARNING.Printf("handle delay task %s not success", key)
|
||||
continue
|
||||
}
|
||||
log.DEBUG.Printf("send delay task %s to pending queue done", key)
|
||||
}
|
||||
|
||||
_, err = b.client.KV.Put(b.ctx, key, string(body))
|
||||
if err != nil {
|
||||
log.ERROR.Print(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getQueue(config *config.Config, taskProcessor iface.TaskProcessor) string {
|
||||
|
|
Loading…
Reference in New Issue