machinery-plugins/examples/etcd/main.go

459 lines
11 KiB
Go
Raw Permalink Normal View History

2024-05-19 12:24:36 +00:00
package main
import (
"context"
"errors"
"fmt"
"os"
"time"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/config"
"github.com/RichardKnop/machinery/v2/example/tracers"
"github.com/RichardKnop/machinery/v2/log"
"github.com/RichardKnop/machinery/v2/tasks"
"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
opentracinglog "github.com/opentracing/opentracing-go/log"
"github.com/urfave/cli"
2024-06-01 03:35:00 +00:00
etcdbackend "git.ifooth.com/common/machinery-plugins/backends/etcd"
etcdbroker "git.ifooth.com/common/machinery-plugins/brokers/etcd"
exampletasks "git.ifooth.com/common/machinery-plugins/examples/tasks"
etcdlock "git.ifooth.com/common/machinery-plugins/locks/etcd"
2024-05-19 12:24:36 +00:00
)
var (
app *cli.App
)
func init() {
// Initialise a CLI app
app = cli.NewApp()
app.Name = "machinery"
app.Usage = "machinery worker and send example tasks with machinery send"
app.Version = "0.0.0"
}
func main() {
// Set the CLI app commands
app.Commands = []cli.Command{
{
Name: "worker",
Usage: "launch machinery worker",
Action: func(c *cli.Context) error {
if err := worker(); err != nil && !errors.Is(err, machinery.ErrWorkerQuitGracefully) {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
{
Name: "send",
Usage: "send example tasks ",
Action: func(c *cli.Context) error {
if err := send(); err != nil {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
}
// Run the CLI app
_ = app.Run(os.Args)
}
func startServer() (*machinery.Server, error) {
2024-05-24 17:09:00 +00:00
conf := &config.Config{
2024-05-19 12:24:36 +00:00
DefaultQueue: "machinery_tasks",
2024-05-24 17:03:13 +00:00
Broker: "http://127.0.0.1:2379",
ResultBackend: "http://127.0.0.1:2379",
Lock: "http://127.0.0.1:2379",
2024-05-25 08:11:27 +00:00
ResultsExpireIn: 60,
2024-05-19 12:24:36 +00:00
}
2024-05-24 17:03:13 +00:00
ctx := context.Background()
2024-05-19 12:24:36 +00:00
// Create server instance
// broker := redisbroker.NewGR(cnf, []string{"localhost:6379"}, 1)
2024-05-24 17:09:00 +00:00
broker, err := etcdbroker.New(ctx, conf)
2024-05-19 12:24:36 +00:00
if err != nil {
return nil, err
}
2024-05-24 17:09:00 +00:00
// backend := redisbackend.NewGR(cnf, []string{"localhost:6379"}, 3)
backend, err := etcdbackend.New(ctx, conf)
2024-05-24 17:03:13 +00:00
if err != nil {
return nil, err
}
2024-05-24 17:09:00 +00:00
// lock := redislock.New(cnf, []string{"localhost:6379"}, 3, 2)
lock, err := etcdlock.New(ctx, conf, 3)
2024-05-24 14:57:16 +00:00
if err != nil {
return nil, err
}
2024-05-24 17:09:00 +00:00
server := machinery.NewServer(conf, broker, backend, lock)
2024-05-19 12:24:36 +00:00
// Register tasks
tasksMap := map[string]interface{}{
"add": exampletasks.Add,
"multiply": exampletasks.Multiply,
"sum_ints": exampletasks.SumInts,
"sum_floats": exampletasks.SumFloats,
"concat": exampletasks.Concat,
"split": exampletasks.Split,
"panic_task": exampletasks.PanicTask,
"long_running_task": exampletasks.LongRunningTask,
}
return server, server.RegisterTasks(tasksMap)
}
func worker() error {
consumerTag := "machinery_worker"
cleanup, err := tracers.SetupTracer(consumerTag)
if err != nil {
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
}
defer cleanup()
server, err := startServer()
if err != nil {
return err
}
// The second argument is a consumer tag
// Ideally, each worker should have a unique tag (worker1, worker2 etc)
worker := server.NewWorker(consumerTag, 0)
// Here we inject some custom code for error handling,
// start and end of task hooks, useful for metrics for example.
errorHandler := func(err error) {
log.ERROR.Println("I am an error handler:", err)
}
preTaskHandler := func(signature *tasks.Signature) {
log.INFO.Println("I am a start of task handler for:", signature.Name)
}
postTaskHandler := func(signature *tasks.Signature) {
log.INFO.Println("I am an end of task handler for:", signature.Name)
}
worker.SetPostTaskHandler(postTaskHandler)
worker.SetErrorHandler(errorHandler)
worker.SetPreTaskHandler(preTaskHandler)
return worker.Launch()
}
func send() error {
cleanup, err := tracers.SetupTracer("sender")
if err != nil {
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
}
defer cleanup()
server, err := startServer()
if err != nil {
return err
}
var (
2024-05-25 11:42:37 +00:00
addTask0, addTask1, addTask2 tasks.Signature
multiplyTask0, multiplyTask1 tasks.Signature
sumIntsTask, sumFloatsTask, concatTask, splitTask tasks.Signature
panicTask tasks.Signature
longRunningTask tasks.Signature
2024-05-19 12:24:36 +00:00
)
var initTasks = func() {
addTask0 = tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 1,
},
{
Type: "int64",
Value: 1,
},
},
}
2024-05-25 11:42:37 +00:00
addTask1 = tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 2,
},
{
Type: "int64",
Value: 2,
},
},
}
addTask2 = tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 5,
},
{
Type: "int64",
Value: 6,
},
},
}
multiplyTask0 = tasks.Signature{
Name: "multiply",
Args: []tasks.Arg{
{
Type: "int64",
Value: 4,
},
},
}
multiplyTask1 = tasks.Signature{
Name: "multiply",
}
sumIntsTask = tasks.Signature{
Name: "sum_ints",
Args: []tasks.Arg{
{
Type: "[]int64",
Value: []int64{1, 2},
},
},
}
sumFloatsTask = tasks.Signature{
Name: "sum_floats",
Args: []tasks.Arg{
{
Type: "[]float64",
Value: []float64{1.5, 2.7},
},
},
}
concatTask = tasks.Signature{
Name: "concat",
Args: []tasks.Arg{
{
Type: "[]string",
Value: []string{"foo", "bar"},
},
},
}
splitTask = tasks.Signature{
Name: "split",
Args: []tasks.Arg{
{
Type: "string",
Value: "foo",
},
},
}
panicTask = tasks.Signature{
Name: "panic_task",
}
longRunningTask = tasks.Signature{
Name: "long_running_task",
}
2024-05-19 12:24:36 +00:00
}
/*
* Lets start a span representing this run of the `send` command and
* set a batch id as baggage so it can travel all the way into
* the worker functions.
*/
span, ctx := opentracing.StartSpanFromContext(context.Background(), "send")
defer span.Finish()
batchID := uuid.New().String()
span.SetBaggageItem("batch.id", batchID)
span.LogFields(opentracinglog.String("batch.id", batchID))
log.INFO.Println("Starting batch:", batchID)
/*
* First, let's try sending a single task
*/
initTasks()
2024-05-25 11:42:37 +00:00
log.INFO.Println("Single task:")
2024-05-19 12:24:36 +00:00
asyncResult, err := server.SendTaskWithContext(ctx, &addTask0)
if err != nil {
return fmt.Errorf("Could not send task: %s", err.Error())
}
2024-05-25 11:42:37 +00:00
results, err := asyncResult.Get(time.Millisecond * 5)
if err != nil {
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
}
log.INFO.Printf("1 + 1 = %v\n", tasks.HumanReadableResults(results))
2024-05-19 12:24:36 +00:00
2024-05-25 11:42:37 +00:00
/*
* Try couple of tasks with a slice argument and slice return value
*/
asyncResult, err = server.SendTaskWithContext(ctx, &sumIntsTask)
if err != nil {
return fmt.Errorf("Could not send task: %s", err.Error())
}
results, err = asyncResult.Get(time.Millisecond * 5)
if err != nil {
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
}
log.INFO.Printf("sum([1, 2]) = %v\n", tasks.HumanReadableResults(results))
asyncResult, err = server.SendTaskWithContext(ctx, &sumFloatsTask)
if err != nil {
return fmt.Errorf("Could not send task: %s", err.Error())
}
results, err = asyncResult.Get(time.Millisecond * 5)
if err != nil {
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
}
log.INFO.Printf("sum([1.5, 2.7]) = %v\n", tasks.HumanReadableResults(results))
asyncResult, err = server.SendTaskWithContext(ctx, &concatTask)
if err != nil {
return fmt.Errorf("Could not send task: %s", err.Error())
}
results, err = asyncResult.Get(time.Millisecond * 5)
if err != nil {
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
}
log.INFO.Printf("concat([\"foo\", \"bar\"]) = %v\n", tasks.HumanReadableResults(results))
asyncResult, err = server.SendTaskWithContext(ctx, &splitTask)
if err != nil {
return fmt.Errorf("Could not send task: %s", err.Error())
}
results, err = asyncResult.Get(time.Millisecond * 5)
if err != nil {
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
}
log.INFO.Printf("split([\"foo\"]) = %v\n", tasks.HumanReadableResults(results))
/*
* Now let's explore ways of sending multiple tasks
*/
// Now let's try a parallel execution
initTasks()
log.INFO.Println("Group of tasks (parallel execution):")
group, err := tasks.NewGroup(&addTask0, &addTask1, &addTask2)
if err != nil {
return fmt.Errorf("Error creating group: %s", err.Error())
}
asyncResults, err := server.SendGroupWithContext(ctx, group, 10)
if err != nil {
return fmt.Errorf("Could not send group: %s", err.Error())
}
for _, asyncResult := range asyncResults {
results, err = asyncResult.Get(time.Millisecond * 5)
if err != nil {
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
}
log.INFO.Printf(
"%v + %v = %v\n",
asyncResult.Signature.Args[0].Value,
asyncResult.Signature.Args[1].Value,
tasks.HumanReadableResults(results),
)
}
// Now let's try a group with a chord
initTasks()
log.INFO.Println("Group of tasks with a callback (chord):")
group, err = tasks.NewGroup(&addTask0, &addTask1, &addTask2)
if err != nil {
return fmt.Errorf("Error creating group: %s", err.Error())
}
chord, err := tasks.NewChord(group, &multiplyTask1)
if err != nil {
return fmt.Errorf("Error creating chord: %s", err)
}
chordAsyncResult, err := server.SendChordWithContext(ctx, chord, 10)
if err != nil {
return fmt.Errorf("Could not send chord: %s", err.Error())
}
results, err = chordAsyncResult.Get(time.Millisecond * 5)
if err != nil {
return fmt.Errorf("Getting chord result failed with error: %s", err.Error())
}
log.INFO.Printf("(1 + 1) * (2 + 2) * (5 + 6) = %v\n", tasks.HumanReadableResults(results))
// Now let's try chaining task results
initTasks()
log.INFO.Println("Chain of tasks:")
chain, err := tasks.NewChain(&addTask0, &addTask1, &addTask2, &multiplyTask0)
if err != nil {
return fmt.Errorf("Error creating chain: %s", err)
}
chainAsyncResult, err := server.SendChainWithContext(ctx, chain)
if err != nil {
return fmt.Errorf("Could not send chain: %s", err.Error())
}
results, err = chainAsyncResult.Get(time.Millisecond * 5)
if err != nil {
return fmt.Errorf("Getting chain result failed with error: %s", err.Error())
}
log.INFO.Printf("(((1 + 1) + (2 + 2)) + (5 + 6)) * 4 = %v\n", tasks.HumanReadableResults(results))
// Let's try a task which throws panic to make sure stack trace is not lost
initTasks()
asyncResult, err = server.SendTaskWithContext(ctx, &panicTask)
if err != nil {
return fmt.Errorf("Could not send task: %s", err.Error())
2024-05-24 14:57:16 +00:00
}
2024-05-25 11:42:37 +00:00
_, err = asyncResult.Get(time.Millisecond * 5)
if err == nil {
return errors.New("Error should not be nil if task panicked")
}
log.INFO.Printf("Task panicked and returned error = %v\n", err.Error())
// Let's try a long running task
initTasks()
asyncResult, err = server.SendTaskWithContext(ctx, &longRunningTask)
if err != nil {
return fmt.Errorf("Could not send task: %s", err.Error())
}
results, err = asyncResult.Get(time.Millisecond * 5)
if err != nil {
return fmt.Errorf("Getting long running task result failed with error: %s", err.Error())
}
log.INFO.Printf("Long running task returned = %v\n", tasks.HumanReadableResults(results))
2024-05-24 14:57:16 +00:00
2024-05-19 12:24:36 +00:00
return nil
}