Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq

Build Status GoDoc Go Report Card License: MIT Gitter chat

Overview

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.

Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines.
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Task Queue Diagram

Stability and Compatibility

Important Note: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

Features

Quickstart

First, make sure you are running a Redis server locally.

$ redis-server

Next, write a package that encapsulates task creation and task handling.

package tasks

import (
    "fmt"

    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
    payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
    return asynq.NewTask(TypeEmailDelivery, payload)
}

func NewImageResizeTask(src string) *asynq.Task {
    payload := map[string]interface{}{"src": src}
    return asynq.NewTask(TypeImageResize, payload)
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    userID, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    tmplID, err := t.Payload.GetString("template_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
    // Email delivery code ...
    return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
    // ... fields for struct
}

func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    src, err := t.Payload.GetString("src")
    if err != nil {
        return err
    }
    fmt.Printf("Resize image: src = %s\n", src)
    // Image resizing code ...
    return nil
}

func NewImageProcessor() *ImageProcessor {
    // ... return an instance
}

In your application code, import the above package and use Client to put tasks on the queue.

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}
    c := asynq.NewClient(r)
    defer c.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    t := tasks.NewEmailDeliveryTask(42, "some:template:id")
    res, err := c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    t = tasks.NewEmailDeliveryTask(42, "other:template:id")
    res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal("could not schedule task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)

    // ---------------------------------------------------------------------------
    // Example 4: Pass options to tune task processing behavior at enqueue time.
    //            Options passed at enqueue time override default ones, if any.
    // ---------------------------------------------------------------------------

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)
}

Next, start a worker server to process these tasks in the background.
To start the background workers, use Server and provide your Handler to process the tasks.

You can optionally use ServeMux to create a handler, just as you would with "net/http" Handler.

package main

import (
    "log"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}

    srv := asynq.NewServer(r, asynq.Config{
        // Specify how many concurrent workers to use
        Concurrency: 10,
        // Optionally specify multiple queues with different priority.
        Queues: map[string]int{
            "critical": 6,
            "default":  3,
            "low":      1,
        },
        // See the godoc for other configuration options
    })

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

For a more detailed walk-through of the library, see our Getting Started Guide.

To Learn more about asynq features and APIs, see our Wiki and godoc.

Web UI

Asynqmon is a web based tool for monitoring and administrating Asynq queues and tasks. Please see the tool's README for details.

Here's a few screenshots of the web UI.

Queues view
Web UI QueuesView

Tasks view
Web UI TasksView

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

Here's an example of running the stats command.

Gif

For details on how to use the tool, refer to the tool's README.

Installation

To install asynq library, run the following command:

go get -u github.com/hibiken/asynq

To install the CLI tool, run the following command:

go get -u github.com/hibiken/asynq/tools/asynq

Requirements

Dependency Version
Redis v3.0+
Go v1.13+

Contributing

We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community. Please see the Contribution Guide before contributing.

Acknowledgements

  • Sidekiq : Many of the design ideas are taken from sidekiq and its Web UI
  • RQ : Client APIs are inspired by rq library.
  • Cobra : Asynq CLI is built with cobra

License

Asynq is released under the MIT license. See LICENSE.

Comments
  • [FEATURE REQUEST] Instrumentation/Tracing for asynq redis client

    [FEATURE REQUEST] Instrumentation/Tracing for asynq redis client

    Is your feature request related to a problem? Please describe.

    • I want to add tracing to the underlying redis client instance using dd-trace-go.
    • Is it possible to get access to the underlying redis instance so I can wrap the client with a function like the one below?
    import (
    	"github.com/go-redis/redis/v7"
    	ddRedis "gopkg.in/DataDog/dd-trace-go.v1/contrib/go-redis/redis.v7"
    )
    
    func TraceClient(rClient redis.UniversalClient) {
    	ddRedis.WrapClient(
    		rClient,
    		ddRedis.WithServiceName(config.ServiceName()+"-redis"),
    		ddRedis.WithAnalytics(config.Enabled()),
    	)
    }
    
    

    Describe alternatives you've considered

    • I am also looking if the way to go about this is creating a RedisConnOpt interface that returns a wrapped redis instance with datadog tracing. What do you think?

    https://pkg.go.dev/gopkg.in/DataDog/[email protected]/contrib/go-redis/redis.v7#WrapClient

  • [BUG] The task is always active

    [BUG] The task is always active

    Describe the bug When the task is active, the worker shutdown, then the task will always be active.

    To Reproduce Steps to reproduce the behavior:

    1. Set task sleep times.
    func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    	id, err := t.Payload.GetInt("user_id")
    	if err != nil {
    		return err
    	}
    	fmt.Printf("[%s] - [Start] Send Welcome Email to User %d\n", time.Now().String(), id)
    	time.Sleep(10*time.Second)
    	fmt.Printf("[%s] - [ End ] Send Welcome Email to User %d\n", time.Now().String(), id)
    	return nil
    }
    
    1. Start the worker.
    2. Start the scheduler.
    3. When the task is active, shutdown the worker.
    4. The task will always be active.

    Expected behavior I think the task should change the state to failed based on Timeout, MaxRetry, etc.

    Screenshots If applicable, add screenshots to help explain your problem.

    Environment (please complete the following information):

    • OS: Windows
    • Version of asynq package: v0.17.2
  • [QUESTION] i think set stop in run function is not a good idea

    [QUESTION] i think set stop in run function is not a good idea

    https://github.com/hibiken/asynq/blob/master/background.go#L253

    normally, packages has two methods, Run and Stop, so i can cleanup other resources before or after Stop(), another scene is that user will run multipe backgroud server in main(), like kafka consumer

  • [FEATURE REQUEST] Processed job's payload and result

    [FEATURE REQUEST] Processed job's payload and result

    Is your feature request related to a problem? Please describe. Asynq currently removes the successfully processed jobs from Redis. That means the information about the processed jobs also vanishes like;

    • When did the client pushed the job into the queue
    • When is the job completed.
    • Is a given job id processed successfully.

    Describe the solution you'd like It would be nice to keep processed jobs until a given TTL and return a basic struct that contains something like id, queued_at, completed_at. I think to achieve that, the first thing is to generate a unique id for each task and keep them like asynq:{default}:processed:{job_id}

    Describe alternatives you've considered Python RQ or Laravel Horizon might be good examples to take a look at.

  • [Question] Are there TTL set for tasks? How long does it stay in Redis?

    [Question] Are there TTL set for tasks? How long does it stay in Redis?

    Asking because i cannot find this in the wiki and others might find it useful.

    Specifically, How long do archived tasks stay in Redis so we can keep track or retry manually?

  • [FEATURE REQUEST] WebUI of server states (like sidekiq's UI)

    [FEATURE REQUEST] WebUI of server states (like sidekiq's UI)

    We could offer a cmd for serving a comprehensive Web UI for management and monitoring

    Describe the solution you'd like maybe we can add a sub cmd in asynq. eg. asynq webui.

    eg. https://raw.githubusercontent.com/mperham/sidekiq/master/examples/web-ui.png

  • [Proposal] Change Payload to bytes & Add Wrapper around Key-Value pair payload

    [Proposal] Change Payload to bytes & Add Wrapper around Key-Value pair payload

    Is your feature request related to a problem? Please describe. I'm always frustrated when I need to call GetX method for all the values in Payload to get all the data (also need to make sure that I don't make a typo in payload key string). It'd be nice if asynq supported en/decoding protobuf or gob message directly so that I can load that data to an object with one method call.

    Describe the solution you'd like initial proposal:

    type EmailTaskPayload struct {
        UserID int
    }
    
    // client side.
    func main() {
         c := asynq.NewClient(asynq.RedisClientOpt{Addr: ":6379"})
    
        t := asynq.NewTask("email:welcome", EmailTaskPayload{UserID: 42})
        err := c.Enqueue(t)
        if err != nil {
            log.Fatalf("could not enqueue task: %v", err)
        }
    }
    
    // background workers side
    func main() {
        bg := asynq.NewBackground(r, &asynq.Config{
            Concurrency: 10,
        })
    
        mux := asynq.NewMux()
        mux.HandleFunc("email:welcome", emailHandler)
    
        bg.Run(mux)
    }
    
    func emailHandler(ctx context.Context, t *asynq.Task) error {
        var p EmailTaskPayload
        err := t.ReadPayload(&p)
        if err != nil {
            return err
        }
        fmt.Printf("Send Email to User %d\n", p.UserID)
        return nil
    }
    

    Describe alternatives you've considered None for now.

    Additional context None for now.

  • [BUG] The high priority queue is process very slow compare to lower queue

    [BUG] The high priority queue is process very slow compare to lower queue

    Describe the bug The high priority queue is processed very slow compare to the lower queue

    To Reproduce Steps to reproduce the behavior (Code snippets if applicable):

    • Run a long-running task on the default queue. Many small but critical tasks will issue from here and added to the critical queue
    • The default queue is hold all the time of worker pools
    • The critical queue is stuck, and no task is process

    Expected behavior

    • The critical queue's task need to be selected to process first and clean out quickly

    Screenshots

    • Setup server with 3 queue Screen Shot 2021-08-20 at 09 38 28

    • Enqueue critical task like that Screen Shot 2021-08-20 at 09 41 06

    • The critical queue stuck Screen Shot 2021-08-20 at 09 37 49

    • The default queue is running on all other workers Screen Shot 2021-08-20 at 09 37 42

    Environment (please complete the following information):

    • OS: Ubuntu 18.4
    • Version of asynq package 0.18.3

    Screen Shot 2021-08-20 at 09 47 04

  • System Design Pattern | Messaging App

    System Design Pattern | Messaging App

    Hi @hibiken,

    First of all, thank you for writing lightest message queue in Go.

    I'm writing a messaging app. When the QR is read, I start to process messages. I will use asynq for processing history messages and normal message. I guess I have to create an asynq server for each number (instance)?

    Message Handler Case:

    • Number Instance -> PhoneNumber (Start asynq server when read qr)

    • QR Read -> Handle All History Messages(avarage 2-3 million message) -> Start worker queue (instance_id_history).

    • Normal Message (Schedule Message) -> Start worker queue (instance_id_normal)

    Should I always listen to 2 queues? Or I can stop listening to this queue when the HistoryMessages is finished, right?

    I'll be listening to 2000 queue when there are 1000 number(instances). Do you think a redis cluster(sentinel) is necessary to handle this traffic?

    What kind of path do you think we should follow with asynq?

    Sincerely :)

  • add UniqueKey option to extend Unique's behaviour

    add UniqueKey option to extend Unique's behaviour

    Instead of (queue, type, payload) use any custom key. This allows to enforce unique tasks not limited to same payload and/or type.

    Tested with --redis_cluster flag.

  • [FEATURE REQUEST] Distributed concurrency support per queue / taskType

    [FEATURE REQUEST] Distributed concurrency support per queue / taskType

    Is your feature request related to a problem? Please describe. N/A

    Describe the solution you'd like Is it possible to set concurrency individually for every queue like it is supported here.

    Describe alternatives you've considered Currently I tried to wire up the server in a way where I calculate total concurrency and set max priority and assuming that the combination will provide some sort of similar effects as in gocraft/work. Storing the maxConcurrency and priority info per queue and using it for initialisation of the server when starting the adapter (a simple wrapper over asynq).

    type Adapter struct {
    	taskConfigMap map[string]taskConfig
    	tcMutex       sync.RWMutex
    }
    
    type taskConfig struct {
    	priority       int
    	maxConcurrency int
    }
    
    func (a *Adapter) server() *asynq.Server {
    	return asynq.NewServer(a.redisConnOpt, asynq.Config{
    		Concurrency:    a.concurrency(),
    		Queues:         a.queuePriorityMap(),
    	})
    }
    
    func (a *Adapter) queuePriorityMap() map[string]int {
    	a.tcMutex.RLock()
    	defer a.tcMutex.RUnlock()
    
    	m := make(map[string]int)
    
    	for tn, cfg := range f.taskConfigMap {
    		if cfg.priority == 0 {
    			m[tn] = 1
    			continue
    		}
    
    		m[tn] = cfg.priority
    	}
    
    	return m
    }
    
    func (a *Adapter) concurrency() int {
    	a.tcMutex.RLock()
    	defer a.tcMutex.RUnlock()
    
    	concurrency := 0
    	for _, cfg := range f.taskConfigMap {
    		if cfg.maxConcurrency == 0 {
    			concurrency++
    			continue
    		}
    
    		concurrency += cfg.maxConcurrency
    	}
    
    	if rtcpu := runtime.NumCPU(); concurrency < rtcpu {
    		return rtcpu
    	}
    
    	return concurrency
    }
    

    Additional context Ref: Priority Sampler from gocraft/work

  • fix empty id

    fix empty id

    When the network is bad, there will be tasks without msg fields (only one state field), and the task ID cannot be obtained. If the ID is empty, the default ID will be set

  • [FEATURE REQUEST] Query tasks from redis

    [FEATURE REQUEST] Query tasks from redis

    Is your feature request related to a problem? Please describe. Our app has many tenants and sometimes tasks fail for one reason or another, it would be helpful to allow tenants to see jobs queued for them (emails, etc) and show the status.

    Describe the solution you'd like Method(s) to query tasks from the Redis server.

    Describe alternatives you've considered I looked at just re-implementing the query logic from the cli myself, but this seems like it should be in the main library.

  • Missing username in parsing redis uri

    Missing username in parsing redis uri

    Hi! I was trying to monitor asynq jobs with asynqmon and then I realise the parseRedisURI method doesn't parse username from HTTP standard authentication inside uri. https://github.com/hibiken/asynq/blob/0b8cfad7034159dadccf95e56b96305c0d2a4f7a/asynq.go#L462 I wanna know it's intentional or just missing or whatever. If it wasn't intentional I can add it myself.

  • [FEATURE REQUEST] Ability to hook lock check before dequeueing a task

    [FEATURE REQUEST] Ability to hook lock check before dequeueing a task

    Is your feature request related to a problem? Please describe. I have worked on a distributed semaphore in the past. However, one question that I have is that the lock count is checked after the task is dequeued from the queue, and then if the lock can't be acquired, it is again re-queued with a delay set by retryBackoff.

    This should be okay in normal circumstances, but in case of downtime on dependent upstreams, I am worried that this can put unnecessary pressure on Redis and de-queueing/re-queueing may also interfere with the order of tasks in the queue.

    Is there a way, the dequeue routine can take into account of this lock count before the dequeue even happens?

    Describe the solution you'd like Instead of checking for the lock after the de-queue, I would like to hook the lock checker before the de-queue begins.

    Describe alternatives you've considered I tried to see if I can use the Pause queue feature to help with this, however, I kinda feel like it is just a workaround, and not sure if it is okay to do so.

  • Redis errors are not being returned to the caller on server run

    Redis errors are not being returned to the caller on server run

    Describe the bug Errors are not bubbled up to the caller when starting server. For example:

    ERROR: cannot subscribe to cancelation channel: UNKNOWN: redis pubsub receive error: dial tcp: address redis.dev.xyz: missing port in address

    To Reproduce

    srv := asynq.NewServer(
    	asynq.RedisClientOpt{Addr: redisURL},
    	asynq.Config{
    		Queues: map[string]int{
    			"high":    6,
    			"default": 3,
    			"low":     1,
    		},
    		ErrorHandler: asynq.ErrorHandlerFunc(HandleErrorFunc),
    	},
    )
    
    mux := asynq.NewServeMux()
    mux.HandleFunc(TypeFirebaseSyncUser, HandleFirebaseSyncUserTask)
    
    if err := srv.Run(mux); err != nil {
      // No error is ever returned here
      log.Fatalf("could not run server: %v", err)
    }
    

    Expected behavior If an error happens such as not being able to connect to redis, the caller should be able to do what they need to do with that error. This is a problem because when I run this server in a container, the container continues to report as running with no issues, even though there are definitely issues.

GTA(Go Task Async) is a lightweight reliable asynchronous task and transaction message library for Golang

GTA (Go Task Async) is a lightweight and reliable asynchronous task and transaction message library for by golang.

Jun 4, 2022
A lightweight, distributed and reliable message queue based on Redis

nmq A lightweight, distributed and reliable message queue based on Redis Get Started Download go get github.com/inuggets/nmq Usage import "github.com

Nov 22, 2021
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

Oct 20, 2022
RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue

RapidMQ RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue in the Go programming langu

Sep 27, 2022
A Multi Consumer per Message Queue with persistence and Queue Stages.
 A Multi Consumer per Message Queue with persistence and Queue Stages.

CrimsonQ A Multi Consumer per Message Queue with persistence and Queue Stages. Under Active Development Crimson Queue allows you to have multiple cons

Jul 30, 2022
Go client to reliable queues based on Redis Cluster Streams

Ami Go client to reliable queues based on Redis Cluster Streams. Consume/produce performance Performance is dependent from: Redis Cluster nodes count;

Dec 12, 2022
💨A well crafted go packages that help you build robust, reliable, maintainable microservices.

Hippo A Microservices Toolkit. Hippo is a collection of well crafted go packages that help you build robust, reliable, maintainable microservices. It

Aug 11, 2022
A single binary, simple, message queue.

MiniQueue A stupid simple, single binary message queue using HTTP/2. Most messaging workloads don't require enormous amounts of data, endless features

Nov 9, 2022
A simple persistent directory-backed FIFO queue.

pqueue pqueue is a simple persistent directory-backed FIFO queue. It provides the typical queue interface Enqueue and Dequeue and may store any byte s

Dec 12, 2022
Simple docker container to publish a fixed message to a specified queue. Created to be used with k8s CRON scheduling.

RabbitMQ Publish CRON Simple docker container to publish a fixed message to a specified rabbitmq exchange. Created to be used as part of a Kubernetes

Dec 20, 2021
redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue redisqueue provides a producer and consumer of a queue that uses Redis streams. Features A Producer struct to make enqueuing messages easy.

Dec 29, 2022
Kudruk helps you to create queue channels and manage them gracefully.

kudruk Channels are widely used as queues. kudruk (means queue in Turkish) helps you to easily create queue with channel and manage the data in the qu

Feb 21, 2022
Chanman helps you to create queue channels and manage them gracefully.

chanman Channels are widely used as queues. chanman (Channel Manager) helps you to easily create queue with channel and manage the data in the queue.

Oct 16, 2021
A basic event queue (and publisher/subscriber) in go

queue A basic event queue (and publisher/subscriber) in go. Installation go get github.com/jimjibone/queue Queue Usage Queue is a channel-based FIFO q

Dec 17, 2021
implentacion queue in kafka, rabbit and sqs

Big Queue on Go This is a simple big queue and implementation in kafka, rabbit and aws sqs. Publish in a topic in kafka: Use NewPublisher method to cr

Dec 29, 2021
dque is a fast, embedded, durable queue for Go

dque - a fast embedded durable queue for Go dque is: persistent -- survives program restarts scalable -- not limited by your RAM, but by your disk spa

Jan 8, 2023
Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Jan 4, 2023
Queue with NATS Jetstream to remove all the erlangs from cloud

Saf in Persian means Queue. One of the problems, that we face on projects with queues is deploying RabbitMQ on the cloud which brings us many challenges for CPU load, etc. I want to see how NATS with Jetstream can work as the queue to replace RabbitMQ.

Dec 15, 2022
A fast durable queue for Go

pqueue - a fast durable queue for Go pqueue is thread-safety, serves environments where more durability is required (e.g., outages last longer than me

Oct 16, 2022