459 lines
11 KiB
Go
459 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/RichardKnop/machinery/v2"
|
|
"github.com/RichardKnop/machinery/v2/config"
|
|
"github.com/RichardKnop/machinery/v2/example/tracers"
|
|
"github.com/RichardKnop/machinery/v2/log"
|
|
"github.com/RichardKnop/machinery/v2/tasks"
|
|
"github.com/google/uuid"
|
|
"github.com/opentracing/opentracing-go"
|
|
opentracinglog "github.com/opentracing/opentracing-go/log"
|
|
"github.com/urfave/cli"
|
|
|
|
etcdbackend "git.ifooth.com/common/machinery-plugins/backends/etcd"
|
|
etcdbroker "git.ifooth.com/common/machinery-plugins/brokers/etcd"
|
|
exampletasks "git.ifooth.com/common/machinery-plugins/examples/tasks"
|
|
etcdlock "git.ifooth.com/common/machinery-plugins/locks/etcd"
|
|
)
|
|
|
|
var (
|
|
app *cli.App
|
|
)
|
|
|
|
func init() {
|
|
// Initialise a CLI app
|
|
app = cli.NewApp()
|
|
app.Name = "machinery"
|
|
app.Usage = "machinery worker and send example tasks with machinery send"
|
|
app.Version = "0.0.0"
|
|
}
|
|
|
|
func main() {
|
|
// Set the CLI app commands
|
|
app.Commands = []cli.Command{
|
|
{
|
|
Name: "worker",
|
|
Usage: "launch machinery worker",
|
|
Action: func(c *cli.Context) error {
|
|
if err := worker(); err != nil && !errors.Is(err, machinery.ErrWorkerQuitGracefully) {
|
|
return cli.NewExitError(err.Error(), 1)
|
|
}
|
|
return nil
|
|
},
|
|
},
|
|
{
|
|
Name: "send",
|
|
Usage: "send example tasks ",
|
|
Action: func(c *cli.Context) error {
|
|
if err := send(); err != nil {
|
|
return cli.NewExitError(err.Error(), 1)
|
|
}
|
|
return nil
|
|
},
|
|
},
|
|
}
|
|
|
|
// Run the CLI app
|
|
_ = app.Run(os.Args)
|
|
}
|
|
|
|
func startServer() (*machinery.Server, error) {
|
|
conf := &config.Config{
|
|
DefaultQueue: "machinery_tasks",
|
|
Broker: "http://127.0.0.1:2379",
|
|
ResultBackend: "http://127.0.0.1:2379",
|
|
Lock: "http://127.0.0.1:2379",
|
|
ResultsExpireIn: 60,
|
|
}
|
|
|
|
ctx := context.Background()
|
|
// Create server instance
|
|
// broker := redisbroker.NewGR(cnf, []string{"localhost:6379"}, 1)
|
|
broker, err := etcdbroker.New(ctx, conf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// backend := redisbackend.NewGR(cnf, []string{"localhost:6379"}, 3)
|
|
backend, err := etcdbackend.New(ctx, conf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// lock := redislock.New(cnf, []string{"localhost:6379"}, 3, 2)
|
|
lock, err := etcdlock.New(ctx, conf, 3)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
server := machinery.NewServer(conf, broker, backend, lock)
|
|
|
|
// Register tasks
|
|
tasksMap := map[string]interface{}{
|
|
"add": exampletasks.Add,
|
|
"multiply": exampletasks.Multiply,
|
|
"sum_ints": exampletasks.SumInts,
|
|
"sum_floats": exampletasks.SumFloats,
|
|
"concat": exampletasks.Concat,
|
|
"split": exampletasks.Split,
|
|
"panic_task": exampletasks.PanicTask,
|
|
"long_running_task": exampletasks.LongRunningTask,
|
|
}
|
|
|
|
return server, server.RegisterTasks(tasksMap)
|
|
}
|
|
|
|
func worker() error {
|
|
consumerTag := "machinery_worker"
|
|
|
|
cleanup, err := tracers.SetupTracer(consumerTag)
|
|
if err != nil {
|
|
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
|
|
}
|
|
defer cleanup()
|
|
|
|
server, err := startServer()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// The second argument is a consumer tag
|
|
// Ideally, each worker should have a unique tag (worker1, worker2 etc)
|
|
worker := server.NewWorker(consumerTag, 0)
|
|
|
|
// Here we inject some custom code for error handling,
|
|
// start and end of task hooks, useful for metrics for example.
|
|
errorHandler := func(err error) {
|
|
log.ERROR.Println("I am an error handler:", err)
|
|
}
|
|
|
|
preTaskHandler := func(signature *tasks.Signature) {
|
|
log.INFO.Println("I am a start of task handler for:", signature.Name)
|
|
}
|
|
|
|
postTaskHandler := func(signature *tasks.Signature) {
|
|
log.INFO.Println("I am an end of task handler for:", signature.Name)
|
|
}
|
|
|
|
worker.SetPostTaskHandler(postTaskHandler)
|
|
worker.SetErrorHandler(errorHandler)
|
|
worker.SetPreTaskHandler(preTaskHandler)
|
|
|
|
return worker.Launch()
|
|
}
|
|
|
|
func send() error {
|
|
cleanup, err := tracers.SetupTracer("sender")
|
|
if err != nil {
|
|
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
|
|
}
|
|
defer cleanup()
|
|
|
|
server, err := startServer()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var (
|
|
addTask0, addTask1, addTask2 tasks.Signature
|
|
multiplyTask0, multiplyTask1 tasks.Signature
|
|
sumIntsTask, sumFloatsTask, concatTask, splitTask tasks.Signature
|
|
panicTask tasks.Signature
|
|
longRunningTask tasks.Signature
|
|
)
|
|
|
|
var initTasks = func() {
|
|
addTask0 = tasks.Signature{
|
|
Name: "add",
|
|
Args: []tasks.Arg{
|
|
{
|
|
Type: "int64",
|
|
Value: 1,
|
|
},
|
|
{
|
|
Type: "int64",
|
|
Value: 1,
|
|
},
|
|
},
|
|
}
|
|
|
|
addTask1 = tasks.Signature{
|
|
Name: "add",
|
|
Args: []tasks.Arg{
|
|
{
|
|
Type: "int64",
|
|
Value: 2,
|
|
},
|
|
{
|
|
Type: "int64",
|
|
Value: 2,
|
|
},
|
|
},
|
|
}
|
|
|
|
addTask2 = tasks.Signature{
|
|
Name: "add",
|
|
Args: []tasks.Arg{
|
|
{
|
|
Type: "int64",
|
|
Value: 5,
|
|
},
|
|
{
|
|
Type: "int64",
|
|
Value: 6,
|
|
},
|
|
},
|
|
}
|
|
|
|
multiplyTask0 = tasks.Signature{
|
|
Name: "multiply",
|
|
Args: []tasks.Arg{
|
|
{
|
|
Type: "int64",
|
|
Value: 4,
|
|
},
|
|
},
|
|
}
|
|
|
|
multiplyTask1 = tasks.Signature{
|
|
Name: "multiply",
|
|
}
|
|
|
|
sumIntsTask = tasks.Signature{
|
|
Name: "sum_ints",
|
|
Args: []tasks.Arg{
|
|
{
|
|
Type: "[]int64",
|
|
Value: []int64{1, 2},
|
|
},
|
|
},
|
|
}
|
|
|
|
sumFloatsTask = tasks.Signature{
|
|
Name: "sum_floats",
|
|
Args: []tasks.Arg{
|
|
{
|
|
Type: "[]float64",
|
|
Value: []float64{1.5, 2.7},
|
|
},
|
|
},
|
|
}
|
|
|
|
concatTask = tasks.Signature{
|
|
Name: "concat",
|
|
Args: []tasks.Arg{
|
|
{
|
|
Type: "[]string",
|
|
Value: []string{"foo", "bar"},
|
|
},
|
|
},
|
|
}
|
|
|
|
splitTask = tasks.Signature{
|
|
Name: "split",
|
|
Args: []tasks.Arg{
|
|
{
|
|
Type: "string",
|
|
Value: "foo",
|
|
},
|
|
},
|
|
}
|
|
|
|
panicTask = tasks.Signature{
|
|
Name: "panic_task",
|
|
}
|
|
|
|
longRunningTask = tasks.Signature{
|
|
Name: "long_running_task",
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Lets start a span representing this run of the `send` command and
|
|
* set a batch id as baggage so it can travel all the way into
|
|
* the worker functions.
|
|
*/
|
|
span, ctx := opentracing.StartSpanFromContext(context.Background(), "send")
|
|
defer span.Finish()
|
|
|
|
batchID := uuid.New().String()
|
|
span.SetBaggageItem("batch.id", batchID)
|
|
span.LogFields(opentracinglog.String("batch.id", batchID))
|
|
|
|
log.INFO.Println("Starting batch:", batchID)
|
|
/*
|
|
* First, let's try sending a single task
|
|
*/
|
|
initTasks()
|
|
|
|
log.INFO.Println("Single task:")
|
|
|
|
asyncResult, err := server.SendTaskWithContext(ctx, &addTask0)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send task: %s", err.Error())
|
|
}
|
|
|
|
results, err := asyncResult.Get(time.Millisecond * 5)
|
|
if err != nil {
|
|
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
|
|
}
|
|
log.INFO.Printf("1 + 1 = %v\n", tasks.HumanReadableResults(results))
|
|
|
|
/*
|
|
* Try couple of tasks with a slice argument and slice return value
|
|
*/
|
|
asyncResult, err = server.SendTaskWithContext(ctx, &sumIntsTask)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send task: %s", err.Error())
|
|
}
|
|
|
|
results, err = asyncResult.Get(time.Millisecond * 5)
|
|
if err != nil {
|
|
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
|
|
}
|
|
log.INFO.Printf("sum([1, 2]) = %v\n", tasks.HumanReadableResults(results))
|
|
|
|
asyncResult, err = server.SendTaskWithContext(ctx, &sumFloatsTask)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send task: %s", err.Error())
|
|
}
|
|
|
|
results, err = asyncResult.Get(time.Millisecond * 5)
|
|
if err != nil {
|
|
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
|
|
}
|
|
log.INFO.Printf("sum([1.5, 2.7]) = %v\n", tasks.HumanReadableResults(results))
|
|
|
|
asyncResult, err = server.SendTaskWithContext(ctx, &concatTask)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send task: %s", err.Error())
|
|
}
|
|
|
|
results, err = asyncResult.Get(time.Millisecond * 5)
|
|
if err != nil {
|
|
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
|
|
}
|
|
log.INFO.Printf("concat([\"foo\", \"bar\"]) = %v\n", tasks.HumanReadableResults(results))
|
|
|
|
asyncResult, err = server.SendTaskWithContext(ctx, &splitTask)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send task: %s", err.Error())
|
|
}
|
|
|
|
results, err = asyncResult.Get(time.Millisecond * 5)
|
|
if err != nil {
|
|
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
|
|
}
|
|
log.INFO.Printf("split([\"foo\"]) = %v\n", tasks.HumanReadableResults(results))
|
|
|
|
/*
|
|
* Now let's explore ways of sending multiple tasks
|
|
*/
|
|
|
|
// Now let's try a parallel execution
|
|
initTasks()
|
|
log.INFO.Println("Group of tasks (parallel execution):")
|
|
|
|
group, err := tasks.NewGroup(&addTask0, &addTask1, &addTask2)
|
|
if err != nil {
|
|
return fmt.Errorf("Error creating group: %s", err.Error())
|
|
}
|
|
|
|
asyncResults, err := server.SendGroupWithContext(ctx, group, 10)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send group: %s", err.Error())
|
|
}
|
|
|
|
for _, asyncResult := range asyncResults {
|
|
results, err = asyncResult.Get(time.Millisecond * 5)
|
|
if err != nil {
|
|
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
|
|
}
|
|
log.INFO.Printf(
|
|
"%v + %v = %v\n",
|
|
asyncResult.Signature.Args[0].Value,
|
|
asyncResult.Signature.Args[1].Value,
|
|
tasks.HumanReadableResults(results),
|
|
)
|
|
}
|
|
|
|
// Now let's try a group with a chord
|
|
initTasks()
|
|
log.INFO.Println("Group of tasks with a callback (chord):")
|
|
|
|
group, err = tasks.NewGroup(&addTask0, &addTask1, &addTask2)
|
|
if err != nil {
|
|
return fmt.Errorf("Error creating group: %s", err.Error())
|
|
}
|
|
|
|
chord, err := tasks.NewChord(group, &multiplyTask1)
|
|
if err != nil {
|
|
return fmt.Errorf("Error creating chord: %s", err)
|
|
}
|
|
|
|
chordAsyncResult, err := server.SendChordWithContext(ctx, chord, 10)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send chord: %s", err.Error())
|
|
}
|
|
|
|
results, err = chordAsyncResult.Get(time.Millisecond * 5)
|
|
if err != nil {
|
|
return fmt.Errorf("Getting chord result failed with error: %s", err.Error())
|
|
}
|
|
log.INFO.Printf("(1 + 1) * (2 + 2) * (5 + 6) = %v\n", tasks.HumanReadableResults(results))
|
|
|
|
// Now let's try chaining task results
|
|
initTasks()
|
|
log.INFO.Println("Chain of tasks:")
|
|
|
|
chain, err := tasks.NewChain(&addTask0, &addTask1, &addTask2, &multiplyTask0)
|
|
if err != nil {
|
|
return fmt.Errorf("Error creating chain: %s", err)
|
|
}
|
|
|
|
chainAsyncResult, err := server.SendChainWithContext(ctx, chain)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send chain: %s", err.Error())
|
|
}
|
|
|
|
results, err = chainAsyncResult.Get(time.Millisecond * 5)
|
|
if err != nil {
|
|
return fmt.Errorf("Getting chain result failed with error: %s", err.Error())
|
|
}
|
|
log.INFO.Printf("(((1 + 1) + (2 + 2)) + (5 + 6)) * 4 = %v\n", tasks.HumanReadableResults(results))
|
|
|
|
// Let's try a task which throws panic to make sure stack trace is not lost
|
|
initTasks()
|
|
asyncResult, err = server.SendTaskWithContext(ctx, &panicTask)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send task: %s", err.Error())
|
|
}
|
|
|
|
_, err = asyncResult.Get(time.Millisecond * 5)
|
|
if err == nil {
|
|
return errors.New("Error should not be nil if task panicked")
|
|
}
|
|
log.INFO.Printf("Task panicked and returned error = %v\n", err.Error())
|
|
|
|
// Let's try a long running task
|
|
initTasks()
|
|
asyncResult, err = server.SendTaskWithContext(ctx, &longRunningTask)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send task: %s", err.Error())
|
|
}
|
|
|
|
results, err = asyncResult.Get(time.Millisecond * 5)
|
|
if err != nil {
|
|
return fmt.Errorf("Getting long running task result failed with error: %s", err.Error())
|
|
}
|
|
log.INFO.Printf("Long running task returned = %v\n", tasks.HumanReadableResults(results))
|
|
|
|
return nil
|
|
}
|