Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq

Build Status GoDoc Go Report Card License: MIT Gitter chat

Overview

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.

Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines.
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Task Queue Diagram

Stability and Compatibility

Important Note: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

Features

Quickstart

First, make sure you are running a Redis server locally.

$ redis-server

Next, write a package that encapsulates task creation and task handling.

package tasks

import (
    "fmt"

    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
    payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
    return asynq.NewTask(TypeEmailDelivery, payload)
}

func NewImageResizeTask(src string) *asynq.Task {
    payload := map[string]interface{}{"src": src}
    return asynq.NewTask(TypeImageResize, payload)
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    userID, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    tmplID, err := t.Payload.GetString("template_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
    // Email delivery code ...
    return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
    // ... fields for struct
}

func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    src, err := t.Payload.GetString("src")
    if err != nil {
        return err
    }
    fmt.Printf("Resize image: src = %s\n", src)
    // Image resizing code ...
    return nil
}

func NewImageProcessor() *ImageProcessor {
    // ... return an instance
}

In your application code, import the above package and use Client to put tasks on the queue.

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}
    c := asynq.NewClient(r)
    defer c.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    t := tasks.NewEmailDeliveryTask(42, "some:template:id")
    res, err := c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    t = tasks.NewEmailDeliveryTask(42, "other:template:id")
    res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal("could not schedule task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)

    // ---------------------------------------------------------------------------
    // Example 4: Pass options to tune task processing behavior at enqueue time.
    //            Options passed at enqueue time override default ones, if any.
    // ---------------------------------------------------------------------------

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)
}

Next, start a worker server to process these tasks in the background.
To start the background workers, use Server and provide your Handler to process the tasks.

You can optionally use ServeMux to create a handler, just as you would with "net/http" Handler.

package main

import (
    "log"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}

    srv := asynq.NewServer(r, asynq.Config{
        // Specify how many concurrent workers to use
        Concurrency: 10,
        // Optionally specify multiple queues with different priority.
        Queues: map[string]int{
            "critical": 6,
            "default":  3,
            "low":      1,
        },
        // See the godoc for other configuration options
    })

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

For a more detailed walk-through of the library, see our Getting Started Guide.

To Learn more about asynq features and APIs, see our Wiki and godoc.

Web UI

Asynqmon is a web based tool for monitoring and administrating Asynq queues and tasks. Please see the tool's README for details.

Here's a few screenshots of the web UI.

Queues view
Web UI QueuesView

Tasks view
Web UI TasksView

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

Here's an example of running the stats command.

Gif

For details on how to use the tool, refer to the tool's README.

Installation

To install asynq library, run the following command:

go get -u github.com/hibiken/asynq

To install the CLI tool, run the following command:

go get -u github.com/hibiken/asynq/tools/asynq

Requirements

Dependency Version
Redis v3.0+
Go v1.13+

Contributing

We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community. Please see the Contribution Guide before contributing.

Acknowledgements

  • Sidekiq : Many of the design ideas are taken from sidekiq and its Web UI
  • RQ : Client APIs are inspired by rq library.
  • Cobra : Asynq CLI is built with cobra

License

Asynq is released under the MIT license. See LICENSE.

Comments
  • [FEATURE REQUEST] Instrumentation/Tracing for asynq redis client

    [FEATURE REQUEST] Instrumentation/Tracing for asynq redis client

    Is your feature request related to a problem? Please describe.

    • I want to add tracing to the underlying redis client instance using dd-trace-go.
    • Is it possible to get access to the underlying redis instance so I can wrap the client with a function like the one below?
    import (
    	"github.com/go-redis/redis/v7"
    	ddRedis "gopkg.in/DataDog/dd-trace-go.v1/contrib/go-redis/redis.v7"
    )
    
    func TraceClient(rClient redis.UniversalClient) {
    	ddRedis.WrapClient(
    		rClient,
    		ddRedis.WithServiceName(config.ServiceName()+"-redis"),
    		ddRedis.WithAnalytics(config.Enabled()),
    	)
    }
    
    

    Describe alternatives you've considered

    • I am also looking if the way to go about this is creating a RedisConnOpt interface that returns a wrapped redis instance with datadog tracing. What do you think?

    https://pkg.go.dev/gopkg.in/DataDog/[email protected]/contrib/go-redis/redis.v7#WrapClient

  • [BUG] The task is always active

    [BUG] The task is always active

    Describe the bug When the task is active, the worker shutdown, then the task will always be active.

    To Reproduce Steps to reproduce the behavior:

    1. Set task sleep times.
    func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    	id, err := t.Payload.GetInt("user_id")
    	if err != nil {
    		return err
    	}
    	fmt.Printf("[%s] - [Start] Send Welcome Email to User %d\n", time.Now().String(), id)
    	time.Sleep(10*time.Second)
    	fmt.Printf("[%s] - [ End ] Send Welcome Email to User %d\n", time.Now().String(), id)
    	return nil
    }
    
    1. Start the worker.
    2. Start the scheduler.
    3. When the task is active, shutdown the worker.
    4. The task will always be active.

    Expected behavior I think the task should change the state to failed based on Timeout, MaxRetry, etc.

    Screenshots If applicable, add screenshots to help explain your problem.

    Environment (please complete the following information):

    • OS: Windows
    • Version of asynq package: v0.17.2
  • [QUESTION] i think set stop in run function is not a good idea

    [QUESTION] i think set stop in run function is not a good idea

    https://github.com/hibiken/asynq/blob/master/background.go#L253

    normally, packages has two methods, Run and Stop, so i can cleanup other resources before or after Stop(), another scene is that user will run multipe backgroud server in main(), like kafka consumer

  • [FEATURE REQUEST] Processed job's payload and result

    [FEATURE REQUEST] Processed job's payload and result

    Is your feature request related to a problem? Please describe. Asynq currently removes the successfully processed jobs from Redis. That means the information about the processed jobs also vanishes like;

    • When did the client pushed the job into the queue
    • When is the job completed.
    • Is a given job id processed successfully.

    Describe the solution you'd like It would be nice to keep processed jobs until a given TTL and return a basic struct that contains something like id, queued_at, completed_at. I think to achieve that, the first thing is to generate a unique id for each task and keep them like asynq:{default}:processed:{job_id}

    Describe alternatives you've considered Python RQ or Laravel Horizon might be good examples to take a look at.

  • [Question] Are there TTL set for tasks? How long does it stay in Redis?

    [Question] Are there TTL set for tasks? How long does it stay in Redis?

    Asking because i cannot find this in the wiki and others might find it useful.

    Specifically, How long do archived tasks stay in Redis so we can keep track or retry manually?

  • [FEATURE REQUEST] WebUI of server states (like sidekiq's UI)

    [FEATURE REQUEST] WebUI of server states (like sidekiq's UI)

    We could offer a cmd for serving a comprehensive Web UI for management and monitoring

    Describe the solution you'd like maybe we can add a sub cmd in asynq. eg. asynq webui.

    eg. https://raw.githubusercontent.com/mperham/sidekiq/master/examples/web-ui.png

  • [Proposal] Change Payload to bytes & Add Wrapper around Key-Value pair payload

    [Proposal] Change Payload to bytes & Add Wrapper around Key-Value pair payload

    Is your feature request related to a problem? Please describe. I'm always frustrated when I need to call GetX method for all the values in Payload to get all the data (also need to make sure that I don't make a typo in payload key string). It'd be nice if asynq supported en/decoding protobuf or gob message directly so that I can load that data to an object with one method call.

    Describe the solution you'd like initial proposal:

    type EmailTaskPayload struct {
        UserID int
    }
    
    // client side.
    func main() {
         c := asynq.NewClient(asynq.RedisClientOpt{Addr: ":6379"})
    
        t := asynq.NewTask("email:welcome", EmailTaskPayload{UserID: 42})
        err := c.Enqueue(t)
        if err != nil {
            log.Fatalf("could not enqueue task: %v", err)
        }
    }
    
    // background workers side
    func main() {
        bg := asynq.NewBackground(r, &asynq.Config{
            Concurrency: 10,
        })
    
        mux := asynq.NewMux()
        mux.HandleFunc("email:welcome", emailHandler)
    
        bg.Run(mux)
    }
    
    func emailHandler(ctx context.Context, t *asynq.Task) error {
        var p EmailTaskPayload
        err := t.ReadPayload(&p)
        if err != nil {
            return err
        }
        fmt.Printf("Send Email to User %d\n", p.UserID)
        return nil
    }
    

    Describe alternatives you've considered None for now.

    Additional context None for now.

  • [BUG] The high priority queue is process very slow compare to lower queue

    [BUG] The high priority queue is process very slow compare to lower queue

    Describe the bug The high priority queue is processed very slow compare to the lower queue

    To Reproduce Steps to reproduce the behavior (Code snippets if applicable):

    • Run a long-running task on the default queue. Many small but critical tasks will issue from here and added to the critical queue
    • The default queue is hold all the time of worker pools
    • The critical queue is stuck, and no task is process

    Expected behavior

    • The critical queue's task need to be selected to process first and clean out quickly

    Screenshots

    • Setup server with 3 queue Screen Shot 2021-08-20 at 09 38 28

    • Enqueue critical task like that Screen Shot 2021-08-20 at 09 41 06

    • The critical queue stuck Screen Shot 2021-08-20 at 09 37 49

    • The default queue is running on all other workers Screen Shot 2021-08-20 at 09 37 42

    Environment (please complete the following information):

    • OS: Ubuntu 18.4
    • Version of asynq package 0.18.3

    Screen Shot 2021-08-20 at 09 47 04

  • System Design Pattern | Messaging App

    System Design Pattern | Messaging App

    Hi @hibiken,

    First of all, thank you for writing lightest message queue in Go.

    I'm writing a messaging app. When the QR is read, I start to process messages. I will use asynq for processing history messages and normal message. I guess I have to create an asynq server for each number (instance)?

    Message Handler Case:

    • Number Instance -> PhoneNumber (Start asynq server when read qr)

    • QR Read -> Handle All History Messages(avarage 2-3 million message) -> Start worker queue (instance_id_history).

    • Normal Message (Schedule Message) -> Start worker queue (instance_id_normal)

    Should I always listen to 2 queues? Or I can stop listening to this queue when the HistoryMessages is finished, right?

    I'll be listening to 2000 queue when there are 1000 number(instances). Do you think a redis cluster(sentinel) is necessary to handle this traffic?

    What kind of path do you think we should follow with asynq?

    Sincerely :)

  • add UniqueKey option to extend Unique's behaviour

    add UniqueKey option to extend Unique's behaviour

    Instead of (queue, type, payload) use any custom key. This allows to enforce unique tasks not limited to same payload and/or type.

    Tested with --redis_cluster flag.

  • [FEATURE REQUEST] Distributed concurrency support per queue / taskType

    [FEATURE REQUEST] Distributed concurrency support per queue / taskType

    Is your feature request related to a problem? Please describe. N/A

    Describe the solution you'd like Is it possible to set concurrency individually for every queue like it is supported here.

    Describe alternatives you've considered Currently I tried to wire up the server in a way where I calculate total concurrency and set max priority and assuming that the combination will provide some sort of similar effects as in gocraft/work. Storing the maxConcurrency and priority info per queue and using it for initialisation of the server when starting the adapter (a simple wrapper over asynq).

    type Adapter struct {
    	taskConfigMap map[string]taskConfig
    	tcMutex       sync.RWMutex
    }
    
    type taskConfig struct {
    	priority       int
    	maxConcurrency int
    }
    
    func (a *Adapter) server() *asynq.Server {
    	return asynq.NewServer(a.redisConnOpt, asynq.Config{
    		Concurrency:    a.concurrency(),
    		Queues:         a.queuePriorityMap(),
    	})
    }
    
    func (a *Adapter) queuePriorityMap() map[string]int {
    	a.tcMutex.RLock()
    	defer a.tcMutex.RUnlock()
    
    	m := make(map[string]int)
    
    	for tn, cfg := range f.taskConfigMap {
    		if cfg.priority == 0 {
    			m[tn] = 1
    			continue
    		}
    
    		m[tn] = cfg.priority
    	}
    
    	return m
    }
    
    func (a *Adapter) concurrency() int {
    	a.tcMutex.RLock()
    	defer a.tcMutex.RUnlock()
    
    	concurrency := 0
    	for _, cfg := range f.taskConfigMap {
    		if cfg.maxConcurrency == 0 {
    			concurrency++
    			continue
    		}
    
    		concurrency += cfg.maxConcurrency
    	}
    
    	if rtcpu := runtime.NumCPU(); concurrency < rtcpu {
    		return rtcpu
    	}
    
    	return concurrency
    }
    

    Additional context Ref: Priority Sampler from gocraft/work

  • [FEATURE REQUEST] Query tasks from redis

    [FEATURE REQUEST] Query tasks from redis

    Is your feature request related to a problem? Please describe. Our app has many tenants and sometimes tasks fail for one reason or another, it would be helpful to allow tenants to see jobs queued for them (emails, etc) and show the status.

    Describe the solution you'd like Method(s) to query tasks from the Redis server.

    Describe alternatives you've considered I looked at just re-implementing the query logic from the cli myself, but this seems like it should be in the main library.

  • [FEATURE REQUEST] Do you consider to return the whole metadata struct?

    [FEATURE REQUEST] Do you consider to return the whole metadata struct?

    Hi. Thank you for the response for the previous issue.

    Do you consider returning the whole metadata struct? Something like this?

    // GetMetadata extracts metadata from a context, if any. func GetMetadata(ctx context.Context) (metadata taskMetadata, ok bool) { metadata, ok = ctx.Value(metadataCtxKey).(taskMetadata) return }

    Thank you

  • [BUG] markAsCompleteCmd&markAsCompleteUniqueCmd script typo

    [BUG] markAsCompleteCmd&markAsCompleteUniqueCmd script typo

    asynq/internal/rdb/rdb.go#381-383,#418-420

    if redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then
      redis.redis.error_reply("INTERNAL")   // this line
    end
    
  • Missing username in parsing redis uri

    Missing username in parsing redis uri

    Hi! I was trying to monitor asynq jobs with asynqmon and then I realise the parseRedisURI method doesn't parse username from HTTP standard authentication inside uri. https://github.com/hibiken/asynq/blob/0b8cfad7034159dadccf95e56b96305c0d2a4f7a/asynq.go#L462 I wanna know it's intentional or just missing or whatever. If it wasn't intentional I can add it myself.

  • [FEATURE REQUEST] Ability to hook lock check before dequeueing a task

    [FEATURE REQUEST] Ability to hook lock check before dequeueing a task

    Is your feature request related to a problem? Please describe. I have worked on a distributed semaphore in the past. However, one question that I have is that the lock count is checked after the task is dequeued from the queue, and then if the lock can't be acquired, it is again re-queued with a delay set by retryBackoff.

    This should be okay in normal circumstances, but in case of downtime on dependent upstreams, I am worried that this can put unnecessary pressure on Redis and de-queueing/re-queueing may also interfere with the order of tasks in the queue.

    Is there a way, the dequeue routine can take into account of this lock count before the dequeue even happens?

    Describe the solution you'd like Instead of checking for the lock after the de-queue, I would like to hook the lock checker before the de-queue begins.

    Describe alternatives you've considered I tried to see if I can use the Pause queue feature to help with this, however, I kinda feel like it is just a workaround, and not sure if it is okay to do so.

BlobStore is a highly reliable,highly available and ultra-large scale distributed storage system

BlobStore Overview Documents Build BlobStore Deploy BlobStore Manage BlobStore License Overview BlobStore is a highly reliable,highly available and ul

Oct 10, 2022
Go Open Source, Distributed, Simple and efficient Search Engine

Go Open Source, Distributed, Simple and efficient full text search engine.

Dec 31, 2022
Distributed-Services - Distributed Systems with Golang to consequently build a fully-fletched distributed service

Distributed-Services This project is essentially a result of my attempt to under

Jun 1, 2022
Fast, efficient, and scalable distributed map/reduce system, DAG execution, in memory or on disk, written in pure Go, runs standalone or distributedly.

Gleam Gleam is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize. Gleam is built

Jan 1, 2023
Sandglass is a distributed, horizontally scalable, persistent, time sorted message queue.
Sandglass is a distributed, horizontally scalable, persistent, time sorted message queue.

Sandglass is a distributed, horizontally scalable, persistent, time ordered message queue. It was developed to support asynchronous tasks and message

Dec 13, 2022
Distributed lock manager. Warning: very hard to use it properly. Not because it's broken, but because distributed systems are hard. If in doubt, do not use this.

What Dlock is a distributed lock manager [1]. It is designed after flock utility but for multiple machines. When client disconnects, all his locks are

Dec 24, 2019
A simple but powerful distributed lock

nlock A simple but powerful distributed lock Get Started Download go get github.com/inuggets/nlock Usage Redis lock import lock "github.com/inuggets/

Nov 14, 2021
High performance, distributed and low latency publish-subscribe platform.
High performance, distributed and low latency publish-subscribe platform.

Emitter: Distributed Publish-Subscribe Platform Emitter is a distributed, scalable and fault-tolerant publish-subscribe platform built with MQTT proto

Jan 2, 2023
Dapr is a portable, event-driven, runtime for building distributed applications across cloud and edge.
Dapr is a portable, event-driven, runtime for building distributed applications across cloud and edge.

Dapr is a portable, serverless, event-driven runtime that makes it easy for developers to build resilient, stateless and stateful microservices that run on the cloud and edge and embraces the diversity of languages and developer frameworks.

Jan 5, 2023
💡 A Distributed and High-Performance Monitoring System. The next generation of Open-Falcon
💡 A Distributed and High-Performance Monitoring System.  The next generation of Open-Falcon

夜莺简介 夜莺是一套分布式高可用的运维监控系统,最大的特点是混合云支持,既可以支持传统物理机虚拟机的场景,也可以支持K8S容器的场景。同时,夜莺也不只是监控,还有一部分CMDB的能力、自动化运维的能力,很多公司都基于夜莺开发自己公司的运维平台。开源的这部分功能模块也是商业版本的一部分,所以可靠性有保

Jan 5, 2023
Build share and run your distributed applications.
Build share and run your distributed applications.

sealer[ˈsiːlər] provides the way for distributed application package and delivery based on kubernetes.

Dec 30, 2022
short-url distributed and high-performance

durl 是一个分布式的高性能短链服务,逻辑简单,并提供了相关api接口,开发人员可以快速接入,也可以作为go初学者练手项目.

Jan 2, 2023
A distributed and coördination-free log management system
A distributed and coördination-free log management system

OK Log is archived I hoped to find the opportunity to continue developing OK Log after the spike of its creation. Unfortunately, despite effort, no su

Dec 26, 2022
JuiceFS is a distributed POSIX file system built on top of Redis and S3.
JuiceFS is a distributed POSIX file system built on top of Redis and S3.

JuiceFS is a high-performance POSIX file system released under GNU Affero General Public License v3.0. It is specially optimized for the cloud-native

Jan 4, 2023
Golimit is Uber ringpop based distributed and decentralized rate limiter
Golimit is Uber ringpop based distributed and decentralized rate limiter

Golimit A Distributed Rate limiter Golimit is Uber ringpop based distributed and decentralized rate limiter. It is horizontally scalable and is based

Dec 21, 2022
A distributed systems library for Kubernetes deployments built on top of spindle and Cloud Spanner.

hedge A library built on top of spindle and Cloud Spanner that provides rudimentary distributed computing facilities to Kubernetes deployments. Featur

Nov 9, 2022
A distributed locking library built on top of Cloud Spanner and TrueTime.

A distributed locking library built on top of Cloud Spanner and TrueTime.

Sep 13, 2022