redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue

Version GoDoc Build Status Coverage Status Go Report Card License

redisqueue provides a producer and consumer of a queue that uses Redis streams.

Features

  • A Producer struct to make enqueuing messages easy.
  • A Consumer struct to make processing messages concurrenly.
  • Claiming and acknowledging messages if there's no error, so that if a consumer dies while processing, the message it was working on isn't lost. This guarantees at least once delivery.
  • A "visibility timeout" so that if a message isn't processed in a designated time frame, it will be be processed by another consumer.
  • A max length on the stream so that it doesn't store the messages indefinitely and run out of memory.
  • Graceful handling of Unix signals (SIGINT and SIGTERM) to let in-flight messages complete.
  • A channel that will surface any errors so you can handle them centrally.
  • Graceful handling of panics to avoid crashing the whole process.
  • A concurrency setting to control how many goroutines are spawned to process messages.
  • A batch size setting to limit the total messages in flight.
  • Support for multiple streams.

Installation

redisqueue requires a Go version with Modules support and uses import versioning. So please make sure to initialize a Go module before installing redisqueue:

go mod init github.com/my/repo
go get github.com/robinjoseph08/redisqueue/v2

Import:

import "github.com/robinjoseph08/redisqueue/v2"

Example

Here's an example of a producer that inserts 1000 messages into a queue:

package main

import (
	"fmt"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
		StreamMaxLength:      10000,
		ApproximateMaxLength: true,
	})
	if err != nil {
		panic(err)
	}

	for i := 0; i < 1000; i++ {
		err := p.Enqueue(&redisqueue.Message{
			Stream: "redisqueue:test",
			Values: map[string]interface{}{
				"index": i,
			},
		})
		if err != nil {
			panic(err)
		}

		if i%100 == 0 {
			fmt.Printf("enqueued %d\n", i)
		}
	}
}

And here's an example of a consumer that reads the messages off of that queue:

package main

import (
	"fmt"
	"time"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
		VisibilityTimeout: 60 * time.Second,
		BlockingTimeout:   5 * time.Second,
		ReclaimInterval:   1 * time.Second,
		BufferSize:        100,
		Concurrency:       10,
	})
	if err != nil {
		panic(err)
	}

	c.Register("redisqueue:test", process)

	go func() {
		for err := range c.Errors {
			// handle errors accordingly
			fmt.Printf("err: %+v\n", err)
		}
	}()

	fmt.Println("starting")
	c.Run()
	fmt.Println("stopped")
}

func process(msg *redisqueue.Message) error {
	fmt.Printf("processing message: %v\n", msg.Values["index"])
	return nil
}
Comments
  • Updated Go Redis to 7th version, error and testify update.

    Updated Go Redis to 7th version, error and testify update.

    Updated go-redis to version 7 (current stable version). Also updated errors and testify. This update was needed because if project uses Redis client itself (v7) then it will be v6 (of current Redisqueue) and v7 simultaneously.

  • Accept a

    Accept a "universal" Redis client in Consumer/Producer Options

    This allows a Redis Cluster client to be used with the package.


    Some of our on premises users use clustered Redis; to my surprise, using a normal Redis client with a cluster is a sharp edge.

    This PR allows the consumer to pass in either a vanilla Redis client or a "universal" client.

    Unrelated: if this requires a module version bump (not sure? existing callers should work?), would you consider bumping to go-redis/v8 at the same time?

  • Add ability to inject *redis.Client into producer & consumer

    Add ability to inject *redis.Client into producer & consumer

    This adds a new field to both the ProducerOptions and ConsumerOptions structs, RedisClient, that allows you to use dependency injection for the Redis client instead of using options to create one internally.

    This allows consumers to use the same Redis connection pool for their queue, and any other operations they want to take elsewhere in their program. One use case might be wanting a heartbeat operation that restarts the program if things seem unhealthy.

    Internally, this splits the function that creates and validates the Redis client as being usable in to two functions. One to create the client, and another to exercise the client to make sure it works and that Redis offers the features we need.

    Fixes #8

  • Add support for registering consumer at specific Message ID

    Add support for registering consumer at specific Message ID

    Some consumers may not want to process old messages in the stream, and to instead only start consuming from the latest. This adds a RegisterWithID function that allows you to specify any message ID. Per the Redis documention, 0 means the oldest message in the stream while $ means the latest.

    Fixes #6

  • Upgrade to go-redis/v8

    Upgrade to go-redis/v8

    There have been a number of fixes to go-redis for Redis Cluster compatibility. Notably, [1], which impacts the node go-redis chooses to send a stream command to.

    We're continuing to see MOVED errors emitted from redisqueue, despite not overriding MaxRedirects when constructing our redis client. This suggests to me that the choice of initial node may be to blame.

    This commit updates redisqueue to use go-redis/v8. In order to avoid signature changes, we use context.TODO() liberally.

    [1] https://github.com/go-redis/redis/issues/1501

  • feat(redis): update to go-redis/v7 and switch to redisqueue/v2

    feat(redis): update to go-redis/v7 and switch to redisqueue/v2

    what

    carry of #5

    • update to github.com/robinjoseph08/redisqueue/v2
    • upgrade go-redis to v7
    • update testify and errors dependencies
    • make note in README about go modules
  • chore(makefile): update Makefile

    chore(makefile): update Makefile

    what

    • update golangci-lint to latest version
    • disable scopelint for tests because it was failing with:
      consumer_test.go:120:33  scopelint  Using the variable on range scope `tt` in function literal
      

      which isnt a problem in our case since the t.Run functions block the range over tests

    • split out installing golangci-lint into its own target
    • fix setup command to install tools in the ./bin directory, to work with modules, and to not rely on realpath
  • Add ability to inject *redis.Client instead of *redis.Options

    Add ability to inject *redis.Client instead of *redis.Options

    I'd like to continually health check the Redis client by doing a simple SET / GET transaction every n seconds, and if it fails trigger a process restart.

    Unfortunately, I have no way to use the same Redis pool as the publisher/consumer which puts extra load on the Redis cluster and doesn't fully test the ability to use Redis.

    I was wondering if there'd be a strong objection to adding a *redis.Client to both ConsumerOptions and ProducerOptions. I'm thinking of retaining the *redis.Options for now, and to only use that if *redis.Client is nil.

  • Support creating groups from end of stream, not just beginning

    Support creating groups from end of stream, not just beginning

    Because the message ID we're specifying when creating the group is always 0, consumers will need to process all messages in the stream before getting to the latest message. For a system I'm building, I'm looking to have the semantics be more lossy when a new consumer comes to life.

    Would it be possible to extend the API to include a way to specify whether we want all messages, or only new messages.

    Here is the relevant line of code:

    https://github.com/robinjoseph08/redisqueue/blob/d95817e5c69fb071b4cdb2232235854f872c7926/consumer.go#L156

  • fix(reclaim): increment ID when looping

    fix(reclaim): increment ID when looping

    because we weren't incrementing the ID before setting it to start, and because XPENDING lists messages between start and end inclusively, it never got to the point where the length of the response was 0. this resulted in an infinite loop which ate cpu and prevented graceful termination. by incrementing the id, we can now get an empty response to signal when to stop paging through.

  • Add ability to set up stream messages TTY with XTRIM MINID

    Add ability to set up stream messages TTY with XTRIM MINID

    This issue is some kind of a feature request. Redisqueue has a great ability to limit a stream size by evaluating StreamMaxLength field of the ProducerOptions:

    // StreamMaxLength sets the MAXLEN option when calling XADD. This creates a // capped stream to prevent the stream from taking up memory indefinitely. // It's important to note though that this isn't the maximum number of // completed messages, but the maximum number of total messages. This // means that if all consumers are down, but producers are still enqueuing, // and the maximum is reached, unprocessed message will start to be dropped. // So ideally, you'll set this number to be as high as you can makee it. // More info here: https://redis.io/commands/xadd#capped-streams. StreamMaxLength int64

    It works great but does not give proper flexibility in the problem of limiting streams by time which leads to hard stream size limits which could lead to huge stream size if it has big messages and big size in order to offer a huge capacity for lots of consumers or load.

    Redis Streams message can not have TTY with expire command https://redis.io/commands/expire but there is a tread with another feature request for the Redis. https://github.com/redis/redis/issues/4450#issuecomment-756857585 So we could emulate a TTY for a Redis Streams messages by this logic:

    • we have a method in this library that get a stream name and time string (ex. "-7 days" to store only messages for the last 7 days in the stream) that define the TTY point for stream messages and callback for error
    • this function has a Redis client inside
    • it makes a call to Redis with XINFO GROUPS command which receive the last delivered ids for each consumer group
    • we compare values from a previous step with TTY time object and check if the stream has undelivered messages
    • if all the messages were delivered to consumers we could run XTRIM MINID command to remove old messages (it will work only for Redis 6.2 https://redis.io/commands/xtrim#history)
    • if the stream has consumers which had not received messages that tend to be deleted, we can run a special error callback function with empty interfaces inside which gives the ability to developer to handle somehow this situation (logging, alerting, etc...)

    Soon, we will need to develop such functionality to our services but I think that it would be great to not reinvent the wheel but have this code inside the Redisqueue.

  • Remove version check from Redis preflight

    Remove version check from Redis preflight

    This commit removes the INFO version check from the Redis connection preflighting.

    In our automated test suite we make extensive use of miniredis rather than depending on a full Redis server. Miniredis is quite capable, but does not implement the INFO command.

    Redis 5 was released in 2018 and was a significant upgrade to Redis. I haven't seen Redis < 5 in production since shortly after 5's release. Therefore I believe this to be a reasonable loosening of the constraints on redisqueue.


    @robinjoseph08 I understand if this isn't a change you want to pull in, just thought I'd see if we could continue tracking upstream :).

  • Upgrade to go-redis/v8

    Upgrade to go-redis/v8

    There have been a number of fixes to go-redis for Redis Cluster compatibility. Notably, [1], which impacts the node go-redis chooses to send a stream command to.

    We're continuing to see MOVED errors emitted from redisqueue, despite not overriding MaxRedirects when constructing our redis client. This suggests to me that the choice of initial node may be to blame.

    This commit updates redisqueue to use go-redis/v8. In order to avoid signature changes, we use context.TODO() liberally.

    [1] https://github.com/go-redis/redis/issues/1501

Producer x Consumer example using Kafka library and Go.

Go - Kafka - Example Apache Kafka Commands First of all, run the docker-compose docker-compose up, than run docker exec -it kafka_kafka_1 bash Topics

Dec 8, 2021
A Multi Consumer per Message Queue with persistence and Queue Stages.
 A Multi Consumer per Message Queue with persistence and Queue Stages.

CrimsonQ A Multi Consumer per Message Queue with persistence and Queue Stages. Under Active Development Crimson Queue allows you to have multiple cons

Jul 30, 2022
Simple-messaging - Brokerless messaging. Pub/Sub. Producer/Consumer. Pure Go. No C.

Simple Messaging Simple messaging for pub/sub and producer/consumer. Pure Go! Usage Request-Response Producer: consumerAddr, err := net.ResolveTCPAddr

Jan 20, 2022
Go client to reliable queues based on Redis Cluster Streams

Ami Go client to reliable queues based on Redis Cluster Streams. Consume/produce performance Performance is dependent from: Redis Cluster nodes count;

Dec 12, 2022
go - Kafka protobuf message producer
go - Kafka protobuf message producer

Kafka prtobuf message producer This project is used to produce protobuf messages message to given kafka topic UseCase Integration testing Debugging Mo

Jan 13, 2022
Machinery is an asynchronous task queue/job queue based on distributed message passing.
Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery Machinery is an asynchronous task queue/job queue based on distributed message passing. V2 Experiment First Steps Configuration Lock Broker

Jan 7, 2023
Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Jan 4, 2023
A lightweight, distributed and reliable message queue based on Redis

nmq A lightweight, distributed and reliable message queue based on Redis Get Started Download go get github.com/inuggets/nmq Usage import "github.com

Nov 22, 2021
Experiments using Go 1.18beta1's Generic typings and the Segmentio kafka-go consumer client

Experiments using Go 1.18beta1's Generic typings and the Segmentio kafka-go consumer client

Jan 28, 2022
Redis as backend for Queue Package
Redis as backend for Queue Package

redis Redis as backend for Queue package Setup start the redis server redis-server start the redis cluster, see the config # server 01 mkdir server01

Oct 16, 2022
Gomaxscale - CDC consumer for MaxScale

gomaxscale Go library that allows consuming from MaxScale CDC listener. Useful f

Feb 22, 2022
Apr 12, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Dec 30, 2022
RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue

RapidMQ RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue in the Go programming langu

Sep 27, 2022
Kudruk helps you to create queue channels and manage them gracefully.

kudruk Channels are widely used as queues. kudruk (means queue in Turkish) helps you to easily create queue with channel and manage the data in the qu

Feb 21, 2022
Chanman helps you to create queue channels and manage them gracefully.

chanman Channels are widely used as queues. chanman (Channel Manager) helps you to easily create queue with channel and manage the data in the queue.

Oct 16, 2021
A basic event queue (and publisher/subscriber) in go

queue A basic event queue (and publisher/subscriber) in go. Installation go get github.com/jimjibone/queue Queue Usage Queue is a channel-based FIFO q

Dec 17, 2021
implentacion queue in kafka, rabbit and sqs

Big Queue on Go This is a simple big queue and implementation in kafka, rabbit and aws sqs. Publish in a topic in kafka: Use NewPublisher method to cr

Dec 29, 2021
A single binary, simple, message queue.

MiniQueue A stupid simple, single binary message queue using HTTP/2. Most messaging workloads don't require enormous amounts of data, endless features

Nov 9, 2022