A simple pubsub package for go.

PkgGoDev

Package pubsub implements a simple multi-topic pub-sub library.

Install pubsub with,

go get github.com/cskr/pubsub

This repository is a go module and contains tagged releases. Please pin a version for production use.

Use of this module is governed by a BSD-style license that can be found in the LICENSE file.

Comments
  • Panic when an channel is closed but expected to receive a message

    Panic when an channel is closed but expected to receive a message

    Hi,

    We apopted you tool a while ago to use as a central cornerstone in out internal event messaging system.

    We work with a lot of potentially volatile channels (~600-1000 real time connections), resulting an the luckily rare error that an channel has been closed and removed before a message may reach it. This causes a system wide panic even though it could be easily ignored and discarded.

    Nevertheless, this is a highly critical problem for us!

    Therefore, I wanted to ask you whether you can add additional error handling or a discard recovery into the publish methods

    I can provide a simple fix it you like.

  • Unsub can cause deadlock

    Unsub can cause deadlock

    Hey,

    I have encountered a deadlock with the library.

    Basically when a goroutine that was previously subbed stops listening (in our case when a client disconnects) and calls a defered function to unsub all. But between the time that it stops reading the channel and when it calls unsub something publish's to a channel its subscribed to (this has to publish more messages than the pubsub chan size).

    The steps would be (where A and B are different goroutines and R is the registry goroutine):

    A: x := ps.sub("1") // Which successfully registers
    B: ps.pub("testing", "1") // This writes down registry's cmd chan
    B: ps.pub("testing", "1") // This writes down registry's cmd chan - assuming buffer of 1
    A: unsub(x, "1") // or unsub(x) - which writes unsub down cmd chan
    R: reads the pub event
    R: send("1", x) // which blocks on ch <- msg
    B: ps.pub("testing1", "1") // next event to route
    

    And we are all asleep: A: waits for the registry to read its unsub op B: waits for the registry to accept its pub op R: waits for A to accept its pub.

  • Sending nil doesn't seem to work.

    Sending nil doesn't seem to work.

    I need a way to notify the subscribed clients that the channel is closed, but sending nil doesn't work, for some reason. I'm sending []byte at the moment, so I'm not sure how to even work around this.

  • Getting nil when SubOnce between time.sleep

    Getting nil when SubOnce between time.sleep

    I'm running a go routine which is continiously publishing structs. On the other hand I have a go routine which is SubOnce every couple of seconds. What I notice is that when I add a sleep, the subonce start to returning nil values.

    Any ideas why?

       for {
    	rgb := getImage(ps, dec) // in this function I do the SubOnce
    	..
    	matArray[2] = &rgb // this rgb is nil
    	.. 
    	time.Sleep(10 * time.Second)
    }
    
  • AddSub + Unsub closes my channels

    AddSub + Unsub closes my channels

    I'm using a pattern where I have a communication channel that I use for a goroutine, and that channel is used to subscribe to a topic:

    comms := make(chan interface{})
    go func() {
      for msg := range comms {
        // do some stuff
      }
    }()
    
    mySub.AddSub(comms, "some-topic")
    

    The problem is now and then I change the topic that I'm interested in (in my code, it's when users move from one room to another room):

    mySub.Unsub(comms, "some-topic")
    mySub.AddSub(comms, "some-other-topic")
    

    Unsub will close my channel if "some-topic" is the only topic that I'm listening to. Is there a way so that when unsubscribing, the pubsub lib doesn't automatically close my channel? I can't reverse the order of the Unsub/AddSub since in reality the code is a bit more asynchronous than the example above.

    The workaround for now is to create a dummy topic that all channels are subscribed to but never actually broadcasts anything.

  • Avoid duplication of empty topic maps, separate data into contexts

    Avoid duplication of empty topic maps, separate data into contexts

    Hi,

    I'm new to go and just found your extremely useful pubsub go-code and took the opportunity to improve a bit on it. As I understand it, all goroutine are closures, meaning they duplicate all data they need to run. Thus if you share your PubSub struct with many other goroutines, each one will have the same pointers to the channels but also a separate empty copy of the two topic maps.

    I though this could be avoided by (also) cleanly separating the topic maps (only needed in context of the start goroutine) from the chans (needed everywhere).

    In theory the topic maps could now be created on the stack of start instead of the heap with "new" and thus the cleanup up for-loop at the end of start could be removed. However I left it in there, since is also explicitly closes the chans, which otherwise might (?) not happen if other threads still have a pointer to those chans.

    kind regards, btittelbach

  • Add a simple example

    Add a simple example

    This lib looks promising but I am having a hard time picturing the client side flow. Could you add a simple example to make it clear what the user needs to handle?

  • SubOnceEach

    SubOnceEach

    This patch adds a new public method, SubOnceEach, which returns a channel on which callers receive, at most, one message for each topic. Utilized in go-ipfs.

  • Added SubUpdate function

    Added SubUpdate function

    Added a function called SubUpdate so you can add subscriptions to an existing channel.

    Also added a function to unsubscribe from all topics given a channel

    Please let me know if you have any issues with it and ill update it

  • Unsubbing on a closed topic seems to crash?

    Unsubbing on a closed topic seems to crash?

    I'm sorry if this is wrong, but I have my subscribed goroutines to defer Unsub(). However, I Close()d the topic and the Unsub() seems to fail.

    I can't parse Go tracebacks well yet, but I think this is relevant:

    panic: runtime error: deletion of entry in nil map
    
    goroutine 3 [running]:
    github.com/tuxychandru/pubsub.(*PubSub).remove(0xf840001870, 0xf8400408a0, 0x9, 0xf8400bb000, 0xf8400406f0, ...)
        src/github.com/tuxychandru/pubsub/pubsub.go:172 +0x66
    github.com/tuxychandru/pubsub.(*PubSub).start(0xf840001870, 0x0)
        src/github.com/tuxychandru/pubsub/pubsub.go:125 +0x24d
    created by github.com/tuxychandru/pubsub.New
        src/github.com/tuxychandru/pubsub/pubsub.go:56 +0x16b
    
  • Is topics thread safe?

    Is topics thread safe?

    Your code is very easy to understand, however is topics map here https://github.com/tuxychandru/pubsub/blob/master/pubsub.go#L41 thread safe?

    I might be wrong because you only start one gorouting per pubsub object and then inside of start() https://github.com/tuxychandru/pubsub/blob/master/pubsub.go#L111 you just go into a selective receive for commands to modify the topics map...

Simple synchronous event pub-sub package for Golang

event-go Simple synchronous event pub-sub package for Golang This is a Go language package for publishing/subscribing domain events. This is useful to

Jun 16, 2022
Package notify provides an implementation of the Gnome DBus Notifications Specification.

go-notify Package notify provides an implementation of the Gnome DBus Notifications Specification. Examples Display a simple notification. ntf := noti

Dec 27, 2022
The official Go package for NSQ

go-nsq The official Go package for NSQ. Docs See godoc and the main repo apps directory for examples of clients built using this package. Tests Tests

Jan 5, 2023
Scalable package delivery logistics simulator built using SingleStore and Vectorized Redpanda
Scalable package delivery logistics simulator built using SingleStore and Vectorized Redpanda

Reference Architecture using SingleStore and Redpanda for global logistics ?? INFO: For the story behind this code (and epic dashboards), check out th

Oct 29, 2022
An opinionated package that helps you print user-friendly output messages from your Go command line applications.

github.com/eth-p/clout (Command Line Output) clout is a package that helps you print user-friendly output messages from your Go command line applicati

Jan 15, 2022
Redis as backend for Queue Package
Redis as backend for Queue Package

redis Redis as backend for Queue package Setup start the redis server redis-server start the redis cluster, see the config # server 01 mkdir server01

Oct 16, 2022
NSQ as backend for Queue Package
NSQ as backend for Queue Package

NSQ as backend for Queue Package

Jul 4, 2022
Package htmltopdf implements wkhtmltopdf Go bindings.

htmltopdf Package htmltopdf implements wkhtmltopdf Go bindings. It can be used to convert HTML documents to PDF files. The package does not use the wk

Sep 19, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Dec 30, 2022
Abstraction layer for simple rabbitMQ connection, messaging and administration
Abstraction layer for simple rabbitMQ connection, messaging and administration

Jazz Abstraction layer for quick and simple rabbitMQ connection, messaging and administration. Inspired by Jazz Jackrabbit and his eternal hatred towa

Dec 12, 2022
Go simple async message bus
Go simple async message bus

?? message-bus Go simple async message bus. ?? ABOUT Contributors: RafaƂ Lorenz Want to contribute ? Feel free to send pull requests! Have problems, b

Dec 29, 2022
A single binary, simple, message queue.

MiniQueue A stupid simple, single binary message queue using HTTP/2. Most messaging workloads don't require enormous amounts of data, endless features

Nov 9, 2022
A dead simple Go library for sending notifications to various messaging services.
A dead simple Go library for sending notifications to various messaging services.

A dead simple Go library for sending notifications to various messaging services. About Notify arose from my own need for one of my api server running

Jan 7, 2023
Simple, high-performance event streaming broker

Styx Styx is a simple and high-performance event streaming broker. It aims to provide teams of all sizes with a simple to operate, disk-persisted publ

Nov 24, 2022
Inspr is an application mesh for simple, fast and secure development of distributed applications.
Inspr is an application mesh for simple, fast and secure development of distributed applications.

Inspr is an engine for running distributed applications, using multiple communication patterns such as pub sub and more, focused on type consistency a

Jun 10, 2022
Chanify is a safe and simple notification tools. This repository is command line tools for Chanify.

Chanify is a safe and simple notification tools. For developers, system administrators, and everyone can push notifications with API.

Dec 29, 2022
A simple persistent directory-backed FIFO queue.

pqueue pqueue is a simple persistent directory-backed FIFO queue. It provides the typical queue interface Enqueue and Dequeue and may store any byte s

Dec 12, 2022
ntfy is a super simple pub-sub notification service. It allows you to send desktop notifications via scripts.

ntfy ntfy (pronounce: notify) is a super simple pub-sub notification service. It allows you to send desktop and (soon) phone notifications via scripts

Jan 9, 2023
ChizBroker is a fast and simple GRPC based implementation of kafka.
ChizBroker is a fast and simple GRPC based implementation of kafka.

Chiz Broker: a broker for fun ChizBroker is a fast and simple GRPC based implementation of kafka. Features: Ready to be deployed on kubernetes Prometh

Oct 30, 2022