Gmqtt is a flexible, high-performance MQTT broker library that fully implements the MQTT protocol V3.1.1 and V5 in golang

中文文档

Gmqtt Mentioned in Awesome Go Build Status codecov Go Report Card

News: MQTT V5 is now supported. But due to those new features in v5, there area lots of breaking changes. If you have any migration problems, feel free to raise an issue. Or you can use the latest v3 broker.

Installation

$ go get -u github.com/DrmagicE/gmqtt

Features

  • Provide hook method to customized the broker behaviours(Authentication, ACL, etc..). See server/hooks.go for details
  • Support tls/ssl and websocket
  • Provide flexible plugable mechanism. See server/plugin.go and /plugin for details.
  • Provide Go interface for extensions to interact with the server. For examples, the extensions or plugins can publish message or add/remove subscription through function call. See Server interface in server/server.go and admin for details.
  • Provide metrics (by using Prometheus). (plugin: prometheus)
  • Provide GRPC and REST APIs to interact with server. (plugin:admin)
  • Provide session persistence which means the broker can retrieve the session data after restart. Currently, only redis backend is supported.

Limitations

  • Cluster is not supported.

Get Started

The following command will start gmqtt broker with default configuration. The broker listens on 1883 for tcp server and 8883 for websocket server with admin and prometheus plugin loaded.

$ cd cmd/gmqttd
$ go run . start -c default_config.yml

configuration

Gmqtt use -c flag to define configuration path. If not set, gmqtt reads $HOME/gmqtt.yml as default. If default path not exist, Gmqtt will start with default configuration.

session persistence

Gmqtt uses memory to store session data by default and it is the recommended way because of the good performance. But the session data will be lose after the broker restart. You can use redis as backend storage to prevent data loss from restart:

persistence:
  type: redis  
  redis:
    # redis server address
    addr: "127.0.0.1:6379"
    # the maximum number of idle connections in the redis connection pool
    max_idle: 1000
    # the maximum number of connections allocated by the redis connection pool at a given time.
    # If zero, there is no limit on the number of connections in the pool.
    max_active: 0
    # the connection idle timeout, connection will be closed after remaining idle for this duration. If the value is zero, then idle connections are not closed
    idle_timeout: 240s
    password: ""
    # the number of the redis database
    database: 0

Authentication

Gmqtt provides a simple username/password authentication mechanism. (Provided by auth plugin). It is not enabled in default configuration, you can change the configuration to enable it:

# plugin loading orders
plugin_order:
  - auth
  - prometheus
  - admin

When auth plugin enabled, every clients need an account to get connected.You can add accounts through the HTTP API:

# Create: username = user1, password = user1pass
$ curl -X POST -d '{"password":"user1pass"}' 127.0.0.1:8083/v1/accounts/user1
{}
# Query
$ curl 127.0.0.1:8083/v1/accounts/user1
{"account":{"username":"user1","password":"20a0db53bc1881a7f739cd956b740039"}}

API Doc swagger

Docker

$ docker build -t gmqtt .
$ docker run -p 1883:1883 -p 8883:8883 -p 8082:8082 -p 8083:8083  -p 8084:8084  gmqtt

Documentation

godoc

Hooks

Gmqtt implements the following hooks:

Name hooking point possible usages
OnAccept When accepts a TCP connection.(Not supported in websocket) Connection rate limit, IP allow/block list.
OnStop When the broker exists
OnSubscribe When received a subscribe packet Subscribe access control, modifies subscriptions.
OnSubscribed When subscribe succeed
OnUnsubscribe When received a unsubscribe packet Unsubscribe access controls, modifies the topics that is going to unsubscribe.
OnUnsubscribed When unsubscribe succeed
OnMsgArrived When received a publish packet Publish access control, modifies message before delivery.
OnBasicAuth When received a connect packet without AuthMethod property Authentication
OnEnhancedAuth When received a connect packet with AuthMethod property (Only for v5 clients) Authentication
OnReAuth When received a auth packet (Only for v5 clients) Authentication
OnConnected When the client connected succeed
OnSessionCreated When creates a new session
OnSessionResumed When resumes from old session
OnSessionTerminated When session terminated
OnDelivered When a message is delivered to the client
OnClosed When the client is closed
OnMsgDropped When a message is dropped for some reasons

See /examples/hook for details.

How to write plugins

How to write plugins

Contributing

Contributions are always welcome, see Contribution Guide for a complete contributing guide.

Test

Unit Test

$ go test -race ./...

Integration Test

paho.mqtt.testing.

TODO

  • Support bridge mode and cluster.

Breaking changes may occur when adding this new features.

Comments
  • 潛在的bug(not sure)

    潛在的bug(not sure)

    不知道是不是我打開方式不對,我嘗試運行例子,並沒有按照我預料的那樣運行,主要是鑑權部分。

    我用這個代碼來進行鑑權:

    //authentication
    	var onBasicAuth server.OnBasicAuth = func(ctx context.Context, client server.Client, req *server.ConnectRequest) error {
    		username := string(req.Connect.Username)
    		password := string(req.Connect.Password)
    		if validateUser(username, password) {
    			if username == "disable_shared" {
    				// disable shared subscription for this particular client
    				req.Options.SharedSubAvailable = false
    			}
    			return nil
    		}
    		log.Infof("client version: %s ", client.Version())
    		// check the client version, return a compatible reason code.
    		switch client.Version() {
    		case packets.Version5:
    			return codes.NewError(codes.BadUserNameOrPassword)
    		case packets.Version311:
    			return codes.NewError(codes.V3BadUsernameorPassword)
    		}
    		// return nil if pass authentication.
    		return nil
    	}
    

    講道理,如果密碼不對,應該直接把這個鏈接掐掉,但是現在,我用錯誤的密碼會跑出一個錯誤,但是這個錯誤並不會把鏈接掐掉,而是可以繼續發消息???!!! 這就很神奇了。

    這是我的Client, paho的:

    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    	"time"
    
    	"github.com/eclipse/paho.mqtt.golang"
    )
    
    var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    	fmt.Printf("TOPIC: %s\n", msg.Topic())
    	fmt.Printf("MSG: %s\n", msg.Payload())
    }
    
    func main() {
    	mqtt.DEBUG = log.New(os.Stdout, "", 0)
    	mqtt.ERROR = log.New(os.Stdout, "", 0)
    	opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1884").SetClientID("fuckitsme")
    	opts.SetKeepAlive(2 * time.Second)
    	opts.SetDefaultPublishHandler(f)
    	opts.SetPingTimeout(1 * time.Second)
    	opts.SetUsername("alice")
    	opts.SetPassword("1233")
    
    	c := mqtt.NewClient(opts)
    	if token := c.Connect(); token.Wait() && token.Error() != nil {
    		panic(token.Error())
    	}
    
    	for i := 0; i < 50; i++ {
    		text := fmt.Sprintf("-- this is msg #%d!", i)
    		token := c.Publish("message/unique_channel_private", 0, false, text)
    		token.Wait()
    		time.Sleep(1 * time.Second)
    	}
    
    	c.Disconnect(250)
    }
    
    

    這裏我設置的用戶名和密碼是錯誤的,但是竟然能夠通過鑑權,這是log:

    INFO[0264] Validate conn user: username: alice, password: 1233 
    INFO[0264] client version: %!s(uint8=4)                 
    2021-10-31T17:05:58.078+0800	ERROR	server/client.go:273	connection lost	{"client_id": "", "remote_addr": "127.0.0.1:42768", "error": "operation error: Code = 4, reasonString: "}
    github.com/DrmagicE/gmqtt/server.(*client).setError.func1
    	/home/jintian/go/pkg/mod/github.com/!drmagic!e/[email protected]/server/client.go:273
    sync.(*Once).doSlow
    	/usr/lib/go-1.17/src/sync/once.go:68
    sync.(*Once).Do
    	/usr/lib/go-1.17/src/sync/once.go:59
    github.com/DrmagicE/gmqtt/server.(*client).setError
    	/home/jintian/go/pkg/mod/github.com/!drmagic!e/[email protected]/server/client.go:271
    github.com/DrmagicE/gmqtt/server.(*client).connectWithTimeOut.func1
    	/home/jintian/go/pkg/mod/github.com/!drmagic!e/[email protected]/server/client.go:523
    github.com/DrmagicE/gmqtt/server.(*client).connectWithTimeOut
    	/home/jintian/go/pkg/mod/github.com/!drmagic!e/[email protected]/server/client.go:599
    github.com/DrmagicE/gmqtt/server.(*client).serve
    	/home/jintian/go/pkg/mod/github.com/!drmagic!e/[email protected]/server/client.go:1461
    INFO[0264] Validate conn user: username: alice, password: 1233 
    INFO[0264] client version: %!s(uint8=3)                 
    2021-10-31T17:05:58.079+0800	INFO	server/server.go:492	logged in with new session	{"remote_addr": "127.0.0.1:42770", "client_id": "fuckitsme"}
    INFO[0314] client id:fuckitsme is closed with error:  <nil> 
    2021-10-31T17:06:48.119+0800	INFO	server/server.go:599	logged out and cleaning session	{"remote_addr": "127.0.0.1:42770", "client_id": "fuckitsme"}
    
    

    可以看到,他跑出了一個錯誤,但是緊接着,又登錄成功了。

    我他媽就跪了啊,這還怎麼鑑權??

  • How to using websocket sever?

    How to using websocket sever?

    Currently I need listen to a tcp url and a websocket url, what should I do here?

    ln, err := net.Listen("tcp", c.MQTTConfig.Port)
    	ws := &server.WsServer{
    		Server: &http.Server{Addr: "127.0.0.1:1885"},
    		Path:   v.Websocket.Path,
    	}
    
  • 集群中的一个节点宕机(不可用)后,集群中其他节点也不可用,必须把宕机节点重新加入集群

    集群中的一个节点宕机(不可用)后,集群中其他节点也不可用,必须把宕机节点重新加入集群

    集群中其他节点也不可用表现为: 1-其他节点某些客户端连接会断开
    2-新ClientID连接任意节点会连接失败 3-某些可以保持连接的ClientID发送消息失败。新增主题订阅失败

    通过接口{{baseUrl}}/v1/federation/join加入集群成功,集群路由和转发没有问题。 问:能否其中一台宕机后,集群其他节点也可以正常订阅和转发消息? Thx

    配置如下:

    listeners:

    • address: ":1883"
    • address: ":8883" websocket: path: "/"

    api: grpc: - address: "tcp://127.0.0.1:8084" #IP根据本地环境修改 http: - address: "tcp://127.0.0.1:8083" #IP根据本地环境修改 map: "tcp://127.0.0.1:8084" #IP根据本地环境修改

    mqtt: session_expiry: 2h session_expiry_check_timer: 20s message_expiry: 2h inflight_expiry: 30s max_packet_size: 268435456 server_receive_maximum: 100 max_keepalive: 300 topic_alias_maximum: 10 subscription_identifier_available: true wildcard_subscription_available: true shared_subscription_available: true maximum_qos: 2 retain_available: true max_queued_messages: 1000 max_inflight: 100 queue_qos0_messages: true delivery_mode: onlyonce # overlap or onlyonce allow_zero_length_clientid: true

    persistence: type: redis redis: addr: "127.0.0.1:6379" max_idle: 1000 max_active: 0 idle_timeout: 240s password: "****" database: 0

    topic_alias_manager: type: fifo

    plugins: prometheus: path: "/metrics" listen_address: ":8082" auth: hash: md5 federation: node_name: node88 #节点名称根据本地环境修改 fed_addr: :8901 advertise_fed_addr: :8901 gossip_addr: :8902 advertise_gossip_addr: :8902 #retry_join: # - 127.0.0.1:8902 rejoin_after_leave: false snapshot_path:

    plugin_order:

    • prometheus
    • admin
    • federation log: level: debug format: text dump_packet: false`
  • Override req.Message.payload can not be sent

    Override req.Message.payload can not be sent

    I have a customized function:

    var onMsgArrived server.OnMsgArrived = GlobalInterceptors
    
    	OnClosed := func(ctx context.Context, client server.Client, err error) {
    		log.Infoln("client id:"+client.ClientOptions().ClientID+" is closed with error: ", err)
    	}
    	onStop := func(ctx context.Context) {
    		log.Infoln("stop")
    	}
    	OnDelivered := func(ctx context.Context, client server.Client, msg *gmqtt.Message) {
    		log.Infof("delivering message %s to client %s", msg.Payload, client.ClientOptions().ClientID)
    	}
    
    	hooks := server.Hooks{
    		OnBasicAuth:  onBasicAuth,
    		OnSubscribe:  onSubscribe,
    		OnMsgArrived: onMsgArrived,
    		OnClosed:     OnClosed,
    		OnStop:       onStop,
    		OnDelivered:  OnDelivered,
    	}
    

    which simply catch the msg arrived, and then override the payload, but I did this, the message can not be forward anymore.

    func GlobalInterceptors(ctx context.Context, client server.Client, req *server.MsgArrivedRequest) error {
    	// version := client.Version()
    	topic := req.Message.Topic
    	clientId := client.ClientOptions().ClientID
    	payload := string(req.Message.Payload)
            bytes, _ := json.Marshal(chatMsg)
            // override payload
            req.Message.Payload = bytes
    }
    

    the message can not be sent when I override Payload, how?

  • discussion: Cluster design

    discussion: Cluster design

    Due to the MQTT protocol architecture, it is difficult or may be impossible to meet all requirements in MQTT specification. The key point of clustering MQTT brokers is the tradeoffs between consistency and availability.

    Basic Idea

    The basic idea of gmqtt cluster is just like other popular brokers, emq,vernmq,HiveMQ, etc. It is a masterless cluster, each nodes in cluster are equals. User can choose either server-side or client-side load balance to connect the cluster.
    image

    Consistency & Availability

    Network Partition

    When network partition happens, we have 2 options:

    1. trade consistency for availability

    All nodes will continue serving, any message, subscribe, unsubscribe, connect, disconnect event will be delivered once the network partition heals. It is an eventually consistent model. Of course, the message order can not be perserved in this situation.
    If the partition do not heal and reach a timeout or the event buffer is full, the cluster should remove the node automatic. It may cause message loss.

    2. trade availability for consistency

    All nodes will return negative ack for subscribe, unsubscribe, connect packet. For publish packet: • If the client protocol is V3, then closes the client because there is no negative ack for publish packet in V3 protocol. • If the client protocol is V5, then returns a negative ack.

    When the partition heals, all nodes can re-join to the cluster automaticly.
    If we trade availability for consistency, the cluster can not provide fault tolerance——any node goes down will cause the whole cluster become unavailable. Gmqtt will make this tradeoff strategy configurable like how vernemq does.

    reference: https://docs.vernemq.com/clustering/netsplits

    session state

    In gmqtt cluster, the session state will not replicate to other nodes. Which means the client can not resume a session from different nodes. The reason is that it is very difficult and requires a lot of extra works to do to copy session state and maintain the consistency across nodes. This comment https://github.com/emqx/emqx/issues/1623#issuecomment-395326544 elaborates the difficulties. Due to this complexity, both EMQ and vernemq do not support session migration yet.

    Inner-node Communication

    TODO

    Some reference:

    • https://docs.vernemq.com/clustering/introduction
    • https://docs.emqx.io/en/broker/latest/advanced/cluster.html
  • Memory leak using OnBasicAuthWrapper Hook

    Memory leak using OnBasicAuthWrapper Hook

    Hi,

    We are currently experiencing a memory leak using gmqtt. The broker has around 20 clients that are publishing messages and no one is consuming. We are inserting data in the database using the OnMsgArrived hook and authenticating clients using OnBasicAuthWrapper hook.

    After some further investigation I found the problem is on the OnBasicAuthWrapper hook. Apparently when there an in valid password, this error is thrown:

    2022-10-21T10:38:28.636+0200    ERROR   server/client.go:273    connection lost {"client_id": "", "remote_addr": "[::1]:52040", "error": "operation error: Code = 5, reasonString: "}
    github.com/DrmagicE/gmqtt/server.(*client).setError.func1
            /Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:273
    sync.(*Once).doSlow
            /Users/xorduna/go/go1.19.1/src/sync/once.go:74
    sync.(*Once).Do
            /Users/xorduna/go/go1.19.1/src/sync/once.go:65
    github.com/DrmagicE/gmqtt/server.(*client).setError
            /Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:271
    github.com/DrmagicE/gmqtt/server.(*client).connectWithTimeOut.func1
            /Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:523
    github.com/DrmagicE/gmqtt/server.(*client).connectWithTimeOut
            /Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:599
    github.com/DrmagicE/gmqtt/server.(*client).serve
            /Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:1461
    

    If we check the code on client.go, method serve() and follow the execution with the debugger, we see that after sending the "error authentication problem" the goroutine calls client.wg.Wait(). That cause the routing to stay in wait status and never end.

    Is this a normal behaviour?

    I followed the code in https://github.com/DrmagicE/gmqtt/blob/master/plugin/auth/hooks.go is something wrong with my code?

    This is the plugin load function

    func (m *Myc) Load(service server.Server) error {
    	log = server.LoggerWithField(zap.String("plugin", Name))
    
    	enforce := casbin.Enforcer{}
    
    	// panic("implement me")
    	log.Info("Loading myc plugin")
    	log.Info("DB URI", zap.String("dbUri:", m.dbUri))
    
    	dataBase, err := database.Open(database.Config{DBPostgres: m.dbUri, MaxIdleConns: 0, MaxOpenConns: 10, ShowSQL: true}, 5)
    	if err != nil {
    		return fmt.Errorf("connecting to db: %w", err)
    	}
    	m.store = store.NewStore(log.Sugar(), dataBase)
    	log.Info("Loading ready")
    	m.core = core.NewCore(log.Sugar(), app_config.Config{}, m.store, enforce, nil, new(session.Session))
    
            return nil
    }
    

    And that is the OnBasicAuthHook hook

    func (m *Myc) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth {
    	return func(ctx context.Context, client server.Client, req *server.ConnectRequest) (err error) {
    		err = pre(ctx, client, req)
    		if err != nil {
    			return err
    		}
    		// fmt.Println("Authentication with:", string(req.Connect.Username), string(req.Connect.Password))
    		log.Info("Authentication with:", zap.String("username", string(req.Connect.Username)),
    			zap.String("password", string(req.Connect.Password)))
    
    		valid := m.core.Gateway.ValidateToken(string(req.Connect.Username))
    		if string(req.Connect.Password) != "" || !valid {
    			log.Info("authentication failed", zap.String("username", string(req.Connect.Username)))
    			switch client.Version() {
    			case packets.Version5:
    				return codes.NewError(codes.BadUserNameOrPassword)
    			case packets.Version311, packets.Version31:
    				return codes.NewError(codes.V3BadUsernameorPassword)
    			default:
    				return codes.NewError(codes.UnsupportedProtocolVersion)
    			}
    		}
    		return nil
    	}
    }
    

    I enabled pprof on the broker and I from what i see there are thousands of goroutines blocked and waiting ... Is that normal?

    goroutine-stack.txt

    Here can see also the space_inuse from the heap

    heap-broker-6h

    Thank you very much!

    Xavi

  • Auth plugin bug

    Auth plugin bug

    After uncomment auth from example configuration and adding a account no mqtt client can connect to broker with or without username and password for publish or subscribe and this what actually happen : 2021-09-18T23:12:23.553+0430 ERROR server/client.go:273 connection lost {"client_id": "", "remote_addr": "192.168.43.129:2030", "error": "operation error: Code = 5, reasonString: "} github.com/DrmagicE/gmqtt/server.(*client).setError.func1 gmqtt/server/client.go:273 sync.(*Once).doSlow /usr/lib/go/src/sync/once.go:68 sync.(*Once).Do /usr/lib/go/src/sync/once.go:59 github.com/DrmagicE/gmqtt/server.(*client).setError gmqtt/server/client.go:271 github.com/DrmagicE/gmqtt/server.(*client).connectWithTimeOut.func1 gmqtt/server/client.go:523 github.com/DrmagicE/gmqtt/server.(*client).connectWithTimeOut gmqtt/server/client.go:599 github.com/DrmagicE/gmqtt/server.(*client).serve gmqtt/server/client.go:1461 I just found out that the password was wrong but the log error message is still bizarre

  • Modify message before delivering to client

    Modify message before delivering to client

    Is it possible to modify the message payload before delivering it to the clients?

    I have a client that sends zlib compressed data to the MQTT broker, and I need to decompress it before delivering it to the clients which don't have the capability to do it themselves.

    I have tried to modify it in OnMsgArrived hook by:

    • Re-assigning msg to the new message I've constructed via gmqtt.NewMessage
    • Calling the arrived(ctx, client, newMessage) as return value

    Both seem to have no effect at all.

    Is there a way to achieve this?

  • Sessions does not persist for build in Publish method

    Sessions does not persist for build in Publish method

    I am using this lib for a quite long time. However recently i noticed If i sent message through the gmqtt Publish method and if a client is disconnected at that time the sessions gets lost. When i reconnect i didn't receive the message

    On the other hand, If i send message to topic via a third party client like paho.mqtt.golang while the client is disconnected , gmqtt preserves the sessions. Which means when i reconnect i get the published message.

    Please correct me if i am wrong. Is this behaviour suppose to happen ? I am guessing since the build in Publish method of gmqtt does not have a client id so it didn't persist any sessions ?

    Also i am using a plugin approach to re schedule the message from db with the publish method if the gmqtt server gets restarts. I am using the build in publish method for that.

  • persistence factory: memory not found

    persistence factory: memory not found

    	logger, _ := zap.NewDevelopment()
    	mqttServer := server.New(
    		server.WithTCPListener(tcpPort),
    		server.WithLogger(logger),
    		server.WithConfig(config.DefaultConfig()),
    	)
    	if err := mqttServer.Run(); err != nil {
    		return err
    	}
    
  • Intercept messages from published

    Intercept messages from published

    How intercept messages from published?

    My init code:

    package main
    
    import (
    	"context"
    	"net"
    	"os"
    	"os/signal"
    	"syscall"
    	"log"
    	"github.com/DrmagicE/gmqtt"
    )
    func main() {
    	// listener
    	ln, err := net.Listen("tcp", ":1883")
    	if err != nil {
    		log.Fatalln(err.Error())
    		return
    	}
    
    	s := gmqtt.NewServer(
    		gmqtt.WithTCPListener(ln),
    	)
    
    	s.Run()
    	signalCh := make(chan os.Signal, 1)
    	signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
    	<-signalCh
    	s.Stop(context.Background())
    }
    
  • 1000客户端压测报错/gmqtt/persistence/queue/mem/mem.go:185 +0x118

    1000客户端压测报错/gmqtt/persistence/queue/mem/mem.go:185 +0x118

    请问,我在做gmqtt的使用压测, 模拟连接了1000个客户端,每个客户端1秒上报一次数据,写入到influxdb中,docker容器部署。 日志中总包这个错,不太清楚原因,能够给一些方向和建议,非常感谢

    sync.runtime_notifyListWait(0xc0037be110, 0x0) /usr/local/go/src/runtime/sema.go:513 +0x13d sync.(*Cond).Wait(0x0?) /usr/local/go/src/sync/cond.go:56 +0x8c pulse-broker/gmqtt/persistence/queue/mem.(*Queue).Read(0xc000508fc0, {0xc003dad200, 0x64, 0xc001e1b780?}) /mnt/d/work/10_Git/golang/src/pulse-broker/gmqtt/persistence/queue/mem/mem.go:185 +0x118 pulse-broker/gmqtt/server.(*client).pollNewMessages(0xc0031c3200, {0xc003dad200, 0x64, 0x80}) /mnt/d/work/10_Git/golang/src/pulse-broker/gmqtt/server/client.go:1390 +0x94 pulse-broker/gmqtt/server.(*client).pollMessageHandler(0xc0031c3200) /mnt/d/work/10_Git/golang/src/pulse-broker/gmqtt/server/client.go:1436 +0x10e pulse-broker/gmqtt/server.(*client).serve.func3() /mnt/d/work/10_Git/golang/src/pulse-broker/gmqtt/server/client.go:1464 +0x25 created by pulse-broker/gmqtt/server.(*client).serve /mnt/d/work/10_Git/golang/src/pulse-broker/gmqtt/server/client.go:1463 +0x1b1 goroutine 5441 [chan receive]: pulse-broker/gmqtt/server.(*client).readHandle(0xc0031c3200) /mnt/d/work/10_Git/golang/src/pulse-broker/gmqtt/server/client.go:1299 +0x94 pulse-broker/gmqtt/server.(*client).serve.func4() /mnt/d/work/10_Git/golang/src/pulse-broker/gmqtt/server/client.go:1468 +0x25 created by pulse-broker/gmqtt/server.(*client).serve /mnt/d/work/10_Git/golang/src/pulse-broker/gmqtt/server/client.go:1467 +0x1f4 goroutine 5442 [sync.Cond.Wait]:

  • Fixed memory leak when authentication is wrong

    Fixed memory leak when authentication is wrong

    We had an mqtt client that was continously attempting to login with a wrong token into our broker and the memory usage was going up and up.

    After digging into the problem, we found that after the connection was closed, two goroutines were still running: writeLoop() and serve(). serve() was blocked waiting for the closing of the client.wg.Wait(), and this wait group was still waiting for the writeLoop() to end. And the writeLoop() was blocked because it was waiting for the client.close channel. The defer in the serve() closed client.closed, but not client.close.

    On this date, we still don't see the difference between client.closed and client.close, but closing client.close in case there is an error was necessary to end both go routines.

    In any case, I am open to the discussion.

    Thanks

  • 单节点启动,出现too many open files问题

    单节点启动,出现too many open files问题

    客户端走websocket连接。

    [yucuix@jgjapp-dev ~]$ sudo lsof -a -itcp -p 14723
    COMMAND   PID USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
    bin     14723 root    3u  IPv4 448738745      0t0  TCP localhost:ibm-mqisdp (LISTEN)
    bin     14723 root    7u  IPv6 448739940      0t0  TCP *:secure-mqtt (LISTEN)
    bin     14723 root    8u  IPv6 448739942      0t0  TCP *:us-cli (LISTEN)
    bin     14723 root   10u  IPv4 448740762      0t0  TCP localhost:8084 (LISTEN)
    bin     14723 root   11u  IPv4 448740763      0t0  TCP localhost:us-srv (LISTEN)
    bin     14723 root   12u  IPv4 448740342      0t0  TCP localhost:56224->localhost:14003 (ESTABLISHED)
    bin     14723 root   16u  IPv6 448782688      0t0  TCP localhost:secure-mqtt->localhost:50428 (ESTABLISHED)
    bin     14723 root   20u  IPv6 448783484      0t0  TCP localhost:secure-mqtt->localhost:50324 (ESTABLISHED)
    bin     14723 root   21u  IPv6 448781930      0t0  TCP localhost:secure-mqtt->localhost:50360 (ESTABLISHED)
    bin     14723 root   30u  IPv6 448780932      0t0  TCP localhost:secure-mqtt->localhost:50126 (ESTABLISHED)
    bin     14723 root   31u  IPv6 448778284      0t0  TCP localhost:secure-mqtt->localhost:49816 (CLOSE_WAIT)
    bin     14723 root   32u  IPv6 448779220      0t0  TCP localhost:secure-mqtt->localhost:50232 (ESTABLISHED)
    bin     14723 root   33u  IPv6 448782040      0t0  TCP localhost:secure-mqtt->localhost:50434 (ESTABLISHED)
    

    端口连接看不出什么。开发环境下就几个连接,但是socket连接一直没释放断开的连接。

    [root@jgjapp-dev fd]# ls /proc/31816/fd -l
    总用量 0
    lr-x------ 1 root root 64 8月   2 14:24 0 -> /dev/null
    lrwx------ 1 root root 64 8月   2 14:24 1 -> socket:[448738742]
    lrwx------ 1 root root 64 8月   2 14:24 10 -> socket:[448740762]
    lrwx------ 1 root root 64 8月   2 14:24 11 -> socket:[448740763]
    lrwx------ 1 root root 64 8月   2 14:24 12 -> socket:[448740342]
    lrwx------ 1 root root 64 8月   2 14:24 13 -> socket:[448768168]
    lrwx------ 1 root root 64 8月   2 14:24 14 -> socket:[448742405]
    lrwx------ 1 root root 64 8月   2 14:24 15 -> socket:[448750807]
    lrwx------ 1 root root 64 8月   2 14:24 16 -> socket:[448800654]
    lrwx------ 1 root root 64 8月   2 14:24 17 -> socket:[448743237]
    lrwx------ 1 root root 64 8月   2 14:24 18 -> socket:[448743240]
    lrwx------ 1 root root 64 8月   2 14:24 19 -> socket:[448743301]
    lrwx------ 1 root root 64 8月   2 14:24 2 -> socket:[448738742]
    lrwx------ 1 root root 64 8月   2 14:24 20 -> socket:[448817234]
    lrwx------ 1 root root 64 8月   2 14:24 21 -> socket:[448816826]
    lrwx------ 1 root root 64 8月   2 14:24 22 -> socket:[448754806]
    lrwx------ 1 root root 64 8月   2 14:24 23 -> socket:[448753159]
    lrwx------ 1 root root 64 8月   2 14:24 24 -> socket:[448756526]
    lrwx------ 1 root root 64 8月   2 14:24 25 -> socket:[448761346]
    lrwx------ 1 root root 64 8月   2 14:24 26 -> socket:[448761448]
    lrwx------ 1 root root 64 8月   2 14:24 27 -> socket:[448768765]
    lrwx------ 1 root root 64 8月   2 14:24 28 -> socket:[448769909]
    lrwx------ 1 root root 64 8月   2 14:24 29 -> socket:[448775662]
    .........
    lrwx------ 1 root root 64 8月   3 10:22 99 -> socket:[449476728]
    lrwx------ 1 root root 64 8月   3 10:22 990 -> socket:[451674604]
    lrwx------ 1 root root 64 8月   3 10:22 991 -> socket:[451679321]
    lrwx------ 1 root root 64 8月   3 10:22 992 -> socket:[451676686]
    lrwx------ 1 root root 64 8月   3 10:22 993 -> socket:[451682537]
    lrwx------ 1 root root 64 8月   3 10:22 994 -> socket:[451680602]
    lrwx------ 1 root root 64 8月   3 10:22 995 -> socket:[451682682]
    lrwx------ 1 root root 64 8月   3 10:22 996 -> socket:[451684764]
    lrwx------ 1 root root 64 8月   3 10:22 997 -> socket:[451695183]
    lrwx------ 1 root root 64 8月   3 10:22 998 -> socket:[451686259]
    lrwx------ 1 root root 64 8月   3 10:22 999 -> socket:[451686262]
    
    
the pluto is a gateway new time, high performance, high stable, high availability, easy to use

pluto the pluto is a gateway new time, high performance, high stable, high availability, easy to use Acknowledgments thanks nbio for providing low lev

Sep 19, 2021
Package arp implements the ARP protocol, as described in RFC 826. MIT Licensed.

arp Package arp implements the ARP protocol, as described in RFC 826. MIT Licensed. Portions of this code are taken from the Go standard library. The

Dec 20, 2022
TritonHTTP - A simple web server that implements a subset of the HTTP/1.1 protocol specification

TritonHTTP Spec Summary Here we provide a concise summary of the TritonHTTP spec. You should read the spec doc for more details and clarifications. HT

Nov 5, 2022
🚀Gev is a lightweight, fast non-blocking TCP network library based on Reactor mode. Support custom protocols to quickly and easily build high-performance servers.
🚀Gev is a lightweight, fast non-blocking TCP network library based on Reactor mode. Support custom protocols to quickly and easily build high-performance servers.

gev 中文 | English gev is a lightweight, fast non-blocking TCP network library based on Reactor mode. Support custom protocols to quickly and easily bui

Jan 6, 2023
Squzy - is a high-performance open-source monitoring, incident and alert system written in Golang with Bazel and love.

Squzy - opensource monitoring, incident and alerting system About Squzy - is a high-performance open-source monitoring and alerting system written in

Dec 12, 2022
High-performance PHP application server, load-balancer and process manager written in Golang
High-performance PHP application server, load-balancer and process manager written in Golang

RoadRunner is an open-source (MIT licensed) high-performance PHP application server, load balancer, and process manager. It supports running as a serv

Jan 1, 2023
High-performance PHP application server, load-balancer and process manager written in Golang
High-performance PHP application server, load-balancer and process manager written in Golang

RoadRunner is an open-source (MIT licensed) high-performance PHP application server, load balancer, and process manager. It supports running as a serv

Dec 9, 2021
wire protocol for multiplexing connections or streams into a single connection, based on a subset of the SSH Connection Protocol

qmux qmux is a wire protocol for multiplexing connections or streams into a single connection. It is based on the SSH Connection Protocol, which is th

Dec 26, 2022
A simple tool to convert socket5 proxy protocol to http proxy protocol

Socket5 to HTTP 这是一个超简单的 Socket5 代理转换成 HTTP 代理的小工具。 如何安装? Golang 用户 # Required Go 1.17+ go install github.com/mritd/s2h@master Docker 用户 docker pull m

Jan 2, 2023
High performance async-io(proactor) networking for Golang。golangのための高性能非同期io(proactor)ネットワーキング
High performance async-io(proactor) networking for Golang。golangのための高性能非同期io(proactor)ネットワーキング

gaio Introduction 中文介绍 For a typical golang network program, you would first conn := lis.Accept() to get a connection and go func(net.Conn) to start a

Dec 29, 2022
🧙 High-performance PHP-to-Golang IPC/RPC bridge

High-performance PHP-to-Golang IPC bridge Goridge is high performance PHP-to-Golang codec library which works over native PHP sockets and Golang net/r

Dec 28, 2022
Laptop Booking Application in Golang and gRPC, load-balancing with NGINX, and fully compatible with HTTPS OpenAPI v3

Laptop Booking Application in Golang and gRPC Goals GitHub CI & Coverage Badge Serialize protobuf messages Create laptop unary gRPC Search laptop Serv

Jun 17, 2022
Nov 9, 2022
A high-performance concurrent scanner written by go, which can be used for survival detection, tcp port detection, and web service detection.
A high-performance concurrent scanner written by go, which can be used for survival detection, tcp port detection, and web service detection.

aScan A high-performance concurrent scanner written by go, which can be used for survival detection, tcp port detection, and web service detection. Fu

Aug 15, 2022
GoSNMP is an SNMP client library fully written in Go

GoSNMP is an SNMP client library fully written in Go. It provides Get, GetNext, GetBulk, Walk, BulkWalk, Set and Traps. It supports IPv4 and IPv6, using SNMPv1, SNMPv2c or SNMPv3. Builds are tested against linux/amd64 and linux/386.

Jan 5, 2023
GoSNMP is an SNMP client library fully written in Go.

GoSNMP is an SNMP client library fully written in Go. It provides Get, GetNext, GetBulk, Walk, BulkWalk, Set and Traps. It supports IPv4 and IPv6, using SNMPv1, SNMPv2c or SNMPv3. Builds are tested against linux/amd64 and linux/386.

Oct 28, 2021
P2PDistributedHashTable - A golang Kademlia/Bittorrent DHT library that implements BEP5
P2PDistributedHashTable - A golang Kademlia/Bittorrent DHT library that implements BEP5

This is a golang Kademlia/Bittorrent DHT library that implements BEP 5. It's typ

Apr 10, 2022