ZenQ - A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer

ZenQ

A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer

Features

  • Much faster than native channels in both SPSC (single-producer-single-consumer) and MPSC (multi-producer-single-consumer) modes in terms of time/op
  • More resource efficient in terms of memory_allocation/op and num_allocations/op evident while benchmarking large batch size inputs
  • Handles the case where NUM_WRITER_GOROUTINES > NUM_CPU_CORES much better than native channels
  • Selection from multiple ZenQs just like golang's select{} ensuring fair selection and no starvation

Benchmarks to support the above claims here

Installation

You need Golang 1.18.x or above since this package uses generics

$ go get github.com/alphadose/[email protected]

Usage

  1. Simple Read/Write
package main

import (
	"fmt"

	"github.com/alphadose/zenq"
)

type payload struct {
	alpha int
	beta  string
}

func main() {
	zq := zenq.New[payload]()

	for j := 0; j < 5; j++ {
		go func() {
			for i := 0; i < 20; i++ {
				zq.Write(payload{
					alpha: i,
					beta:  fmt.Sprint(i),
				})
			}
		}()
	}

	for i := 0; i < 100; i++ {
        	var data payload = zq.Read()
		fmt.Printf("%+v\n", data)
	}
}
  1. Selection from multiple ZenQs just like golang's native select{}. The selection process is fair i.e no single ZenQ gets starved
package main

import (
	"fmt"

	"github.com/alphadose/zenq"
)

type custom1 struct {
	alpha int
	beta  string
}

type custom2 struct {
	gamma int
}

var (
	zq1 = zenq.New[int]()
	zq2 = zenq.New[string]()
	zq3 = zenq.New[custom1]()
	zq4 = zenq.New[*custom2]()
)

func main() {
	go looper(intProducer)
	go looper(stringProducer)
	go looper(custom1Producer)
	go looper(custom2Producer)

	for i := 0; i < 40; i++ {

		// Selection occurs here
		switch data := zenq.Select(zq1, zq2, zq3, zq4).(type) {
		case int:
			fmt.Printf("Received int %d\n", data)
		case string:
			fmt.Printf("Received string %s\n", data)
		case custom1:
			fmt.Printf("Received custom data type number 1 %#v\n", data)
		case *custom2:
			fmt.Printf("Received pointer %#v\n", data)
		}
	}
}

func intProducer(ctr int) { zq1.Write(ctr) }

func stringProducer(ctr int) { zq2.Write(fmt.Sprint(ctr * 10)) }

func custom1Producer(ctr int) { zq3.Write(custom1{alpha: ctr, beta: fmt.Sprint(ctr)}) }

func custom2Producer(ctr int) { zq4.Write(&custom2{gamma: 1 << ctr}) }

func looper(producer func(ctr int)) {
	for i := 0; i < 10; i++ {
		producer(i)
	}
}

Benchmarks

Benchmarking code available here

Note that if you run the benchmarks with --race flag then ZenQ will perform slower because the --race flag slows down the atomic operations in golang. Under normal circumstances, ZenQ will outperform golang native channels.

Hardware Specs

❯ neofetch
                    'c.          [email protected]
                 ,xNMM.          ----------------------
               .OMMMMo           OS: macOS 12.3 21E230 arm64
               OMMM0,            Host: MacBookAir10,1
     .;loddo:' loolloddol;.      Kernel: 21.4.0
   cKMMMMMMMMMMNWMMMMMMMMMM0:    Uptime: 6 hours, 41 mins
 .KMMMMMMMMMMMMMMMMMMMMMMMWd.    Packages: 86 (brew)
 XMMMMMMMMMMMMMMMMMMMMMMMX.      Shell: zsh 5.8
;MMMMMMMMMMMMMMMMMMMMMMMM:       Resolution: 1440x900
:MMMMMMMMMMMMMMMMMMMMMMMM:       DE: Aqua
.MMMMMMMMMMMMMMMMMMMMMMMMX.      WM: Rectangle
 kMMMMMMMMMMMMMMMMMMMMMMMMWd.    Terminal: iTerm2
 .XMMMMMMMMMMMMMMMMMMMMMMMMMMk   Terminal Font: FiraCodeNerdFontComplete-Medium 16 (normal)
  .XMMMMMMMMMMMMMMMMMMMMMMMMK.   CPU: Apple M1
    kMMMMMMMMMMMMMMMMMMMMMMd     GPU: Apple M1
     ;KMMMMMMMWXXWMMMMMMMk.      Memory: 1370MiB / 8192MiB
       .cooc,.    .,coo:.

Terminology

  • NUM_WRITERS -> The number of goroutines concurrently writing to ZenQ/Channel
  • INPUT_SIZE -> The number of input payloads to be passed through ZenQ/Channel from producers to consumer
Computed from benchstat of 30 benchmarks each via go test -benchmem -bench=. benchmarks/simple/*.go

name                                     time/op
_Chan_NumWriters1_InputSize600-8          23.2µs ± 1%
_ZenQ_NumWriters1_InputSize600-8          18.1µs ± 1%
_Chan_NumWriters3_InputSize60000-8        5.52ms ± 3%
_ZenQ_NumWriters3_InputSize60000-8        2.67ms ± 6%
_Chan_NumWriters8_InputSize6000000-8       680ms ± 1%
_ZenQ_NumWriters8_InputSize6000000-8       308ms ± 4%
_Chan_NumWriters100_InputSize6000000-8     1.56s ± 6%
_ZenQ_NumWriters100_InputSize6000000-8     519ms ± 2%
_Chan_NumWriters1000_InputSize7000000-8    1.98s ± 1%
_ZenQ_NumWriters1000_InputSize7000000-8    441ms ±11%
_Chan_Million_Blocking_Writers-8           10.4s ± 3%
_ZenQ_Million_Blocking_Writers-8           8.56s ±24%

name                                     alloc/op
_Chan_NumWriters1_InputSize600-8           0.00B
_ZenQ_NumWriters1_InputSize600-8           0.00B
_Chan_NumWriters3_InputSize60000-8          110B ±68%
_ZenQ_NumWriters3_InputSize60000-8        23.6B ±107%
_Chan_NumWriters8_InputSize6000000-8       585B ±234%
_ZenQ_NumWriters8_InputSize6000000-8       411B ±299%
_Chan_NumWriters100_InputSize6000000-8    44.7kB ±35%
_ZenQ_NumWriters100_InputSize6000000-8    19.7kB ±78%
_Chan_NumWriters1000_InputSize7000000-8    483kB ±10%
_ZenQ_NumWriters1000_InputSize7000000-8  1.13kB ±602%
_Chan_Million_Blocking_Writers-8           553MB ± 0%
_ZenQ_Million_Blocking_Writers-8          95.5MB ± 0%

name                                     allocs/op
_Chan_NumWriters1_InputSize600-8            0.00
_ZenQ_NumWriters1_InputSize600-8            0.00
_Chan_NumWriters3_InputSize60000-8          0.00
_ZenQ_NumWriters3_InputSize60000-8          0.00
_Chan_NumWriters8_InputSize6000000-8       2.20 ±218%
_ZenQ_NumWriters8_InputSize6000000-8       0.90 ±344%
_Chan_NumWriters100_InputSize6000000-8       163 ±18%
_ZenQ_NumWriters100_InputSize6000000-8      47.0 ±79%
_Chan_NumWriters1000_InputSize7000000-8    1.79k ± 6%
_ZenQ_NumWriters1000_InputSize7000000-8    2.00 ±550%
_Chan_Million_Blocking_Writers-8           2.00M ± 0%
_ZenQ_Million_Blocking_Writers-8            995k ± 0%

The above results show that ZenQ is more efficient than channels in all 3 metrics i.e time/op, mem_alloc/op and num_allocs/op for the following tested cases:-

  1. SPSC
  2. MPSC with NUM_WRITER_GOROUTINES < NUM_CPU_CORES
  3. MPSC with NUM_WRITER_GOROUTINES > NUM_CPU_CORES

Cherry on the Cake

In SPSC mode ZenQ is faster than channels by 98 seconds in case of input size 6 * 108

❯ go run benchmarks/simple/main.go

With Input Batch Size: 60 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 64.875µs
ZenQ Runner completed transfer in: 9µs
====================================================================

With Input Batch Size: 600 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 70.958µs
ZenQ Runner completed transfer in: 44.958µs
====================================================================

With Input Batch Size: 6000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 967.417µs
ZenQ Runner completed transfer in: 518.916µs
====================================================================

With Input Batch Size: 6000000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 1.191589458s
ZenQ Runner completed transfer in: 144.895583ms
====================================================================

With Input Batch Size: 600000000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 1m52.671809708s
ZenQ Runner completed transfer in: 14.356517042s
====================================================================

For a select{} based transfer experiment these are the results

❯ go run benchmarks/selector/main.go

Chan Select Runner completed transfer in: 2m42.313942333s
ZenQ Select Runner completed transfer in: 41.938121583s

Code available here

Owner
Anish Mukherjee
A wanderer in the world of code
Anish Mukherjee
Comments
  • fatal error: casgstatus: waiting for Gwaiting but is Grunnable

    fatal error: casgstatus: waiting for Gwaiting but is Grunnable

    A fatal error occurs during benchmarking

    fatal error: casgstatus: waiting for Gwaiting but is Grunnable

    Benchmark Code: https://github.com/lemon-mint/golang-q-benchmark go version go1.18.4 windows/amd64

    goos: windows
    goarch: amd64
    pkg: github.com/lemon-mint/golang-q-benchmark
    cpu: Intel(R) Core(TM) i7-4790K CPU @ 4.00GHz
    BenchmarkZenQ-8   	fatal error: casgstatus: waiting for Gwaiting but is Grunnable
    
    runtime stack:
    runtime.throw({0x930baf?, 0xc000084000?})
    	C:/Program Files/Go/src/runtime/panic.go:992 +0x76
    runtime.casgstatus(0xc0002844e0, 0x4, 0x1)
    	C:/Program Files/Go/src/runtime/proc.go:978 +0x385
    runtime.ready(0xc0002844e0, 0xc0002844e0?, 0x10?)
    	C:/Program Files/Go/src/runtime/proc.go:857 +0x71
    runtime.goready.func1()
    	C:/Program Files/Go/src/runtime/proc.go:372 +0x26
    runtime.systemstack()
    	C:/Program Files/Go/src/runtime/asm_amd64.s:469 +0x4e
    
    goroutine 87 [running]:
    runtime.systemstack_switch()
    	C:/Program Files/Go/src/runtime/asm_amd64.s:436 fp=0xc0003f1df8 sp=0xc0003f1df0 pc=0x8531e0
    runtime.goready(0x85df1e?, 0xc00040fce0?)
    	C:/Program Files/Go/src/runtime/proc.go:371 +0x47 fp=0xc0003f1e28 sp=0xc0003f1df8 pc=0x82a547
    github.com/alphadose/zenq/v2.safe_ready(0x8f1785?)
    	G:/go/pkg/mod/github.com/alphadose/zenq/[email protected]/lib_runtime_linkage.go:176 +0x54 fp=0xc0003f1e50 sp=0xc0003f1e28 pc=0x8efe54
    github.com/alphadose/zenq/v2.(*ThreadParker[...]).Ready(0xc000442860?)
    	G:/go/pkg/mod/github.com/alphadose/zenq/[email protected]/thread_parker.go:64 +0x95 fp=0xc0003f1eb8 sp=0xc0003f1e50 pc=0x8f1855
    github.com/alphadose/zenq/v2.(*ZenQ[...]).Read(0xc000072640)
    	G:/go/pkg/mod/github.com/alphadose/zenq/[email protected]/zenq.go:188 +0xfa fp=0xc0003f1f20 sp=0xc0003f1eb8 pc=0x8f1a3a
    github.com/lemon-mint/golang-q-benchmark.BenchmarkZenQ.func1(0xc00020c020)
    	g:/work/golang-q-benchmark/main_test.go:24 +0xb8 fp=0xc0003f1f80 sp=0xc0003f1f20 pc=0x8f0958
    testing.(*B).RunParallel.func1()
    	C:/Program Files/Go/src/testing/benchmark.go:788 +0xcb fp=0xc0003f1fe0 sp=0xc0003f1f80 pc=0x8ab0cb
    runtime.goexit()
    	C:/Program Files/Go/src/runtime/asm_amd64.s:1571 +0x1 fp=0xc0003f1fe8 sp=0xc0003f1fe0 pc=0x855541
    created by testing.(*B).RunParallel
    	C:/Program Files/Go/src/testing/benchmark.go:781 +0x105
    
    goroutine 1 [chan receive]:
    testing.(*B).doBench(0xc00013c240)
    	C:/Program Files/Go/src/testing/benchmark.go:285 +0x7f
    testing.(*benchContext).processBench(0xc0000040f0, 0x238?)
    	C:/Program Files/Go/src/testing/benchmark.go:589 +0x3aa
    testing.(*B).run(0xc00013c240?)
    	C:/Program Files/Go/src/testing/benchmark.go:276 +0x67
    testing.(*B).Run(0xc00013c000, {0x9280ec?, 0x2f82bb486508?}, 0x932a68)
    	C:/Program Files/Go/src/testing/benchmark.go:677 +0x453
    testing.runBenchmarks.func1(0xc00013c000?)
    	C:/Program Files/Go/src/testing/benchmark.go:550 +0x6e
    testing.(*B).runN(0xc00013c000, 0x1)
    	C:/Program Files/Go/src/testing/benchmark.go:193 +0x102
    testing.runBenchmarks({0x92f7db, 0x28}, 0xa69b60?, {0xa0f3e0, 0x4, 0x40?})
    	C:/Program Files/Go/src/testing/benchmark.go:559 +0x3f2
    testing.(*M).Run(0xc0000701e0)
    	C:/Program Files/Go/src/testing/testing.go:1726 +0x811
    main.main()
    	_testmain.go:53 +0x1aa
    

    image

  • Questions regarding struct fields order

    Questions regarding struct fields order

    Could you please explain:

    1. Why there is padding at the beggining of ZenQ structure?
    	// ZenQ is the CPU cache optimized ringbuffer implementation
    	ZenQ[T any] struct {
    		// The padding members 0 to 4 below are here to ensure each item is on a separate cache line.
    		// This prevents false sharing and hence improves performance.
    		_           cacheLinePadding
    		writerIndex atomic.Uint32
    		_           [constants.CacheLinePadSize - unsafe.Sizeof(atomic.Uint32{})]byte
    		readerIndex atomic.Uint32
    		_           [constants.CacheLinePadSize - unsafe.Sizeof(atomic.Uint32{})]byte
    		metaQ
    		_ [constants.CacheLinePadSize - unsafe.Sizeof(metaQ{})]byte
    		selectFactory[T]
    		_ [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory[T]{})]byte
    	}
    
    1. Ordering of the slot type, as I think that on 64 bit system there would be waste of 32 bit in case of item being smaller than 32 bit as 64bit pointer must be 64 aligned. Moving writeParker to the top of the structure would solve the issue.
    slot[T any] struct {
    		atomic.Uint32
    		writeParker *ThreadParker[T]
    		item        T
    	}
    
  • *zenq.ZenQ.Read() eat 100% CPU

    *zenq.ZenQ.Read() eat 100% CPU

    I have a goroutine read a value from zenq queue in a for loop, but the Read() function use the whole cpu.

    image image

    when I comment these Read() function and recompile the cpu usage drop to zero. image image

    I tried run profile this program , the top10 cum shows image

    Please tell me how to solve this problem, thank you!

  • Running with --race causes a crash although no race conditions were detected

    Running with --race causes a crash although no race conditions were detected

    Most likely due to handling the goroutine pointer (stored as unsafe.Pointer) without

    if race.Enabled {
    race.Acquire(ptr)
    }
    

    but the race internal package cannot be imported as a module

  • More Benchmarks for your feedback

    More Benchmarks for your feedback

    Hi, here is some feedback for you to get results on different architecture Ubuntu 22.04 AMD Ryzen Threadripper 3960X 128 GB RAM

    Simple

    ~/go/pkg/mod/github.com/alphadose/[email protected]$ go run benchmarks/simple/main.go
    With Input Batch Size: 60 and Num Concurrent Writers: 1
    
    Native Channel Runner completed transfer in: 11.462µs
    ZenQ Runner completed transfer in: 4.739µs
    ====================================================================
    
    With Input Batch Size: 600 and Num Concurrent Writers: 1
    
    Native Channel Runner completed transfer in: 32.522µs
    ZenQ Runner completed transfer in: 28.795µs
    ====================================================================
    
    With Input Batch Size: 6000 and Num Concurrent Writers: 1
    
    Native Channel Runner completed transfer in: 449.323µs
    ZenQ Runner completed transfer in: 254.583µs
    ====================================================================
    
    With Input Batch Size: 6000000 and Num Concurrent Writers: 1
    
    Native Channel Runner completed transfer in: 365.257669ms
    ZenQ Runner completed transfer in: 447.14544ms
    ====================================================================
    
    With Input Batch Size: 600000000 and Num Concurrent Writers: 1
    
    Native Channel Runner completed transfer in: 38.346896012s
    ZenQ Runner completed transfer in: 48.59434176s
    ====================================================================
    
    

    Selector

    Chan Select Runner completed transfer in: 4m32.220766401s
    ZenQ Select Runner did not complete transfer after 30min+ (tested both Ubuntu and Windows)
    
Machinery is an asynchronous task queue/job queue based on distributed message passing.
Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery Machinery is an asynchronous task queue/job queue based on distributed message passing. V2 Experiment First Steps Configuration Lock Broker

Jan 7, 2023
A Multi Consumer per Message Queue with persistence and Queue Stages.
 A Multi Consumer per Message Queue with persistence and Queue Stages.

CrimsonQ A Multi Consumer per Message Queue with persistence and Queue Stages. Under Active Development Crimson Queue allows you to have multiple cons

Jul 30, 2022
Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Jan 4, 2023
Golang Delay Queue

[gdq] Golang Delay Queue GDQ is a library that leverage db or cache to be setup as a delay queue. For current version, Only redis can adapt to this li

Jan 15, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Dec 30, 2022
RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue

RapidMQ RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue in the Go programming langu

Sep 27, 2022
redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue redisqueue provides a producer and consumer of a queue that uses Redis streams. Features A Producer struct to make enqueuing messages easy.

Dec 29, 2022
A single binary, simple, message queue.

MiniQueue A stupid simple, single binary message queue using HTTP/2. Most messaging workloads don't require enormous amounts of data, endless features

Nov 9, 2022
dque is a fast, embedded, durable queue for Go

dque - a fast embedded durable queue for Go dque is: persistent -- survives program restarts scalable -- not limited by your RAM, but by your disk spa

Jan 8, 2023
Queue with NATS Jetstream to remove all the erlangs from cloud

Saf in Persian means Queue. One of the problems, that we face on projects with queues is deploying RabbitMQ on the cloud which brings us many challenges for CPU load, etc. I want to see how NATS with Jetstream can work as the queue to replace RabbitMQ.

Dec 15, 2022
A fast durable queue for Go

pqueue - a fast durable queue for Go pqueue is thread-safety, serves environments where more durability is required (e.g., outages last longer than me

Oct 16, 2022
Redis as backend for Queue Package
Redis as backend for Queue Package

redis Redis as backend for Queue package Setup start the redis server redis-server start the redis cluster, see the config # server 01 mkdir server01

Oct 16, 2022
NSQ as backend for Queue Package
NSQ as backend for Queue Package

NSQ as backend for Queue Package

Jul 4, 2022
Kudruk helps you to create queue channels and manage them gracefully.

kudruk Channels are widely used as queues. kudruk (means queue in Turkish) helps you to easily create queue with channel and manage the data in the qu

Feb 21, 2022
Chanman helps you to create queue channels and manage them gracefully.

chanman Channels are widely used as queues. chanman (Channel Manager) helps you to easily create queue with channel and manage the data in the queue.

Oct 16, 2021
A simple persistent directory-backed FIFO queue.

pqueue pqueue is a simple persistent directory-backed FIFO queue. It provides the typical queue interface Enqueue and Dequeue and may store any byte s

Dec 12, 2022
Persistent queue in Go based on BBolt

Persistent queue Persistent queue based on bbolt DB. Supposed to be used as embeddable persistent queue to any Go application. Features: messages are

Jun 30, 2022
A lightweight, distributed and reliable message queue based on Redis

nmq A lightweight, distributed and reliable message queue based on Redis Get Started Download go get github.com/inuggets/nmq Usage import "github.com

Nov 22, 2021
KubeMQ is a Kubernetes native message queue broker

KubeMQ Community is the open-source version of KubeMQ, the Kubernetes native message broker. More about KubeMQ

Nov 20, 2021