| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- package jobs
- import (
- "fmt"
- "sync"
- "time"
- "github.com/robfig/cron/v3"
- "device-manage/app/admin/models"
- "device-manage/common/global"
- "device-manage/pkg"
- "device-manage/pkg/cronjob"
- )
- var timeFormat = "2006-01-02 15:04:05"
- var retryCount = 3
- var jobList map[string]JobsExec
- var lock sync.Mutex
- type JobCore struct {
- InvokeTarget string
- Name string
- JobId uint
- EntryId int
- CronExpression string
- Args string
- }
- // 任务类型 http
- type HttpJob struct {
- JobCore
- }
- type ExecJob struct {
- JobCore
- }
- func (e *ExecJob) Run() {
- startTime := time.Now()
- var obj = jobList[e.InvokeTarget]
- if obj == nil {
- global.JobLogger.Warning(" ExecJob Run job nil", e)
- return
- }
- CallExec(obj.(JobsExec), e.Args)
- // 结束时间
- endTime := time.Now()
- // 执行时间
- latencyTime := endTime.Sub(startTime)
- //TODO: 待完善部分
- //str := time.Now().Format(timeFormat) + " [INFO] JobCore " + string(e.EntryId) + "exec success , spend :" + latencyTime.String()
- //ws.SendAll(str)
- global.JobLogger.Info(time.Now().Format(timeFormat), " [INFO] JobCore ", e, "exec success , spend :", latencyTime)
- }
- //http 任务接口
- func (h *HttpJob) Run() {
- startTime := time.Now()
- var count = 0
- /* 循环 */
- LOOP:
- if count < retryCount {
- /* 跳过迭代 */
- str, err := pkg.Get(h.InvokeTarget)
- if err != nil {
- // 如果失败暂停一段时间重试
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
- fmt.Printf(time.Now().Format(timeFormat)+" [INFO] Retry after the task fails %d seconds! %s \n", time.Duration(count)*time.Second, str)
- time.Sleep(time.Duration(count) * time.Second)
- goto LOOP
- }
- count = count + 1
- }
- // 结束时间
- endTime := time.Now()
- // 执行时间
- latencyTime := endTime.Sub(startTime)
- //TODO: 待完善部分
- global.JobLogger.Info(time.Now().Format(timeFormat), " [INFO] JobCore ", h, "exec success , spend :", latencyTime)
- }
- // 初始化
- func Setup() {
- fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Starting...")
- global.GADMCron = cronjob.NewWithSeconds()
- sysJob := models.SysJob{}
- jobList := make([]models.SysJob, 0)
- err := sysJob.GetList(&jobList)
- if err != nil {
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore init error", err)
- }
- if len(jobList) == 0 {
- fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore total:0")
- }
- _, err = sysJob.RemoveAllEntryID()
- if err != nil {
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore remove entry_id error", err)
- }
- for i := 0; i < len(jobList); i++ {
- if jobList[i].JobType == 1 {
- j := &HttpJob{}
- j.InvokeTarget = jobList[i].InvokeTarget
- j.CronExpression = jobList[i].CronExpression
- j.JobId = jobList[i].JobId
- j.Name = jobList[i].JobName
- sysJob.EntryId, err = AddJob(j)
- } else if jobList[i].JobType == 2 {
- j := &ExecJob{}
- j.InvokeTarget = jobList[i].InvokeTarget
- j.CronExpression = jobList[i].CronExpression
- j.JobId = jobList[i].JobId
- j.Name = jobList[i].JobName
- j.Args = jobList[i].Args
- sysJob.EntryId, err = AddJob(j)
- }
- err = sysJob.Update(jobList[i].JobId)
- }
- // 其中任务
- global.GADMCron.Start()
- fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore start success.")
- // 关闭任务
- defer global.GADMCron.Stop()
- select {}
- }
- // 添加任务 AddJob(invokeTarget string, jobId int, jobName string, cronExpression string)
- func AddJob(job Job) (int, error) {
- if job == nil {
- fmt.Println("unknown")
- return 0, nil
- }
- return job.addJob()
- }
- func (h *HttpJob) addJob() (int, error) {
- id, err := global.GADMCron.AddJob(h.CronExpression, h)
- if err != nil {
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
- return 0, err
- }
- EntryId := int(id)
- return EntryId, nil
- }
- func (h *ExecJob) addJob() (int, error) {
- id, err := global.GADMCron.AddJob(h.CronExpression, h)
- if err != nil {
- fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
- return 0, err
- }
- EntryId := int(id)
- return EntryId, nil
- }
- // 移除任务
- func Remove(entryID int) chan bool {
- ch := make(chan bool)
- go func() {
- global.GADMCron.Remove(cron.EntryID(entryID))
- fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Remove success ,info entryID :", entryID)
- ch <- true
- }()
- return ch
- }
- // 任务停止
- func Stop() chan bool {
- ch := make(chan bool)
- go func() {
- global.GADMCron.Stop()
- ch <- true
- }()
- return ch
- }
|