feat: use list&watch for pending task (#4)
* feat: use watch for next task * feat: use watch for next taskmain
parent
8d8a7a12f5
commit
d42efb69d6
|
@ -6,7 +6,6 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -238,8 +237,7 @@ func (b *etcdBroker) Publish(ctx context.Context, signature *tasks.Signature) er
|
||||||
// delay the task
|
// delay the task
|
||||||
if s.ETA != nil {
|
if s.ETA != nil {
|
||||||
if s.ETA.After(now) {
|
if s.ETA.After(now) {
|
||||||
// score := signature.ETA.UnixNano()
|
key = fmt.Sprintf("/machinery/v2/broker/delayed_tasks/t%d-%s", s.ETA.UnixNano(), s.UUID)
|
||||||
key = fmt.Sprintf("/machinery/v2/broker/delayed_tasks/%s", s.UUID)
|
|
||||||
_, err = b.cli.Put(ctx, key, string(msg))
|
_, err = b.cli.Put(ctx, key, string(msg))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -308,75 +306,43 @@ func (b *etcdBroker) GetDelayedTasks() ([]*tasks.Signature, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *etcdBroker) nextTask(queue string) (*Signature, error) {
|
func (b *etcdBroker) nextTask(queue string) (*Signature, error) {
|
||||||
key := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s", queue)
|
keyPrefix := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s", queue)
|
||||||
items, err := b.getTasks(context.Background(), key)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
item, err := getFirstItem(ctx, b.cli, keyPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer time.Sleep(time.Second)
|
signature := new(Signature)
|
||||||
|
decoder := json.NewDecoder(bytes.NewReader(item))
|
||||||
if len(items) == 0 {
|
decoder.UseNumber()
|
||||||
return nil, haveNoTaskErr
|
if err := decoder.Decode(signature); err != nil {
|
||||||
|
return nil, errs.NewErrCouldNotUnmarshalTaskSignature(item, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var t *Signature
|
return signature, nil
|
||||||
score := int64(math.MaxInt64)
|
|
||||||
|
|
||||||
for _, v := range items {
|
|
||||||
if v.Score < score {
|
|
||||||
t = v
|
|
||||||
score = v.Score
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
k := fmt.Sprintf("/machinery/v2/broker/pending_tasks/%s/%s", queue, t.UUID)
|
|
||||||
_, _ = b.cli.Delete(context.Background(), k)
|
|
||||||
|
|
||||||
return t, nil
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *etcdBroker) nextDelayedTask() (*Signature, error) {
|
func (b *etcdBroker) nextDelayedTask() (*Signature, error) {
|
||||||
key := "/machinery/v2/broker/delayed_tasks"
|
keyPrefix := "/machinery/v2/broker/delayed_tasks"
|
||||||
items, err := b.getTasks(context.Background(), key)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
item, err := getFirstETAItem(ctx, b.cli, keyPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(items) == 0 {
|
signature := new(Signature)
|
||||||
return nil, haveNoTaskErr
|
decoder := json.NewDecoder(bytes.NewReader(item))
|
||||||
|
decoder.UseNumber()
|
||||||
|
if err := decoder.Decode(signature); err != nil {
|
||||||
|
return nil, errs.NewErrCouldNotUnmarshalTaskSignature(item, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var task *Signature
|
return signature, nil
|
||||||
now := time.Now()
|
|
||||||
earliest := now
|
|
||||||
|
|
||||||
for _, t := range items {
|
|
||||||
// 还没有到时间
|
|
||||||
if t.ETA.After(now) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// 选择最早的时间
|
|
||||||
if t.ETA.Before(earliest) {
|
|
||||||
earliest = *t.ETA
|
|
||||||
task = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if task != nil {
|
|
||||||
k := fmt.Sprintf("/machinery/v2/broker/delayed_tasks/%s", task.UUID)
|
|
||||||
_, err = b.cli.Delete(context.Background(), k)
|
|
||||||
if err != nil {
|
|
||||||
log.ERROR.Print(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return task, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, haveNoTaskErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *etcdBroker) requeueMessage(delivery *Signature, taskProcessor iface.TaskProcessor) {
|
func (b *etcdBroker) requeueMessage(delivery *Signature, taskProcessor iface.TaskProcessor) {
|
||||||
|
|
|
@ -0,0 +1,160 @@
|
||||||
|
package etcd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/RichardKnop/machinery/v2/log"
|
||||||
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrKeyExists = errors.New("key already exists")
|
||||||
|
ErrWaitMismatch = errors.New("unexpected wait result")
|
||||||
|
ErrTooManyClients = errors.New("too many clients")
|
||||||
|
ErrNoWatcher = errors.New("no watcher channel")
|
||||||
|
)
|
||||||
|
|
||||||
|
type checkFunc func(*mvccpb.KeyValue) error
|
||||||
|
|
||||||
|
// deleteRevKey deletes a key by revision, returning false if key is missing
|
||||||
|
func deleteRevKey(ctx context.Context, kv clientv3.KV, key string, rev int64) error {
|
||||||
|
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", rev)
|
||||||
|
req := clientv3.OpDelete(key)
|
||||||
|
txnresp, err := kv.Txn(ctx).If(cmp).Then(req).Commit()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !txnresp.Succeeded {
|
||||||
|
return fmt.Errorf("delete key failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitPrefixEvents(ctx context.Context, c *clientv3.Client, prefix string, rev int64) (*clientv3.Event, error) {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
wc := c.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(rev))
|
||||||
|
if wc == nil {
|
||||||
|
return nil, ErrNoWatcher
|
||||||
|
}
|
||||||
|
|
||||||
|
return waitEvents(wc), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitEvents(wc clientv3.WatchChan) *clientv3.Event {
|
||||||
|
for wresp := range wc {
|
||||||
|
for _, ev := range wresp.Events {
|
||||||
|
if ev.Type != mvccpb.PUT {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return ev
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getItem(ctx context.Context, c *clientv3.Client, prefix string, opts []clientv3.OpOption, check checkFunc) (*mvccpb.KeyValue, error) {
|
||||||
|
resp, err := c.Get(ctx, prefix, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// nothing yet; wait on elements
|
||||||
|
if len(resp.Kvs) == 0 {
|
||||||
|
ev, err := waitPrefixEvents(
|
||||||
|
ctx,
|
||||||
|
c,
|
||||||
|
prefix,
|
||||||
|
resp.Header.Revision)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ev == nil {
|
||||||
|
return nil, haveNoTaskErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := check(ev.Kv); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := deleteRevKey(ctx, c, string(ev.Kv.Key), ev.Kv.ModRevision); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ev.Kv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
kv := resp.Kvs[0]
|
||||||
|
if err := check(kv); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := deleteRevKey(ctx, c, string(kv.Key), kv.ModRevision); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return kv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getFirstItem(ctx context.Context, c *clientv3.Client, prefix string) ([]byte, error) {
|
||||||
|
// 按创建时间排序, 只返回单个数据
|
||||||
|
opts := clientv3.WithFirstRev()
|
||||||
|
|
||||||
|
check := func(*mvccpb.KeyValue) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
kv, err := getItem(ctx, c, prefix, opts, check)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return kv.Value, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getFirstETAItem(ctx context.Context, c *clientv3.Client, prefix string) ([]byte, error) {
|
||||||
|
// 按创建key排序, 只返回单个数据
|
||||||
|
opts := clientv3.WithFirstKey()
|
||||||
|
|
||||||
|
check := func(kv *mvccpb.KeyValue) error {
|
||||||
|
k := string(kv.Key)
|
||||||
|
if len(k) < 55 {
|
||||||
|
log.ERROR.Print("not valid eta key: %s", k)
|
||||||
|
return fmt.Errorf("not valid eta key: %s", k)
|
||||||
|
}
|
||||||
|
|
||||||
|
nsStr := k[36:55]
|
||||||
|
ns, err := strconv.ParseInt(nsStr, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parse key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t := time.Unix(0, ns)
|
||||||
|
if t.After(time.Now()) {
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
return haveNoTaskErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
kv, err := getItem(ctx, c, prefix, opts, check)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return kv.Value, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func putItem() {
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue