Celery Distributed Task Queue in Go


Go Client/Server for Celery Distributed Task Queue

Having been involved in several projects migrating servers from Python to Go, I have realized Go can improve performance of existing python web applications. As Celery distributed tasks are often used in such web applications, this library allows you to both implement celery workers and submit celery tasks in Go.

You can also use this library as pure go distributed task queue.

Go Celery Worker in Action


Supported Brokers/Backends

Now supporting both Redis and AMQP!!

  • Redis (broker/backend)
  • AMQP (broker/backend) - does not allow concurrent use of channels

Celery Configuration

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

Starting from version 4.0, Celery uses message protocol version 2 as default value. GoCelery does not yet support message protocol version 2, so you must explicitly set CELERY_TASK_PROTOCOL to 1.

CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content


GoCelery GoDoc has good examples.
Also take a look at example directory for sample python code.

GoCelery Worker Example

Run Celery Worker implemented in Go

// create redis connection pool
redisPool := &redis.Pool{
  Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		return c, err

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
	&gocelery.RedisCeleryBackend{Pool: redisPool},
	5, // number of workers

// task
add := func(a, b int) int {
	return a + b

// register task
cli.Register("worker.add", add)

// start workers (non-blocking call)

// wait for client request
time.Sleep(10 * time.Second)

// stop workers gracefully (blocking call)

Python Client Example

Submit Task from Python Client

from celery import Celery

app = Celery('tasks',

def add(x, y):
    return x + y

if __name__ == '__main__':
    ar = add.apply_async((5456, 2878), serializer='json')

Python Worker Example

Run Celery Worker implemented in Python

from celery import Celery

app = Celery('tasks',

def add(x, y):
    return x + y
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

GoCelery Client Example

Submit Task from Go Client

// create redis connection pool
redisPool := &redis.Pool{
  Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		return c, err

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
	&gocelery.RedisCeleryBackend{Pool: redisPool},

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.Delay(taskName, argA, argB)
if err != nil {

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))

Sample Celery Task Message

Celery Message Protocol Version 1

    "expires": null,
    "utc": true,
    "args": [5456, 2878],
    "chord": null,
    "callbacks": null,
    "errbacks": null,
    "taskset": null,
    "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
    "retries": 0,
    "task": "worker.add",
    "timelimit": [null, null],
    "eta": null,
    "kwargs": {}


Please let us know if you use gocelery in your project!


You are more than welcome to make any contributions. Please create Pull Request for any changes.


The gocelery is offered under MIT license.

  • gocelery client crashes python celery workers (python 3.6, celery 4.2.1, redis broker/backend)

    gocelery client crashes python celery workers (python 3.6, celery 4.2.1, redis broker/backend)

    I'm trying this Go client:

    package main
    import (
    func main() {
    	// initialize celery client
    	url := fmt.Sprintf("amqp://platzi:%s@localhost/platzi",os.Getenv("RMQ_PASS"))
    	cli, _ := gocelery.NewCeleryClient(
    	// Prepara los comandos
    	taskName := "RegistraComandos.registra"
    	comandos := [3]string{"uno","dos","tres"}
    	i:= 0
    	for i < 1 {
    		_, err := cli.Delay(taskName, comandos[rand.Intn(3)])
    		if err != nil {

    But it crashes the Python worker with this error:

    [2019-03-06 10:23:31,621: CRITICAL/MainProcess] Unrecoverable error: AttributeError("'NoneType' object has no attribute 'tzinfo'",)
    Traceback (most recent call last):
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
        return self.obj.start()
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 317, in start
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 593, in start
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/loops.py", line 88, in asynloop
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/kombu/common.py", line 417, in update
        return self.set(self.value)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/kombu/common.py", line 410, in set
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/consumer/tasks.py", line 47, in set_prefetch_count
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/kombu/messaging.py", line 558, in qos
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/channel.py", line 1812, in basic_qos
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/abstract_channel.py", line 59, in send_method
        return self.wait(wait, returns_tuple=returns_tuple)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/abstract_channel.py", line 79, in wait
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/connection.py", line 491, in drain_events
        while not self.blocking_read(timeout):
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/connection.py", line 497, in blocking_read
        return self.on_inbound_frame(frame)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/method_framing.py", line 77, in on_frame
        callback(channel, msg.frame_method, msg.frame_args, msg)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/connection.py", line 501, in on_inbound_method
        method_sig, payload, content,
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/abstract_channel.py", line 128, in dispatch_method
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/channel.py", line 1597, in _on_basic_deliver
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/kombu/messaging.py", line 624, in _receive_callback
        return on_m(message) if on_m else self.receive(decoded, message)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 567, in on_task_received
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/strategy.py", line 153, in task_message_handler
        body=body, headers=headers, decoded=decoded, utc=utc,
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/request.py", line 140, in __init__
        self.eta = maybe_make_aware(eta, self.tzlocal)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/utils/time.py", line 327, in maybe_make_aware
        if is_naive(dt):
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/utils/time.py", line 282, in is_naive
        return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None
    AttributeError: 'NoneType' object has no attribute 'tzinfo'

    That particular worker works like a charm with Python and also node-celery clients, so far.

  • gocelery worker fails to decode message from python client (python 3.6, celery 4.2.1, redis backend)

    gocelery worker fails to decode message from python client (python 3.6, celery 4.2.1, redis backend)

    Since I couldn't get the RabbitMQ version to work (see #26 ) I have switched to Redis. Here's the program:

    import os
    from celery import Celery
    from dotenv import load_dotenv
    app = Celery('tasks',
    if __name__ == '__main__':
        ordenes =['tres', 'uno','uno','uno','dos','dos', 'tres']
        for i in ordenes:
            print( "Envía ", i )
            enviado = app.send_task("tasks.registra", [i], serializer='json')

    And the reception, which is pretty much the same, but for redis:

    package main
    // Estructura de https://github.com/gocelery/gocelery
    import (
    // Celery Task
    var comandos = make(map[string]int)
    func registra(comando string) {
    	fmt.Println( comando )
    	fmt.Println( comandos )
    func main() {
    	// Crea el broker y el backend
    	celeryBroker := gocelery.NewRedisCeleryBroker("redis://")
    	celeryBackend := gocelery.NewRedisCeleryBackend("redis://")
    	// Usa dos workers
    	celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 1)
    	// Registra la función
    	celeryClient.Register("tasks.registra", registra)
    	// Arranca el worker
    	fmt.Println( "Arranca el worker" )
    	go celeryClient.StartWorker()
    	// Espera y para 
    	time.Sleep(120 * time.Second)

    Messages are received allright, but it errors:

    Arranca el worker
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message

    The error is the same with or without the serializer='json' argument. Looking at the source of the error, it seems to be trying to decode some Base64 stuff. But I really have no idea. Any help?

  • Why does gocelery use BLPOP command in redis broker?

    Why does gocelery use BLPOP command in redis broker?

    I have the question about redis_broker.go https://github.com/gocelery/gocelery/blob/17631e11026a63f12751d8b4b0e0e2fe521e745b/redis_broker.go#L49-L83 We can find that it uses LPUSH when it sends and BLPOP when it gets.
    I assume it would be LIFO, not FIFO.
    I want to ask why it doesn't uses BRPOP which kombu uses.


  • Task should be an interface, not an anonymous function

    Task should be an interface, not an anonymous function

    type Task interface {
        Execute() (result interface{}, err error)

    This way you can add dependencies without accessing global state.

    For example

    type AddTask struct {
        A `json:"a"`
        B `json:"b"`
        Cache CachingService
  • Fix infinite loop bug in deliveryAck

    Fix infinite loop bug in deliveryAck

    Fix an infinite loop bug in https://github.com/gocelery/gocelery/issues/117 If RabbitMQ is down then deliveryAck will retry indefinitely and consume a lot of CPU resources.

  • running concurrent tests with multiple CeleryClient and multiple tasks fails

    running concurrent tests with multiple CeleryClient and multiple tasks fails

    As apparent from below log, TestWorkerClientArgs test is attempting to get registeredTasks map 0xc4200e48d0 instead of 0xc4200e5590 which is from TestWorkerClientKwargs test.

    • both tests have new CeleryClient of its own: 0xc4200e4900, 0xc4200e55c0
    • both tests pass if run individually.
    === RUN   TestWorkerClientKwargs
    2016/09/30 20:49:41 kwarg client 0xc4200e4900
    2016/09/30 20:49:41 registering task on 0xc4200e48d0 multiply_kwargs
    2016/09/30 20:49:41 registered tasks: 0xc4200e48d0 map[multiply_kwargs:0xc4200fabc0]
    2016/09/30 20:49:41 getting tasks: 0xc4200e48d0 map[multiply_kwargs:0xc4200fabc0]
    2016/09/30 20:49:41 kwarg client 0xc4200a1b00
    2016/09/30 20:49:41 registering task on 0xc4200a1ad0 multiply_kwargs
    2016/09/30 20:49:41 registered tasks: 0xc4200a1ad0 map[multiply_kwargs:0xc4200fadc0]
    2016/09/30 20:49:41 getting tasks: 0xc4200a1ad0 map[multiply_kwargs:0xc4200fadc0]
    --- PASS: TestWorkerClientKwargs (0.02s)
    === RUN   TestWorkerClientArgs
    2016/09/30 20:49:41 arg client 0xc4200e55c0
    2016/09/30 20:49:41 registering task on 0xc4200e5590 multiply
    2016/09/30 20:49:41 registered tasks: 0xc4200e5590 map[multiply:0x487510]
    2016/09/30 20:49:41 getting tasks: 0xc4200e48d0 map[multiply_kwargs:0xc4200fabc0]
    2016/09/30 20:49:41 task multiply_args is not registered
    2016/09/30 20:49:41 async result got
    --- FAIL: TestWorkerClientArgs (5.01s)
        gocelery_test.go:134: failed to get result: 5s timeout getting result for cdd00c82-e593-45a0-955a-4ac10f2b4b09
  • compilation fails on message.go: not enough arguments in call to uuid.

    compilation fails on message.go: not enough arguments in call to uuid.

    I got following error when trying to compile the go project referencing gocelery:

    github.com/gocelery/gocelery/message.go:XX:XX: not enough arguments in call to uuid.Must have (uuid.UUID) want (uuid.UUID, error)

    if you look into the latest code for https://github.com/satori/go.uuid uuid.Must function has extra argument "error"

    func Must(u UUID, err error) UUID {
    	if err != nil {
    	return u
  • Fix bug.  support for task execution without arguments

    Fix bug. support for task execution without arguments

    Now. can not execution no args tasks.

    def test_task():
        print("Hello, World!")
    func main() {
            cli, _ := gocelery.NewCeleryClient(
            asyncResult, err := cli.Delay(test_task)
            if err != nil {

    go run main.go

    [2019-06-26 17:37:44,256: ERROR/ForkPoolWorker-2] Task test_task[cd94a9db-cdec-43f4-be87-7474df4cca21] raised unexpected: TypeError('test_task object argument after * must be an iterable, not NoneType')
    Traceback (most recent call last):
      File "/$HOME.local/share/virtualenvs/test-python-repo--NH_gJ4S/lib/python3.7/site-packages/celery/app/trace.py", line 385, in trace_task
        R = retval = fun(*args, **kwargs)
    TypeError: test_task object argument after * must be an iterable, not NoneType

    https://github.com/gocelery/gocelery/blob/master/message.go#L171 Add below code. fmt.Printf(stirng(jsonData))

    Debug log. {"id":"81af5e46-b6dc-46c3-9057-d21202d9c2aa","task":"vulns.tasks.test_task","args":null,"kwargs":{},"retries":0,"eta":null}

    args is null.

  • Monitoring Tool/UI for Gocelery (FlowerUI)

    Monitoring Tool/UI for Gocelery (FlowerUI)

    Hi @sickyoon would you know if there is any Monitoring UI for gocelery just like how Flower UI can monitor celery? Or do you know if flower UI can be configured to server a gocelery application?

  • Making taskID in AsyncResult a public field.

    Making taskID in AsyncResult a public field.


    Hi @sickyoon, great project you have done! I was wondering what the reason was for setting the fields in AsyncResult as private fields especially taskID. I am hoping to retrieve the TaskID immediately after I submit the task to the queue even before the queue's been worked on by the worker. Would it be possible to change it to a public field? eg.

    type AsyncResult struct {
    	TaskID  string
    	backend CeleryBackend
    	result  *ResultMessage
  • Each worker burns one CPU core even if he is only waiting for new messages

    Each worker burns one CPU core even if he is only waiting for new messages

    The StartWorker() function runs into an loop and on each iteration it wants to get new messages from the broker.

    As far as i understood go channels normaly you want explizit the blocking behavior of channel reads and write so that the underling code ony gets executet if there is something on the channel.

    In the way you used the select satement brakes this pattern due to the default: case. Everytime when there is nothing on the channel your code does not stop. Instead it is looping over and over again and burns all the CPU for waiting on tasks.

    To be honest im only a GO beginner so I'm not able to fix your code at all. But I would appreciate it when you think about that design pattern.

    func (w *CeleryWorker) StartWorker() {
        w.stopChannel = make(chan struct{}, 1)
        for i := 0; i < w.numWorkers; i++ {
            go func(workerID int) {
                defer w.workWG.Done()
                for {
                    select {
                    case <-w.stopChannel:
                        // process messages
                        taskMessage, err := w.broker.GetTaskMessage()
                        if err != nil || taskMessage == nil {
                        //log.Printf("WORKER %d task message received: %v\n", workerID, taskMessage)
                        // run task
                        resultMsg, err := w.RunTask(taskMessage)
                        if err != nil {
                        defer releaseResultMessage(resultMsg)
                        // push result to backend
                        err = w.backend.SetResult(taskMessage.ID, resultMsg)
                        if err != nil {


    func (b *AMQPCeleryBroker) GetTaskMessage() (*TaskMessage, error) {
        var taskMessage TaskMessage
        select {
        case delivery := <-b.consumingChannel:
            if err := json.Unmarshal(delivery.Body, &taskMessage); err != nil {
                return nil, err
            return &taskMessage, nil
            return nil, fmt.Errorf("consumingChannel is empty")

    Kind regards Reinhard Luediger

  • Sharing a fork

    Sharing a fork

    Hi! I've made a few changes in a fork https://github.com/marselester/gocelery, e.g., panic handling, spawning workers on demand, CELERY_IGNORE_RESULT capability, fixed a memory leak.

    Then I decided to rearrange the same ideas resulting in Gopher Celery https://github.com/marselester/gopher-celery. Maybe somebody else will find it useful.

  • Interest in a PostgreSQL result backend

    Interest in a PostgreSQL result backend


    I see that there is Celery/Redis backend support out of the box – but unfortunately I've been using PostgreSQL as a result backend for my own project until I can migrate to Redis.

    I plan to write a lightweight Postgres backend wrapper based off of the Celery one – https://github.com/celery/celery/blob/master/celery/backends/database/models.py . Since the interface provided by gocelery is simple (GetResult/SetResult), I imagine it would be pretty straightforward.

    Do you / folks think there would be interest if I were to open a PR with it here, even as a proof of concept?

  • connection.close() does not work.

    connection.close() does not work.

    Hi, I'm using gocelery to send celery task to broker (rabbitmq), and python as worker. However sometimes I received this error: "channel/connection is not open" that makes message cannot send to broker. The walkthrough solution is that I will check that connection is usable before send to to broker, if not then I will close old connection and make new connection.

    func (b *AMQPCeleryBroker) SendCeleryMessage(message *gocelery.CeleryMessage) error {
    	err := b.Send(message)
    	if b.ShouldReconnect(err) {
    		log.Printf("Error %#v, retrying", err)
    		return b.Send(message)
    	return err
    func (b *AMQPCeleryBroker) ShouldReconnect(err error) bool {
    	return err == amqp.ErrClosed || err == amqp.ErrChannelMax
    func (b *AMQPCeleryBroker) Reconnect() {
    	defer func() {
    		if err := recover(); err != nil {
    			fmt.Println(fmt.Sprintf("[ERROR] %v", err))
    	_ = b.Close()
    	conn, channel := NewAMQPConnection(b.host)
    	b.Channel = channel
    	b.connection = conn

    However the method b.Close() seems not working. Number of connections continuously increase, and cause connection leak. Please help. Thanks.

  • Keep getting this error: amqp_backend: failed to acknowledge result message : delivery not initialized

    Keep getting this error: amqp_backend: failed to acknowledge result message : delivery not initialized

    I've used gocelery for months and it has been working well but just yesterday when I deploy my app to a new GCP project I keep getting this error

    amqp_backend: failed to acknowledge result message: delivery not initialized

    but if I change to use redis as message broker it works like a charm Is there anything I need to modify? Probably pkg dependencies?

    RabbitMQ Version: rabbitmq:3.8.19-alpine

