533 lines
13 KiB
Go
533 lines
13 KiB
Go
/*
|
|
* Tencent is pleased to support the open source community by making Blueking Container Service available.
|
|
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
|
|
* Licensed under the MIT License (the "License"); you may not use this file except
|
|
* in compliance with the License. You may obtain a copy of the License at
|
|
* http://opensource.org/licenses/MIT
|
|
* Unless required by applicable law or agreed to in writing, software distributed under
|
|
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
|
* either express or implied. See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
// Package etcd is broker use etcd
|
|
package etcd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/RichardKnop/machinery/v2/brokers/errs"
|
|
"github.com/RichardKnop/machinery/v2/brokers/iface"
|
|
"github.com/RichardKnop/machinery/v2/common"
|
|
"github.com/RichardKnop/machinery/v2/config"
|
|
"github.com/RichardKnop/machinery/v2/log"
|
|
"github.com/RichardKnop/machinery/v2/tasks"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
const (
|
|
pendingTaskPrefix = "/machinery/v2/broker/pending_tasks"
|
|
runningTaskPrefix = "/machinery/v2/broker/running_tasks"
|
|
delayedTaskPrefix = "/machinery/v2/broker/delayed_tasks"
|
|
delayedTaskLockKey = "/machinery/v2/lock/delayed_tasks"
|
|
)
|
|
|
|
type etcdBroker struct {
|
|
common.Broker
|
|
ctx context.Context
|
|
client *clientv3.Client
|
|
wg sync.WaitGroup
|
|
pendingTask map[string]struct{}
|
|
runningTask map[string]struct{}
|
|
delayedTask map[string]*delayTask
|
|
mtx sync.RWMutex
|
|
delayedMtx sync.RWMutex
|
|
}
|
|
|
|
// New ..
|
|
func New(ctx context.Context, conf *config.Config) (iface.Broker, error) {
|
|
etcdConf := clientv3.Config{
|
|
Endpoints: []string{conf.Broker},
|
|
Context: ctx,
|
|
DialTimeout: time.Second * 5,
|
|
TLS: conf.TLSConfig,
|
|
}
|
|
|
|
client, err := clientv3.New(etcdConf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
broker := etcdBroker{
|
|
Broker: common.NewBroker(conf),
|
|
ctx: ctx,
|
|
client: client,
|
|
pendingTask: make(map[string]struct{}),
|
|
runningTask: make(map[string]struct{}),
|
|
delayedTask: make(map[string]*delayTask),
|
|
}
|
|
|
|
return &broker, nil
|
|
|
|
}
|
|
|
|
// nolint
|
|
// StartConsuming ...
|
|
func (b *etcdBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
|
|
if concurrency < 1 {
|
|
concurrency = runtime.NumCPU()
|
|
}
|
|
b.Broker.StartConsuming(consumerTag, concurrency, taskProcessor)
|
|
|
|
log.INFO.Printf("[*] Waiting for messages, concurrency=%d. To exit press CTRL+C", concurrency)
|
|
|
|
// Channel to which we will push tasks ready for processing by worker
|
|
deliveries := make(chan Delivery)
|
|
defer log.INFO.Printf("stop all consuming and handle done")
|
|
defer b.wg.Wait()
|
|
|
|
ctx, cancel := context.WithCancel(b.ctx)
|
|
defer cancel()
|
|
|
|
// list watch running task
|
|
b.wg.Add(1)
|
|
go func() {
|
|
defer func() {
|
|
cancel()
|
|
b.wg.Done()
|
|
log.INFO.Printf("list watch running task stopped")
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-b.GetStopChan():
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
err := b.listWatchRunningTask(ctx)
|
|
if err != nil {
|
|
log.ERROR.Printf("list watch running task failed, err: %s", err)
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// list watch pending task
|
|
b.wg.Add(1)
|
|
go func() {
|
|
defer func() {
|
|
cancel()
|
|
b.wg.Done()
|
|
log.INFO.Printf("list watch pending task stopped")
|
|
}()
|
|
|
|
queue := getQueue(b.GetConfig(), taskProcessor)
|
|
for {
|
|
select {
|
|
case <-b.GetStopChan():
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
err := b.listWatchPendingTask(ctx, queue)
|
|
if err != nil {
|
|
log.ERROR.Printf("list watch pending task failed, err: %s", err)
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// A receiving goroutine keeps popping messages from the queue by BLPOP
|
|
// If the message is valid and can be unmarshaled into a proper structure
|
|
// we send it to the deliveries channel
|
|
b.wg.Add(1)
|
|
go func() {
|
|
defer func() {
|
|
cancel()
|
|
close(deliveries)
|
|
b.wg.Done()
|
|
log.INFO.Printf("handle next task stopped")
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-b.GetStopChan():
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
if !taskProcessor.PreConsumeHandler() {
|
|
continue
|
|
}
|
|
|
|
task := b.nextTask(ctx, getQueue(b.GetConfig(), taskProcessor), consumerTag)
|
|
if task == nil {
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
|
|
deliveries <- task
|
|
}
|
|
}
|
|
}()
|
|
|
|
// A goroutine to watch for delayed tasks and push them to deliveries
|
|
// channel for consumption by the worker
|
|
b.wg.Add(1)
|
|
go func() {
|
|
defer func() {
|
|
cancel()
|
|
b.wg.Done()
|
|
log.INFO.Printf("handle delayed task stopped")
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
// A way to stop this goroutine from b.StopConsuming
|
|
case <-b.GetStopChan():
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
err := b.handleDelayedTask(ctx)
|
|
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
|
log.ERROR.Printf("handle delayed task failed, err: %s", err)
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
}()
|
|
|
|
if err := b.consume(deliveries, concurrency, taskProcessor); err != nil {
|
|
log.WARNING.Printf("consume stopped, err=%v, retry=%t", err, b.GetRetry())
|
|
return b.GetRetry(), err
|
|
}
|
|
|
|
log.INFO.Printf("consume stopped, retry=%t", b.GetRetry())
|
|
return b.GetRetry(), nil
|
|
}
|
|
|
|
// consume takes delivered messages from the channel and manages a worker pool
|
|
// to process tasks concurrently
|
|
func (b *etcdBroker) consume(deliveries <-chan Delivery, concurrency int, taskProcessor iface.TaskProcessor) error {
|
|
eg, ctx := errgroup.WithContext(b.ctx)
|
|
|
|
for i := 0; i < concurrency; i++ {
|
|
eg.Go(func() error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
|
|
case t, ok := <-deliveries:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if err := b.consumeOne(t, taskProcessor); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
return eg.Wait()
|
|
}
|
|
|
|
// consumeOne processes a single message using TaskProcessor
|
|
func (b *etcdBroker) consumeOne(delivery Delivery, taskProcessor iface.TaskProcessor) error {
|
|
// If the task is not registered, we requeue it,
|
|
// there might be different workers for processing specific tasks
|
|
if !b.IsTaskRegistered(delivery.Signature().Name) {
|
|
log.INFO.Printf("Task not registered with this worker. Requeuing message: %s", delivery.Body())
|
|
|
|
if !delivery.Signature().IgnoreWhenTaskNotRegistered {
|
|
delivery.Nack()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
log.DEBUG.Printf("Received new message: %s", delivery.Body())
|
|
defer delivery.Ack()
|
|
|
|
return taskProcessor.Process(delivery.Signature())
|
|
}
|
|
|
|
// StopConsuming 停止
|
|
func (b *etcdBroker) StopConsuming() {
|
|
b.Broker.StopConsuming()
|
|
|
|
b.wg.Wait()
|
|
}
|
|
|
|
// Publish put kvs to etcd stor
|
|
func (b *etcdBroker) Publish(ctx context.Context, signature *tasks.Signature) error {
|
|
// Adjust routing key (this decides which queue the message will be published to)
|
|
b.Broker.AdjustRoutingKey(signature)
|
|
|
|
msg, err := json.Marshal(signature)
|
|
if err != nil {
|
|
return fmt.Errorf("JSON marshal error: %s", err)
|
|
}
|
|
|
|
key := fmt.Sprintf("%s/%s/%s", pendingTaskPrefix, signature.RoutingKey, signature.UUID)
|
|
|
|
// Check the ETA signature field, alway delay the task if not nil,
|
|
// prevent the key overwrite by slow ack request
|
|
if signature.ETA != nil {
|
|
key = fmt.Sprintf("%s/eta-%d/%s/%s",
|
|
delayedTaskPrefix, signature.ETA.UnixMilli(), signature.RoutingKey, signature.UUID)
|
|
}
|
|
|
|
_, err = b.client.Put(ctx, key, string(msg))
|
|
if err != nil {
|
|
log.ERROR.Printf("Publish queue[%s] new message: %s", key, string(msg))
|
|
} else {
|
|
log.DEBUG.Printf("Publish queue[%s] new message: %s", key, string(msg))
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (b *etcdBroker) getTasks(ctx context.Context, key string) ([]*tasks.Signature, error) {
|
|
resp, err := b.client.Get(ctx, key, clientv3.WithPrefix())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := make([]*tasks.Signature, 0, len(resp.Kvs))
|
|
for _, kv := range resp.Kvs {
|
|
signature := new(tasks.Signature)
|
|
decoder := json.NewDecoder(bytes.NewReader(kv.Value))
|
|
decoder.UseNumber()
|
|
if err := decoder.Decode(signature); err != nil {
|
|
return nil, errs.NewErrCouldNotUnmarshalTaskSignature(kv.Value, err)
|
|
}
|
|
|
|
result = append(result, signature)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// GetPendingTasks 获取执行队列, 任务统计可使用
|
|
func (b *etcdBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
|
|
if queue == "" {
|
|
queue = b.GetConfig().DefaultQueue
|
|
}
|
|
|
|
key := fmt.Sprintf("%s/%s", pendingTaskPrefix, queue)
|
|
items, err := b.getTasks(b.ctx, key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return items, nil
|
|
}
|
|
|
|
// GetDelayedTasks 任务统计可使用
|
|
func (b *etcdBroker) GetDelayedTasks() ([]*tasks.Signature, error) {
|
|
items, err := b.getTasks(b.ctx, delayedTaskPrefix)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return items, nil
|
|
}
|
|
|
|
func (b *etcdBroker) nextTask(ctx context.Context, queue string, consumerTag string) Delivery {
|
|
b.mtx.Lock()
|
|
runningTask := make(map[string]struct{}, len(b.runningTask))
|
|
for k, v := range b.runningTask {
|
|
runningTask[k] = v
|
|
}
|
|
pendingTask := make(map[string]struct{}, len(b.pendingTask))
|
|
for k, v := range b.pendingTask {
|
|
pendingTask[k] = v
|
|
}
|
|
b.mtx.Unlock()
|
|
|
|
for k := range pendingTask {
|
|
if !strings.Contains(k, queue) {
|
|
continue
|
|
}
|
|
|
|
if _, ok := runningTask[k]; ok {
|
|
continue
|
|
}
|
|
|
|
d, err := NewDelivery(ctx, b.client, k, consumerTag)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
b.mtx.Lock()
|
|
b.runningTask[k] = struct{}{}
|
|
b.mtx.Unlock()
|
|
|
|
return d
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *etcdBroker) listWatchRunningTask(ctx context.Context) error {
|
|
// List
|
|
listCtx, listCancel := context.WithTimeout(ctx, time.Second*10)
|
|
defer listCancel()
|
|
resp, err := b.client.Get(listCtx, runningTaskPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b.mtx.Lock()
|
|
b.runningTask = make(map[string]struct{})
|
|
for _, kv := range resp.Kvs {
|
|
key := string(kv.Key)
|
|
taskKey := findTaskKey(key)
|
|
if taskKey == "" {
|
|
continue
|
|
}
|
|
b.runningTask[taskKey] = struct{}{}
|
|
}
|
|
b.mtx.Unlock()
|
|
|
|
// Watch
|
|
watchCtx, watchCancel := context.WithTimeout(ctx, time.Minute*10)
|
|
defer watchCancel()
|
|
|
|
watchOpts := []clientv3.OpOption{
|
|
clientv3.WithPrefix(),
|
|
clientv3.WithKeysOnly(),
|
|
clientv3.WithRev(resp.Header.Revision),
|
|
}
|
|
wc := b.client.Watch(watchCtx, runningTaskPrefix, watchOpts...)
|
|
for wresp := range wc {
|
|
if wresp.Err() != nil {
|
|
return wresp.Err()
|
|
}
|
|
|
|
b.mtx.Lock()
|
|
for _, ev := range wresp.Events {
|
|
key := string(ev.Kv.Key)
|
|
taskKey := findTaskKey(key)
|
|
if taskKey == "" {
|
|
continue
|
|
}
|
|
if ev.Type == clientv3.EventTypeDelete {
|
|
delete(b.runningTask, taskKey)
|
|
}
|
|
|
|
if ev.Type == clientv3.EventTypePut {
|
|
b.runningTask[taskKey] = struct{}{}
|
|
}
|
|
}
|
|
b.mtx.Unlock()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *etcdBroker) listWatchPendingTask(ctx context.Context, queue string) error {
|
|
keyPrefix := fmt.Sprintf("%s/%s", pendingTaskPrefix, queue)
|
|
|
|
// List
|
|
listCtx, listCancel := context.WithTimeout(ctx, time.Second*10)
|
|
defer listCancel()
|
|
resp, err := b.client.Get(listCtx, keyPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b.mtx.Lock()
|
|
b.pendingTask = make(map[string]struct{})
|
|
for _, kv := range resp.Kvs {
|
|
key := string(kv.Key)
|
|
taskKey := findTaskKey(key)
|
|
if taskKey == "" {
|
|
continue
|
|
}
|
|
b.pendingTask[taskKey] = struct{}{}
|
|
}
|
|
b.mtx.Unlock()
|
|
|
|
// Watch
|
|
watchCtx, watchCancel := context.WithTimeout(ctx, time.Minute*10)
|
|
defer watchCancel()
|
|
|
|
watchOpts := []clientv3.OpOption{
|
|
clientv3.WithPrefix(),
|
|
clientv3.WithKeysOnly(),
|
|
clientv3.WithRev(resp.Header.Revision),
|
|
}
|
|
wc := b.client.Watch(watchCtx, keyPrefix, watchOpts...)
|
|
for wresp := range wc {
|
|
if wresp.Err() != nil {
|
|
return wresp.Err()
|
|
}
|
|
|
|
b.mtx.Lock()
|
|
for _, ev := range wresp.Events {
|
|
key := string(ev.Kv.Key)
|
|
taskKey := findTaskKey(key)
|
|
if taskKey == "" {
|
|
continue
|
|
}
|
|
if ev.Type == clientv3.EventTypeDelete {
|
|
delete(b.pendingTask, taskKey)
|
|
}
|
|
|
|
if ev.Type == clientv3.EventTypePut {
|
|
b.pendingTask[taskKey] = struct{}{}
|
|
}
|
|
}
|
|
b.mtx.Unlock()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getQueue(config *config.Config, taskProcessor iface.TaskProcessor) string {
|
|
customQueue := taskProcessor.CustomQueue()
|
|
if customQueue == "" {
|
|
return config.DefaultQueue
|
|
}
|
|
return customQueue
|
|
}
|
|
|
|
// findTaskKey return {queue}/{taskID}
|
|
func findTaskKey(key string) string {
|
|
switch {
|
|
case strings.HasPrefix(key, pendingTaskPrefix+"/"):
|
|
return key[len(pendingTaskPrefix)+1:]
|
|
|
|
case strings.HasPrefix(key, runningTaskPrefix+"/"):
|
|
return key[len(runningTaskPrefix)+1:]
|
|
|
|
case strings.HasPrefix(key, delayedTaskPrefix):
|
|
// {delayedTaskPrefix}/eta-{ms}/{queue}/{taskID}
|
|
parts := strings.Split(key, "/")
|
|
if len(parts) != 8 {
|
|
log.WARNING.Printf("invalid delay task %s, just ignore", key)
|
|
return ""
|
|
}
|
|
return fmt.Sprintf("%s/%s", parts[6], parts[7])
|
|
|
|
default:
|
|
log.WARNING.Printf("invalid task %s, just ignore", key)
|
|
return ""
|
|
}
|
|
}
|