jobbase.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package jobs
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/robfig/cron/v3"
  7. "device-manage/app/admin/models"
  8. "device-manage/common/global"
  9. "device-manage/pkg"
  10. "device-manage/pkg/cronjob"
  11. )
  12. var timeFormat = "2006-01-02 15:04:05"
  13. var retryCount = 3
  14. var jobList map[string]JobsExec
  15. var lock sync.Mutex
  16. type JobCore struct {
  17. InvokeTarget string
  18. Name string
  19. JobId uint
  20. EntryId int
  21. CronExpression string
  22. Args string
  23. }
  24. // 任务类型 http
  25. type HttpJob struct {
  26. JobCore
  27. }
  28. type ExecJob struct {
  29. JobCore
  30. }
  31. func (e *ExecJob) Run() {
  32. startTime := time.Now()
  33. var obj = jobList[e.InvokeTarget]
  34. if obj == nil {
  35. global.JobLogger.Warning(" ExecJob Run job nil", e)
  36. return
  37. }
  38. CallExec(obj.(JobsExec), e.Args)
  39. // 结束时间
  40. endTime := time.Now()
  41. // 执行时间
  42. latencyTime := endTime.Sub(startTime)
  43. //TODO: 待完善部分
  44. //str := time.Now().Format(timeFormat) + " [INFO] JobCore " + string(e.EntryId) + "exec success , spend :" + latencyTime.String()
  45. //ws.SendAll(str)
  46. global.JobLogger.Info(time.Now().Format(timeFormat), " [INFO] JobCore ", e, "exec success , spend :", latencyTime)
  47. }
  48. //http 任务接口
  49. func (h *HttpJob) Run() {
  50. startTime := time.Now()
  51. var count = 0
  52. /* 循环 */
  53. LOOP:
  54. if count < retryCount {
  55. /* 跳过迭代 */
  56. str, err := pkg.Get(h.InvokeTarget)
  57. if err != nil {
  58. // 如果失败暂停一段时间重试
  59. fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
  60. fmt.Printf(time.Now().Format(timeFormat)+" [INFO] Retry after the task fails %d seconds! %s \n", time.Duration(count)*time.Second, str)
  61. time.Sleep(time.Duration(count) * time.Second)
  62. goto LOOP
  63. }
  64. count = count + 1
  65. }
  66. // 结束时间
  67. endTime := time.Now()
  68. // 执行时间
  69. latencyTime := endTime.Sub(startTime)
  70. //TODO: 待完善部分
  71. global.JobLogger.Info(time.Now().Format(timeFormat), " [INFO] JobCore ", h, "exec success , spend :", latencyTime)
  72. }
  73. // 初始化
  74. func Setup() {
  75. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Starting...")
  76. global.GADMCron = cronjob.NewWithSeconds()
  77. sysJob := models.SysJob{}
  78. jobList := make([]models.SysJob, 0)
  79. err := sysJob.GetList(&jobList)
  80. if err != nil {
  81. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore init error", err)
  82. }
  83. if len(jobList) == 0 {
  84. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore total:0")
  85. }
  86. _, err = sysJob.RemoveAllEntryID()
  87. if err != nil {
  88. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore remove entry_id error", err)
  89. }
  90. for i := 0; i < len(jobList); i++ {
  91. if jobList[i].JobType == 1 {
  92. j := &HttpJob{}
  93. j.InvokeTarget = jobList[i].InvokeTarget
  94. j.CronExpression = jobList[i].CronExpression
  95. j.JobId = jobList[i].JobId
  96. j.Name = jobList[i].JobName
  97. sysJob.EntryId, err = AddJob(j)
  98. } else if jobList[i].JobType == 2 {
  99. j := &ExecJob{}
  100. j.InvokeTarget = jobList[i].InvokeTarget
  101. j.CronExpression = jobList[i].CronExpression
  102. j.JobId = jobList[i].JobId
  103. j.Name = jobList[i].JobName
  104. j.Args = jobList[i].Args
  105. sysJob.EntryId, err = AddJob(j)
  106. }
  107. err = sysJob.Update(jobList[i].JobId)
  108. }
  109. // 其中任务
  110. global.GADMCron.Start()
  111. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore start success.")
  112. // 关闭任务
  113. defer global.GADMCron.Stop()
  114. select {}
  115. }
  116. // 添加任务 AddJob(invokeTarget string, jobId int, jobName string, cronExpression string)
  117. func AddJob(job Job) (int, error) {
  118. if job == nil {
  119. fmt.Println("unknown")
  120. return 0, nil
  121. }
  122. return job.addJob()
  123. }
  124. func (h *HttpJob) addJob() (int, error) {
  125. id, err := global.GADMCron.AddJob(h.CronExpression, h)
  126. if err != nil {
  127. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
  128. return 0, err
  129. }
  130. EntryId := int(id)
  131. return EntryId, nil
  132. }
  133. func (h *ExecJob) addJob() (int, error) {
  134. id, err := global.GADMCron.AddJob(h.CronExpression, h)
  135. if err != nil {
  136. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
  137. return 0, err
  138. }
  139. EntryId := int(id)
  140. return EntryId, nil
  141. }
  142. // 移除任务
  143. func Remove(entryID int) chan bool {
  144. ch := make(chan bool)
  145. go func() {
  146. global.GADMCron.Remove(cron.EntryID(entryID))
  147. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Remove success ,info entryID :", entryID)
  148. ch <- true
  149. }()
  150. return ch
  151. }
  152. // 任务停止
  153. func Stop() chan bool {
  154. ch := make(chan bool)
  155. go func() {
  156. global.GADMCron.Stop()
  157. ch <- true
  158. }()
  159. return ch
  160. }