An embeddable lightweight Go/Golang MQTT broker(server) for IoT.

Snple MQTT logo

PkgGoDev

Snple MQTT

简体中文

Note: The API of this library is still unstable and has not been sufficiently tested, please do not use it in production environments.

Features

  • MQTT 3.1.1 compatible.
  • Full MQTT Feature-set (QoS, Retained, $SYS)
  • Trie-based Subscription model.
  • Ring Buffer packet codec.
  • TCP, Websocket, (including SSL/TLS).
  • Interfaces for Client Authentication and Topic access control.
  • Bolt-backed persistence and storage interfaces.
  • Server Publish (Publish, PublishToClientByID, ...).
  • Event hooks (Recv, Send, ...), see hook.go.

Roadmap

  • Improve event hooks and server publish interface
  • MQTT v5 compatibility

Quick Start

import (
    "github.com/snple/mqtt"
    "log"
)

func main() {
    // Create the new MQTT Server.
    server := mqtt.New()

    // Create a TCP listener on a standard port.
    tcp := listener.NewTCP("t1", ":1883", &mqtt.AuthAllow{})

    // Add the listener to the server.
    err := server.AddListener(tcp)
    if err != nil {
        log.Fatal(err)
    }

    // Start the broker. Serve() is blocking - see examples folder
    // for usage ideas.
    err = server.Serve()
    if err != nil {
        log.Fatal(err)
    }
}

Examples of running the broker with various configurations can be found in the examples folder.

Authentication and ACL

Authentication and ACL may be configured on a per-listener basis by providing an Auth Controller to the listener configuration. Custom Auth Controllers should satisfy the Auth interface found in auth.go. Two default controllers are provided, AuthAllow, which allows all traffic, and AuthDisallow, which denies all traffic.

    tcp := listener.NewTCP("t1", ":1883", &mqtt.AuthAllow{})
    err := server.AddListener(tcp)

If no auth controller is provided in the listener configuration, the server will default to Allowing all traffic.

SSL/TLS

SSL/TLS may be configured on both the TCP and Websocket listeners.

    cert, err := tls.X509KeyPair(publicCertificate, privateKey)
    if err != nil {
        log.Fatal(err)
    }
    cfg := &tls.Config{Certificates: []tls.Certificate{cert}}

    tcp := listener.NewTCPWithTLS("t1", ":1883", &mqtt.AuthAllow{}, cfg)
    err := server.AddListener(tcp)

Note the mandatory inclusion of the Auth Controller!

Data Persistence

Snple MQTT provides a persistence.Store interface for developing and attaching persistent stores to the broker. The default persistence mechanism packaged with the broker is backed by Bolt and can be enabled by assigning a *bolt.Store to the server.

    // import "github.com/snple/mqtt/persistence/bolt"
    err = server.AddStore(bolt.New("mqtt.db", nil))
    if err != nil {
        log.Fatal(err)
    }

Persistence is on-demand (not flushed) and will potentially reduce throughput when compared to the standard in-memory store. Only use it if you need to maintain state through restarts.

Server publish

Snple MQTT provides interfaces such as Publish, PublishToClientByID etc. for publish messages directly from the server.

    server.Publish(
        "time", // topic
        []byte(fmt.Sprintf(`{"time": "%s"}`, time.Now().Format(time.RFC3339))), // payload
        1,     // qos
        false, // retain
    )

    server.PublishToClientByID(
        "mqtt_123456", // client id
        "time",        // topic
        []byte(fmt.Sprintf(`{"time": "%s"}`, time.Now().Format(time.RFC3339))), // payload
        1,     // qos
        false, // retain
    )

With PublishToClientByID, you can publish messages to specified client, even if the client is not subscribed. (It depends on whether your client will handle unsubscribed messages.)

Server Hook interface

Snple MQTT provides a Hook interface for extending server functionality.

type Hook interface {
	// When the client connects to the server
	// If the return is false, the client will be rejected.
	Connect(*Server, *Client) bool

	// When the client disconnects
	DisConnect(*Server, *Client, error)

	// When the server receives a packet.
	// If the return is false, it will cancel the operation.
	Recv(*Server, *Client, *packets.Packet) bool

	// When the server sends a packet.
	// If the return is false, it will cancel the operation.
	Send(*Server, *Client, *packets.Packet) bool

	// When the server receives a message from the client publish.
	// If the return is false, it will cancel the operation.
	Emit(*Server, *Client, *packets.Packet) bool

	// When the server pushes a message to the client
	// If the return is false, it will cancel the operation.
	Push(*Server, *Client, *packets.Packet) bool
}

With this interface, you can debug more easily, and:

func (*MyHook) Emit(server *mqtt.Server, client *mqtt.Client, pk *packets.Packet) bool {
    log.Printf("Client publish: %v, topic: %v, payload:%v", client.ID, pk.TopicName, pk.Payload)

    if pk.TopicName == "time" {
        server.PublishToClientByID(
            client.ID,  // client id
            "time_ack", // topic
            []byte(fmt.Sprintf(`{"time": "%s"}`, time.Now().Format(time.RFC3339))), // payload
            1,     // qos
            false, // retain
        )
    }

    return true
}

This code demonstrates that when a client sends a message with topic of "time" to the server, the server gives direct feedback to the client.

Contributions

Contributions and feedback are both welcomed and encouraged! Open an issue to report a bug, ask a question, or make a feature request.

Similar Resources

Courier Golang client library provides an opinionated wrapper over paho MQTT library to add features on top of it

Courier Golang Client Library Introduction Courier Golang client library provides an opinionated wrapper over paho MQTT library to add features on top

Nov 19, 2022

An Open-Source Platform for Quantified Self & IoT

An Open-Source Platform for Quantified Self & IoT

Heedy Note: Heedy is currently in alpha. You can try it out by downloading it from the releases page, but there is no guarantee that future versions w

Jan 1, 2023

Suite of libraries for IoT devices (written in Go), experimental for x/exp/io

Go libraries/drivers for IoT devices This repo contains a suite of libraries for IoT devices/sensors/actuators. The suite is meant to be as dependency

Sep 26, 2022

Make IoT a lot more fun with data.

Eywa What is Eywa? "Eywa is the guiding force and deity of Pandora and the Na'vi. All living things on Pandora connect to Eywa." -- Avatar Wiki Projec

Nov 28, 2022

A Go client for Google IoT Core

IoT A simple framework for implementing a Google IoT device. This package makes use of the context package to handle request cancelation, timeouts, an

Sep 26, 2022

Industrial IoT Messaging and Device Management Platform

Industrial IoT Messaging and Device Management Platform

Mainflux Mainflux is modern, scalable, secure, open-source, and patent-free IoT cloud platform written in Go. It accepts user and thing (sensor, actua

Dec 31, 2022

Next-generation IoT open source platform.

Next-generation IoT open source platform.

tKeel Next-generation IoT open source platform High performance, High security and easy to use tKeel is a strong and reusable IoT platform that helps

Dec 28, 2022

Whichip: discover (IoT) device's IP in local network

Whichip: discover (IoT) device's IP in local network

whichip: discover (IoT) device's IP in local network Install On (IoT) Device wget -O install.sh

Dec 8, 2021

A opinionated multi-tenant hyperscale Internet of Things platform to connect IoT devices fast and securely with minimal TCO

infinimesh IoT Platform infinimesh is a opinionated multi-tenant hyperscale Internet of Things platform to connect IoT devices fast and securely with

Feb 14, 2022
Hermes is a tiny MQTT compatible broker written in Go.

Hermes Hermes is a tiny MQTT compatible broker written in Go. The goals of the project are as below Easy to compile, and run Tiny footprint Extensible

Sep 9, 2022
IoT Manager: use IoT platforms with Mender

Mender: Azure IoT Manager: use Azure IoT with Mender General Mender is an open source over-the-air (OTA) software updater for embedded Linux devices.

Jan 10, 2022
Vallox RS-485 MQTT gateway to integrate Vallox RS485 ventilation device to Home Assistant via MQTT.
Vallox RS-485 MQTT gateway to integrate Vallox RS485 ventilation device to Home Assistant via MQTT.

Vallox RS-485 MQTT gateway to integrate Vallox RS485 ventilation device to Home Assistant via MQTT. Implements Home Assistant MQTT discovery but can also be used without Home Assistant.

Dec 26, 2021
Sensirion SCD30 CO2 sensor MQTT gateway with Home Assistant MQTT discovery

Sensirion SCD30 CO2 sensor MQTT gateway for Home Assistant Overview This gateway can be used to publish measurements SCD30 to mqtt. It supports Home A

Oct 10, 2022
Lg-ess-mqtt - MQTT Firmware Extension for 1st generation LG ESS BESS

lg-ess-mqtt This projects is a firmware extension for the 1st generation LG ESS

Oct 19, 2022
IP Camera Alarm Server to MQTT

IP Camera Alarm Server Universal Alarm Server for all your IP cameras in one place! Integrates well with Home Assistant, Node-Red, etc. Runs great on

Dec 8, 2022
handle multiple mqtt server/cluster based on paho client

pakhshi Introduction Consider you have an array of brokers but you want to publish and subscribe on all of them at the same time. Why you may need thi

Nov 9, 2022
Golang framework for robotics, drones, and the Internet of Things (IoT)
Golang framework for robotics, drones, and the Internet of Things (IoT)

Gobot (https://gobot.io/) is a framework using the Go programming language (https://golang.org/) for robotics, physical computing, and the Internet of

Dec 29, 2022
Gobot - Golang framework for robotics, drones, and the Internet of Things (IoT)
Gobot - Golang framework for robotics, drones, and the Internet of Things (IoT)

Gobot (https://gobot.io/) is a framework using the Go programming language (https://golang.org/) for robotics, physical computing, and the Internet of Things.

Jan 8, 2023
🐼 IoT worm written in pure golang.
🐼 IoT worm written in pure golang.

GoriaNet Most powerfull cross compiler (27arch). Kill process by port and check for duplicate instance. Killing process by port. Cross compiler. Infor

Oct 17, 2022