Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

gue

GoDev Coverage Status ReportCard License

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Originally this project used to be a fork of bgentry/que-go but because of some backward-compatibility breaking changes and original library author not being very responsive for PRs I turned fork into standalone project. Version 2 breaks internal backward-compatibility with the original project - DB table and all the internal logic (queries, algorithms) is completely rewritten.

The name Gue is yet another silly word transformation: Queue -> Que, Go + Que -> Gue.

Install

go get -u github.com/vgarvardt/gue/v2

Additionally, you need to apply DB migration.

Usage Example

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/jackc/pgx/v4/pgxpool"
    "golang.org/x/sync/errgroup"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/pgxv4"
)

type printNameArgs struct {
    Name string
}

func main() {
    printName := func(j *gue.Job) error {
        var args printNameArgs
        if err := json.Unmarshal(j.Args, &args); err != nil {
            return err
        }
        fmt.Printf("Hello %s!\n", args.Name)
        return nil
    }

    pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }

    pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
    if err != nil {
        log.Fatal(err)
    }
    defer pgxPool.Close()

    poolAdapter := pgxv4.NewConnPool(pgxPool)

    gc := gue.NewClient(poolAdapter)
    wm := gue.WorkMap{
        "PrintName": printName,
    }
    // create a pool w/ 2 workers
    workers := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue("name_printer"))

    ctx, shutdown := context.WithCancel(context.Background())

    // work jobs in goroutine
    g, gctx := errgroup.WithContext(ctx)
    g.Go(func() error {
        err := workers.Run(gctx)
        if err != nil {
            // In a real-world applications, use a better way to shut down
            // application on unrecoverable error. E.g. fx.Shutdowner from
            // go.uber.org/fx module.
            log.Fatal(err)
        }
        return err
    })

    args, err := json.Marshal(printNameArgs{Name: "vgarvardt"})
    if err != nil {
        log.Fatal(err)
    }

    j := &gue.Job{
        Type:  "PrintName",
        Args:  args,
    }
    if err := gc.Enqueue(context.Background(), j); err != nil {
        log.Fatal(err)
    }

    j := &gue.Job{
        Type:  "PrintName",
        RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
        Args:  args,
    }
    if err := gc.Enqueue(context.Background(), j); err != nil {
        log.Fatal(err)
    }

    time.Sleep(30 * time.Second) // wait for while

    // send shutdown signal to worker
    shutdown()
    if err := g.Wait(); err != nil {
        log.Fatal(err)
    }
}

PostgreSQL drivers

Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented:

pgx/v4

package main

import (
    "context"
    "log"
    "os"

    "github.com/jackc/pgx/v4/pgxpool"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/pgxv4"
)

func main() {
    pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }

    pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
    if err != nil {
        log.Fatal(err)
    }
    defer pgxPool.Close()

    poolAdapter := pgxv4.NewConnPool(pgxPool)

    gc := gue.NewClient(poolAdapter)
    ...
}

pgx/v3

package main

import (
    "log"
    "os"

    "github.com/jackc/pgx"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/pgxv3"
)

func main() {
    pgxCfg, err := pgx.ParseURI(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }
    pgxPool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
        ConnConfig:   pgxCfg,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer pgxPool.Close()

    poolAdapter := pgxv3.NewConnPool(pgxPool)

    gc := gue.NewClient(poolAdapter)
    ...
}

lib/pq

package main

import (
    "database/sql"
    "log"
    "os"

    _ "github.com/lib/pq" // register postgres driver

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/libpq"
)

func main() {
    db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    poolAdapter := libpq.NewConnPool(db)

    gc := gue.NewClient(poolAdapter)
    ...
}

pg/v10

package main

import (
    "log"
    "os"

    "github.com/go-pg/pg/v10"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/gopgv10"
)

func main() {
    opts, err := pg.ParseURL(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }

    db := pg.Connect(opts)
    defer db.Close()

    poolAdapter := gopgv10.NewConnPool(db)

    gc := gue.NewClient(poolAdapter)
    ...
}

Logging

Package supports several logging libraries using adapter interface internally. Currently, adapters for the following drivers have been implemented:

  • NoOp (adapter.NoOpLogger) - default adapter that does nothing, so it is basically /dev/null logger
  • Stdlib log - adapter that uses log logger for logs output. Instantiate it with adapter.NewStdLogger(...).
  • Uber zap - adapter that uses go.uber.org/zap logger for logs output. Instantiate it with adapter.zap.New(...).

Testing

Linter and tests are running for every Pull Request, but it is possible to run linter and tests locally using docker and make.

Run linter: make link. This command runs liner in docker container with the project source code mounted.

Run tests: make test. This command runs project dependencies in docker containers if they are not started yet and runs go tests with coverage.

Owner
Comments
  • Manually rescheduling a job

    Manually rescheduling a job

    I am trying to mutate the job.RunAt field inside a job to have it try to run itself at a later time. I want to control when to do this myself in a job and not as s policy.

    Currently when I do this it works but i get the error 2022/11/09 11:26:42 Got an error on deleting a job level=error worker-pool-id=8d9cd5 worker-id=8d9cd5/worker-1 job-id=3 job-type=ChallengeFinisher error=conn busy

    I have a wrapper around a job where i check that if this jobs depdendentIds are not finished running it should wait for a given interval and try again. I would very much like to keep the id of this job as it is and not report this as an error.

  • Exampe outbox is not workering

    Exampe outbox is not workering

    Hello,

    I tried the example merged in #98 but i get an error when i run the client task. After starting the client and putting messages in the db i run the task client comamnd

    I get the following error:

     task worker
    task: [deps-up] docker compose up --detach --wait
    [+] Running 2/2
     ⠿ Container outbox-worker-kafka-redpanda-1  Healthy                                                                                                                                      0.5s
     ⠿ Container outbox-worker-kafka-postgres-1  Healthy                                                                                                                                      0.5s
    task: [_start_worker] go run ./... worker
    Error: could not create test topic: dial tcp [::1]:9092: connect: connection refused
    

    Looking in docker logs i see outbox-worker-kafka-postgres-1 | 2022-10-17 12:17:08.136 UTC [76] FATAL: role "test" does not exist

  • panic recovery is broken

    panic recovery is broken

    defer recoverPanic(ctx, ll, j) in worker.go:206 doesn't log the panic and populates lastError with an empty string, even tho it increases the errerCount value

    I'm using the zap and pq adapter and

    I spend some time looking into the issue but I couldn't find the reason

  • Question: Semantics on cancel of workerpool

    Question: Semantics on cancel of workerpool

    I am trying to implement logic where when i cancel the context a workerPool run in the active jobs are allowed to finish, but not accept new jobs. Is this a pattern that gue supports?

    I tried to experiment a little but i did not find an obvious way to do this.

  • Keeping finished jobs in the database

    Keeping finished jobs in the database

    Hi, first of all thanks for this project, it's exactly what I was looking for.

    I'm currently playing around with it and saw that finished jobs are removed afterwards from the table. Have you thought of adding an option to keep finished jobs in the database to get some sort of log?

    robin

  • Allow Client To Pass Context In WorkFunc

    Allow Client To Pass Context In WorkFunc

    Proposal to let clients pass their context to WorkFunc, which can then be used to extract metadata in the WorkFunc implementation. One example is that this would allow to chain middlewares, where the context is used to populate metadata and then is passed through the argument of the function:

    func CtxMiddleware(handler gue.WorkFunc) gue.WorkFunc {
               return func(ctx context.Context, job *gue.Job) (err error) {
    		
    		// put some metadata info to context
    		
                      return handler(ctx, job)
    	}
    }
    
  • Added Run method and deprecated Start

    Added Run method and deprecated Start

    This change deprectates leaky Start method and suggests
    using the new Run method instead. The Run method blocks
    until all goroutines exit.
    
    The main issue with previous implementation is that it’s
    not possible to wait for worker exit. This change also fixes
    time.After leak on context cancellation by explicitly managing
    and reusing time.Timer.
    
    It’s also common to have a callback in Run method to signal
    that the startup has completed, e.g.
    
    	// This example simplifies things a bit but the general
    	// idea is to notify on a channel about successful start
    	// or error. Usually there is also another goroutine that
    	// selects on error channel even after startup to handle
    	// unexpected shutdowns.
    	errc := make(chan error, 1)
    	done := make(chan struct{}, 1)
    	grp.Go(func() error {
    		errc <- w.Run(ctx, func(ctx context.Context) error {
    			done <- struct{}{}
    			<-ctx.Done()
    			return nil
    		})
    		return nil
    	})
    	select {
    	case <-done:
    		fmt.Println("up and running")
    	case <-errc:
    		fmt.Println("failed to start")
    	}
    
    That’s not neccesary with Worker and WorkerPool since all
    interactions are done via Client and any startup error is
    considered unrecoverable. In terms of systemd, the pattern
    above corresponds to “notify” service type while we use
    the “exec” type for workers. And the previous behavior is
    like “forking” type without PID file, ouch.
    
  • Bump go.opentelemetry.io/otel from 1.10.0 to 1.11.0

    Bump go.opentelemetry.io/otel from 1.10.0 to 1.11.0

    Bumps go.opentelemetry.io/otel from 1.10.0 to 1.11.0.

    Changelog

    Sourced from go.opentelemetry.io/otel's changelog.

    [1.11.0/0.32.3] 2022-10-12

    Added

    • Add default User-Agent header to OTLP exporter requests (go.opentelemetry.io/otel/exporters/otlptrace/otlptracegrpc and go.opentelemetry.io/otel/exporters/otlptrace/otlptracehttp). (#3261)

    Changed

    • span.SetStatus has been updated such that calls that lower the status are now no-ops. (#3214)
    • Upgrade golang.org/x/sys/unix from v0.0.0-20210423185535-09eb48e85fd7 to v0.0.0-20220919091848-fb04ddd9f9c8. This addresses GO-2022-0493. (#3235)

    [0.32.2] Metric SDK (Alpha) - 2022-10-11

    Added

    • Added an example of using metric views to customize instruments. (#3177)
    • Add default User-Agent header to OTLP exporter requests (go.opentelemetry.io/otel/exporters/otlpmetric/otlpmetricgrpc and go.opentelemetry.io/otel/exporters/otlpmetric/otlpmetrichttp). (#3261)

    Changed

    • Flush pending measurements with the PeriodicReader in the go.opentelemetry.io/otel/sdk/metric when ForceFlush or Shutdown are called. (#3220)
    • Update histogram default bounds to match the requirements of the latest specification. (#3222)

    Fixed

    • Use default view if instrument does not match any registered view of a reader. (#3224, #3237)
    • Return the same instrument every time a user makes the exact same instrument creation call. (#3229, #3251)
    • Return the existing instrument when a view transforms a creation call to match an existing instrument. (#3240, #3251)
    • Log a warning when a conflicting instrument (e.g. description, unit, data-type) is created instead of returning an error. (#3251)
    • The OpenCensus bridge no longer sends empty batches of metrics. (#3263)

    [0.32.1] Metric SDK (Alpha) - 2022-09-22

    Changed

    • The Prometheus exporter sanitizes OpenTelemetry instrument names when exporting. Invalid characters are replaced with _. (#3212)

    Added

    • The metric portion of the OpenCensus bridge (go.opentelemetry.io/otel/bridge/opencensus) has been reintroduced. (#3192)
    • The OpenCensus bridge example (go.opentelemetry.io/otel/example/opencensus) has been reintroduced. (#3206)

    Fixed

    • Updated go.mods to point to valid versions of the sdk. (#3216)
    • Set the MeterProvider resource on all exported metric data. (#3218)

    [0.32.0] Revised Metric SDK (Alpha) - 2022-09-18

    ... (truncated)

    Commits

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
  • Support other binary encoding for storing the Args

    Support other binary encoding for storing the Args

    Currently the library forces a Job's Args to be encoded as valid JSON by storing it in JSON field in the database table and casting it to json.RawMessage in https://github.com/vgarvardt/gue/blob/667e4cb0c1ec1a08256ee38f67a967beb32f8d5d/client.go#L100

    But in my case I would like to transport the Job args as a Protobuf message (for forward compatibility). The database field is easy to change of course because I run the migration myself. But I currently can't change the json.RawMessage cast.

    It would make this project more flexible if developers could bring their own encoding format. Maybe just as option on the Client with a default set to using JSON?

    let me know if this would be interesting to you an I can probably setup a PR and discuss it a bit further.

  • Worker failed to lock a job

    Worker failed to lock a job

    I encountered a problem, why? Thank you for you answern. Details: [gue-logger.go:26::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Debug()] Feb 23 19:09:36.946 [D] Tried to enqueue a job [gue-logger.go:42::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Error()] Feb 23 19:09:36.946 [E] Worker failed to lock a job ...... [gue-logger.go:42::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Error()] Feb 23 19:09:46.954 [E] Worker failed to lock a job [gue-logger.go:34::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Info()] Feb 23 19:10:26.946 [I] Worker finished [gue-logger.go:34::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Info()] Feb 23 19:10:26.946 [I] Worker pool finished

  • Add LockJobByID function

    Add LockJobByID function

    Proposal to add LockJobById function. At the moment, clients can not remove a specific job from the queue because there is no way to retrieve a specific job. With this new function, clients can now call job.Delete() and job.Done() to remove a retrieved job from the queue.

  • Adding a subQueue or workIdentifier field

    Adding a subQueue or workIdentifier field

    Would it be possible to add a new string field to the job table that specifies text field that identifies that this particular job is of a particular run/batch of said job? The use case here is around me checking for dependent items in queue.

    • I cannot use Type as that is used to map to a function
    • I cannot use Queue as that is used to run seperate workerPools
    • I could use Args and run a Json query but not sure how efficient that is

    if we had a field job_batch_id (tentative name) that could optionally be added to a job checking this would be very easy. I could basically as part of the wrapper I have to check if a job should run check if there are any active jobs with a given subQueue and if it is reschedule the job to run later.

    Also then I could say that if a job Enques other jobs it can use the same subQueue as parent or atleast use that as a prefix.

    I know this is pushing the boundaries of the project a little but it is just a single field added from the perspective of this project.

    Btw do you have a patreon set up here? You are doing amazing work and should be rewarded.

  • example does not work finished_jobs_log missing

    example does not work finished_jobs_log missing

    CREATE TABLE IF NOT EXISTS finished_jobs_log
    (
        job_id      BIGSERIAL   NOT NULL PRIMARY KEY,
        run_at      TIMESTAMPTZ NOT NULL,
        type        TEXT        NOT NULL,
        queue       TEXT        NOT NULL
    );
    
    
  • when using custom schema relation

    when using custom schema relation "gue_jobs" does not exist

    Worker failed to lock a job level=error worker-pool-id=959ce5 worker-id=959ce5/worker-0 error=could not lock a job (rollback result: ): ERROR: relation "gue_jobs" does not exist (SQLSTATE 42P01)

A Multi Consumer per Message Queue with persistence and Queue Stages.
 A Multi Consumer per Message Queue with persistence and Queue Stages.

CrimsonQ A Multi Consumer per Message Queue with persistence and Queue Stages. Under Active Development Crimson Queue allows you to have multiple cons

Jul 30, 2022
redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue redisqueue provides a producer and consumer of a queue that uses Redis streams. Features A Producer struct to make enqueuing messages easy.

Dec 29, 2022
GTA(Go Task Async) is a lightweight reliable asynchronous task and transaction message library for Golang

GTA (Go Task Async) is a lightweight and reliable asynchronous task and transaction message library for by golang.

Jun 4, 2022
Golang Delay Queue

[gdq] Golang Delay Queue GDQ is a library that leverage db or cache to be setup as a delay queue. For current version, Only redis can adapt to this li

Jan 15, 2022
ZenQ - A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer

ZenQ A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer Features Much faster than native channels in both SPSC (single

Jan 8, 2023
A high-level RabbitMQ driver for Golang.

grabbitmq A high-level RabbitMQ driver for Golang. Import in your project: go get github.com/shaswata56/grabbitmq Usage Demo: package main import (

Aug 2, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Dec 30, 2022
RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue

RapidMQ RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue in the Go programming langu

Sep 27, 2022
A single binary, simple, message queue.

MiniQueue A stupid simple, single binary message queue using HTTP/2. Most messaging workloads don't require enormous amounts of data, endless features

Nov 9, 2022
dque is a fast, embedded, durable queue for Go

dque - a fast embedded durable queue for Go dque is: persistent -- survives program restarts scalable -- not limited by your RAM, but by your disk spa

Jan 8, 2023
Queue with NATS Jetstream to remove all the erlangs from cloud

Saf in Persian means Queue. One of the problems, that we face on projects with queues is deploying RabbitMQ on the cloud which brings us many challenges for CPU load, etc. I want to see how NATS with Jetstream can work as the queue to replace RabbitMQ.

Dec 15, 2022
A fast durable queue for Go

pqueue - a fast durable queue for Go pqueue is thread-safety, serves environments where more durability is required (e.g., outages last longer than me

Oct 16, 2022
Redis as backend for Queue Package
Redis as backend for Queue Package

redis Redis as backend for Queue package Setup start the redis server redis-server start the redis cluster, see the config # server 01 mkdir server01

Oct 16, 2022
NSQ as backend for Queue Package
NSQ as backend for Queue Package

NSQ as backend for Queue Package

Jul 4, 2022
Kudruk helps you to create queue channels and manage them gracefully.

kudruk Channels are widely used as queues. kudruk (means queue in Turkish) helps you to easily create queue with channel and manage the data in the qu

Feb 21, 2022
Chanman helps you to create queue channels and manage them gracefully.

chanman Channels are widely used as queues. chanman (Channel Manager) helps you to easily create queue with channel and manage the data in the queue.

Oct 16, 2021
A simple persistent directory-backed FIFO queue.

pqueue pqueue is a simple persistent directory-backed FIFO queue. It provides the typical queue interface Enqueue and Dequeue and may store any byte s

Dec 12, 2022
Persistent queue in Go based on BBolt

Persistent queue Persistent queue based on bbolt DB. Supposed to be used as embeddable persistent queue to any Go application. Features: messages are

Jun 30, 2022
A lightweight, distributed and reliable message queue based on Redis

nmq A lightweight, distributed and reliable message queue based on Redis Get Started Download go get github.com/inuggets/nmq Usage import "github.com

Nov 22, 2021