dque is a fast, embedded, durable queue for Go

dque - a fast embedded durable queue for Go

Go Report Card GoDoc

dque is:

  • persistent -- survives program restarts
  • scalable -- not limited by your RAM, but by your disk space
  • FIFO -- First In First Out
  • embedded -- compiled into your Golang program
  • synchronized -- safe for concurrent usage
  • fast or safe, you choose -- turbo mode lets the OS decide when to write to disk
  • has a liberal license -- allows any use, commercial or personal

I love tools that do one thing well. Hopefully this fits that category.

I am indebted to Gabor Cselle who, years ago, inspired me with an example of an in-memory persistent queue written in Java. I was intrigued by the simplicity of his approach, which became the foundation of the "segment" part of this queue which holds the head and the tail of the queue in memory as well as storing the segment files in between.

performance

There are two performance modes: safe and turbo

safe mode
  • safe mode is the default
  • forces an fsync to disk every time you enqueue or dequeue an item.
  • while this is the safest way to use dque with little risk of data loss, it is also the slowest.
turbo mode
  • can be enabled/disabled with a call to DQue.TurboOn() or DQue.TurboOff()
  • lets the OS batch up your changes to disk, which makes it a lot faster.
  • also allows you to flush changes to disk at opportune times. See DQue.TurboSync()
  • comes with a risk that a power failure could lose changes. By turning on Turbo mode you accept that risk.
  • run the benchmark to see the difference on your hardware.
  • there is a todo item to force flush changes to disk after a configurable amount of time to limit risk.

implementation

  • The queue is held in segments of a configurable size.
  • The queue is protected against re-opening from other processes.
  • Each in-memory segment corresponds with a file on disk. Think of the segment files as a bit like rolling log files. The oldest segment files are eventually deleted, not based on time, but whenever their items have all been dequeued.
  • Segment files are only appended to until they fill up. At which point a new segment is created. They are never modified (other than being appended to and deleted when each of their items has been dequeued).
  • If there is more than one segment, new items are enqueued to the last segment while dequeued items are taken from the first segment.
  • Because the encoding/gob package is used to store the struct to disk:
    • Only structs can be stored in the queue.
    • Only one type of struct can be stored in each queue.
    • Only public fields in a struct will be stored.
    • A function is required that returns a pointer to a new struct of the type stored in the queue. This function is used when loading segments into memory from disk. I'd love to find a way to avoid this function.
  • Queue segment implementation:
    • For nice visuals, see Gabor Cselle's documentation here. Note that Gabor's implementation kept the entire queue in memory as well as disk. dque keeps only the head and tail segments in memory.
    • Enqueueing an item adds it both to the end of the last segment file and to the in-memory item slice for that segment.
    • When a segment reaches its maximum size a new segment is created.
    • Dequeueing an item removes it from the beginning of the in-memory slice and appends a 4-byte "delete" marker to the end of the segment file. This allows the item to be left in the file until the number of delete markers matches the number of items, at which point the entire file is deleted.
    • When a segment is reconstituted from disk, each "delete" marker found in the file causes a removal of the first element of the in-memory slice.
    • When each item in the segment has been dequeued, the segment file is deleted and the next segment is loaded into memory.

example

See the full example code here

Or a shortened version here:

package dque_test

import (
    "log"

    "github.com/joncrlsn/dque"
)

// Item is what we'll be storing in the queue.  It can be any struct
// as long as the fields you want stored are public.
type Item struct {
    Name string
    Id   int
}

// ItemBuilder creates a new item and returns a pointer to it.
// This is used when we load a segment of the queue from disk.
func ItemBuilder() interface{} {
    return &Item{}
}

func main() {
    ExampleDQue_main()
}

// ExampleQueue_main() show how the queue works
func ExampleDQue_main() {
    qName := "item-queue"
    qDir := "/tmp"
    segmentSize := 50

    // Create a new queue with segment size of 50
    q, err := dque.New(qName, qDir, segmentSize, ItemBuilder)
    ...

    // Add an item to the queue
    err := q.Enqueue(&Item{"Joe", 1})
    ...

    // Properly close a queue
    q.Close()

    // You can reconsitute the queue from disk at any time
    q, err = dque.Open(qName, qDir, segmentSize, ItemBuilder)
    ...

    // Peek at the next item in the queue
    var iface interface{}
    if iface, err = q.Peek(); err != nil {
        if err != dque.ErrEmpty {
            log.Fatal("Error peeking at item ", err)
        }
    }

    // Dequeue the next item in the queue
    if iface, err = q.Dequeue(); err != nil {
        if err != dque.ErrEmpty {
            log.Fatal("Error dequeuing item ", err)
        }
    }

    // Dequeue the next item in the queue and block until one is available
    if iface, err = q.DequeueBlock(); err != nil {
        log.Fatal("Error dequeuing item ", err)
    }

    // Assert type of the response to an Item pointer so we can work with it
    item, ok := iface.(*Item)
    if !ok {
        log.Fatal("Dequeued object is not an Item pointer")
    }

    doSomething(item)
}

func doSomething(item *Item) {
    log.Println("Dequeued", item)
}

contributors

todo? (feel free to submit pull requests)

  • add option to enable turbo with a timeout that would ensure you would never lose more than n seconds of changes.
  • add Lock() and Unlock() methods so you can peek at the first item and then conditionally dequeue it without worrying that another goroutine has grabbed it out from under you. The use case is when you don't want to actually remove it from the queue until you know you were able to successfully handle it.
  • store the segment size in a config file inside the queue. Then it only needs to be specified on dque.New(...)

alternative tools

  • CurlyQ is a bit heavier (requires Redis) but has more background processing features.
Comments
  • protect the queue directory by a lockfile

    protect the queue directory by a lockfile

    fixes #2

    While a queue is open (after calling New / Open/ NewOrOpen) the queue directory is protected by a lockfile.

    As long as the current process is running, this lockfile ensures no other process can open the queue and potentially cause data corruption.

    The lockfile does not protect against double-opening of the same queue from within the same process.

    The public API gets extended by a Close() function - this releases a lockfile on the queue directory. If the user omits to call Close, the file remains inside the queue directory, but the operating system releases the lock - allowing a new process to acquire it (application crashed and restarts without locking issues).

  • DQue.Size and SegmentNumbers are not thread-safe

    DQue.Size and SegmentNumbers are not thread-safe

    First off, thanks for writing this package! I'm doing a brief code review on it to evaluate it for production use, and am filing issues along the way. Please let me know if you'd rather we fork it or submit patches for review.

    qSegment.size() is thread-safe, however DQue.Size is not since firstSegment and lastSegment may be updated asynchronously (ex. from Enqueue/Dequeue).

  • Error when directory doesn't exist

    Error when directory doesn't exist

    Was wondering if we could swap os.Mkdir with os.MkdirAll to allow for the creation of parents directories. At the moment, I have a folder local and set dque to create a queue in directory local/queues and I would expect that folder to be created.

    https://github.com/joncrlsn/dque/blob/v2/queue.go#L68-L80

    Would love to submit a PR if you agree too.

    Thank you

  • What could cause a decode failure on the segmentSize boundary?

    What could cause a decode failure on the segmentSize boundary?

    I am trying a consumer/producer program and it reliably fails on the nth item, (where n is the segmentSize) I can dequeue n-1 records with no issue, and the 0000000000001.dque file disappears as expected, but that nth item always gives me this error. Both the producer and consumer are separate coroutines. Any idea before I try to make a simple example that reproduces the error?

    error creating new segment. Queue is in an inconsistent state: unable to load queue segment in /tmp/receiver_a_1_queue_28762: object in segment file /tmp/rmyqueue/0000000000002.dque cannot be decoded: failed to decode *main.Item: gob: type mismatch: no fields matched compiling decoder for Item

    Item is

    type RequestRec struct {
    	Topic string
    	Body  string
    	Count int // how many time we have resent that request
    }
    
  • Segment files are not closed explicitly

    Segment files are not closed explicitly

    If the queue grows faster than it is consumed, Enqueue will append new segments. newQueueSegment opens segment files with os.O_APPEND|os.O_CREATE|os.O_WRONLY but they are only closed (a) if they become the firstSegment and get fully consumed and removed via delete in Dequeue, or (b) closed by the os.File finalizer. This may result in many open file descriptors until the garbage collector runs.

  • Windows & leaking filedescriptors

    Windows & leaking filedescriptors

    This doesn't seem to work at all on windows. Very few tests pass. This is mostly due to not closing segment files properly.

    --- FAIL: TestSegment (0.02s)
            segment_test.go:82: Error cleaning up directory from the TestSegment method with 'remove ./TestSegment\0000000000001.dque: The process cannot access the file because it is being used by another process.'
    --- FAIL: TestSegment_Turbo (0.00s)
            segment_test.go:111: Error creating directory in the TestSegment_Turbo method: mkdir ./TestSegment: Cannot create a file when that file already exists.
    --- FAIL: TestQueue_AddRemoveLoop (0.01s)
            queue_test.go:75: Error cleaning up the queue directory remove test1\0000000000002.dque: The process cannot access the file because it is being used by another process.
    --- FAIL: TestQueue_Add2Remove1 (0.00s)
            queue_test.go:88: Error removing queue directory remove test1\0000000000002.dque: The process cannot access the file because it is being used by another process.
    --- FAIL: TestQueue_Add9Remove8 (0.00s)
            queue_test.go:154: Error removing queue directory remove test1\0000000000002.dque: The process cannot access the file because it is being used by another process.
    --- FAIL: TestQueue_EmptyDequeue (0.00s)
            queue_test.go:240: Error cleaning up the queue directory: remove testEmptyDequeue\0000000000001.dque: The process cannot access the file because it is being used by another process.
    --- FAIL: TestQueue_NewOrOpen (0.00s)
            queue_test.go:262: Error cleaning up the queue directory: remove testNewOrOpen\0000000000001.dque: The process cannot access the file because it is being used by another process.
    --- FAIL: TestQueue_Turbo (0.00s)
            queue_test.go:269: Error removing queue directory: remove testNewOrOpen\0000000000001.dque: The process cannot access the file because it is being used by another process.
    2019/01/24 14:38:32 Error creating new dque the given queue directory is not valid (/tmp)
    exit status 1
    FAIL    github.com/joncrlsn/dque        0.103s
    

    The tests all seem to pass on nix but i'm guessing it is leaking filedescriptors there aswell.

  • Handle additional errors when loading corrupted segments

    Handle additional errors when loading corrupted segments

    • corrupted state files containing excess deletion records (zeros) which otherwise results in a panic
    • wrap gob errors

    The former is surprisingly the most common form of corruption we see in production on machines that can be power cycled at will.

  • Run unit tests and lint checks in CI

    Run unit tests and lint checks in CI

    I recommend running https://github.com/golangci/golangci-lint on all projects since it has a good default set of lint checks which frequently uncover real issues.

    Travis and Github Actions appear to have the best support for matrix builds (to run the tests on multiple go versions) so I'd recommend one of those. I'm interested in trying Actions. I also have a lot of experience using CircleCI but would mainly recommend it for teams with multiple repositories. Any preference?

  • We are still failing...

    We are still failing...

    jcarlson "If we still have to log into the servers in a year then it sounds like we'd be technically failing at devops.” at 9AM Monday, March 4th, 2019.

    Also, hello!

  • Fix concurrency issues in DequeueBlock and PeekBlock

    Fix concurrency issues in DequeueBlock and PeekBlock

    The mutexEmptyCond mutex doesn't guard the condition that is being checked (i.e. that the queue is not empty), which makes the whole rendez-vous point ineffective.

  • Blocking Dequeue & Peek functions

    Blocking Dequeue & Peek functions

    The existing Dequeue() and Peek() functions simply return an ErrEmpty error if no items are in the queue.

    I found myself trying to replace a go channel with dque, but was missing a proper blocking Dequeue function that only returns once at least one item is available.

    Any thoughts on accepting such a feature request / PR?

  • Occasional: Queue is in an inconsistent state

    Occasional: Queue is in an inconsistent state

    I've been trying to figure this out for a while but it's super hard to reproduce. Occasionally, I get the following error:

    Caused by: error deleting queue segment /tmp/ripple/600f5b80f62ab2ff24e82005/hb-queue/0000000000001.dque. Queue 
    is in an inconsistent state: error deleting file: /tmp/ripple/600f5b80f62ab2ff24e82005/hb-queue/0000000000001.dque: 
    remove /tmp/ripple/600f5b80f62ab2ff24e82005/hb-queue/0000000000001.dque: no such file or directory
    

    Are there any known causes for these inconsistencies that I might have missed? I'm not doing anything special except I am using the same instance of the queue in different goroutines. One does all the writing while the other one dequeues.

    Sometimes I go weeks without seeing this and then it just pops up.

  • Fix broken object while enqueueing

    Fix broken object while enqueueing

    If input objects share some underlying data structure, they may broken while enqueueing. The patch fix this case. However, enqueue will be much slower than before because of the extra decode operation for each object. Maybe a switch should be added to this feature.

  • DQue should be an Interface

    DQue should be an Interface

    Although the current state works great, think about changing DQue as the return of New() to an interface. This makes it much easier for testing purposes. A dummy implementation of the interface would avoid any write/lock implications during testing.

    type DQue interface {
      Close() error
      Enqueue(obj interface{}) error
      Dequeue() (interface{}, error)
      Peek() (interface{}, error)
      DequeueBlock() (interface{}, error)
      PeekBlock() (interface{}, error)
      Size() int
      SizeUnsafe() int
      SegmentNumbers() (int, int)
      Turbo() bool
      TurboOn() error
      TurboOff() error
      TurboSync() error
    }
    
  • Corrupted data when opening existing queue

    Corrupted data when opening existing queue

    I took an example_test.go and just run two goroutines one enqueueing consecutive integers, another doing blocking dequeue and just print them from time to time.

    Segment size 50, then switches to 100000.

    Interrupting the program, and the starting it again, causes it to read corrupted data:

    2022/02/23 15:26:49 Error creating new dque unable to create queue segment in /tmp/item-queue: unable to load queue segment in /tmp/item-queue: segment file /tmp/item-queue/0000000000041.dque is corrupted: error reading gob data from file: EOF exit status 1

    Source:

    package main
    
    import (
        "fmt"
        "log"
    
        "github.com/joncrlsn/dque"
    )
    
    func main() {
        ExampleDQue()
    }
    
    // Item is what we'll be storing in the queue.  It can be any struct
    // as long as the fields you want stored are public.
    type Item struct {
    	Name string
    	Id   int
    }
    
    // ItemBuilder creates a new item and returns a pointer to it.
    // This is used when we load a segment of the queue from disk.
    func ItemBuilder() interface{} {
    	return &Item{}
    }
    
    // ExampleDQue shows how the queue works
    func ExampleDQue() {
    	qName := "item-queue"
    	qDir := "/tmp"
    	segmentSize := 100000
    
    	q, err := dque.NewOrOpen(qName, qDir, segmentSize, ItemBuilder)
    	if err != nil {
    		log.Fatal("Error creating new dque ", err)
    	}
    
    	go func() {
    		i := 0
    		for {
    			err := q.Enqueue(&Item{"Joe", i})
    			if err != nil {
    				log.Fatal("Error enqueueing", err)
    			}
    
    			i++
    			//log.Println("Queue size:", q.Size())
    		}
    	}()
    
    	func() {
    		for {
    			var iface interface{}
    
    			// Dequeue the next item in the queue and block until one is available
    			if iface, err = q.DequeueBlock(); err != nil {
    				log.Fatal("Error dequeuing item ", err)
    			}
    
    			// Assert type of the response to an Item pointer so we can work with it
    			item, ok := iface.(*Item)
    			if !ok {
    				log.Fatal("Dequeued object is not an Item pointer")
    			}
    
    			doSomething(item)
    		}
    	}()
    }
    
    func doSomething(item *Item) {
    	if item.Id % 100000 == 0 {
    		fmt.Println("Dequeued:", item)
    	}
    }
    
  • Unable to import v2.x.x in go.mod

    Unable to import v2.x.x in go.mod

    Hi I get following error if I include following in my go.mod file.

    github.com/joncrlsn/dque v2.2.0

    The error I get is

    #12 0.387 go: errors parsing go.mod:
    #12 0.387 /edge/go.mod:11:2: require github.com/joncrlsn/dque: version "v2.2.0" invalid: should be v0 or v1, not v2
    

    This is probably related to https://go.dev/blog/v2-go-modules. If that's true, can you please make the v2 code follow these guidlines.

Related tags
Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.

Cadence Visit cadenceworkflow.io to learn about Cadence. This repo contains the source code of the Cadence server. To implement workflows, activities

Jan 9, 2023
Machinery is an asynchronous task queue/job queue based on distributed message passing.
Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery Machinery is an asynchronous task queue/job queue based on distributed message passing. V2 Experiment First Steps Configuration Lock Broker

Jan 7, 2023
A Multi Consumer per Message Queue with persistence and Queue Stages.
 A Multi Consumer per Message Queue with persistence and Queue Stages.

CrimsonQ A Multi Consumer per Message Queue with persistence and Queue Stages. Under Active Development Crimson Queue allows you to have multiple cons

Jul 30, 2022
Embedded, Fast and Persistent bigqueue implementation

bigqueue bigqueue provides embedded, fast and persistent queue written in pure Go using memory mapped (mmap) files. bigqueue is now thread safe as wel

Sep 24, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Dec 30, 2022
RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue

RapidMQ RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue in the Go programming langu

Sep 27, 2022
redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue redisqueue provides a producer and consumer of a queue that uses Redis streams. Features A Producer struct to make enqueuing messages easy.

Dec 29, 2022
A single binary, simple, message queue.

MiniQueue A stupid simple, single binary message queue using HTTP/2. Most messaging workloads don't require enormous amounts of data, endless features

Nov 9, 2022
Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Jan 4, 2023
Queue with NATS Jetstream to remove all the erlangs from cloud

Saf in Persian means Queue. One of the problems, that we face on projects with queues is deploying RabbitMQ on the cloud which brings us many challenges for CPU load, etc. I want to see how NATS with Jetstream can work as the queue to replace RabbitMQ.

Dec 15, 2022
Redis as backend for Queue Package
Redis as backend for Queue Package

redis Redis as backend for Queue package Setup start the redis server redis-server start the redis cluster, see the config # server 01 mkdir server01

Oct 16, 2022
NSQ as backend for Queue Package
NSQ as backend for Queue Package

NSQ as backend for Queue Package

Jul 4, 2022
Kudruk helps you to create queue channels and manage them gracefully.

kudruk Channels are widely used as queues. kudruk (means queue in Turkish) helps you to easily create queue with channel and manage the data in the qu

Feb 21, 2022
Chanman helps you to create queue channels and manage them gracefully.

chanman Channels are widely used as queues. chanman (Channel Manager) helps you to easily create queue with channel and manage the data in the queue.

Oct 16, 2021
A simple persistent directory-backed FIFO queue.

pqueue pqueue is a simple persistent directory-backed FIFO queue. It provides the typical queue interface Enqueue and Dequeue and may store any byte s

Dec 12, 2022
Persistent queue in Go based on BBolt

Persistent queue Persistent queue based on bbolt DB. Supposed to be used as embeddable persistent queue to any Go application. Features: messages are

Jun 30, 2022
A lightweight, distributed and reliable message queue based on Redis

nmq A lightweight, distributed and reliable message queue based on Redis Get Started Download go get github.com/inuggets/nmq Usage import "github.com

Nov 22, 2021
KubeMQ is a Kubernetes native message queue broker

KubeMQ Community is the open-source version of KubeMQ, the Kubernetes native message broker. More about KubeMQ

Nov 20, 2021
Golang Delay Queue

[gdq] Golang Delay Queue GDQ is a library that leverage db or cache to be setup as a delay queue. For current version, Only redis can adapt to this li

Jan 15, 2022