This website requires JavaScript.
0
Pull Requests
Explore
Active Time Tracker
#
0
Create…
New Repository
New Migration
anonymous
Signed in as
anonymous
Profile
Subscriptions
Settings
Help
Sign Out
common
/
machinery-plugins
Private
Watch
1
Fork
You've already forked machinery-plugins
Fork to a different account
0
Code
Pull Requests
Releases
Activity
machinery-plugins
/
backends
/
mysql
/
or
Cancel
Edit File
Preview
Preview Changes
package mysql import ( "encoding/json" "errors" "fmt" "log" "time" "github.com/RichardKnop/machinery/v2/backends/iface" "github.com/RichardKnop/machinery/v2/config" "github.com/RichardKnop/machinery/v2/tasks" "github.com/jmoiron/sqlx" ) var ( ErrRedisLockFailed = errors.New("redis lock: failed to acquire lock") ) type mysqlBackend struct { globalConf *config.Config db *sqlx.DB retries int interval time.Duration } func New(cnf *config.Config) (iface.Backend, error) { db, err := sqlx.Connect("mysql", "xx") if err != nil { log.Fatalln(err) } backend := mysqlBackend{ globalConf: cnf, db: db, } return &backend, nil } // Group related functions func (b mysqlBackend) InitGroup(groupUUID string, taskUUIDs []string) error { groupMeta := &tasks.GroupMeta{ GroupUUID: groupUUID, TaskUUIDs: taskUUIDs, CreatedAt: time.Now().UTC(), } encoded, err := json.Marshal(groupMeta) if err != nil { return err } sql := "insert into %s (uuid, encoded) values($1, $2)" _, err = b.db.Exec(sql, groupUUID, encoded) return err } func (b mysqlBackend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) { return false, nil } func (b mysqlBackend) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) { sql := "select * from %s where group_uuid = $1" metas := []*tasks.GroupMeta{} if err := b.db.Select(&metas, sql); err != nil { return nil, err } if len(metas) != 1 { return nil, fmt.Errorf("not valid groupUUID") } return metas[0], nil } func (b mysqlBackend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) { return nil, nil } func (b mysqlBackend) TriggerChord(groupUUID string) (bool, error) { return false, nil } // Setting / getting task state // SetStatePending updates task state to PENDING func (b *mysqlBackend) SetStatePending(signature *tasks.Signature) error { taskState := tasks.NewPendingTaskState(signature) return b.updateState(taskState) } // SetStateReceived updates task state to RECEIVED func (b *mysqlBackend) SetStateReceived(signature *tasks.Signature) error { taskState := tasks.NewReceivedTaskState(signature) b.mergeNewTaskState(taskState) return b.updateState(taskState) } // SetStateStarted updates task state to STARTED func (b *mysqlBackend) SetStateStarted(signature *tasks.Signature) error { taskState := tasks.NewStartedTaskState(signature) b.mergeNewTaskState(taskState) return b.updateState(taskState) } // SetStateRetry updates task state to RETRY func (b *mysqlBackend) SetStateRetry(signature *tasks.Signature) error { taskState := tasks.NewRetryTaskState(signature) b.mergeNewTaskState(taskState) return b.updateState(taskState) } // SetStateSuccess updates task state to SUCCESS func (b *mysqlBackend) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error { taskState := tasks.NewSuccessTaskState(signature, results) b.mergeNewTaskState(taskState) return b.updateState(taskState) } // SetStateFailure updates task state to FAILURE func (b *mysqlBackend) SetStateFailure(signature *tasks.Signature, err string) error { taskState := tasks.NewFailureTaskState(signature, err) b.mergeNewTaskState(taskState) return b.updateState(taskState) } func (b mysqlBackend) GetState(taskUUID string) (*tasks.TaskState, error) { return nil, nil } // Purging stored stored tasks states and group meta data func (b mysqlBackend) IsAMQP() bool { return false } func (b *mysqlBackend) mergeNewTaskState(newState *tasks.TaskState) { state, err := b.GetState(newState.TaskUUID) if err == nil { newState.CreatedAt = state.CreatedAt newState.TaskName = state.TaskName } } func (b mysqlBackend) PurgeState(taskUUID string) error { return nil } func (b mysqlBackend) PurgeGroupMeta(groupUUID string) error { return nil } // getStates returns multiple task states func (b *mysqlBackend) getStates(taskUUIDs ...string) ([]*tasks.TaskState, error) { taskStates := make([]*tasks.TaskState, len(taskUUIDs)) return taskStates, nil } // updateState saves current task state func (b *mysqlBackend) updateState(taskState *tasks.TaskState) error { return nil }
Loading…
Loading…
Commit Changes
Add a Signed-off-by trailer by the committer at the end of the commit log message.
Commit directly to the
main
branch.
Create a
new branch
for this commit and start a pull request.
Commit Changes
Cancel
Commit an empty file
The file you're about to commit is empty. Proceed?
Cancel
Commit Changes