Go concurrent-safe, goroutine-safe, thread-safe queue

go.dev reference godoc reference version Build Status Go Report Card codecov CodeFactor Mentioned in Awesome Go

goconcurrentqueue - Concurrent safe queues

The package goconcurrentqueue offers a public interface Queue with methods for a queue. It comes with multiple Queue's concurrent-safe implementations, meaning they could be used concurrently by multiple goroutines without adding race conditions.

Topics

Installation

Execute

go get github.com/enriquebris/goconcurrentqueue

This package is compatible with the following golang versions:

  • 1.7.x
  • 1.8.x
  • 1.9.x
  • 1.10.x
  • 1.11.x
  • 1.12.x
  • 1.13.x
  • 1.14.x

Documentation

Visit goconcurrentqueue at go.dev

Classes diagram

goconcurrentqueue class diagram

Queues

FIFO

FIFO: concurrent-safe auto expandable queue.

pros

  • It is possible to enqueue as many items as needed.
  • Extra methods to get and remove enqueued items:
    • Get: returns an element's value and keeps the element at the queue
    • Remove: removes an element (using a given position) from the queue

cons

  • It is slightly slower than FixedFIFO.

FixedFIFO

FixedFIFO: concurrent-safe fixed capacity queue.

pros

  • FixedFIFO is, at least, 2x faster than FIFO in concurrent scenarios (multiple GR accessing the queue simultaneously).

cons

  • It has a fixed capacity meaning that no more items than this capacity could coexist at the same time.

Benchmarks FixedFIFO vs FIFO

The numbers for the following charts were obtained by running the benchmarks in a 2012 MacBook Pro (2.3 GHz Intel Core i7 - 16 GB 1600 MHz DDR3) with golang v1.12

Enqueue

concurrent-safe FixedFIFO vs FIFO . operation: enqueue

Dequeue

concurrent-safe FixedFIFO vs FIFO . operation: dequeue

Get started

FIFO queue simple usage

Live code - playground

package main

import (
	"fmt"

	"github.com/enriquebris/goconcurrentqueue"
)

type AnyStruct struct {
	Field1 string
	Field2 int
}

func main() {
	queue := goconcurrentqueue.NewFIFO()

	queue.Enqueue("any string value")
	queue.Enqueue(5)
	queue.Enqueue(AnyStruct{Field1: "hello world", Field2: 15})

	// will output: 3
	fmt.Printf("queue's length: %v\n", queue.GetLen())

	item, err := queue.Dequeue()
	if err != nil {
		fmt.Println(err)
		return
	}

	// will output "any string value"
	fmt.Printf("dequeued item: %v\n", item)

	// will output: 2
	fmt.Printf("queue's length: %v\n", queue.GetLen())

}

Wait until an element gets enqueued

Live code - playground

package main

import (
	"fmt"
	"time"

	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	var (
		fifo = goconcurrentqueue.NewFIFO()
		done = make(chan struct{})
	)

	go func() {
		fmt.Println("1 - Waiting for next enqueued element")
		value, _ := fifo.DequeueOrWaitForNextElement()
		fmt.Printf("2 - Dequeued element: %v\n", value)

		done <- struct{}{}
	}()

	fmt.Println("3 - Go to sleep for 3 seconds")
	time.Sleep(3 * time.Second)

	fmt.Println("4 - Enqueue element")
	fifo.Enqueue(100)

	<-done
}

Wait until an element gets enqueued with timeout

Live code - playground

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	var (
		fifo = goconcurrentqueue.NewFIFO()
		ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
	)
	defer cancel()

	fmt.Println("1 - Waiting for next enqueued element")
	_, err := fifo.DequeueOrWaitForNextElementContext(ctx)
    
	if err != nil {
		fmt.Printf("2 - Failed waiting for new element: %v\n", err)
		return
	}
}

Dependency Inversion Principle using concurrent-safe queues

High level modules should not depend on low level modules. Both should depend on abstractions. Robert C. Martin

Live code - playground

package main

import (
	"fmt"

	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	var (
		queue          goconcurrentqueue.Queue
		dummyCondition = true
	)

	// decides which Queue's implementation is the best option for this scenario
	if dummyCondition {
		queue = goconcurrentqueue.NewFIFO()
	} else {
		queue = goconcurrentqueue.NewFixedFIFO(10)
	}

	fmt.Printf("queue's length: %v\n", queue.GetLen())
	workWithQueue(queue)
	fmt.Printf("queue's length: %v\n", queue.GetLen())
}

// workWithQueue uses a goconcurrentqueue.Queue to perform the work
func workWithQueue(queue goconcurrentqueue.Queue) error {
	// do some work

	// enqueue an item
	if err := queue.Enqueue("test value"); err != nil {
		return err
	}

	return nil
}

History

v0.6.1

  • FixedFifo.Enqueue prevents to gets blocked trying to send the item over an invalid waitForNextElementChan channel

v0.6.0

  • Added DequeueOrWaitForNextElementContext()

v0.5.1

  • FIFO.DequeueOrWaitForNextElement() was modified to avoid deadlock when DequeueOrWaitForNextElement && Enqueue are invoked around the same time.
  • Added multiple goroutine unit testings for FIFO.DequeueOrWaitForNextElement()

v0.5.0

  • Added DequeueOrWaitForNextElement()

v0.4.0

  • Added QueueError (custom error)

v0.3.0

  • Added FixedFIFO queue's implementation (at least 2x faster than FIFO for multiple GRs)
  • Added benchmarks for both FIFO / FixedFIFO
  • Added GetCap() to Queue interface
  • Removed Get() and Remove() methods from Queue interface

v0.2.0

  • Added Lock/Unlock/IsLocked methods to control operations locking

v0.1.0

  • First In First Out (FIFO) queue added
Comments
  • fifo_queue consumers can stuck forever.

    fifo_queue consumers can stuck forever.

    Hi! It looks like fifo_queue is not really safe to use and can lock the app forever.

    Lets say we have length=0 and consumer calls for DequeueOrWaitForNextElementContext and reaches line 117

    Meanwhile producer tries to Enqueue a new object: It takes listener in line 46. but can't send object yet in line 49 because no one listen yet so it adds to the slice and forgets about listener

    Consumer start to consume waitChan in 126 but that will never happen . i guess the whole idea of the for loop is to resolve that. But there is no number of iterations and timeout values that can guarantee that it will be able to Dequeue anything in 129. Other consumers can always get there faster. So loop ends with no results and we end up trying to consume waitChan forever in 137

    It is hard to catch but very painful. Number of consumers > 1 required to catch that.

  • Support passing context into DequeueOrWaitForNextElement

    Support passing context into DequeueOrWaitForNextElement

    This change adds the ability to pass a context into DequeueOrWaitForNextElement by introducing DequeueOrWaitForNextElementContext. Whenever the context is canceled, the method returns immediately.

    Some background; when working in concurrent applications it is common to pass around a context that is canceled whenever the long-running process should exit. You can see libraries like http natively support context. I think it would make sense that these concurrent queues also support context passing.

    For now I implemented this function only in the FIFO struct because I would like to get some feedback first. If we agree that this is something we want I can also implement the same in FixedFIFO.

    I added a single test TestContextCanceled. It feels little, but i'm also not sure which other scenarios need to be tested. Let me know if you have ideas for other test cases.

  • Sometimes blocked at Enqueue?

    Sometimes blocked at Enqueue?

    FixedFIFO sometimes blocked at the Enqueue function when the queue is empty, and i use context.WithTimeout and DequeueOrWaitForNextElementContext to deque item in another routine:

    // Enqueue enqueues an element. Returns error if queue is locked or it is at full capacity.
    func (st *FixedFIFO) Enqueue(value interface{}) error {
    	if st.IsLocked() {
    		return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
    	}
    
    	// check if there is a listener waiting for the next element (this element)
    	select {
    	case listener := <-st.waitForNextElementChan:
    
                   //
    ---->          //  *** blocked here!!!
                   //
    		// send the element through the listener's channel instead of enqueue it
    		listener <- value
    
    	default:
    		// enqueue the element following the "normal way"
    		select {
    		case st.queue <- value:
    		default:
    			return NewQueueError(QueueErrorCodeFullCapacity, "FixedFIFO queue is at full capacity")
    		}
    	}
    

    deque (not blocking):

    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    	defer cancel()
    	qItem, err := tp.ReqCacheQueue.DequeueOrWaitForNextElementContext(ctx)
    	if err != nil {
    		if err == context.DeadlineExceeded {
    			return
    		}
    		qErr, ok := err.(*goconcurrentqueue.QueueError)
    		if ok && qErr != nil {
    			if qErr.Code() == goconcurrentqueue.QueueErrorCodeEmptyQueue {
    				time.Sleep(100 * time.Millisecond)
    				return
    			}
    		}
    		log.Error("process task err: ", err)
    		return
    	}
    
  •  FixedFIFO use DequeueOrWaitForNextElement will can stuck forever

    FixedFIFO use DequeueOrWaitForNextElement will can stuck forever

    If I use 5 producers and one consumer, when I use DequeueOrWaitForNextElement to dequeue, the consumer will be stuck in ctx.Done() (cannot consume data). Then the producer will fill up the queue and keep reporting FixedFIFO queue is at full capacity. At this time waitForNextElementChan is empty code: `package main

    import ( "fmt" "math/rand" "os" "os/signal" "syscall" "time"

    "github.com/enriquebris/goconcurrentqueue"
    

    )

    type AnyStruct struct { Field1 string Field2 int }

    var ( queue *goconcurrentqueue.FixedFIFO )

    func main() {

    queue = goconcurrentqueue.NewFixedFIFO(4096)
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    done := make(chan bool, 1)
    // will output: 3
    fmt.Printf("queue's length: %v\n", queue.GetCap())
    
    for i := 0; i < 5; i++ {
    	go product()
    }
    
    go consume()
    
    go func() {
    	sig := <-sigs
    	fmt.Println()
    	fmt.Println(sig)
    	done <- true
    }()
    
    fmt.Println("awaiting signal")
    <-done
    fmt.Println("exiting")
    

    }

    func product() { rand.Seed(time.Now().UnixNano())

    for {
    	i := rand.Intn(1000000000)
    	val := &AnyStruct{Field1: fmt.Sprintf("hello world %d", i), Field2: i}
    	err := queue.Enqueue(val)
    	if err != nil {
    		//	fmt.Println(err)
    	}
    	time.Sleep(time.Microsecond * 100)
    }
    

    }

    func consume() { for { //ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*80)) //defer cancel() //val, err :=queue.DequeueOrWaitForNextElementContext(ctx) val, err := queue.DequeueOrWaitForNextElement() if err == nil { ele := val.(*AnyStruct) fmt.Println(ele.Field1, ", ", ele.Field2) } else { fmt.Println(err) time.Sleep(time.Second) } }

    } `

  • TestDequeueOrWaitForNextElementMultiGR stuck when executed on single proccess

    TestDequeueOrWaitForNextElementMultiGR stuck when executed on single proccess

    While testing #19 I noticed Travis testing is timing out. This problem also occurs on master, therefore this issues. I was able to replicate this problem on my computer by limiting the maximum processes: GOMAXPROCS=1 go test -test.v

    The test that gets stuck is TestFixedFIFOTestSuite/TestDequeueOrWaitForNextElementMultiGR

    I have tried debugging by adding print statements through the test but I can't figure out what's wrong. I did find that lowering WaitForNextElementChanCapacity from 1000 to 500 makes the test succeed, but I don't think this solves the root problem.

    Does anyone have an idea what could cause this?

    Thanks for this amazing package, I am having a lot of fun working in this nice codebase.

  • Add methods to FIFO Queue

    Add methods to FIFO Queue

    Hey there, I've been using this library (great work btw! it's super simple to use) for a project and I wanted to share some features that I've created for the FIFO Queue.

    1. GetAll(limit, offset *int) is a method that returns the underlying array, but has two arguments (limit and offset) to create a filtered slice from the original one.
    2. Swap(a int, b int) is a method that swaps A with B and vice versa.
    3. MoveFrontWithId(index int) & MoveBackWithId(index int) are two methods that let you change the position of a certain element to the front or the back.

    I think they are very useful methods, although they could be very specifics to certain situations. Hope you like it!

    Best, Marcos.

  • The resources consumed seems very high despite of dequeuing

    The resources consumed seems very high despite of dequeuing

    Hi, I enqueue job in main thread and dequeue it in other thread, however, the resource does not reduce after that. I don't know what did I do wrong. Here is my implementations:

    Main code:

    func main() {
    	initConfigs()
    	route := gin.Default()
    	route.Use(gzip.Gzip(gzip.DefaultCompression))
    
    	proxyAPI := proxy_reserver.NewProxyAPI()
    	proxyAPI.InitRoute(route, "/")
    
    	go proxyAPI.DequeueUploadJobs() // I started the dequeue process here
    
    	route.Run("0.0.0.0:" + viper.GetString("service.port"))
    }
    

    In the API handler I did:

    func (app *ProxyAPI) UploadInstance(c *gin.Context) {
    	cCp := c.Copy()
    	bytes, err := ioutil.ReadAll(cCp.Request.Body)
    	if err == nil {
                    if queueInstance == nil {
                            queueInstance = goconcurrentqueue.NewFIFO()
                    }
    		queueInstance.Enqueue(UploadJob{
    			Data:   bytes,
    			DatLen: c.Request.ContentLength,
    		})
    	}
    }
    

    And the DequeueUploadJobs():

    func (app *ProxyAPI) DequeueUploadJobs() {
    	fmt.Println("Start dequeuing instance jobs")
    	for true {
    		if app.queueInstance != nil && app.queueInstance.GetLen() > 0 {
    			app.Logger.Info(fmt.Sprintf("Upload queue size is %d", app.queueInstance.GetLen()))
    			job, err := app.queueInstance.Dequeue()
    			if err != nil {
    				app.Logger.Info(fmt.Sprint(err))
    				continue
    			}
    
    			c := job.(UploadJob)
                           //My job
    			// utils.LogInfo("Done execute meta " + key)
    		} else {
    			time.Sleep(1 * time.Second)
    		}
    	}
    }
    

    Can you tell my what's wrong with my code? Thank you so much!

  • Configure Renovate

    Configure Renovate

    WhiteSource Renovate

    Welcome to Renovate! This is an onboarding PR to help you understand and configure settings before regular Pull Requests begin.

    🚦 To activate Renovate, merge this Pull Request. To disable Renovate, simply close this Pull Request unmerged.


    Detected Package Files

    • go.mod (gomod)

    Configuration Summary

    Based on the default config's presets, Renovate will:

    • Start dependency updates only once this onboarding PR is merged
    • Enable Renovate Dependency Dashboard creation
    • If semantic commits detected, use semantic commit type fix for dependencies and chore for all others
    • Ignore node_modules, bower_components, vendor and various test/tests directories
    • Autodetect whether to pin dependencies or maintain ranges
    • Rate limit PR creation to a maximum of two per hour
    • Limit to maximum 10 open PRs at any time
    • Group known monorepo packages together
    • Use curated list of recommended non-monorepo package groupings
    • Fix some problems with very old Maven commons versions
    • Ignore spring cloud 1.x releases
    • Ignore web3j 5.0.0 release
    • Ignore http4s digest-based 1.x milestones
    • Use node versioning for @types/node
    • Limit concurrent requests to reduce load on Repology servers until we can fix this properly, see issue 10133
    • Do not upgrade from Alpine stable to edge

    🔡 Would you like to change the way Renovate is upgrading your dependencies? Simply edit the renovate.json in this branch with your custom config and the list of Pull Requests in the "What to Expect" section below will be updated the next time Renovate runs.


    What to Expect

    It looks like your repository dependencies are already up-to-date and no Pull Requests will be necessary right away.


    ❓ Got questions? Check out Renovate's Docs, particularly the Getting Started section. If you need any further assistance then you can also request help here.


    This PR has been generated by WhiteSource Renovate. View repository job log here.

  • Update FixedFIFO tests

    Update FixedFIFO tests

    The following tests were updated:

    • FixedFIFO.TestDequeueOrWaitForNextElementMultiGR
    • FixedFIFO.TestDequeueOrWaitForNextElementWithEmptyQueue

    Both tests have listener goroutines to dequeue items. This update guarantees the goroutines will be ready before the queue gets filled.

  • Add license scan report and status

    Add license scan report and status

    Your FOSSA integration was successful! Attached in this PR is a badge and license report to track scan status in your README.

    Below are docs for integrating FOSSA license checks into your CI:

  • Dequeue or wait for next element - load

    Dequeue or wait for next element - load

    • FIFO.DequeueOrWaitForNextElement() was modified to avoid deadlock when DequeueOrWaitForNextElement && Enqueue are invoked around the same time.

    • Added multiple goroutine unit testings for FIFO.DequeueOrWaitForNextElement()

    • Added links to playground live code (Get started section)

Package ring provides a high performance and thread safe Go implementation of a bloom filter.

ring - high performance bloom filter Package ring provides a high performance and thread safe Go implementation of a bloom filter. Usage Please see th

Nov 20, 2022
A thread safe map which has expiring key-value pairs

~ timedmap ~ A map which has expiring key-value pairs. go get github.com/zekroTJA/timedmap Intro This package allows to set values to a map which will

Dec 29, 2022
A Golang lock-free thread-safe HashMap optimized for fastest read access.

hashmap Overview A Golang lock-free thread-safe HashMap optimized for fastest read access. Usage Set a value for a key in the map: m := &HashMap{} m.S

Dec 30, 2022
A simple and efficient thread-safe sharded hashmap for Go

shardmap A simple and efficient thread-safe sharded hashmap for Go. This is an alternative to the standard Go map and sync.Map, and is optimized for w

Dec 17, 2022
Goroutine local storage

gls Goroutine local storage IMPORTANT NOTE It is my duty to point you to https://blog.golang.org/context, which is how Google solves all of the proble

Dec 22, 2022
Highly concurrent drop-in replacement for bufio.Writer

concurrent-writer Highly concurrent drop-in replacement for bufio.Writer. concurrent.Writer implements highly concurrent buffering for an io.Writer ob

Nov 20, 2022
skipmap is a high-performance concurrent sorted map based on skip list. Up to 3x ~ 10x faster than sync.Map in the typical pattern.
skipmap is a high-performance concurrent sorted map based on skip list. Up to 3x ~ 10x faster than sync.Map in the typical pattern.

Introduction skipmap is a high-performance concurrent map based on skip list. In typical pattern(one million operations, 90%LOAD 9%STORE 1%DELETE), th

Jan 8, 2023
Concurrent data structures for Go

xsync Concurrent data structures for Go. An extension for the standard sync package. This library should be considered experimental, so make sure to r

Jan 9, 2023
sync.WaitGroup for concurrent use

concwg Description This package provides a version of sync.WaitGroup that allows calling Add and Wait in different goroutines. Motivation sync.WaitGro

May 20, 2022
Goeland - A first-order concurrent automated theorem prover

Goéland Goéland is an automated theorem prover using the tableau method for firs

Sep 22, 2022
A highly optimized double-ended queue

Overview Deque is a highly optimized double-ended queue. Benchmark Benchmark_PushBack/Deque<harden> 100000000 10.3 ns/op 9 B/op

Dec 13, 2022
Fast ring-buffer deque (double-ended queue)

deque Fast ring-buffer deque (double-ended queue) implementation. For a pictorial description, see the Deque diagram Installation $ go get github.com/

Dec 26, 2022
Simple priority queue in Go

Priority Queue in Go ==================== This package provides a priority queue implementation and scaffold interfaces. Installation ------------ U

Apr 5, 2022
High-performance minimalist queue implemented using a stripped-down lock-free ringbuffer, written in Go (golang.org)

This project is no longer maintained - feel free to fork the project! gringo A high-performance minimalist queue implemented using a stripped-down loc

Oct 24, 2022
A Go queue manager on top of Redis

Queue A Go library for managing queues on top of Redis. It is based on a hiring exercise but later I found it useful for myself in a custom task proce

Aug 12, 2022
Cross-platform beanstalkd queue server admin console.
Cross-platform beanstalkd queue server admin console.

Overview aurora is a web-based Beanstalkd queue server console written in Go and works on macOS, Linux, and Windows machines. The main idea behind usi

Dec 30, 2022
Fast golang queue using ring-buffer

Queue A fast Golang queue using a ring-buffer, based on the version suggested by Dariusz Górecki. Using this instead of other, simpler, queue implemen

Jan 3, 2023
Go implementation of the van Emde Boas tree data structure: Priority queue for positive whole numbers in O(log log u) time.

vEB Go implementation of the van Emde Boas tree data structure: Priority queue for positive whole numbers in O(log log u) time. Supports the following

Mar 7, 2022
Null Types, Safe primitive type conversion and fetching value from complex structures.

Typ Typ is a library providing a powerful interface to impressive user experience with conversion and fetching data from built-in types in Golang Feat

Sep 26, 2022