chore: add examples (#2)

Browse Source
main
git 2024-05-19 20:24:36 +08:00 committed by GitHub
parent f3e28e9b80
commit d626b2d623
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 307 additions and 2 deletions

203
examples/etcd/main.go Normal file
View File

@ -0,0 +1,203 @@
package main
import (
"context"
"errors"
"fmt"
"os"
"time"
"github.com/RichardKnop/machinery/v2"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
"github.com/RichardKnop/machinery/v2/config"
"github.com/RichardKnop/machinery/v2/example/tracers"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2/log"
"github.com/RichardKnop/machinery/v2/tasks"
"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
opentracinglog "github.com/opentracing/opentracing-go/log"
"github.com/urfave/cli"
etcdbroker "github.com/ifooth/machinery-plugins/brokers/etcd"
exampletasks "github.com/ifooth/machinery-plugins/examples/tasks"
)
var (
app *cli.App
)
func init() {
// Initialise a CLI app
app = cli.NewApp()
app.Name = "machinery"
app.Usage = "machinery worker and send example tasks with machinery send"
app.Version = "0.0.0"
}
func main() {
// Set the CLI app commands
app.Commands = []cli.Command{
{
Name: "worker",
Usage: "launch machinery worker",
Action: func(c *cli.Context) error {
if err := worker(); err != nil && !errors.Is(err, machinery.ErrWorkerQuitGracefully) {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
{
Name: "send",
Usage: "send example tasks ",
Action: func(c *cli.Context) error {
if err := send(); err != nil {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
}
// Run the CLI app
_ = app.Run(os.Args)
}
func startServer() (*machinery.Server, error) {
cnf := &config.Config{
DefaultQueue: "machinery_tasks",
Broker: "redis://localhost:6379",
ResultBackend: "redis://localhost:6379",
ResultsExpireIn: 3600,
}
// Create server instance
// broker := redisbroker.NewGR(cnf, []string{"localhost:6379"}, 1)
broker, err := etcdbroker.New(cnf, "http://127.0.0.1:2379")
if err != nil {
return nil, err
}
// lock := redislock.New(cnf, []string{"localhost:6379"}, 3, 2)
lock := eagerlock.New()
backend := redisbackend.NewGR(cnf, []string{"localhost:6379"}, 3)
server := machinery.NewServer(cnf, broker, backend, lock)
// Register tasks
tasksMap := map[string]interface{}{
"add": exampletasks.Add,
"multiply": exampletasks.Multiply,
"sum_ints": exampletasks.SumInts,
"sum_floats": exampletasks.SumFloats,
"concat": exampletasks.Concat,
"split": exampletasks.Split,
"panic_task": exampletasks.PanicTask,
"long_running_task": exampletasks.LongRunningTask,
}
return server, server.RegisterTasks(tasksMap)
}
func worker() error {
consumerTag := "machinery_worker"
cleanup, err := tracers.SetupTracer(consumerTag)
if err != nil {
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
}
defer cleanup()
server, err := startServer()
if err != nil {
return err
}
// The second argument is a consumer tag
// Ideally, each worker should have a unique tag (worker1, worker2 etc)
worker := server.NewWorker(consumerTag, 0)
// Here we inject some custom code for error handling,
// start and end of task hooks, useful for metrics for example.
errorHandler := func(err error) {
log.ERROR.Println("I am an error handler:", err)
}
preTaskHandler := func(signature *tasks.Signature) {
log.INFO.Println("I am a start of task handler for:", signature.Name)
}
postTaskHandler := func(signature *tasks.Signature) {
log.INFO.Println("I am an end of task handler for:", signature.Name)
}
worker.SetPostTaskHandler(postTaskHandler)
worker.SetErrorHandler(errorHandler)
worker.SetPreTaskHandler(preTaskHandler)
return worker.Launch()
}
func send() error {
cleanup, err := tracers.SetupTracer("sender")
if err != nil {
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
}
defer cleanup()
server, err := startServer()
if err != nil {
return err
}
var (
addTask0 tasks.Signature
)
var initTasks = func() {
addTask0 = tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 1,
},
{
Type: "int64",
Value: 1,
},
},
}
}
/*
* Lets start a span representing this run of the `send` command and
* set a batch id as baggage so it can travel all the way into
* the worker functions.
*/
span, ctx := opentracing.StartSpanFromContext(context.Background(), "send")
defer span.Finish()
batchID := uuid.New().String()
span.SetBaggageItem("batch.id", batchID)
span.LogFields(opentracinglog.String("batch.id", batchID))
log.INFO.Println("Starting batch:", batchID)
/*
* First, let's try sending a single task
*/
initTasks()
now := time.Now().Add(time.Second * 10)
addTask0.ETA = &now
asyncResult, err := server.SendTaskWithContext(ctx, &addTask0)
if err != nil {
return fmt.Errorf("Could not send task: %s", err.Error())
}
log.INFO.Println("Single task:", asyncResult.Signature.UUID)
return nil
}

75
examples/tasks/tasks.go Normal file
View File

@ -0,0 +1,75 @@
package tasks
import (
"errors"
"strings"
"time"
"github.com/RichardKnop/machinery/v2/log"
)
// Add ...
func Add(args ...int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, nil
}
// Multiply ...
func Multiply(args ...int64) (int64, error) {
sum := int64(1)
for _, arg := range args {
sum *= arg
}
return sum, nil
}
// SumInts ...
func SumInts(numbers []int64) (int64, error) {
var sum int64
for _, num := range numbers {
sum += num
}
return sum, nil
}
// SumFloats ...
func SumFloats(numbers []float64) (float64, error) {
var sum float64
for _, num := range numbers {
sum += num
}
return sum, nil
}
// Concat ...
func Concat(strs []string) (string, error) {
var res string
for _, s := range strs {
res += s
}
return res, nil
}
// Split ...
func Split(str string) ([]string, error) {
return strings.Split(str, ""), nil
}
// PanicTask ...
func PanicTask() (string, error) {
panic(errors.New("oops"))
}
// LongRunningTask ...
func LongRunningTask() error {
log.INFO.Print("Long running task started")
for i := 0; i < 10; i++ {
log.INFO.Print(10 - i)
time.Sleep(1 * time.Second)
}
log.INFO.Print("Long running task finished")
return nil
}

15
go.mod
View File

@ -4,6 +4,9 @@ go 1.20
require (
github.com/RichardKnop/machinery/v2 v2.0.13
github.com/google/uuid v1.3.1
github.com/opentracing/opentracing-go v1.2.0
github.com/urfave/cli v1.22.5
go.etcd.io/etcd/client/v3 v3.5.13
golang.org/x/sync v0.7.0
)
@ -16,8 +19,12 @@ require (
cloud.google.com/go/pubsub v1.33.0 // indirect
github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae // indirect
github.com/aws/aws-sdk-go v1.37.16 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redsync/redsync/v4 v4.8.1 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
@ -25,15 +32,19 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v1.9.2 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/klauspost/compress v1.9.5 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rabbitmq/amqp091-go v1.9.0 // indirect
github.com/redis/go-redis/v9 v9.0.5 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.0.1 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc // indirect
go.etcd.io/etcd/api/v3 v3.5.13 // indirect

16
go.sum
View File

@ -58,12 +58,15 @@ github.com/aws/aws-sdk-go v1.37.16 h1:Q4YOP2s00NpB9wfmTDZArdcLRuG9ijbnoAwTW3ivle
github.com/aws/aws-sdk-go v1.37.16/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
github.com/bsm/ginkgo/v2 v2.5.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
github.com/bsm/gomega v1.20.0/go.mod h1:JifAceMQ4crZIWYUKrlGcmbN3bqHogVTADMD2ATsbwk=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
@ -80,10 +83,12 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@ -98,9 +103,13 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4=
github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg=
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
github.com/go-redsync/redsync/v4 v4.8.1 h1:rq2RvdTI0obznMdxKUWGdmmulo7lS9yCzb8fgDKOlbM=
github.com/go-redsync/redsync/v4 v4.8.1/go.mod h1:LmUAsQuQxhzZAoGY7JS6+dNhNmZyonMZiiEDY9plotM=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
@ -213,7 +222,9 @@ github.com/googleapis/gax-go/v2 v2.11.0 h1:9V9PWXEsWnPpQhu/PeQIkS4eGzMlTLGgt80cU
github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
@ -269,6 +280,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o=
github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
@ -276,7 +288,9 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
@ -298,9 +312,11 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/urfave/cli v1.22.5 h1:lNq9sAHXK2qfdI8W+GRItjCEkI+2oR4d+MEHy1CKXoU=
github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=