213 lines
5.0 KiB
Go
213 lines
5.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/RichardKnop/machinery/v2"
|
|
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
|
|
"github.com/RichardKnop/machinery/v2/config"
|
|
"github.com/RichardKnop/machinery/v2/example/tracers"
|
|
"github.com/RichardKnop/machinery/v2/log"
|
|
"github.com/RichardKnop/machinery/v2/tasks"
|
|
"github.com/google/uuid"
|
|
"github.com/opentracing/opentracing-go"
|
|
opentracinglog "github.com/opentracing/opentracing-go/log"
|
|
"github.com/urfave/cli"
|
|
|
|
etcdbroker "github.com/ifooth/machinery-plugins/brokers/etcd"
|
|
exampletasks "github.com/ifooth/machinery-plugins/examples/tasks"
|
|
etcdlock "github.com/ifooth/machinery-plugins/locks/etcd"
|
|
)
|
|
|
|
var (
|
|
app *cli.App
|
|
)
|
|
|
|
func init() {
|
|
// Initialise a CLI app
|
|
app = cli.NewApp()
|
|
app.Name = "machinery"
|
|
app.Usage = "machinery worker and send example tasks with machinery send"
|
|
app.Version = "0.0.0"
|
|
}
|
|
|
|
func main() {
|
|
// Set the CLI app commands
|
|
app.Commands = []cli.Command{
|
|
{
|
|
Name: "worker",
|
|
Usage: "launch machinery worker",
|
|
Action: func(c *cli.Context) error {
|
|
if err := worker(); err != nil && !errors.Is(err, machinery.ErrWorkerQuitGracefully) {
|
|
return cli.NewExitError(err.Error(), 1)
|
|
}
|
|
return nil
|
|
},
|
|
},
|
|
{
|
|
Name: "send",
|
|
Usage: "send example tasks ",
|
|
Action: func(c *cli.Context) error {
|
|
if err := send(); err != nil {
|
|
return cli.NewExitError(err.Error(), 1)
|
|
}
|
|
return nil
|
|
},
|
|
},
|
|
}
|
|
|
|
// Run the CLI app
|
|
_ = app.Run(os.Args)
|
|
}
|
|
|
|
func startServer() (*machinery.Server, error) {
|
|
cnf := &config.Config{
|
|
DefaultQueue: "machinery_tasks",
|
|
Broker: "redis://localhost:6379",
|
|
ResultBackend: "redis://localhost:6379",
|
|
ResultsExpireIn: 3600,
|
|
}
|
|
|
|
// Create server instance
|
|
// broker := redisbroker.NewGR(cnf, []string{"localhost:6379"}, 1)
|
|
broker, err := etcdbroker.New(cnf, "http://127.0.0.1:2379")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// lock := redislock.New(cnf, []string{"localhost:6379"}, 3, 2)
|
|
lock, err := etcdlock.New(cnf, "http://127.0.0.1:2379")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
backend := redisbackend.NewGR(cnf, []string{"localhost:6379"}, 3)
|
|
server := machinery.NewServer(cnf, broker, backend, lock)
|
|
|
|
// Register tasks
|
|
tasksMap := map[string]interface{}{
|
|
"add": exampletasks.Add,
|
|
"multiply": exampletasks.Multiply,
|
|
"sum_ints": exampletasks.SumInts,
|
|
"sum_floats": exampletasks.SumFloats,
|
|
"concat": exampletasks.Concat,
|
|
"split": exampletasks.Split,
|
|
"panic_task": exampletasks.PanicTask,
|
|
"long_running_task": exampletasks.LongRunningTask,
|
|
}
|
|
|
|
return server, server.RegisterTasks(tasksMap)
|
|
}
|
|
|
|
func worker() error {
|
|
consumerTag := "machinery_worker"
|
|
|
|
cleanup, err := tracers.SetupTracer(consumerTag)
|
|
if err != nil {
|
|
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
|
|
}
|
|
defer cleanup()
|
|
|
|
server, err := startServer()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// The second argument is a consumer tag
|
|
// Ideally, each worker should have a unique tag (worker1, worker2 etc)
|
|
worker := server.NewWorker(consumerTag, 0)
|
|
|
|
// Here we inject some custom code for error handling,
|
|
// start and end of task hooks, useful for metrics for example.
|
|
errorHandler := func(err error) {
|
|
log.ERROR.Println("I am an error handler:", err)
|
|
}
|
|
|
|
preTaskHandler := func(signature *tasks.Signature) {
|
|
log.INFO.Println("I am a start of task handler for:", signature.Name)
|
|
}
|
|
|
|
postTaskHandler := func(signature *tasks.Signature) {
|
|
log.INFO.Println("I am an end of task handler for:", signature.Name)
|
|
}
|
|
|
|
worker.SetPostTaskHandler(postTaskHandler)
|
|
worker.SetErrorHandler(errorHandler)
|
|
worker.SetPreTaskHandler(preTaskHandler)
|
|
|
|
return worker.Launch()
|
|
}
|
|
|
|
func send() error {
|
|
cleanup, err := tracers.SetupTracer("sender")
|
|
if err != nil {
|
|
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
|
|
}
|
|
defer cleanup()
|
|
|
|
server, err := startServer()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var (
|
|
addTask0 tasks.Signature
|
|
)
|
|
|
|
var initTasks = func() {
|
|
addTask0 = tasks.Signature{
|
|
Name: "add",
|
|
Args: []tasks.Arg{
|
|
{
|
|
Type: "int64",
|
|
Value: 1,
|
|
},
|
|
{
|
|
Type: "int64",
|
|
Value: 1,
|
|
},
|
|
},
|
|
}
|
|
|
|
}
|
|
|
|
/*
|
|
* Lets start a span representing this run of the `send` command and
|
|
* set a batch id as baggage so it can travel all the way into
|
|
* the worker functions.
|
|
*/
|
|
span, ctx := opentracing.StartSpanFromContext(context.Background(), "send")
|
|
defer span.Finish()
|
|
|
|
batchID := uuid.New().String()
|
|
span.SetBaggageItem("batch.id", batchID)
|
|
span.LogFields(opentracinglog.String("batch.id", batchID))
|
|
|
|
log.INFO.Println("Starting batch:", batchID)
|
|
|
|
/*
|
|
* First, let's try sending a single task
|
|
*/
|
|
initTasks()
|
|
|
|
now := time.Now().Add(time.Second * 10)
|
|
addTask0.ETA = &now
|
|
asyncResult, err := server.SendTaskWithContext(ctx, &addTask0)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not send task: %s", err.Error())
|
|
}
|
|
|
|
log.INFO.Println("Single task:", asyncResult.Signature.UUID)
|
|
|
|
if err := server.RegisterPeriodicTask("* * * * *", "hello", &addTask0); err != nil {
|
|
return err
|
|
}
|
|
|
|
time.Sleep(time.Second * 120)
|
|
|
|
return nil
|
|
}
|