A tiny wrapper around NSQ topic and channel :rocket:

Event Bus NSQ

  • A tiny wrapper around go-nsq topic and channel.
  • Protect nsq calls with gobreaker.

Installation

go get -u github.com/rafaeljesus/nsq-event-bus

Usage

The nsq-event-bus package exposes a interface for emitting and listening events.

Emitter

import "github.com/rafaeljesus/nsq-event-bus"

topic := "events"
emitter, err := bus.NewEmitter(bus.EmitterConfig{
  Address: "localhost:4150",
  MaxInFlight: 25,
})

e := event{}
if err = emitter.Emit(topic, &e); err != nil {
  // handle failure to emit message
}

// emitting messages on a async fashion
if err = emitter.EmitAsync(topic, &e); err != nil {
  // handle failure to emit message
}

Listener

import "github.com/rafaeljesus/nsq-event-bus"

if err = bus.On(bus.ListenerConfig{
  Topic:              "topic",
  Channel:            "test_on",
  HandlerFunc:        handler,
  HandlerConcurrency: 4,
}); err != nil {
  // handle failure to listen a message
}

func handler(message *Message) (reply interface{}, err error) {
  e := event{}
  if err = message.DecodePayload(&e); err != nil {
    message.Finish()
    return
  }

  if message.Attempts > MAX_DELIVERY_ATTEMPTS {
    message.Finish()
    return
  }

  err, _ = doWork(&e)
  if err != nil {
    message.Requeue(BACKOFF_TIME)
    return
  }

  message.Finish()
  return
}

Request (Request/Reply)

import "github.com/rafaeljesus/nsq-event-bus"

topic := "user_signup"
emitter, err = bus.NewEmitter(bus.EmitterConfig{})

e := event{Login: "rafa", Password: "ilhabela_is_the_place"}
if err = bus.Request(topic, &e, handler); err != nil {
  // handle failure to listen a message
}

func handler(message *Message) (reply interface{}, err error) {
  e := event{}
  if err = message.DecodePayload(&e); err != nil {
    message.Finish()
    return
  }

  reply = &Reply{}
  message.Finish()
  return
}

Contributing

  • Fork it
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create new Pull Request

Badges

Build Status Go Report Card Go Doc


GitHub @rafaeljesus  ·  Medium @_jesus_rafael  ·  Twitter @_jesus_rafael

Owner
Rafael Jesus
fork() -> exec(...)
Rafael Jesus
Comments
  • Fix default value from EmitterConfig Address

    Fix default value from EmitterConfig Address

    Fixed default value to Producer's address.

    # before changes
    nsq-event-bus (fix-conf-address)$ go test -run TestNewEmitter
    --- FAIL: TestNewEmitter (0.00s)
    	emitter_test.go:19: Expected emitter address "new-address", got "" 
    FAIL
    exit status 1
    FAIL	github.com/rafaeljesus/nsq-event-bus	0.003s
    
    # after changes
    nsq-event-bus (fix-conf-address)$ go test -run TestNewEmitter
    PASS
    ok  	github.com/rafaeljesus/nsq-event-bus	0.004s
    
  • Change handlerfunc signature

    Change handlerfunc signature

    This PR exposes the bus.Message which wraps nsq.Message allowing developers to handle messages pretty like the nsq.Message

    closes https://github.com/rafaeljesus/nsq-event-bus/issues/7

  • Allows emitter and listener to be configurable

    Allows emitter and listener to be configurable

    Allows emitter and listener to be configurable. To achieve this both event bus methods must be totally separated. This PR separates them having each one with it's own configuration.

    • [x] Expose Emitter with Emit methods
    • [x] Expose Emitter with Request methods
    • [x] Expose Listener with On method

    closes https://github.com/rafaeljesus/nsq-event-bus/issues/3

  • go get URL wrong on README

    go get URL wrong on README

    When using the example from README to get the package, we get this error:

    go get -u https://github.com/rafaeljesus/nsq-event-bus
    package https:/github.com/rafaeljesus/nsq-event-bus: "https://" not allowed in import path
    

    There is no need for the https in the go get... ;)

  • nsq on client side (consumer)

    nsq on client side (consumer)

    I created a route for http request that handles all incoming requests then i push them on event bus by bus.NewEmitter() method, then return status code 200 to user. on the other side (consumer) what is the best approach to handle event bus ? waitGroup with large number like wg.Add(100000) or infinite loop ? or .. ?

    can you help me with example ...

  • Make message a nsq.message struct

    Make message a nsq.message struct

    In order to keep exposing the most important features from NSQ and give all flexibility the official driver does for developers, the event bus pkg should expose an bus.Message wrapping nsq.Message allowing developers to handle messages pretty like the same when with nsq.Message

  • Unable to receive messages, but nsq_tail works

    Unable to receive messages, but nsq_tail works

    I've set up a simple test of the event-bus library. Its a simple Listener for messages on a channel called SE on a Topic called hypatia-inbound. I use the following to_nsq command line to send messages

    echo "JOHN,CHERIE" | to_nsq -delimiter="," -topic="hypatia-inbound" -nsqd-tcp-address="127.0.0.1:4150" -rate=2
    

    When I run tail_nsq, I receive the messages. However, from a small utility built with this code

    type event struct{ Name string }
    
    var wg sync.WaitGroup
    
    func main() {
    
    	wg.Add(1)
    	if err := bus.On(bus.ListenerConfig{
    		Topic:               "hypatia-inbound",
    		Channel:             "se",
    		HandlerFunc:         recv_handler,
    		Lookup:              []string{"127.0.0.1:4161"},
    		HandlerConcurrency:  4,
    		DialTimeout:         time.Second * 5,
    		ReadTimeout:         time.Second * 60,
    		WriteTimeout:        time.Second * 1,
    		LookupdPollInterval: time.Second * 60,
    		LookupdPollJitter:   0.3,
    		MaxRequeueDelay:     time.Second * 5,
    		DefaultRequeueDelay: time.Second * 5,
    	}); err != nil {
    		fmt.Println("Error in main. err ", err)
    		fmt.Println("---------------------------------------")
    	}
    	wg.Wait()
    }
    
    func recv_handler(message *bus.Message) (reply interface{}, err error) {
    	e := event{}
    
    	fmt.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
    	fmt.Println(message)
    	fmt.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
    
    	if err = message.DecodePayload(&e); err != nil {
    		message.Finish()
    		wg.Done()
    		fmt.Println("Error decoding message ", err)
    		return
    	}
    
    	fmt.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
    	fmt.Println(e)
    	fmt.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
    
    	message.Finish()
    	wg.Done()
    
    	return
    }
    

    I receive the error shown below. My handler is never called. Have I mis-configured this?

    017/07/12 20:30:43 ERR    1 [hypatia-inbound/se] Handler returned error (invalid character 'C' looking for beginning of value) for msg 083dce4335207000
    
The official Go package for NSQ

go-nsq The official Go package for NSQ. Docs See godoc and the main repo apps directory for examples of clients built using this package. Tests Tests

Jan 5, 2023
NSQ as backend for Queue Package
NSQ as backend for Queue Package

NSQ as backend for Queue Package

Jul 4, 2022
Topictool - Batch replace, add or remove Github repository topic labels

Topictool CLI Tool to manage topic labels on Github repositories Installation go

Feb 3, 2022
A tiny wrapper over amqp exchanges and queues 🚌 ✨

Rabbus ?? ✨ A tiny wrapper over amqp exchanges and queues. In memory retries with exponential backoff for sending messages. Protect producer calls wit

Dec 18, 2022
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

Oct 20, 2022
A library for scheduling when to dispatch a message to a channel

gosd go-schedulable-dispatcher (gosd), is a library for scheduling when to dispatch a message to a channel. Implementation The implementation provides

Sep 27, 2022
replicate messages from streaming channel to jetstream

NATS Streaming/Jetstream Replicator [SJR] Introduction This project replicates messages from streaming channels to jetstream. but why? At Snapp when w

Dec 15, 2022
A wrapper of streadway/amqp that provides reconnection logic and sane defaults

go-rabbitmq Wrapper of streadway/amqp that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐ Supporte

Dec 28, 2022
goczmq is a golang wrapper for CZMQ.

goczmq Introduction A golang interface to the CZMQ v4.2 API. Install Dependencies libsodium libzmq czmq For CZMQ master go get github.com/zeromq/goczm

Jan 5, 2023
Golang API wrapper for MangaDex v5's MVP API.

mangodex Golang API wrapper for MangaDex v5's MVP API. Full documentation is found here. This API is still in Open Beta, so testing may not be complet

Oct 23, 2022
Golang AMQP wrapper for RabbitMQ with better API

go-rabbitmq Golang AMQP wrapper for RabbitMQ with better API Table of Contents Background Features Usage Installation Connect to RabbitMQ Declare Queu

Dec 21, 2022
High abstraction wrapper for Golang Rabbit MQ Client

GRMQ Go Rabbit MQ What are the typical use-cases of RabbitMQ broker ? We create

Nov 3, 2022
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and Bitbucket Server.
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and  Bitbucket Server.

Rabbit A lightweight service that will build and store your go projects binaries. Rabbit is a lightweight service that will build and store your go pr

Nov 19, 2022
Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.

Cadence Visit cadenceworkflow.io to learn about Cadence. This repo contains the source code of the Cadence server. To implement workflows, activities

Jan 9, 2023
May 11, 2023
⚡ HTTP/2 Apple Push Notification Service (APNs) push provider for Go — Send push notifications to iOS, tvOS, Safari and OSX apps, using the APNs HTTP/2 protocol.

APNS/2 APNS/2 is a go package designed for simple, flexible and fast Apple Push Notifications on iOS, OSX and Safari using the new HTTP/2 Push provide

Jan 1, 2023
Asynq: simple, reliable, and efficient distributed task queue in Go
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Dec 30, 2022
💨 A real time messaging system to build a scalable in-app notifications, multiplayer games, chat apps in web and mobile apps.
💨 A real time messaging system to build a scalable in-app notifications, multiplayer games, chat apps in web and mobile apps.

Beaver A Real Time Messaging Server. Beaver is a real-time messaging server. With beaver you can easily build scalable in-app notifications, realtime

Jan 1, 2023
Build event-driven and event streaming applications with ease

Commander ?? Commander is Go library for writing event-driven applications. Enabling event sourcing, RPC over messages, SAGA's, bidirectional streamin

Dec 19, 2022