machinery-plugins/examples/etcd/main.go

220 lines
5.2 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"os"
"time"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/config"
"github.com/RichardKnop/machinery/v2/example/tracers"
"github.com/RichardKnop/machinery/v2/log"
"github.com/RichardKnop/machinery/v2/tasks"
"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
opentracinglog "github.com/opentracing/opentracing-go/log"
"github.com/urfave/cli"
etcdbackend "github.com/ifooth/machinery-plugins/backends/etcd"
etcdbroker "github.com/ifooth/machinery-plugins/brokers/etcd"
exampletasks "github.com/ifooth/machinery-plugins/examples/tasks"
etcdlock "github.com/ifooth/machinery-plugins/locks/etcd"
)
var (
app *cli.App
)
func init() {
// Initialise a CLI app
app = cli.NewApp()
app.Name = "machinery"
app.Usage = "machinery worker and send example tasks with machinery send"
app.Version = "0.0.0"
}
func main() {
// Set the CLI app commands
app.Commands = []cli.Command{
{
Name: "worker",
Usage: "launch machinery worker",
Action: func(c *cli.Context) error {
if err := worker(); err != nil && !errors.Is(err, machinery.ErrWorkerQuitGracefully) {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
{
Name: "send",
Usage: "send example tasks ",
Action: func(c *cli.Context) error {
if err := send(); err != nil {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
}
// Run the CLI app
_ = app.Run(os.Args)
}
func startServer() (*machinery.Server, error) {
conf := &config.Config{
DefaultQueue: "machinery_tasks",
Broker: "http://127.0.0.1:2379",
ResultBackend: "http://127.0.0.1:2379",
Lock: "http://127.0.0.1:2379",
ResultsExpireIn: 3600,
}
ctx := context.Background()
// Create server instance
// broker := redisbroker.NewGR(cnf, []string{"localhost:6379"}, 1)
broker, err := etcdbroker.New(ctx, conf)
if err != nil {
return nil, err
}
// backend := redisbackend.NewGR(cnf, []string{"localhost:6379"}, 3)
backend, err := etcdbackend.New(ctx, conf)
if err != nil {
return nil, err
}
// lock := redislock.New(cnf, []string{"localhost:6379"}, 3, 2)
lock, err := etcdlock.New(ctx, conf, 3)
if err != nil {
return nil, err
}
server := machinery.NewServer(conf, broker, backend, lock)
// Register tasks
tasksMap := map[string]interface{}{
"add": exampletasks.Add,
"multiply": exampletasks.Multiply,
"sum_ints": exampletasks.SumInts,
"sum_floats": exampletasks.SumFloats,
"concat": exampletasks.Concat,
"split": exampletasks.Split,
"panic_task": exampletasks.PanicTask,
"long_running_task": exampletasks.LongRunningTask,
}
return server, server.RegisterTasks(tasksMap)
}
func worker() error {
consumerTag := "machinery_worker"
cleanup, err := tracers.SetupTracer(consumerTag)
if err != nil {
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
}
defer cleanup()
server, err := startServer()
if err != nil {
return err
}
// The second argument is a consumer tag
// Ideally, each worker should have a unique tag (worker1, worker2 etc)
worker := server.NewWorker(consumerTag, 0)
// Here we inject some custom code for error handling,
// start and end of task hooks, useful for metrics for example.
errorHandler := func(err error) {
log.ERROR.Println("I am an error handler:", err)
}
preTaskHandler := func(signature *tasks.Signature) {
log.INFO.Println("I am a start of task handler for:", signature.Name)
}
postTaskHandler := func(signature *tasks.Signature) {
log.INFO.Println("I am an end of task handler for:", signature.Name)
}
worker.SetPostTaskHandler(postTaskHandler)
worker.SetErrorHandler(errorHandler)
worker.SetPreTaskHandler(preTaskHandler)
return worker.Launch()
}
func send() error {
cleanup, err := tracers.SetupTracer("sender")
if err != nil {
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
}
defer cleanup()
server, err := startServer()
if err != nil {
return err
}
var (
addTask0 tasks.Signature
)
var initTasks = func() {
addTask0 = tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 1,
},
{
Type: "int64",
Value: 1,
},
},
}
}
/*
* Lets start a span representing this run of the `send` command and
* set a batch id as baggage so it can travel all the way into
* the worker functions.
*/
span, ctx := opentracing.StartSpanFromContext(context.Background(), "send")
defer span.Finish()
batchID := uuid.New().String()
span.SetBaggageItem("batch.id", batchID)
span.LogFields(opentracinglog.String("batch.id", batchID))
log.INFO.Println("Starting batch:", batchID)
/*
* First, let's try sending a single task
*/
initTasks()
now := time.Now().Add(time.Second * 10)
addTask0.ETA = &now
asyncResult, err := server.SendTaskWithContext(ctx, &addTask0)
if err != nil {
return fmt.Errorf("Could not send task: %s", err.Error())
}
log.INFO.Println("Single task:", asyncResult.Signature.UUID)
if err := server.RegisterPeriodicTask("* * * * *", "hello", &addTask0); err != nil {
return err
}
time.Sleep(time.Second * 120000)
return nil
}