machinery-plugins/brokers/etcd/etcd.go

449 lines
10 KiB
Go
Raw Normal View History

2024-06-08 16:03:27 +00:00
// Package etcd is broker use etcd
2024-05-19 12:09:59 +00:00
package etcd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"runtime"
2024-06-07 23:55:03 +00:00
"strconv"
"strings"
2024-05-19 12:09:59 +00:00
"sync"
"time"
2024-06-08 16:03:27 +00:00
"golang.org/x/sync/errgroup"
2024-05-19 12:09:59 +00:00
"github.com/RichardKnop/machinery/v2/brokers/errs"
"github.com/RichardKnop/machinery/v2/brokers/iface"
"github.com/RichardKnop/machinery/v2/common"
"github.com/RichardKnop/machinery/v2/config"
"github.com/RichardKnop/machinery/v2/log"
"github.com/RichardKnop/machinery/v2/tasks"
clientv3 "go.etcd.io/etcd/client/v3"
2024-06-07 23:55:03 +00:00
"go.etcd.io/etcd/client/v3/concurrency"
2024-05-19 12:09:59 +00:00
)
type etcdBroker struct {
common.Broker
2024-06-08 15:50:48 +00:00
ctx context.Context
client *clientv3.Client
wg sync.WaitGroup
assignMap map[string]bool
mtx sync.Mutex
2024-05-19 12:09:59 +00:00
}
// New ..
2024-05-24 17:03:13 +00:00
func New(ctx context.Context, conf *config.Config) (iface.Broker, error) {
etcdConf := clientv3.Config{
2024-06-08 16:03:27 +00:00
Endpoints: []string{conf.Broker},
2024-05-24 17:03:13 +00:00
Context: ctx,
DialTimeout: time.Second * 5,
2024-05-24 17:13:08 +00:00
TLS: conf.TLSConfig,
2024-05-24 17:03:13 +00:00
}
client, err := clientv3.New(etcdConf)
2024-05-19 12:09:59 +00:00
if err != nil {
return nil, err
}
broker := etcdBroker{
2024-06-08 15:50:48 +00:00
Broker: common.NewBroker(conf),
ctx: ctx,
client: client,
assignMap: map[string]bool{},
2024-05-19 12:09:59 +00:00
}
return &broker, nil
}
// StartConsuming ..
func (b *etcdBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
if concurrency < 1 {
concurrency = runtime.NumCPU()
}
b.Broker.StartConsuming(consumerTag, concurrency, taskProcessor)
log.INFO.Printf("[*] Waiting for messages, concurrency=%d. To exit press CTRL+C", concurrency)
// Channel to which we will push tasks ready for processing by worker
2024-06-07 23:55:03 +00:00
deliveries := make(chan Delivery)
2024-05-19 12:09:59 +00:00
2024-06-08 01:47:54 +00:00
ctx, cancel := context.WithCancel(b.ctx)
defer cancel()
// list watch task
b.wg.Add(1)
go func() {
defer b.wg.Done()
for {
select {
// A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan():
return
default:
err := b.listWatchTasks(ctx, getQueue(b.GetConfig(), taskProcessor))
if err != nil {
log.ERROR.Printf("handle list watch task err: %s", err)
}
}
}
}()
2024-05-19 12:09:59 +00:00
// A receiving goroutine keeps popping messages from the queue by BLPOP
// If the message is valid and can be unmarshaled into a proper structure
// we send it to the deliveries channel
b.wg.Add(1)
go func() {
defer b.wg.Done()
2024-06-08 01:47:54 +00:00
defer cancel()
2024-05-19 12:09:59 +00:00
for {
select {
// A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan():
close(deliveries)
return
default:
if !taskProcessor.PreConsumeHandler() {
continue
}
2024-06-10 10:39:47 +00:00
task := b.nextTask(getQueue(b.GetConfig(), taskProcessor), consumerTag)
if task == nil {
time.Sleep(time.Second)
2024-05-19 12:09:59 +00:00
continue
}
2024-06-10 10:39:47 +00:00
deliveries <- task
2024-05-19 12:09:59 +00:00
}
}
}()
// A goroutine to watch for delayed tasks and push them to deliveries
// channel for consumption by the worker
b.wg.Add(1)
go func() {
defer b.wg.Done()
2024-06-08 01:47:54 +00:00
defer cancel()
2024-06-10 10:48:22 +00:00
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
2024-05-19 12:09:59 +00:00
for {
select {
// A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan():
return
2024-06-10 10:48:22 +00:00
case <-ticker.C:
err := b.handleDelayedTask(ctx)
2024-05-19 12:09:59 +00:00
if err != nil {
2024-06-07 23:55:03 +00:00
log.ERROR.Printf("handleDelayedTask err: %s", err)
2024-05-19 12:09:59 +00:00
}
}
}
}()
if err := b.consume(deliveries, concurrency, taskProcessor); err != nil {
return b.GetRetry(), err
}
b.wg.Wait()
return b.GetRetry(), nil
}
// consume takes delivered messages from the channel and manages a worker pool
// to process tasks concurrently
2024-06-07 23:55:03 +00:00
func (b *etcdBroker) consume(deliveries <-chan Delivery, concurrency int, taskProcessor iface.TaskProcessor) error {
2024-06-08 16:24:03 +00:00
eg, ctx := errgroup.WithContext(b.ctx)
2024-05-19 12:09:59 +00:00
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case t, ok := <-deliveries:
if !ok {
return nil
}
if err := b.consumeOne(t, taskProcessor); err != nil {
return err
}
}
}
})
}
return eg.Wait()
}
// consumeOne processes a single message using TaskProcessor
2024-06-08 16:24:03 +00:00
func (b *etcdBroker) consumeOne(delivery Delivery, taskProcessor iface.TaskProcessor) error {
2024-05-19 12:09:59 +00:00
// If the task is not registered, we requeue it,
// there might be different workers for processing specific tasks
2024-06-08 16:24:03 +00:00
if !b.IsTaskRegistered(delivery.Signature().Name) {
log.INFO.Printf("Task not registered with this worker. Requeuing message: %s", delivery.Body())
if !delivery.Signature().IgnoreWhenTaskNotRegistered {
delivery.Nack()
2024-05-19 12:09:59 +00:00
}
return nil
}
2024-06-08 16:24:03 +00:00
log.DEBUG.Printf("Received new message: %s", delivery.Body())
defer delivery.Ack()
2024-05-19 12:09:59 +00:00
2024-06-08 16:24:03 +00:00
return taskProcessor.Process(delivery.Signature())
2024-05-19 12:09:59 +00:00
}
// StopConsuming 停止
func (b *etcdBroker) StopConsuming() {
b.Broker.StopConsuming()
b.wg.Wait()
}
// Publish put kvs to etcd stor
func (b *etcdBroker) Publish(ctx context.Context, signature *tasks.Signature) error {
// Adjust routing key (this decides which queue the message will be published to)
b.Broker.AdjustRoutingKey(signature)
now := time.Now()
2024-06-07 23:55:03 +00:00
msg, err := json.Marshal(signature)
2024-05-19 12:09:59 +00:00
if err != nil {
return fmt.Errorf("JSON marshal error: %s", err)
}
2024-06-07 23:55:03 +00:00
key := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s", signature.RoutingKey, signature.UUID)
2024-05-19 12:09:59 +00:00
// Check the ETA signature field, if it is set and it is in the future,
// delay the task
2024-06-07 23:55:03 +00:00
if signature.ETA != nil && signature.ETA.After(now) {
2024-06-08 01:47:54 +00:00
key = fmt.Sprintf("/machinery/v2/broker/delayed_tasks/eta-%d/%s/%s",
signature.ETA.UnixMilli(), signature.RoutingKey, signature.UUID)
2024-06-07 23:55:03 +00:00
_, err = b.client.Put(ctx, key, string(msg))
return err
2024-05-19 12:09:59 +00:00
}
2024-05-24 17:03:13 +00:00
_, err = b.client.Put(ctx, key, string(msg))
2024-05-19 12:09:59 +00:00
return err
}
2024-06-07 23:55:03 +00:00
func (b *etcdBroker) getTasks(ctx context.Context, key string) ([]*tasks.Signature, error) {
2024-05-24 17:03:13 +00:00
resp, err := b.client.Get(ctx, key, clientv3.WithPrefix())
2024-05-19 12:09:59 +00:00
if err != nil {
return nil, err
}
2024-06-07 23:55:03 +00:00
result := make([]*tasks.Signature, 0, len(resp.Kvs))
2024-06-08 16:24:03 +00:00
for _, kv := range resp.Kvs {
if strings.Contains(string(kv.Key), "/assign") {
continue
}
2024-06-07 23:55:03 +00:00
signature := new(tasks.Signature)
2024-06-08 16:24:03 +00:00
decoder := json.NewDecoder(bytes.NewReader(kv.Value))
2024-05-19 12:09:59 +00:00
decoder.UseNumber()
if err := decoder.Decode(signature); err != nil {
2024-06-08 16:24:03 +00:00
return nil, errs.NewErrCouldNotUnmarshalTaskSignature(kv.Value, err)
2024-05-19 12:09:59 +00:00
}
result = append(result, signature)
}
return result, nil
}
// GetPendingTasks 获取执行队列, 任务统计可使用
func (b *etcdBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
if queue == "" {
queue = b.GetConfig().DefaultQueue
}
key := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s", queue)
2024-06-08 01:47:54 +00:00
items, err := b.getTasks(b.ctx, key)
2024-05-19 12:09:59 +00:00
if err != nil {
return nil, err
}
2024-06-07 23:55:03 +00:00
return items, nil
2024-05-19 12:09:59 +00:00
}
// GetDelayedTasks 任务统计可使用
func (b *etcdBroker) GetDelayedTasks() ([]*tasks.Signature, error) {
key := "/machinery/v2/broker/delayed_tasks"
2024-06-08 01:47:54 +00:00
items, err := b.getTasks(b.ctx, key)
2024-05-19 12:09:59 +00:00
if err != nil {
return nil, err
}
2024-06-07 23:55:03 +00:00
return items, nil
2024-05-19 12:09:59 +00:00
}
2024-06-10 10:39:47 +00:00
func (b *etcdBroker) nextTask(queue string, consumerTag string) Delivery {
b.mtx.Lock()
assignMap := make(map[string]bool, len(b.assignMap))
2024-06-10 13:11:02 +00:00
for k, v := range b.assignMap {
2024-06-10 10:39:47 +00:00
assignMap[k] = v
}
b.mtx.Unlock()
for k, assigned := range assignMap {
2024-06-08 15:50:48 +00:00
if assigned {
continue
}
2024-06-08 01:47:54 +00:00
if !strings.Contains(k, queue) {
continue
}
2024-06-08 01:57:35 +00:00
d, err := NewDelivery(b.ctx, b.client, k, consumerTag)
2024-06-08 01:47:54 +00:00
if err != nil {
continue
}
2024-06-10 10:39:47 +00:00
return d
2024-06-08 01:47:54 +00:00
}
2024-06-10 10:39:47 +00:00
return nil
2024-06-08 01:47:54 +00:00
}
2024-06-08 15:50:48 +00:00
func (b *etcdBroker) setAssign(key string, assign bool) bool {
if !strings.Contains(key, "/assign") {
return false
}
k := strings.TrimSuffix(key, "/assign")
if _, ok := b.assignMap[k]; ok {
b.assignMap[k] = assign
}
return true
}
2024-06-08 01:47:54 +00:00
func (b *etcdBroker) listWatchTasks(ctx context.Context, queue string) error {
keyPrefix := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s", queue)
2024-06-08 01:47:54 +00:00
// List
2024-06-10 13:11:02 +00:00
listCtx, listCancel := context.WithTimeout(ctx, time.Second*10)
2024-06-08 01:47:54 +00:00
defer listCancel()
resp, err := b.client.Get(listCtx, keyPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
2024-05-19 12:09:59 +00:00
if err != nil {
2024-06-08 01:47:54 +00:00
return err
2024-05-19 12:09:59 +00:00
}
2024-06-10 13:11:02 +00:00
b.mtx.Lock()
b.assignMap = map[string]bool{}
2024-06-08 01:47:54 +00:00
for _, kv := range resp.Kvs {
2024-06-08 15:50:48 +00:00
key := string(kv.Key)
if b.setAssign(key, true) {
2024-06-08 01:47:54 +00:00
continue
}
2024-06-08 15:50:48 +00:00
b.assignMap[key] = false
2024-05-19 12:09:59 +00:00
}
2024-06-10 13:11:02 +00:00
b.mtx.Unlock()
2024-05-19 12:09:59 +00:00
2024-06-08 01:47:54 +00:00
// Watch
2024-06-10 13:11:02 +00:00
watchCtx, watchCancel := context.WithTimeout(ctx, time.Minute*10)
2024-06-08 01:47:54 +00:00
defer watchCancel()
wc := b.client.Watch(watchCtx, keyPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly(), clientv3.WithRev(resp.Header.Revision))
for wresp := range wc {
2024-06-10 10:39:47 +00:00
if wresp.Err() != nil {
return watchCtx.Err()
}
2024-06-10 13:11:02 +00:00
b.mtx.Lock()
2024-06-08 01:47:54 +00:00
for _, ev := range wresp.Events {
2024-06-08 15:50:48 +00:00
key := string(ev.Kv.Key)
2024-06-08 01:47:54 +00:00
if ev.Type == clientv3.EventTypeDelete {
2024-06-08 15:50:48 +00:00
if b.setAssign(key, false) {
continue
2024-06-08 01:47:54 +00:00
}
2024-06-08 15:50:48 +00:00
delete(b.assignMap, key)
2024-06-08 01:47:54 +00:00
}
if ev.Type == clientv3.EventTypePut {
2024-06-08 15:50:48 +00:00
if b.setAssign(key, true) {
2024-06-08 01:47:54 +00:00
continue
}
2024-06-08 15:50:48 +00:00
b.assignMap[key] = false
2024-06-08 01:47:54 +00:00
}
}
2024-06-10 13:11:02 +00:00
b.mtx.Unlock()
2024-06-07 23:55:03 +00:00
}
2024-06-08 01:47:54 +00:00
return nil
2024-05-19 12:09:59 +00:00
}
2024-06-10 10:48:22 +00:00
func (b *etcdBroker) handleDelayedTask(ctx context.Context) error {
2024-06-07 23:55:03 +00:00
ttl := time.Second * 10
2024-06-08 01:47:54 +00:00
ctx, cancel := context.WithTimeout(ctx, ttl)
defer cancel()
2024-05-19 12:09:59 +00:00
2024-06-07 23:55:03 +00:00
// 创建一个新的session
s, err := concurrency.NewSession(b.client, concurrency.WithTTL(int(ttl.Seconds())))
2024-05-19 12:09:59 +00:00
if err != nil {
2024-06-10 10:48:22 +00:00
return err
2024-05-19 12:09:59 +00:00
}
2024-06-07 23:55:03 +00:00
defer s.Orphan()
2024-05-19 12:09:59 +00:00
2024-06-07 23:55:03 +00:00
lockKey := "/machinery/v2/lock/delayed_tasks"
m := concurrency.NewMutex(s, lockKey)
if err = m.Lock(ctx); err != nil {
2024-06-10 10:48:22 +00:00
return err
2024-05-19 12:09:59 +00:00
}
2024-06-07 23:55:03 +00:00
defer m.Unlock(ctx) // nolint
2024-05-19 12:09:59 +00:00
2024-06-07 23:55:03 +00:00
keyPrefix := "/machinery/v2/broker/delayed_tasks/eta-"
end := strconv.FormatInt(time.Now().UnixMilli(), 10)
2024-06-10 10:48:22 +00:00
resp, err := b.client.Get(b.ctx, keyPrefix+"0", clientv3.WithRange(keyPrefix+end))
2024-05-19 12:09:59 +00:00
if err != nil {
2024-06-10 10:48:22 +00:00
return err
2024-05-19 12:09:59 +00:00
}
2024-06-10 10:21:36 +00:00
for _, kv := range resp.Kvs {
2024-06-07 23:55:03 +00:00
key := string(kv.Key)
parts := strings.Split(key, "/")
2024-06-08 01:47:54 +00:00
if len(parts) != 8 {
2024-06-07 23:55:03 +00:00
log.WARNING.Printf("invalid delay task %s, continue", key)
continue
}
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", kv.ModRevision)
deleteReq := clientv3.OpDelete(key)
2024-06-08 01:47:54 +00:00
pendingKey := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s", parts[6], parts[7])
2024-06-07 23:55:03 +00:00
putReq := clientv3.OpPut(pendingKey, string(kv.Value))
c, err := b.client.Txn(b.ctx).If(cmp).Then(deleteReq, putReq).Commit()
if err != nil {
2024-06-10 10:48:22 +00:00
return fmt.Errorf("handle delay task %s: %w", key, err)
2024-06-07 23:55:03 +00:00
}
if !c.Succeeded {
log.WARNING.Printf("handle delay task %s not success", key)
continue
}
log.DEBUG.Printf("send delay task %s to pending queue done", key)
2024-05-19 12:09:59 +00:00
}
2024-06-07 23:55:03 +00:00
2024-06-10 10:48:22 +00:00
return nil
2024-05-19 12:09:59 +00:00
}
func getQueue(config *config.Config, taskProcessor iface.TaskProcessor) string {
customQueue := taskProcessor.CustomQueue()
if customQueue == "" {
return config.DefaultQueue
}
return customQueue
}