分布式定时任务库 distributed-cron

dcron

Language Build Status Go Report Card

分布式定时任务库

原理

基于redis同步节点数据,模拟服务注册。然后将任务名 根据一致性hash 选举出执行该任务的节点。

流程图

dcron流程图

特性

  • 负载均衡:根据任务数据和节点数据均衡分发任务。
  • 无缝扩容:如果任务节点负载过大,直接启动新的服务器后部分任务会自动迁移至新服务实现无缝扩容。
  • 故障转移:单个节点故障,10s后会自动将任务自动转移至其他正常节点。
  • 任务唯一:同一个服务内同一个任务只会启动单个运行实例,不会重复执行。
  • 自定义存储:通过实现driver接口来增加节点数据存储方式。

使用说明

1.创建redisDriver实例,指定服务名并初始化dcron。服务名为执行相同任务的单元。

  drv, _ := redis.NewDriver(&redis.Conf{
  		Host: "127.0.0.1",
  		Port: 6379,
  })
  dcron := NewDcron("server1", drv)

2.使用cron语法添加任务,需要指定任务名。任务名作为任务的唯一标识,必须保证唯一。

    dcron.AddFunc("test1","*/3 * * * *",func(){
		fmt.Println("执行 test1 任务",time.Now().Format("15:04:05"))
	})

3.开始任务。

dcron.Start()

关于服务名的说明

服务名只是为了定义相同一组任务,节点在启动时会产生一个uuid,然后绑定到这个服务内,不会存在多个节点使用同一个服务明出现冲突的问题。

比如有个服务叫【课堂服务】里面包含了 【上课】【下课】 等各类定时任务,那么就可以有n个不同的服务节点(可以在同一台或者不同机器上),服务都叫课堂服务。

其他

注意:一般定时如果和http服务在一起时不用特殊处理;但如果程序内只有该定时任务,需要阻塞主进程以防止主线程直接退出。

Comments
  • 数据竞争bug

    数据竞争bug

    Dcron.Start() //启动分布式定时任务 Dcron.Stop()//停止定时任务

    会产生数据竞争

    代码 ` //分布式定时任务路由 Dcron.Start() //启动分布式定时任务

    quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) sign := <-quit //阻塞等待结束信号 utils.Logger().Info().Msgf("%v %v %v %+v", "SERVER_RUN", "server_close_sig", "", sign)

    //停止定时任务 Dcron.Stop()

    ================== WARNING: DATA RACE Read at 0x00c000594038 by goroutine 46: github.com/libi/dcron.(*NodePool).tickerUpdatePool() C:/Users/yunwe/go/pkg/mod/github.com/libi/[email protected]/node_pool.go:77 +0xfb github.com/libi/dcron.(*NodePool).StartPool.func1() C:/Users/yunwe/go/pkg/mod/github.com/libi/[email protected]/node_pool.go:57 +0x39

    Previous write at 0x00c000594038 by main goroutine: github.com/libi/dcron.(*Dcron).Stop() C:/Users/yunwe/go/pkg/mod/github.com/libi/[email protected]/dcron.go:159 +0x39 main.main() D:/code_qifan/jump_live/main.go:81 +0x486

    Goroutine 46 (running) created at: github.com/libi/dcron.(*NodePool).StartPool() C:/Users/yunwe/go/pkg/mod/github.com/libi/[email protected]/node_pool.go:57 +0x1e5 github.com/libi/dcron.(*Dcron).Start() C:/Users/yunwe/go/pkg/mod/github.com/libi/[email protected]/dcron.go:134 +0x84 main.main() D:/code_qifan/jump_live/main.go:69 +0x271

    `

  •  The cron tab will stop after a while.

    The cron tab will stop after a while.

    If two machines start the cron tab at the same time, the cron tab will stop after a while, the code is as follows

    func main() { drv, _ := dredis.NewDriver(&dredis.Conf{ Host: "10.203.169.19", Port: 1841, Password: "005f4ce0b221abf", }, redis.DialConnectTimeout(time.Second*10)) dcronDemo := dcron.NewDcron("server1", drv, cron.WithSeconds()) //添加多个任务 启动多个节点时 任务会均匀分配给各个节点

    dcronDemo.AddFunc("s1 test1", "0 */1 * * * *", func() {
    	fmt.Println("执行 service1 test1 任务", time.Now().Format("15:04:05"))
    })
    dcronDemo.AddFunc("s1 test2", "0 */1 * * * *", func() {
    	fmt.Println("执行 service1 test2 任务", time.Now().Format("15:04:05"))
    })
    
    
    dcronDemo.AddFunc("s1 test3", "0 */1 * * * *", func() {
    	fmt.Println("执行 service1 test3 任务", time.Now().Format("15:04:05"))
    })
    dcronDemo.Start()
    
    
    //测试120分钟后退出
    time.Sleep(120 * time.Minute)
    

    }

  • 建议将Duration放到可配置项中

    建议将Duration放到可配置项中

    建议将Duration放到可配置项中,不然 func (np *NodePool) tickerUpdatePool() { tickers := time.NewTicker(time.Second * defaultDuration) for range tickers.C { if np.dcron.isRun { np.updatePool() } } } 直接for循环扫描redis,对性能有影响.

  • 新加入节点可能导致任务重复运行

    新加入节点可能导致任务重复运行

    启动新的节点,如果任务中有小于 10s(defaultDuration) 的定时任务, 可能会导致有的任务会同时在两个节点中运行。

    解决方案: 在 dcron.Start() 中至少等待 defaultDuration 时间后再运行 d.cr.Start()。但是此方案可能会导致有的任务停滞运行一段时间。

  • Dcron开发背景

    Dcron开发背景

    背景

    项目中的定时任务越来越多,为了防止任务重复执行曾经使用过的方案:

    • 只启用了一个节点。
    • 固定循环间隔,使用分布式事务锁。

    第一种方案没有容错机制,当单个节点宕机,所有定时任务都无法正常执行。

    第二种方案不能跟cron一样灵活设定时间,比如需要设定每天1点执行就必须借助数据库或者其他存储手段去轮询,非常低效。

    在对比了市面上主流的分布式定时任务库后,发现要不就是过重,要不就是使用复杂或者不能使用golang无缝接入。所以萌生了开发一个分布式定时任务库。

    要解决的痛点主要包括:

    1. 高一致可靠性
    2. 能复用现在架构内的存储系统,redis或者mysql。
    3. 引入足够方便,使用方法足够简单。

    原理

    在参考了gojob源码后想到了全新的解决思路:

    将所有节点存入公共存储(目前基本所有项目都使用redis作为缓存库,所以首先开发了redis支持)后使用一致性hash算法来选举出执行单个任务的节点来保证唯一性,所有节点都按照写入的cron预执行,在任务执行入口处根据一致性hash算法来判断该任务是否应该由当前节点执行。

  • leaseRespChan 满了 lease keepalive response queue is full; dropping response send

    leaseRespChan 满了 lease keepalive response queue is full; dropping response send

    版本:master 本地测试

    leaseRespChan 满了

    func (e *EtcdDriver) SetHeartBeat(nodeID string) {
    	leaseID, err := e.putKeyWithLease(nodeID, nodeID)
    	if err != nil {
    		log.Printf("putKeyWithLease error: %v", err)
    		return
    	}
    
    	leaseRespChan, err := e.cli.KeepAlive(context.Background(), leaseID)
    
    	if err != nil {
    		log.Printf("keepalive error:%v", err)
    		return
    	}
    
             // 尝试修复
    	go func (){
    		for {
    			select {
    			case resp := <-leaseRespChan:
    				if resp == nil {
    					log.Printf("ectd cli keepalive unexpected nil")
    				}
    			case <-time.After(businessTimeout):
    				log.Printf("ectd cli keepalive timeout")
    			}
    		}
    	} ()
    }
    
  • start dcron error

    start dcron error

    我的代码:
    drvRedis, _ := redis.NewDriver(&redis.Conf{ Addr: conf.RedisSetting.RedisHost, Password: conf.RedisSetting.RedisPassword, //Wait: true, }) if err := drvRedis.Ping(); err != nil { log.Fatal(err.Error()) return } log.Println("init redis ======") dCronEngine := dcron.NewDcron("cron_server1", drvRedis, cron.WithSeconds())

    tg := new(product_platform.TaskGroup)
    //每小时执行一次
    //dcr.AddFunc("TestTask", "0 0 * * * *", tg.TestTask)
    dCronEngine.AddFunc("GetBipData", "0 0 * * * *", tg.GetBipData)
    dCronEngine.AddFunc("GetTeaData", "0 0 * * * *", tg.GetTeaData)
    dCronEngine.Start()
    

    返回的错误日志: 2022-05-27 20:55:06.386 INFO runtime/proc.go:255 init redis ====== [dcron] 2022/05/27 20:55:06 INFO: addJob 'GetBipData' : 0 0 * * * * [dcron] 2022/05/27 20:55:06 INFO: addJob 'GetTeaData' : 0 0 * * * * [dcron] 2022/05/27 20:55:06 ERR: dcron start node pool error ERR unkown command or protocal error

    部署到linux服务器上就出现错误了,本地没有什么事,请问这是什么问题,执行到start方法的时候就出错了?

  • 关于批量添加

    关于批量添加

    从mysql批量加入定时,是加一个for然后 for (⋯⋯) { dcron.AddFunc("test n+1","*/3 * * * *",func(){ fmt.Println("执行 test n+1 任务",time.Now().Format("15:04:05")) }) } 这样子添加吗?

  • some fix about spell-error and mutex and code style.

    some fix about spell-error and mutex and code style.

    1. fix filename spell error
    2. change the log interface
    3. use rwmutex take the place of mutex
    4. use an independent interface dlog.Logger instead of interface {Print ... }
  • 需求想法沟通

    需求想法沟通

    兄弟,既然是分布式定时器库,我觉得想法可以进一步展开。

    1. 新增个http这其他rpc接口,支持远程注册定时任务&参数,定时call相关接口
    2. 周期式执行/递推式执行周期任务是否可选
    3. 大量线上场景并非周期式定时任务,一段时间后触发的任务占绝大多数,能否支持
    4. 海量定时任务还可以继续优化,例如只缓存未来10min钟内要执行的定时任务,降低内存压力
Easy and fluent Go cron scheduling

goCron: A Golang Job Scheduling Package. goCron is a Golang job scheduling package which lets you run Go functions periodically at pre-determined inte

Jan 8, 2023
gron, Cron Jobs in Go.

gron Gron provides a clear syntax for writing and deploying cron jobs. Goals Minimalist APIs for scheduling jobs. Thread safety. Customizable Job Type

Dec 20, 2022
a cron library for go

cron Cron V3 has been released! To download the specific tagged release, run: go get github.com/robfig/cron/[email protected] Import it in your program as: im

Dec 25, 2022
Run Jobs on a schedule, supports fixed interval, timely, and cron-expression timers; Instrument your processes and expose metrics for each job.

A simple process manager that allows you to specify a Schedule that execute a Job based on a Timer. Schedule manage the state of this job allowing you to start/stop/restart in concurrent safe way. Schedule also instrument this Job and gather metrics and optionally expose them via uber-go/tally scope.

Dec 8, 2022
Lightweight, fast and dependency-free Cron expression parser (due checker) for Golang (tested on v1.13 and above)

adhocore/gronx gronx is Golang cron expression parser ported from adhocore/cron-expr. Zero dependency. Very fast because it bails early in case a segm

Dec 30, 2022
Chadburn is a scheduler alternative to cron, built on Go and designed for Docker environments.

Chadburn - a job scheduler Chadburn is a modern and low footprint job scheduler for docker environments, written in Go. Chadburn aims to be a replacem

Dec 6, 2022
基于 Redis 和 Cron 的定时任务队列

RTask RTask 是 Golang 一款基于 Redis 和 Cron 的定时任务队列。 快速上手 您需要使用 Go Module 导入 RTask 工具包。 go get -u github.com/avtion/rtask 使用教程 package main import ( "con

Oct 27, 2021
A cron-like strategy plugin for HashiCorp Nomad Autoscaler

Nomad Autoscaler Cron Strategy A cron-like strategy plugin, where task groups are scaled based on a predefined scheduled. job "webapp" { ... group

Feb 14, 2022
Go-based runner for Cron Control

Cron Control Runner A Go-based runner for processing WordPress cron events, via Cron Control interfaces. Installation & Usage Clone the repo, and cd i

Jul 19, 2022
This package provides the way to get the previous timestamp or the next timestamp that satisfies the cron expression.

Cron expression parser Given a cron expression, you can get the previous timestamp or the next timestamp that satisfies the cron expression. I have us

May 3, 2022
Graceful shutdown with repeating "cron" jobs (running at a regular interval) in Go

Graceful shutdown with repeating "cron" jobs (running at a regular interval) in Go Illustrates how to implement the following in Go: run functions ("j

May 30, 2022
Zdpgo cron - 在golang中使用cron表达式并实现定时任务

zdpgo_cron 在golang中使用cron表达式并实现定时任务 项目地址:https://github.com/zhangdapeng520/zdpgo

Feb 16, 2022
Cloud-native, enterprise-level cron job platform for Kubernetes
Cloud-native, enterprise-level cron job platform for Kubernetes

Furiko Furiko is a cloud-native, enterprise-level cron and adhoc job platform for Kubernetes. The main website for documentation and updates is hosted

Dec 30, 2022
A programmable, observable and distributed job orchestration system.
A programmable, observable and distributed job orchestration system.

?? Overview Odin is a programmable, observable and distributed job orchestration system which allows for the scheduling, management and unattended bac

Dec 21, 2022
Distributed Task Scheduling System|分布式定时任务调度平台
Distributed Task Scheduling System|分布式定时任务调度平台

Crocodile Distributed Task Scheduling System English | 中文 Introduction A distributed task scheduling system based on Golang that supports http request

Jan 5, 2023
Celery Distributed Task Queue in Go
Celery Distributed Task Queue in Go

gocelery Go Client/Server for Celery Distributed Task Queue Why? Having been involved in several projects migrating servers from Python to Go, I have

Jan 1, 2023
Machinery is an asynchronous task queue/job queue based on distributed message passing.
Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery Machinery is an asynchronous task queue/job queue based on distributed message passing. V2 Experiment First Steps Configuration Lock Broker

Dec 24, 2022
high performance distributed task scheduling system, Support multi protocol scheduling tasks
 high performance distributed task scheduling system, Support multi protocol scheduling tasks

high performance distributed task scheduling system, Support multi protocol scheduling tasks

Dec 2, 2022
YTask is an asynchronous task queue for handling distributed jobs in golang
YTask is an asynchronous task queue for handling distributed jobs in golang

YTask is an asynchronous task queue for handling distributed jobs in golang

Dec 24, 2022