kafka watcher for casbin library

Casbin Kafka Watcher

Casbin watcher for kafka

This watcher library will enable users to dynamically change casbin policies through kakfa messages

Influenced by https://github.com/rusenask/casbin-go-cloud-watcher

Installation

go get -u github.com/wgarunap/casbin-kafka-watcher

Architecture

Casbin kafka watcher architecture diagram and message structure

Usage

Configuration can be passed from the application without any strict env variable. To Initialize the watcher, user need to parse kafka brokers array , topic name and callback function.

On each restart all the messages in the kafka topic will be synced to application policy and latest state will build on the application.

User can keep the kafka message key,value and headers with preferred encoding such as []byte, string, json ,avro, protobuf or etc.

Message decoding logic and loading the policy into enforcer can be done inside the callback function. The logic has to write manually by the user.

Example:

Sample Kafka Messages to Produce

Message1
    Key:policy1
    Value:p, alice, data1, read
    
Message2
    Key:role1
    Value:g,alice,admin

Message3
    Key:policy2
    Value:p, admin, data2, read

Sample Kafka Messages to Delete Policies - Produce Null values for the same policy key

Message4
    Key:policy1
    Value:
Message4
    Key:role1
    Value:

Code:

package main

import (
	"context"
	"fmt"
	"github.com/casbin/casbin/v2"
	"github.com/casbin/casbin/v2/model"
	"github.com/tryfix/kstream/data"
	"github.com/tryfix/log"
	"github.com/wgarunap/casbin-kafka-watcher"
	"github.com/wgarunap/casbin-kafka-watcher/adaptor"
	"strings"
	"time"
)

var modelStr = `
[request_definition]
r = sub, obj, act

[policy_definition]
p = sub, obj, act

[role_definition]
g = _, _

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act
`

func main() {
	ctx := context.Background()
	brokers := []string{"localhost:9092"}
	topic := `casbin-policy-updates`
	log.StdLogger = log.StdLogger.NewLog(log.WithLevel(log.DEBUG), log.WithColors(false))

	enforcer, err := casbin.NewSyncedEnforcer()
	if err != nil {
		log.Fatal(err)
	}

	//// StartAutoLoadPolicy load policies from adaptor in the given interval
	//enforcer.StartAutoLoadPolicy(10 * time.Second)

	mdl, err := model.NewModelFromString(modelStr)
	if err != nil {
		log.Fatal(err)
	}

	memAdaptor := adaptor.NewAdapter()

	err = enforcer.InitWithModelAndAdapter(mdl, memAdaptor)
	if err != nil {
		log.Fatal(err)
	}

	cfg := watcher.NewConfig(topic, brokers,
		func(record *data.Record) error {
			// Here you will receive kakfa message data
			fmt.Println(record.Offset, `key:`, string(record.Key), `value:`, string(record.Value), `length:`, len(record.Value))

			if len(record.Value) == 0 {
				// enforcer.RemovePolicy() cannot be used since it only allows p type policies
				_ = memAdaptor.RemovePolicy(string(record.Key), "", nil)
			} else {
				policies := strings.Split(string(record.Value), ",")
				if len(policies) > 2 {
					// enforcer.AddPolicy()  cannot be used since it only allows p type policies
					_ = memAdaptor.AddPolicy(string(record.Key), policies[0], policies[1:])
				} else {
					return fmt.Errorf(`policy length should be greater than 2`)
				}
			}

			// this will load policies immediately after it receive new change
			err := enforcer.LoadPolicy()
			if err != nil {
				return err
			}

			return nil
		})
	cfg.SetLogger(
		log.NewLog(
			log.FileDepth(2),
			log.WithLevel(log.INFO),
			log.WithColors(false),
			log.Prefixed(`casbin-consumer`),
		).Log(),
	)

	_, err = watcher.New(ctx, cfg)
	if err != nil {
		log.Fatal(err)
	}

	enforcer.EnableLog(true)

	fmt.Println(`starting debug loop`)
	for {
		select {
		case <-time.NewTicker(3 * time.Second).C:
			sub, obj, act := `alice`, `data2`, `read`
			if ok, err := enforcer.Enforce(sub, obj, act); ok {
				log.Info(`if block inside`)
			} else {
				if err != nil {
					log.Error(err.Error())
				}
				log.Info(`else block inside`)
			}
		}
	}
}

Suggested Kafka Topic Configuration

Compact the data for a same key and keep the latest message, enabling log compaction. This config will keep the topic data without deleting.

log.cleanup.policy = compact

If you are sending whole casbin policy in a single kafka message, remember to set the max.message.bytes config in Kafka. It's default value is 1MB

Keep the kafka topic partition count as 1. Otherwise, parallel updates through partitions will result in incorrect final policy state. While making this library It is assumed there is only 1 partition for the policy topic. It will only consume from 0th partition.

If you are running kafka as a cluster keep the replica count as 3 or more.

Contact

Aruna Prabhashwara
[email protected]

Owner
Aruna Prabashwara
Go Go Go...
Aruna Prabashwara
Similar Resources

Easy to use distributed event bus similar to Kafka

Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Dec 30, 2022

Kafka implemented in Golang with built-in coordination (No ZooKeeper, single binary install, Cloud Native)

Jocko Distributed commit log service in Go that is wire compatible with Kafka. Created by @travisjeffery, continued by nash. Goals: Protocol compatibl

Aug 9, 2021

CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Nov 13, 2022

franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Dec 29, 2022

Go gRPC Kafka CQRS microservices with tracing

Golang CQRS Kafka gRPC Postgresql MongoDB Redis microservices example 👋 👨‍💻 Full list what has been used: Kafka as messages broker gRPC Go implemen

Jan 1, 2023

pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Psub helper for create system using kafka to streaming and events driven base. Install go get github.com/teng231/psub have 3 env variables for config

Sep 26, 2022

Kafka producer and consumer tool in protobuf format.

protokaf Kafka producer and consumer tool in protobuf format. Features Consume and produce messages using Protobuf protocol Trace messages with Jaeger

Nov 15, 2022

Kafka tool to emit tombstones for messages based on header value matches

Langolier Langolier is a CLI tool to consume a Kafka topic and emit tombstones for messages matched by header name/value pairs. Usage Usage of langoli

Sep 22, 2021

Fixed column file to avro/kafka

shredder shredds Fixed column file to avro/kafka . Implementation uses Avro schema and multicore Speed around 220mb/sec per Core using 4 core on a 1Gb

Dec 20, 2021
A CLI tool for interacting with Kafka through the Confluent Kafka Rest Proxy

kafkactl Table of contents kafkactl Table of contents Overview Build Development Overview kafkactl is a CLI tool to interact with Kafka through the Co

Nov 1, 2021
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Jan 1, 2023
franz-go contains a high performance, pure Go library for interacting with Kafka from 0.8.0 through 2.7.0+. Producing, consuming, transacting, administrating, etc.

franz-go - Apache Kafka client written in Go Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every

Dec 29, 2022
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Dec 28, 2022
Producer x Consumer example using Kafka library and Go.

Go - Kafka - Example Apache Kafka Commands First of all, run the docker-compose docker-compose up, than run docker exec -it kafka_kafka_1 bash Topics

Dec 8, 2021
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Dec 30, 2022
Implementation of the NELI leader election protocol for Go and Kafka
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Dec 8, 2022
ChanBroker, a Broker for goroutine, is simliar to kafka

Introduction chanbroker, a Broker for goroutine, is simliar to kafka In chanbroker has three types of goroutine: Producer Consumer(Subscriber) Broker

Aug 12, 2021
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

Jan 3, 2023
Modern CLI for Apache Kafka, written in Go.
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Dec 31, 2022