Efficient and reliable background processing for Go

CurlyQ

GoDoc Build Status GoCover Go Report Card License

CurlyQ provides a simple, easy-to-use interface for performing background processing in Go. It supports scheduled jobs, job deduplication, and configurable concurrent execution out of the box.

Quickstart

package main

import (
	"context"
	"log"

	cq "github.com/mcmathja/curlyq"
)

func main() {
	// Create a new producer
	producer := cq.NewProducer(&cq.ProducerOpts{
		Address: "localhost:6379",
		Queue: "testq",
	})

	// Use the producer to push a job to the queue
	producer.Perform(cq.Job{
		Data: []byte("Some data!"),
	})

	// Create a new consumer
	consumer := cq.NewConsumer(&cq.ConsumerOpts{
		Address: "localhost:6379",
		Queue: "testq",
	})

	// Consume jobs from the queue with the consumer
	consumer.Consume(func(ctx context.Context, job cq.Job) error {
		log.Println(string(job.Data))
		return nil
	})
}

The Basics

CurlyQ exposes three key types: Jobs, Producers, and Consumers.

Jobs

A Job wraps your data. In most cases, that's all you'll ever need to know about it:

job := cq.Job{
	Data: []byte("Some data."),
}

Every Job also exposes an ID field that uniquely identifies it among all jobs in the queue, and an Attempt field representing how many times it has been attempted so far.

Producers

A Producer pushes jobs on to the queue. Create one by providing it with the address of your Redis instance and a queue name:

producer := cq.NewProducer(&cq.ProducerOpts{
	Address: "my.redis.addr:6379",
	Queue: "queue_name",
})

You can also provide an existing go-redis instance if you would like to configure the queue to run on a more advanced Redis configuration or set up your own retry and timeout logic for network calls:

import "github.com/go-redis/redis/v7"

client := redis.NewClient(&redis.Options{
	Password: "p@55vvoRd",
	DB: 3,
	MaxRetries: 2,
})

producer := cq.NewProducer(&cq.ProducerOpts{
	Client: client,
	Queue: "queue_name",
})

Running producer.Perform(job) will add a job to the queue to be run asynchronously. You can also schedule a job to be enqueued at a particular time by running producer.PerformAt(time, job), or after a certain wait period by running producer.PerformAfter(duration, job). All of the Perform methods return the ID assigned to the job and an error if one occurred.

You can deduplicate jobs by pre-assigning them IDs:

job := cq.Job{
	ID: "todays_job",
}

// Enqueue the job
producer.PerformAfter(10 * time.Second, job)

// Does nothing, because a job with the same ID is already on the queue
producer.Perform(job)

Once a job has been acknowledged, its ID becomes available for reuse.

See the documentation for ProducerOpts for more details about available configuration options.

Consumers

A Consumer pulls jobs off the queue and executes them using a provided handler function. Create one with the same basic options as a Producer:

consumer := cq.NewConsumer(&cq.ConsumerOpts{
	Queue: "queue_name",

	// With an address:
	Address: "my.redis.addr:6379",
	// With a preconfigured go-redis client:
	Client: redisClient,
})

You start a consumer by running its Consume method with a handler function:

consumer.Consume(func(ctx context.Context, job cq.Job) error {
	log.Println("Job %s has been processed!")
	return nil
})

If the provided handler function returns nil, the job is considered to have been processed successfully and is removed from the queue. If the handler returns an error or panics, the job is considered to have failed and will be retried or killed based on how many times it has been attempted.

Consume will continue to process jobs until your application receives an interrupt signal or the consumer encounters a fatal error. Fatal errors only occur when the consumer is unable to communicate with Redis for an essential operation, such as updating the status of a job in flight.

See the documentation for ConsumerOpts for more details about available configuration options.

Comments
  • `go test -v` fails on Windows. Unix-only syscalls on consumer_test.go

    `go test -v` fails on Windows. Unix-only syscalls on consumer_test.go

    I have to say that I am really impressed, good job @mcmathja. The package is well-written with documentation and easy-to-understand code flow. You use the Go Best Practices. I am feeling that you will shine among go community, keep up the great work. You've got my star :)


    Now, on consumer_test.go you use syscalls that are only available on unix systems and not windows, you have two options:

    1. Add // +build !windows to ignore building on Windows on consumer_test.go (not recommended)
    2. Replace all syscall.Kill and remove all syscall.SIGUSR1 from your tests code.

    Here is the output I got when running go test -v on my windows 10 machine.

    C:\mygopath\src\github.com\mcmathja\curlyq>go test -v
    # github.com/mcmathja/curlyq [github.com/mcmathja/curlyq.test]
    .\consumer_test.go:404:42: undefined: syscall.SIGUSR1
    .\consumer_test.go:421:5: undefined: syscall.Kill
    .\consumer_test.go:421:36: undefined: syscall.SIGUSR1
    .\consumer_test.go:437:5: undefined: syscall.Kill
    .\consumer_test.go:437:36: undefined: syscall.SIGUSR1
    

    Thanks, Gerasimos Maropoulos. Author of iris

  • Upgrade go-redis to v7

    Upgrade go-redis to v7

    This updates go-redis to v7, enabling some important features that should be available before stabilizing the API (such as compatibility with sentinel authentication).

    One major change in v7 is that the go-redis client now correctly respects context cancellations, preventing future requests and quitting those that are inflight. Because we need the client to stay alive until cleanup is finished, we need to provide it with a separate context from the main consumer context that isn't cancelled until cleanup is finished. This change allows us to shut down the client when we don't finish in the specified grace period, preventing unexpected changes to the queue after a forced shutdown.

  • Notify of duplicates with an error

    Notify of duplicates with an error

    This modifies the behavior of the Perform methods to return an error when they try to enqueue or schedule a duplicate job. Previously, no indication that the job already exists was provided.

    In cases where a duplicate is expected, this allows the client to act on that information, e.g. to proactively stop additional attempts to enqueue the same job. In cases where it's not expected, the error would serve as an important indication of a problem with how IDs are being assigned client-side.

    _, err := producer.Perform(job)
    if errors.As(err, &cq.ErrJobAlreadyExists{}) {
    	// Handle duplicate
    }
    
  • Don't reorder queue when polling

    Don't reorder queue when polling

    Currently, we run BRPopLPush against the active jobs list to provide real-time notifications to consumers when new work hits the queue. Unfortunately, this has the side-effect of reordering the queue, so that the last item is moved to the front. In certain circumstances this could prevent jobs in the front of the queue from being executed in a timely fashion - e.g., if there was only one consumer, and for each job it processed it enqueued a new job on the same queue. CurlyQ doesn't provide any explicit guarantees about execution order because multiple jobs may be run concurrently, but I think it's reasonable to guarantee queue order to avoid scenarios like the above.

    The more common pattern is to use BRPopLPush to shift items one-at-a-time off of a queue and on to an inflight list. This is difficult to implement in our case, because we want to support batch job polling and constant time acks. We could shift one item onto an inflight list when polling, then combine this onto a set with other items during getJobs, but that increases complexity on the hot path and generates more keys to keep track of when cleaning up after dead consumers.

    This PR takes a different approach. We keep a separate signal list and block on that instead. If the list is empty when a job is added to the queue, we add a throwaway item to that list. This item gets recycled with each call to BRPopLPush, unblocking all consumers in the process. Then, the first time a consumer detects that there are no more jobs to process during polling, it just removes the throwaway item from the signal list, causing all future checks against the signal list to block.

  • Use cross-platform signal in tests

    Use cross-platform signal in tests

    Fixes https://github.com/mcmathja/curlyq/issues/1

    This modifies the test to use syscall.SIGALRM as its test signal instead of syscall.SIGUSR1. The latter is only available on unix systems, while the former is available on Windows as well. It changes the tests to use process.Signal instead of process.Kill for similar reasons.

A persistent and flexible background jobs library for go.

Jobs Development Status Jobs is no longer being actively developed. I will still try my best to respond to issues and pull requests, but in general yo

Nov 21, 2022
Tiny library to handle background jobs.

bgjob Tiny library to handle background jobs. Use PostgreSQL to organize job queues. Highly inspired by gue Features Durable job storage At-least-ones

Nov 16, 2021
A simple Cron library for go that can execute closures or functions at varying intervals, from once a second to once a year on a specific date and time. Primarily for web applications and long running daemons.

Cron.go This is a simple library to handle scheduled tasks. Tasks can be run in a minimum delay of once a second--for which Cron isn't actually design

Dec 17, 2022
Run Jobs on a schedule, supports fixed interval, timely, and cron-expression timers; Instrument your processes and expose metrics for each job.

A simple process manager that allows you to specify a Schedule that execute a Job based on a Timer. Schedule manage the state of this job allowing you to start/stop/restart in concurrent safe way. Schedule also instrument this Job and gather metrics and optionally expose them via uber-go/tally scope.

Dec 8, 2022
Lightweight, fast and dependency-free Cron expression parser (due checker) for Golang (tested on v1.13 and above)

adhocore/gronx gronx is Golang cron expression parser ported from adhocore/cron-expr. Zero dependency. Very fast because it bails early in case a segm

Dec 30, 2022
Marshmallow provides a flexible and performant JSON unmarshalling in Go. It specializes in dealing with unstructured struct - when some fields are known and some aren't, with zero performance overhead nor extra coding needed.
Marshmallow provides a flexible and performant JSON unmarshalling in Go. It specializes in dealing with unstructured struct - when some fields are known and some aren't, with zero performance overhead nor extra coding needed.

Marshmallow Marshmallow package provides a simple API to perform flexible and performant JSON unmarshalling in Go. Marshmallow specializes in dealing

Dec 26, 2022
clockwork - Simple and intuitive job scheduling library in Go.
clockwork - Simple and intuitive job scheduling library in Go.

clockwork A simple and intuitive scheduling library in Go. Inspired by python's schedule and ruby's clockwork libraries. Example use package main imp

Jul 27, 2022
Easy and fluent Go cron scheduling

goCron: A Golang Job Scheduling Package. goCron is a Golang job scheduling package which lets you run Go functions periodically at pre-determined inte

Jan 8, 2023
A programmable, observable and distributed job orchestration system.
A programmable, observable and distributed job orchestration system.

?? Overview Odin is a programmable, observable and distributed job orchestration system which allows for the scheduling, management and unattended bac

Dec 21, 2022
A lightweight job scheduler based on priority queue with timeout, retry, replica, context cancellation and easy semantics for job chaining. Build for golang web apps.

Table of Contents Introduction What is RIO? Concern An asynchronous job processor Easy management of these goroutines and chaining them Introduction W

Dec 9, 2022
Chrono is a scheduler library that lets you run your task and code periodically
Chrono is a scheduler library that lets you run your task and code periodically

Chrono is a scheduler library that lets you run your tasks and code periodically. It provides different scheduling functionalities to make it easier t

Dec 26, 2022
Chadburn is a scheduler alternative to cron, built on Go and designed for Docker environments.

Chadburn - a job scheduler Chadburn is a modern and low footprint job scheduler for docker environments, written in Go. Chadburn aims to be a replacem

Dec 6, 2022
Jenkins is a wonderful system for managing builds, and people love using its UI to configure jobs
Jenkins is a wonderful system for managing builds, and people love using its UI to configure jobs

Jenkins Job DSL Plugin Introduction Jenkins is a wonderful system for managing builds, and people love using its UI to configure jobs. Unfortunately,

Dec 20, 2022
A cross-platform task runner for executing commands and generating files from templates
A cross-platform task runner for executing commands and generating files from templates

Orbit A cross-platform task runner for executing commands and generating files from templates Orbit started with the need to find a cross-platform alt

Oct 22, 2022
A zero-dependencies and lightweight go library for job scheduling

A zero-dependencies and lightweight go library for job scheduling.

Aug 3, 2022
Feb 14, 2022
Simple, efficient background processing for Golang backed by RabbitMQ and Redis
Simple, efficient background processing for Golang backed by RabbitMQ and Redis

Table of Contents How to Use Motivation Requirements Features Examples Setup Config Client/Server Task Worker/Task Hander Register The Handlers Send t

Nov 10, 2022
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

Oct 20, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Dec 30, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq: simple, reliable, and efficient distributed task queue in Go

Dec 31, 2022