pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Psub

helper for create system using kafka to streaming and events driven base.

Install

 go get github.com/teng231/psub

have 3 env variables for config:

  • KAFKA_NUM_PARTITION: config number of partition( default: 3)
  • KAFKA_REPLICATION_FACTOR: config replication factor.(default: -1)
  • KAFKA_VERSION: for set kafka version for sarama config

you can overide 3 variables for cluster configs

Usage

Library have 2 modules publisher and consumer, or use can use both. For example to setup:

  • Both setup
func NewPubSub() error {
	ps := &psub.PubSub{}
	brokers := strings.Split("localhost:9002,localhost:9003", ",")
	ps.InitPublisher(brokers...)
	err := ps.InitConsumerGroup(cf.PubsubSubscription, brokers...)
	if err != nil {
		return err
	}
	return nil
}
  • Publisher setup
func NewPubSub() error {
	ps := &psub.PubSub{}
	brokers := strings.Split("localhost:9002,localhost:9003", ",")
	ps.InitPublisher(brokers...)
	if err != nil {
		return err
	}
	return nil
}


// using for publish event
func SendMessage(msg string) {
    sms := map[string]interface{}{
		Body: "hello everyone",
        App:  "company.vn",
		To:   []string{"0374140511"},
	}
    if err := ps.Publish("topic_send_example", sms); err != nil {
		log.Print(err)
	}
}
  • Consumer setup
func main() {

    // if this task running will block main thread
    onStreamRunning(...)

}


func NewPubSub() error {
	ps := &psub.PubSub{}
	brokers := strings.Split("localhost:9002,localhost:9003", ",")
	err := ps.InitConsumerGroup(cf.PubsubSubscription, brokers...)
	if err != nil {
		return err
	}
	return nil
}

// using for publish event
func SendMessage() error {
    sms := map[string]interface{}{
		Body: "hello everyone",
        App:  "company.vn",
		To:   []string{"0374140511"},
	}
    if err := ps.Publish("topic_send_example", sms); err != nil {
		log.Print(err)
        return err
	}
    return nil
}



func onListenUserInsert(payload []byte, h *Account, fnCommit func()) {
	defer fnCommit()
	user := &User{}
    onCreateUser(user)
    ...
}

type MsgHandler struct {
	f          func([]byte, *Account, func())
	AutoCommit bool
}

var (
	backend_user_insert_user = "backend_user_insert_user"
    numberOfConcurrents = 1
	subsList                   = map[string]*MsgHandler{
		backend_user_insert_user:                   {onListenUserInsert, false},
	}
)

func onStreamRunning(ps *pubsub.PubSub, h *Account) {
	log.Print("Stream start")
	msgBuf := make(chan pubsub.Message, 1000)


	topics := make([]pubsub.Topic, 0)
	for topic, handler := range subsList {
		topics = append(topics, pubsub.Topic{Name: topic, AutoCommit: handler.AutoCommit})
	}

	go func() {
		for {
			psMessage := <-msgBuf
			msghandler := subsList[psMessage.Topic]
			if msghandler.f != nil {
				msghandler.f(psMessage.Body, h, psMessage.Commit)
			}
		}
	}()
	err := ps.OnAsyncSubscribe(topics, numberOfConcurrents, msgBuf)
	if err != nil {
		log.Print(err)
	}
}
Owner
Te Nguyen
A little bits for thinking yourself
Te Nguyen
Similar Resources

Govent is an event bus framework for DDD event source implement

Govent is an event bus framework for DDD event source implement. Govent can also solve the package circular dependency problem.

Jan 28, 2022

Event-planning-go - GRAPHQL Project for Event Planning

About The Project GRAPHQL Project for Event Planning Building the project with l

Mar 13, 2022

A simple pubsub package for go.

Package pubsub implements a simple multi-topic pub-sub library. Install pubsub with, go get github.com/cskr/pubsub This repository is a go module and

Dec 31, 2022

A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

Oct 27, 2021

Producer x Consumer example using Kafka library and Go.

Go - Kafka - Example Apache Kafka Commands First of all, run the docker-compose docker-compose up, than run docker exec -it kafka_kafka_1 bash Topics

Dec 8, 2021

Experiments using Go 1.18beta1's Generic typings and the Segmentio kafka-go consumer client

Experiments using Go 1.18beta1's Generic typings and the Segmentio kafka-go consumer client

Jan 28, 2022

Enterprise Network Flow Collector (IPFIX, sFlow, Netflow)

 Enterprise Network Flow Collector (IPFIX, sFlow, Netflow)

High-performance, scalable and reliable IPFIX, sFlow and Netflow collector (written in pure Golang). Features IPFIX RFC7011 collector sFLow v5 raw hea

Dec 27, 2022

Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Dec 17, 2021

🚀 Golang, Go Fiber, RabbitMQ, MongoDB, Docker, Kubernetes, GitHub Actions and Digital Ocean

🚀 Golang, Go Fiber, RabbitMQ, MongoDB, Docker, Kubernetes, GitHub Actions and Digital Ocean

Bookings Solução de cadastro de usuários e reservas. Tecnologias Utilizadas Golang MongoDB RabbitMQ Github Actions Docker Hub Docker Kubernetes Digita

Feb 18, 2022
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Dec 28, 2022
Example Golang Event-Driven with kafka Microservices Choreography

Microservices Choreography A demonstration for event sourcing using Go and Kafka example Microservices Choreography. To run this project: Install Go I

Dec 2, 2021
Build event-driven and event streaming applications with ease

Commander ?? Commander is Go library for writing event-driven applications. Enabling event sourcing, RPC over messages, SAGA's, bidirectional streamin

Dec 19, 2022
Easy to use distributed event bus similar to Kafka
Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Dec 30, 2022
Higher level abstraction for Sarama.
Higher level abstraction for Sarama.

kafka-do v0.1.5 kafka-do What Higher level abstraction for Sarama. Why We want to be able to write our kafka applications without making the same thin

Sep 30, 2021
Go library to build event driven applications easily using AMQP as a backend.

event Go library to build event driven applications easily using AMQP as a backend. Publisher package main import ( "github.com/creekorful/event" "

Dec 5, 2021
provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka provider-kafka is a Crossplane Provider that is used to manage Kafka resources. Usage Create a provider secret containing a json like t

Oct 29, 2022
A CLI tool for interacting with Kafka through the Confluent Kafka Rest Proxy

kafkactl Table of contents kafkactl Table of contents Overview Build Development Overview kafkactl is a CLI tool to interact with Kafka through the Co

Nov 1, 2021
Basic Event Streaming - Fundamentals of Kafka Studies (BESt-FunKS)

Apache Kafka My study repo for Apache Kafka. Based on this tutorial. Contents Overview Key Terms Event Topic Producer Consumer Partition Getting Start

Mar 2, 2022
POC of an event-driven Go implementation

Event Driven example in Golang This POC shows an example of event-driven architecture with a working domain event broker, an event producer and a cons

Nov 2, 2021