定时任务cron

2022/04/30

最近需求中涉及到定时任务的执行,具体场景是这样的

  • 设置规则 rule1,比如: 某个广告过去两天消耗<1000
  • 规则 rule1 每 N 天比如:7 天检查一次
  • 如果满足规则 rule1 则可以进行下面操作:关停广告 或者 调整报价

我们首先想到的是用系统自带的 cron 服务或者公司的任务调度系统来实现,但它其实并不单纯是一个固定的定时任务(在某个固定时间执行某个任务就可以了)。这个场景要稍微复杂一些,它可能还会发生下列行为操作

  • rule1 可以随时关停
  • rule1 检查频次调整为 3 天检查一次
  • 可以随时新建 rule2
  • 定时任务执行时间不能完全根据服务开启的时间来设置。比如 rule1 创建时间是 5.1 号,频次是 7 天 1 次,正常来说下次任务执行时间应该是 5.8 号;如果我们 5.3 号开启服务,设置每隔 7 天来执行任务的话,下次执行时间就到了 5.10 号,跟我们预期不太一致

就需要动态调整定时任务的频次、新增或者删除定时任务了

系统自带 cron 不能完全满足我们需求的话,就要看看有没有第三方库来支持。github 上找到了 robfig/cron 能够很灵活地进行配置,并且格式什么的和系统自带 cron 也差不多

系统 cron

Linux 中使用内置 cron 计划任务服务,按照约定的时间定时执行特定的任务。cron 服务启动后会读取配置文件/etc/crontab,cron 服务根据命令和执行时间按时来调用工作任务。

cron 服务操作命令

# 启动crontab
$ service crond start

# 关闭crontab
$ service crond stop

# 重启crontab
$ service crond restart

# 重载crontab
$ service crond reload

cron 服务提供了 crontab 命令来设置计划任务

$ crontab [-u user] [-e] [-l] [-r]
参数 描述
crontab -u user 用于设定用户的定时服务,Linux 中每个用户对应着一份 crontab 任务清单。
crontab -e 编辑用户的 crontab 文件内容来设置定时任务
crontab -l 显示用户的 crontab 文件内容
crontab -r 从/var/spool/cron 目录中删除用户的 crontab 文件
crontab -i 在删除用户 crontab 时给出确认提示

crontab 命令可固定的间隔时间执行指定的系统指令或 shell script 脚本,时间间隔的单位是分钟、小时、日、月、周即以上的任意组合。

当使用 crontab 命令编辑完用户的计划任务后,cron 服务自动会在/var/spool/cron 文件夹下生成一个与此用户同名的文件,该文件记录对应用户的计划任务信息,此文件是禁止直接编辑的,只能通过 crontab -e 命令来编辑。cron 服务启动后每过 1 分钟会读取用户配置的计划任务文件,检查是否需要执行里面的命令,因此修改后无需重启 cron 服务。

cron 表达式基础语法

minute hour day month week command

时间 名称 取值范围 描述
minute 分钟 0~59 整数
hour 小时 0~23 整数
day 日期 1~31 一个月中的某一天
month 月份 1~12 整数
week 星期几 0~7 0 或 7 表示星期日
command 命令 - -
# .---------------- minute (0 - 59)
# | .------------- hour (0 - 23)
# | | .---------- day of month (1 - 31)
# | | | .------- month (1 - 12) OR jan,feb,mar,apr ...
# | | | | .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat
# | | | | |
# * * * * * user-name command to be executed

每个域都可以使用数字,但也可以使用下列特殊字符。

特殊字符 名称 描述
* 星号 星号
, 逗号 枚举值,使用逗号分隔指定的列表任务。
- 中杠 范围,整数之间使用中杠表示一个整数范围。
/ 正斜线 增长间隔,指定时间的间隔频率
? 问号 仅用于日和星期,可代替*星号。

例如:

表达式 分钟 小时 日期 月份 星期 描述
0 6 * * * cmd 0 6 * * * 每天早上 6:00 点执行一次任务
0 _/2 _ * * cmd 0 */2 * * * 每间隔 2 小时执行一次任务
_/5 _ * * * cmd */5 * * * * 每隔 5 分钟执行一次任务

⚠️ 注意: centos 下配置 crontab 和 linux 下略有不同,如果直接编辑 /etc/crontab,需要指定用户

vim /etc/crontab
加上用户名 root

*/30 * * * * root date && source /root/.bash_profile &&  /usr/local/bin/ruby /data/programs/appstore_connect/main.rb >> /data/programs/appstore_connect/login.log
55 17 * * * root /bin/bash /data/programs/appstore_connect/get_appstoreconnect_data.sh >> /data/programs/appstore_connect/get_appstoreconnect_data.log

robfig/cron

robfig/cron 包是 Go 的定时任务框架,实现了 cron 计划任务规范的解析器和任务运行器。不同之处在于 robfig/cron 不仅兼容了 Linux 标准的 Crontab 格式,而且扩展到秒级。

调用 demo

下载安装包

$ go get -u -v github.com/robfig/cron/v3

demo

func TestCronDemo(t *testing.T) {
  c := cron.New()
  // 通过AddFunc注册
  c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") })
  c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") })
  c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") })
  c.AddFunc("@every 5m", func() { fmt.Println("every 5m, start 5m fron now") })
  // 通过AddJob注册
  // var cJob cronJobDemo
  //  c.AddJob("@every 5s", cJob)
  // 启动
  c.Start()
  // 停止
  c.Stop()
}
​
type cronJobDemo int
​
func (c cronJobDemo) Run() {
  fmt.Println("5s func trigger")
  return
}

主要逻辑

  1. 先用 cron.New()初始化一个实例
  2. 然后调用 AddFunc(spec string, cmd func()) 注册你希望调用的 func
  3. 最后通过 cron.Start()方法启动。

New()

cron.go 中的 New() 方法用来创建并返回一个 Corn 对象指针

这个函数接收一组可变的 Option 类型的参数,该类型实际上是一类函数:

type Option func(*Cron)

Corn 内置了一些 Option 类型的函数,都在 option.go 中,以 With 开头,用来改变 Cron 的默认行为,在 New() 中创建完 Cron 之后,会依次执行这些函数。

比如

Cron = cron.New(cron.WithParser(cron.NewParser(
		cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
	)))

AddFunc()

AddFunc() 用于向 Corn 中添加一个作业:

func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
    // 包装
	return c.AddJob(spec, FuncJob(cmd))
}

func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
	schedule, err := c.parser.Parse(spec)
	if err != nil {
		return 0, err
	}
	return c.Schedule(schedule, cmd), nil
}

AddFunc() 相较于 AddJob() 帮用户省去了包装成 Job 类型的一步,在 AddJob() 中,调用了 standardParser.Parse() 将 cron 表达式解释成了 schedule 类型,最终,他们调用了 Schedule() 方法:

这个方法负责创建 Entry 结构体,并把它追加到 Cron 的 entries 列表中,如果 Cron 已经处于运行状态,会将这个创建好的 entry 发送到 Cron 的 add chan 中,在 run() 中会处理这种情况。

AddJob 后发生了什么? (主要的数据结构)

对于 Cron 的整体逻辑,最关键的两个数据结构就是 struct Entry 和 Cron。

每当你用 AddJob 注册一个定时调用策略,就会为这个策略生成一个唯一的 Entry,不难想象,Entry 里会存储被执行的时间、需要被调度执行的实体 Job。

生成 entry 后,再将 entry 放到 struct Cron 的 entry 列表里,Cron 的结构里,主要是一些用来和外部交互的 channel,比如通过 channel 添加、删除 entry 等。详见下面的代码。

// Entry 数据结构,每一个被调度实体一个
type Entry struct {
  // 唯一id,用于查询和删除
  ID EntryID
  // 本Entry的调度时间,不是绝对时间,在生成entry时会计算出来
  Schedule Schedule
  // 本entry下次需要执行的绝对时间,会一直被更新
  // 被封装的含义是Job可以多层嵌套,可以实现基于需要执行Job的额外处理
  // 比如抓取Job异常、如果Job没有返回下一个时间点的Job是还是继续执行还是delay
  Next time.Time
  // 上一次被执行时间,主要用来查询
  Prev time.Time
  // WrappedJob 是真实执行的Job实体
  WrappedJob Job
  // Job 主要给用户查询
  Job Job
}
// Cron 数据结构,为robfig/cron的运行实体使用的s数据结构
type Cron struct {
  entries   []*Entry          // 调度执行实体列表
   // chain 用来定义entry里的warppedJob使用什么逻辑(e.g. skipIfLastRunning)
   // 即一个cron里所有entry只有一个封装逻辑
  chain     Chain
  stop      chan struct{}     // 停止整个cron的channel
  add       chan *Entry       // 增加一个entry的channel
  remove    chan EntryID      // 移除一个entry的channel
  snapshot  chan chan []Entry // 获取entry整体快照的channel
  running   bool              // 代表是否已经在执行,是cron为使用者提供的动态修改entry的接口准备的
  logger    Logger            // 封装golang的log包
  runningMu sync.Mutex        // 用来修改运行中的cron数据,比如增加entry,移除entry
  location  *time.Location    // 地理位置
  parser    ScheduleParser    // 对时间格式的解析,为interface, 可以定制自己的时间规则。
  nextID    EntryID           // entry的全局ID,新增一个entry就加1
  jobWaiter sync.WaitGroup    // run job时会进行add(1), job 结束会done(),stop整个cron,以此保证所有job都能退出
}

需要注意的是,WrappedJob 和 chain 这两个成员,这是 Cron 实现的 Job 封装逻辑,目前是解决实际调度 Job 的异常处理。比如你希望自己的上一个时间点的 JobA 没有结束,下一个时间点的 JobA 就不执行,这个“不执行”的逻辑实现就定义在 chain,初始化时通过 chain 将 JobA 进行封装写入 WrappedJob,那么每次 JobA 调用前会先执行封装逻辑,进行判断。

Start()

Start() 用于开始执行 Cron:

func (c *Cron) Start() {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		return
	}
	c.running = true
	go c.run()
}

这个函数干了三件事:

  1. 获取锁
  2. 将 c.running 置为 true 表示 cron 已经在运行中了
  3. 开启一个 goroutine 执行 c.run(), run 中会一直轮循 c.entries 中的 entry, 如果一个 entry 允许执行了,就会开启单独的 goroutine 去执行这个作业

run 是整个 cron 的一个核心,它负责处理 cron 开始执行后的大部分事情,包括添加作业,删除作业,执行作业等,其结构如下:

func (c *Cron) run() {
	c.logger.Info("start")

    // 第一部分
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
		c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
	}

    // 第二部分
	for {
        // 2.1
		sort.Sort(byTime(c.entries))

        // 2.2
		var timer *time.Timer
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))
		}

        // 2.3
		for {
			select {}
			break
		}
	}
}

大概包含下面这几部分:

  • 第一部分:遍历了 c.entries 列表,通过 schedule.Next() 计算出这个作业下一次执行的时间,并赋值给了 entry.Next 字段。

  • 第二部分是一个死循环,这一部分又可以分为三个部分:

    • 2.1:调用了 sort 的快排,其实是对 entries 中的元素按 Next 字段的时间线后顺序排序。

    • 2.2:这一部分是对定时器的一个初始化操作:如果没有可以执行的作业,定时器被设置为十万小时后触发(其实就是休眠),否则定时器会在第一个作业允许被执行时触发,定时器触发后, 2.3 部分会去做剩下的事。

    • 2.3:这又是整个 run 的核心,其主体是一个死循环(其实它会退出,不算是死循环),这个循环里面的核心又是一个 select 多路复用,这个多路复用里监听了五种信号,这五种信号是怎样发出的我们在上面其实已经说过了,他们分别是定时器触发信号 timer.C, 运行过程中添加作业的信号 c.add, 快照信号 c.snapshot, cron 停止的信号 c.stop, 移除作业的信号 c.remove。

Run()

和Start类似的还有的Run函数,不同的是start会开启一个goroutine来运行,而run是在当前goroutine执行的

c.runningMu.Lock()
	if c.running {
		c.runningMu.Unlock()
		return
	}
	c.running = true
	c.runningMu.Unlock()
	c.run()

我的实现

包括crontab.go、run.go 两个文件,通过crontab.go来启动

crontab.go

func main() {
	//启动
	go func() {
		defer func() {
			if r := recover(); r != nil {
				logger.Log.Errorf("start panic:%v stack:%s", r, string(debug.Stack()))
			}
		}()
		start()
	}()
	
	go func() {
		//定时检查是否有规则调整
		defer func() {
			if r := recover(); r != nil {
				logger.Log.Errorf("checkRules panic:%v stack:%s", r, string(debug.Stack()))
			}
		}()
		checkRules()
	}()
	
	for {
		select {}
	}
}

func start() {
	Cron = cron.New(cron.WithParser(cron.NewParser(
		cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
	)))
	
	//获取有效的规则
	ruleList, err := rule.GetRuleList("", []int{rule.StatusOpen})
	if err != nil {
		logger.Log.Errorf("get rule list err:%s", err.Error())
	} else {
		for _, rule := range ruleList {
			addTask(&rule)
		}
	}
	//开启任务
	Cron.Run()
}

func addTask(data *rule.RulesList) {
	if data == nil {
		return
	}
	task, ok := taskMap[data.RuleId]
	if data.Status != rule.StatusOpen {
		//删除或者暂停规则
		if ok {
			//如果已经在任务队列中,则移除
			Cron.Remove(task.CronId)
			delete(taskMap, data.RuleId)
		}
		return
	}
	//下面的都是开启规则的逻辑

	//如果该规则已经存在于任务中
	if ok {
		//上次开启任务后,规则没有改变
		if data.LastUpdateTime.Equal(task.RuleInfo.LastUpdateTime) {
			return
		}
		//有改变,则先删除旧的任务
		Cron.Remove(task.CronId)
		delete(taskMap, data.RuleId)
	}
	checkFrequency := &controllerRule.Frequency{}
	err := json.Unmarshal([]byte(data.CheckFrequency), checkFrequency)
	if err != nil {
		return
	}
	//根据检查频次来设置任务
	//for example "30 */3 * * * ?"
	//【second】:0-59、【minute】:0-59、【hour】:0-23、【day of month】:1-31 【month】:1-12 or JAN-DEC 【day of week】:0-6 orSUN-SAT
	specs := []string{"*", "*", "*", "*", "*", "?"}
	var nextTime time.Time

	if checkFrequency.Type == rule.FrequencyTypeDay {
		//正常
		specs[0] = strconv.Itoa(data.LastUpdateTime.Second())
		specs[1] = strconv.Itoa(data.LastUpdateTime.Minute())
		specs[2] = strconv.Itoa(data.LastUpdateTime.Hour())
		specs[3] = "*/" + strconv.Itoa(checkFrequency.Period)
	} else {
		//主要为了测试用
		specs[0] = strconv.Itoa(data.LastUpdateTime.Second())
		specs[1] = "*/" + strconv.Itoa(checkFrequency.Period)
	}
	newTask := &Task{
		RuleInfo: data,
	}
	spec := strings.Join(specs, " ")
	entryId, err := Cron.AddJob(spec, newTask)
	if err != nil {
		return
	}

	timeNow := time.Now()
	if data.LastUpdateTime.Before(timeNow) {
		subDuration := time.Now().Sub(data.LastUpdateTime)
		if checkFrequency.Type == rule.FrequencyTypeDay {
			//正常
			subMinute := subDuration.Hours() / 24
			tmp := int(math.Ceil(subMinute / float64(checkFrequency.Period)))
			nextTime = data.LastUpdateTime.Add(time.Duration(tmp*tmp*checkFrequency.Period) * 24 * time.Hour)
		} else {
			//主要为了测试
			subMinute := subDuration.Minutes()
			tmp := int(math.Ceil(subMinute / float64(checkFrequency.Period)))
			nextTime = data.LastUpdateTime.Add(time.Duration(tmp*checkFrequency.Period) * time.Minute)
		}

		////测试用
		//nextTime = timeNow.Add(2 * time.Second)

		Cron.SetNextTime(entryId, nextTime)
	}

	//添加到任务映射
	newTask.CronId = entryId
	taskMap[data.RuleId] = newTask

	return
}

run.go

type Task struct {
	RuleInfo *rule.RulesList
	CronId   cron.EntryID
}

func (t Task) Run() {
	defer func() {
		if r := recover(); r != nil {
			dingding.SendDdMessage("", fmt.Sprintf("run panic:%s", r))
		}
	}()
	logger.Log.Infof("cronId:%d ***** run start *****:%s", t.CronId, time.Now().Format("2006-01-02 15:04:05"))

	err := run(t.RuleInfo)
	if err != nil {
		dingding.SendDdMessage("", fmt.Sprintf("ruleId:%d run fail", t.RuleInfo.RuleId))
	}
	logger.Log.Infof("cronId:%d ***** run end *****:%s", t.CronId, time.Now().Format("2006-01-02 15:04:05"))
}

由于robfig计算下一次运行时间时默认时从当前时间算起的,而我们是需要从规则的创建时间来计算的,所以对其代码进行了微调

主要是在 robfig/cron/cron.go 中增加了修改nextTime的方法,主要是在start之前进行修改

func (c *Cron) SetNextTime(id EntryID, nextTime time.Time) {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		return
	}
	if nextTime.Before(time.Now()) {
		return
	}
	for _, e := range c.entries {
		if e.ID != id {
			continue
		}
		e.Next = nextTime
	}
}

参考资料

  1. github/robfig/cron/v3 使用与源码解析
  2. Golang robfig/cron 实现解析
  3. robfig/cron

Post Directory