2024-05-19 12:09:59 +00:00
|
|
|
package etcd
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"runtime"
|
2024-06-07 23:55:03 +00:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
2024-05-19 12:09:59 +00:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/RichardKnop/machinery/v2/brokers/errs"
|
|
|
|
"github.com/RichardKnop/machinery/v2/brokers/iface"
|
|
|
|
"github.com/RichardKnop/machinery/v2/common"
|
|
|
|
"github.com/RichardKnop/machinery/v2/config"
|
|
|
|
"github.com/RichardKnop/machinery/v2/log"
|
|
|
|
"github.com/RichardKnop/machinery/v2/tasks"
|
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
2024-06-07 23:55:03 +00:00
|
|
|
"go.etcd.io/etcd/client/v3/concurrency"
|
2024-05-19 12:09:59 +00:00
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
)
|
|
|
|
|
|
|
|
var haveNoTaskErr = errors.New("have no task")
|
|
|
|
|
|
|
|
type etcdBroker struct {
|
|
|
|
common.Broker
|
2024-05-24 17:03:13 +00:00
|
|
|
ctx context.Context
|
|
|
|
client *clientv3.Client
|
|
|
|
wg sync.WaitGroup
|
2024-06-08 01:47:54 +00:00
|
|
|
keyMap map[string]struct{}
|
|
|
|
mtx sync.Mutex
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// New ..
|
2024-05-24 17:03:13 +00:00
|
|
|
func New(ctx context.Context, conf *config.Config) (iface.Broker, error) {
|
|
|
|
etcdConf := clientv3.Config{
|
2024-05-24 17:13:08 +00:00
|
|
|
Endpoints: []string{conf.Lock},
|
2024-05-24 17:03:13 +00:00
|
|
|
Context: ctx,
|
|
|
|
DialTimeout: time.Second * 5,
|
2024-05-24 17:13:08 +00:00
|
|
|
TLS: conf.TLSConfig,
|
2024-05-24 17:03:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
client, err := clientv3.New(etcdConf)
|
2024-05-19 12:09:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
broker := etcdBroker{
|
2024-05-24 17:03:13 +00:00
|
|
|
Broker: common.NewBroker(conf),
|
|
|
|
ctx: ctx,
|
|
|
|
client: client,
|
2024-06-08 01:47:54 +00:00
|
|
|
keyMap: map[string]struct{}{},
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return &broker, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// StartConsuming ..
|
|
|
|
func (b *etcdBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
|
|
|
|
if concurrency < 1 {
|
|
|
|
concurrency = runtime.NumCPU()
|
|
|
|
}
|
|
|
|
b.Broker.StartConsuming(consumerTag, concurrency, taskProcessor)
|
|
|
|
|
|
|
|
log.INFO.Printf("[*] Waiting for messages, concurrency=%d. To exit press CTRL+C", concurrency)
|
|
|
|
|
|
|
|
// Channel to which we will push tasks ready for processing by worker
|
2024-06-07 23:55:03 +00:00
|
|
|
deliveries := make(chan Delivery)
|
2024-05-19 12:09:59 +00:00
|
|
|
|
2024-06-08 01:47:54 +00:00
|
|
|
ctx, cancel := context.WithCancel(b.ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
// list watch task
|
|
|
|
b.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer b.wg.Done()
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
// A way to stop this goroutine from b.StopConsuming
|
|
|
|
case <-b.GetStopChan():
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
err := b.listWatchTasks(ctx, getQueue(b.GetConfig(), taskProcessor))
|
|
|
|
if err != nil {
|
|
|
|
log.ERROR.Printf("handle list watch task err: %s", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}()
|
|
|
|
|
2024-05-19 12:09:59 +00:00
|
|
|
// A receiving goroutine keeps popping messages from the queue by BLPOP
|
|
|
|
// If the message is valid and can be unmarshaled into a proper structure
|
|
|
|
// we send it to the deliveries channel
|
|
|
|
b.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer b.wg.Done()
|
2024-06-08 01:47:54 +00:00
|
|
|
defer cancel()
|
2024-05-19 12:09:59 +00:00
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
// A way to stop this goroutine from b.StopConsuming
|
|
|
|
case <-b.GetStopChan():
|
|
|
|
close(deliveries)
|
|
|
|
return
|
|
|
|
|
|
|
|
default:
|
|
|
|
if !taskProcessor.PreConsumeHandler() {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2024-06-08 01:57:35 +00:00
|
|
|
task, err := b.nextTask(getQueue(b.GetConfig(), taskProcessor), consumerTag)
|
2024-05-19 12:09:59 +00:00
|
|
|
if err != nil {
|
|
|
|
if !errors.Is(err, haveNoTaskErr) {
|
|
|
|
log.ERROR.Print(err)
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if task != nil {
|
|
|
|
deliveries <- task
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// A goroutine to watch for delayed tasks and push them to deliveries
|
|
|
|
// channel for consumption by the worker
|
|
|
|
b.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer b.wg.Done()
|
2024-06-08 01:47:54 +00:00
|
|
|
defer cancel()
|
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
ticker := time.NewTicker(time.Second)
|
|
|
|
defer ticker.Stop()
|
2024-05-19 12:09:59 +00:00
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
// A way to stop this goroutine from b.StopConsuming
|
|
|
|
case <-b.GetStopChan():
|
|
|
|
return
|
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
case <-ticker.C:
|
2024-06-08 01:47:54 +00:00
|
|
|
err := b.handleDelayedTask(ctx)
|
2024-05-19 12:09:59 +00:00
|
|
|
if err != nil {
|
2024-06-07 23:55:03 +00:00
|
|
|
log.ERROR.Printf("handleDelayedTask err: %s", err)
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
if err := b.consume(deliveries, concurrency, taskProcessor); err != nil {
|
|
|
|
return b.GetRetry(), err
|
|
|
|
}
|
|
|
|
|
|
|
|
b.wg.Wait()
|
|
|
|
|
|
|
|
return b.GetRetry(), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// consume takes delivered messages from the channel and manages a worker pool
|
|
|
|
// to process tasks concurrently
|
2024-06-07 23:55:03 +00:00
|
|
|
func (b *etcdBroker) consume(deliveries <-chan Delivery, concurrency int, taskProcessor iface.TaskProcessor) error {
|
2024-05-19 12:09:59 +00:00
|
|
|
eg, ctx := errgroup.WithContext(context.Background())
|
|
|
|
eg.SetLimit(concurrency)
|
|
|
|
|
|
|
|
for i := 0; i < concurrency; i++ {
|
|
|
|
eg.Go(func() error {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
|
|
|
|
case t, ok := <-deliveries:
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := b.consumeOne(t, taskProcessor); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
return eg.Wait()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// consumeOne processes a single message using TaskProcessor
|
2024-06-07 23:55:03 +00:00
|
|
|
func (b *etcdBroker) consumeOne(d Delivery, taskProcessor iface.TaskProcessor) error {
|
2024-05-19 12:09:59 +00:00
|
|
|
// If the task is not registered, we requeue it,
|
|
|
|
// there might be different workers for processing specific tasks
|
2024-06-07 23:55:03 +00:00
|
|
|
if !b.IsTaskRegistered(d.Signature().Name) {
|
|
|
|
if d.Signature().IgnoreWhenTaskNotRegistered {
|
2024-05-19 12:09:59 +00:00
|
|
|
return nil
|
|
|
|
}
|
2024-06-07 23:55:03 +00:00
|
|
|
log.INFO.Printf("Task not registered with this worker. Requeuing message: %s", d.Body())
|
|
|
|
d.Nack()
|
2024-05-19 12:09:59 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
log.DEBUG.Printf("Received new message: %s", d.Body())
|
|
|
|
|
|
|
|
err := taskProcessor.Process(d.Signature())
|
|
|
|
d.Ack()
|
2024-05-19 12:09:59 +00:00
|
|
|
|
2024-06-07 15:51:24 +00:00
|
|
|
return err
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// StopConsuming 停止
|
|
|
|
func (b *etcdBroker) StopConsuming() {
|
|
|
|
b.Broker.StopConsuming()
|
|
|
|
|
|
|
|
b.wg.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Publish put kvs to etcd stor
|
|
|
|
func (b *etcdBroker) Publish(ctx context.Context, signature *tasks.Signature) error {
|
|
|
|
// Adjust routing key (this decides which queue the message will be published to)
|
|
|
|
b.Broker.AdjustRoutingKey(signature)
|
|
|
|
|
|
|
|
now := time.Now()
|
2024-06-07 23:55:03 +00:00
|
|
|
msg, err := json.Marshal(signature)
|
2024-05-19 12:09:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("JSON marshal error: %s", err)
|
|
|
|
}
|
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
key := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s", signature.RoutingKey, signature.UUID)
|
2024-05-19 12:09:59 +00:00
|
|
|
|
|
|
|
// Check the ETA signature field, if it is set and it is in the future,
|
|
|
|
// delay the task
|
2024-06-07 23:55:03 +00:00
|
|
|
if signature.ETA != nil && signature.ETA.After(now) {
|
2024-06-08 01:47:54 +00:00
|
|
|
key = fmt.Sprintf("/machinery/v2/broker/delayed_tasks/eta-%d/%s/%s",
|
|
|
|
signature.ETA.UnixMilli(), signature.RoutingKey, signature.UUID)
|
2024-06-07 23:55:03 +00:00
|
|
|
_, err = b.client.Put(ctx, key, string(msg))
|
|
|
|
return err
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
|
2024-05-24 17:03:13 +00:00
|
|
|
_, err = b.client.Put(ctx, key, string(msg))
|
2024-05-19 12:09:59 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
func (b *etcdBroker) getTasks(ctx context.Context, key string) ([]*tasks.Signature, error) {
|
2024-05-24 17:03:13 +00:00
|
|
|
resp, err := b.client.Get(ctx, key, clientv3.WithPrefix())
|
2024-05-19 12:09:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
result := make([]*tasks.Signature, 0, len(resp.Kvs))
|
2024-05-19 12:09:59 +00:00
|
|
|
for _, kvs := range resp.Kvs {
|
2024-06-07 23:55:03 +00:00
|
|
|
signature := new(tasks.Signature)
|
2024-05-19 12:09:59 +00:00
|
|
|
decoder := json.NewDecoder(bytes.NewReader(kvs.Value))
|
|
|
|
decoder.UseNumber()
|
|
|
|
if err := decoder.Decode(signature); err != nil {
|
|
|
|
return nil, errs.NewErrCouldNotUnmarshalTaskSignature(kvs.Value, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
result = append(result, signature)
|
|
|
|
}
|
|
|
|
|
|
|
|
return result, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetPendingTasks 获取执行队列, 任务统计可使用
|
|
|
|
func (b *etcdBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
|
|
|
|
if queue == "" {
|
|
|
|
queue = b.GetConfig().DefaultQueue
|
|
|
|
}
|
|
|
|
|
|
|
|
key := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s", queue)
|
2024-06-08 01:47:54 +00:00
|
|
|
items, err := b.getTasks(b.ctx, key)
|
2024-05-19 12:09:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
return items, nil
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// GetDelayedTasks 任务统计可使用
|
|
|
|
func (b *etcdBroker) GetDelayedTasks() ([]*tasks.Signature, error) {
|
|
|
|
key := "/machinery/v2/broker/delayed_tasks"
|
|
|
|
|
2024-06-08 01:47:54 +00:00
|
|
|
items, err := b.getTasks(b.ctx, key)
|
2024-05-19 12:09:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
return items, nil
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
|
2024-06-08 01:57:35 +00:00
|
|
|
func (b *etcdBroker) nextTask(queue string, consumerTag string) (Delivery, error) {
|
2024-06-08 01:47:54 +00:00
|
|
|
for k := range b.keyMap {
|
|
|
|
if !strings.Contains(k, queue) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2024-06-08 01:57:35 +00:00
|
|
|
d, err := NewDelivery(b.ctx, b.client, k, consumerTag)
|
2024-06-08 01:47:54 +00:00
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
return d, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
time.Sleep(time.Second)
|
2024-06-08 01:57:35 +00:00
|
|
|
return b.nextTask(queue, consumerTag)
|
2024-06-08 01:47:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
|
2024-05-19 15:03:10 +00:00
|
|
|
keyPrefix := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s", queue)
|
|
|
|
|
2024-06-08 01:47:54 +00:00
|
|
|
// List
|
|
|
|
listCtx, listCancel := context.WithTimeout(ctx, time.Second*5)
|
|
|
|
defer listCancel()
|
|
|
|
resp, err := b.client.Get(listCtx, keyPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
|
2024-05-19 12:09:59 +00:00
|
|
|
if err != nil {
|
2024-06-08 01:47:54 +00:00
|
|
|
return err
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
|
2024-06-08 01:47:54 +00:00
|
|
|
for _, kv := range resp.Kvs {
|
|
|
|
if bytes.Contains(kv.Key, []byte("/assign")) {
|
|
|
|
delete(b.keyMap, string(kv.Key)[:23])
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
b.keyMap[string(kv.Key)] = struct{}{}
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
|
2024-06-08 01:47:54 +00:00
|
|
|
// Watch
|
|
|
|
watchCtx, watchCancel := context.WithTimeout(ctx, time.Minute*60)
|
|
|
|
defer watchCancel()
|
|
|
|
wc := b.client.Watch(watchCtx, keyPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly(), clientv3.WithRev(resp.Header.Revision))
|
|
|
|
for wresp := range wc {
|
|
|
|
for _, ev := range wresp.Events {
|
|
|
|
if ev.Type == clientv3.EventTypeDelete {
|
|
|
|
if bytes.Contains(ev.Kv.Key, []byte("/assign")) {
|
|
|
|
b.keyMap[string(ev.Kv.Key)] = struct{}{}
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if ev.Type == clientv3.EventTypePut {
|
|
|
|
if bytes.Contains(ev.Kv.Key, []byte("/assign")) {
|
|
|
|
delete(b.keyMap, string(ev.Kv.Key)[:23])
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
b.keyMap[string(ev.Kv.Key)] = struct{}{}
|
|
|
|
}
|
|
|
|
}
|
2024-06-07 23:55:03 +00:00
|
|
|
}
|
|
|
|
|
2024-06-08 01:47:54 +00:00
|
|
|
return nil
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
|
2024-06-08 01:47:54 +00:00
|
|
|
func (b *etcdBroker) handleDelayedTask(ctx context.Context) error {
|
2024-06-07 23:55:03 +00:00
|
|
|
ttl := time.Second * 10
|
2024-06-08 01:47:54 +00:00
|
|
|
ctx, cancel := context.WithTimeout(ctx, ttl)
|
2024-05-19 15:03:10 +00:00
|
|
|
defer cancel()
|
2024-05-19 12:09:59 +00:00
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
// 创建一个新的session
|
|
|
|
s, err := concurrency.NewSession(b.client, concurrency.WithTTL(int(ttl.Seconds())))
|
2024-05-19 12:09:59 +00:00
|
|
|
if err != nil {
|
2024-06-07 23:55:03 +00:00
|
|
|
return err
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
2024-06-07 23:55:03 +00:00
|
|
|
defer s.Orphan()
|
2024-05-19 12:09:59 +00:00
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
lockKey := "/machinery/v2/lock/delayed_tasks"
|
|
|
|
m := concurrency.NewMutex(s, lockKey)
|
|
|
|
|
|
|
|
if err = m.Lock(ctx); err != nil {
|
|
|
|
return err
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
2024-06-07 23:55:03 +00:00
|
|
|
defer m.Unlock(ctx) // nolint
|
2024-05-19 12:09:59 +00:00
|
|
|
|
2024-06-07 23:55:03 +00:00
|
|
|
keyPrefix := "/machinery/v2/broker/delayed_tasks/eta-"
|
|
|
|
end := strconv.FormatInt(time.Now().UnixMilli(), 10)
|
|
|
|
kvs, err := b.client.Get(b.ctx, keyPrefix+"0", clientv3.WithRange(keyPrefix+end))
|
2024-05-19 12:09:59 +00:00
|
|
|
if err != nil {
|
2024-06-07 23:55:03 +00:00
|
|
|
return err
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
2024-06-07 23:55:03 +00:00
|
|
|
for _, kv := range kvs.Kvs {
|
|
|
|
key := string(kv.Key)
|
|
|
|
parts := strings.Split(key, "/")
|
2024-06-08 01:47:54 +00:00
|
|
|
if len(parts) != 8 {
|
2024-06-07 23:55:03 +00:00
|
|
|
log.WARNING.Printf("invalid delay task %s, continue", key)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", kv.ModRevision)
|
|
|
|
deleteReq := clientv3.OpDelete(key)
|
2024-06-08 01:47:54 +00:00
|
|
|
pendingKey := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s", parts[6], parts[7])
|
2024-06-07 23:55:03 +00:00
|
|
|
putReq := clientv3.OpPut(pendingKey, string(kv.Value))
|
|
|
|
c, err := b.client.Txn(b.ctx).If(cmp).Then(deleteReq, putReq).Commit()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if !c.Succeeded {
|
|
|
|
log.WARNING.Printf("handle delay task %s not success", key)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
log.DEBUG.Printf("send delay task %s to pending queue done", key)
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
2024-06-07 23:55:03 +00:00
|
|
|
|
|
|
|
return nil
|
2024-05-19 12:09:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func getQueue(config *config.Config, taskProcessor iface.TaskProcessor) string {
|
|
|
|
customQueue := taskProcessor.CustomQueue()
|
|
|
|
if customQueue == "" {
|
|
|
|
return config.DefaultQueue
|
|
|
|
}
|
|
|
|
return customQueue
|
|
|
|
}
|