The official Go package for NSQ

go-nsq

Build Status GoDoc GitHub release

The official Go package for NSQ.

Docs

See godoc and the main repo apps directory for examples of clients built using this package.

Tests

Tests are run via ./test.sh (which requires nsqd and nsqlookupd to be installed).

Owner
NSQ
A realtime distributed messaging platform
NSQ
Comments
  • make lookup functionality pluggable

    make lookup functionality pluggable

    We'd like to be able to override how a consumer queries from NSQLookupds. I think this approach would also allow others to switch NSQLookupd for etcd, zookeeper, etc.

    This is still a WIP but I hope you can see where I'm going with this. Suggestions and Criticism welcome.

    cc @oliver-bitly @jehiah @mreiferson

  • panic: runtime error: close of closed channel

    panic: runtime error: close of closed channel

    I'm guessing this is user error, but I'm getting a panic when stopping a consumer as of this commit: 4e74fa1f8933064a4f04786e65d3ee7b611be598. Reading the comment in there, I'm trying to understand what's going on.

    My shutdown code looks something like this:

    func (n *NSQPeer) Teardown() {
        n.producer.Stop()
        if n.consumer != nil {
            n.consumer.DisconnectFromNSQD(n.host)
            n.consumer.Stop()
            <-n.consumer.StopChan
        }
    }
    

    Here's the error:

    runtime.panic(0x509600, 0x84c1d5)
            /usr/local/Cellar/go/1.3/libexec/src/pkg/runtime/panic.c:279 +0xf5
    github.com/bitly/go-nsq.(*Consumer).exit(0xc20800ef00)
            /Users/tylertreat/Go/src/github.com/bitly/go-nsq/consumer.go:1082 +0x2e
    github.com/bitly/go-nsq.func·007()
            /Users/tylertreat/Go/src/github.com/bitly/go-nsq/consumer.go:990 +0x29
    created by time.goFunc
            /usr/local/Cellar/go/1.3/libexec/src/pkg/time/sleep.go:121 +0x47
    

    Can anyone provide some insight on why I'm seeing this now?

  • consumer: sometimes got heartbeat but not message

    consumer: sometimes got heartbeat but not message

    1. env

    nsqd --version nsqd v1.0.0-compat (built w/go1.8)

    go-nsq version commit b9762cdcb6d5cc5ac5287ca076354143d332cc97 Tue Feb 14 16:13:23 2017 -0800

    # go version go version go1.8 linux/amd64

    2. what do i meet

    • Program runed, but channel not created at http://127.0.0.1:4151/stats
    • Sometimes this will appear, it always appear at program starts. And when appear, program will never( wait for 2days) got any message.
    • It always appear at program starts. When program successfully starts, seems won't have a chance to reproduct.
    • I have serveral topic to listen. when it appears in one topic, other topic may not affected.
    • When set go-nsq debug on, program will print [d.c2.cp#ephemeral/detect#ephemeral] (127.0.0.3:4150) heartbeat received. but seems won't got any other nsq message.
    # curl -XGET http://127.0.0.1:4151/stats; 
    [root@18-190 ptd]# curl -XGET http://127.0.0.1:4151/stats; 
    nsqd v1.0.0-compat (built w/go1.8)
    start_time 2017-03-27T19:07:02+08:00
    uptime 45m7.260003591s
    
    Health: OK
    
       [.raw.raw#ephemeral] depth: 0     be-depth: 0     msgs: 6656     e2e%: 
          [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 6656     e2e%: 
            [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 6656     re-q: 0        msgs: 6656     connected: 8s
    
       [d.c2.cp#ephemeral] depth: 10000 be-depth: 0     msgs: 791227   e2e%: 
    
       [d.c2.cp.done#ephemeral] depth: 0     be-depth: 0     msgs: 0        e2e%: 
          [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 0        e2e%: 
            [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 0        re-q: 0        msgs: 0        connected: 8s
    
       [d.c2.domain#ephemeral] depth: 10000 be-depth: 0     msgs: 208218   e2e%: 
    
    ...
    

    code snip

    func nsqSubscribe(addr string, topic string, channel string, hdlr nsq.HandlerFunc) error {
        consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
        if err != nil {
            print("new consumer error: ", err, "\n")
            time.Sleep(1 * time.Second) //wait 1s
            panic(err)
        }   
        consumer.AddHandler(hdlr)
        err = consumer.ConnectToNSQD(addr)
        if err != nil {
            print("connect nsqd error: ", err, "\n")
            time.Sleep(1 * time.Second) //wait 1s
            panic(err)
        }   
        _ = <-consumer.StopChan
        panic("nsq conn dead topic=" + topic + " channel=" + channel)
        return nil 
    }
    
    func main(){
    producer, err := nsq.NewProducer(nsqConf.Local.Addr, nsq.NewConfig())
        if err != nil {
            panic(err)
        }
        go func() {
            detector := new(c2.C2Sdk)
    
            if detectorEnabled {
                err = detector.Init()
                if err != nil {
                    fmt.Println("Failed to init c2")
                    panic(err)
                }
            }
            nsqSubscribe(nsqConf.Local.Addr, "d.c2.cp#ephemeral", "detect#ephemeral",
                nsq.HandlerFunc(func(message *nsq.Message) error {
                    return handler_scan(message, detector,
                        producer, unmarshal_url, scan_url,
                        "d.c2.cp.done#ephemeral")
                }))
        }()
    go func() {
            detector := new(c2.C2Sdk)
    
            if detectorEnabled {
                err = detector.Init()
                if err != nil {
                    fmt.Println("Failed to init c2")
                    panic(err)
                }
            } 
            nsqSubscribe(nsqConf.Local.Addr, "d.c2.url#ephemeral", "detect#ephemeral",
                nsq.HandlerFunc(func(message *nsq.Message) error {
                    return handler_scan(message, detector,
                        producer, unmarshal_url, scan_url,
                        "d.c2.url.done#ephemeral")
                }))
        }()
    
    .....
    
    }
    
    

    3. how to reproduct

    • set program connect to a busy nsq server
    • start & restart it for serveral times. ( 5-10 times). when not repoduct, retry.
  • consumer: unexpected re-connection attempts

    consumer: unexpected re-connection attempts

    I'm currently investigating an issue where even after calling either DisconnectFromNSQLookupd or DisconnectFromNSQD on a host that is already disconnected, re-connection attempts still occur.

    In our scenario we are consuming from 3 hosts: 10.0.0.1/10.0.0.2/10.0.0.3 and the host 10.0.0.1 becomes unhealthy (service stopped) we remove it from the list of nodes and call DisconnectFromNSQLookupd or DisconnectFromNSQD.

    As you can see from the following log output, the connection to host 10.0.0.1 was lost and re-connection attempts are made. The host is then removed from our list and DisconnectFromNSQLookupd is called. However, the re-connection attempts still occur:

    2016-04-29 14:35:26 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:35:26 [DBG] IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false} (conn.go 328)
    2016-04-29 14:35:29 [ERR] IO error - EOF (conn.go 471)
    2016-04-29 14:35:29 [INF] beginning close (conn.go 611)
    2016-04-29 14:35:29 [INF] readLoop exiting (conn.go 528)
    2016-04-29 14:35:29 [INF] breaking out of writeLoop (conn.go 535)
    2016-04-29 14:35:29 [INF] writeLoop exiting (conn.go 581)
    2016-04-29 14:35:29 [INF] finished draining, cleanup exiting (conn.go 660)
    2016-04-29 14:35:29 [INF] clean close complete (conn.go 668)
    2016-04-29 14:35:29 [DBG] there are 2 connections left alive (consumer.go 741)
    2016-04-29 14:35:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:35:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:35:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:35:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:35:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:35:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:35:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:36:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:36:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:36:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:36:26 [DBG] querying nsqlookupd http://10.0.0.3:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
    2016-04-29 14:36:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:36:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:36:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:36:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:36:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:36:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:36:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:36:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:36:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:37:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:37:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:37:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:37:26 [DBG] querying nsqlookupd http://10.0.0.2:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
    2016-04-29 14:37:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:37:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:37:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:37:32 [INF] Connecting to NSQ via lookupd hosts: [10.0.0.3:4161 10.0.0.2:4161] (subscriber.go 166)
    2016-04-29 14:37:32 [INF] Connecting to NSQ via lookupd hosts: [10.0.0.3:4161 10.0.0.2:4161] (subscriber.go 166)
    2016-04-29 14:37:32 [INF] Disconnecting from NSQ hosts: [10.0.0.1:4150] (subscriber.go 176)
    2016-04-29 14:37:32 [INF] Disconnecting from NSQ lookupds: [10.0.0.1:4161] (subscriber.go 187)
    2016-04-29 14:37:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:37:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:37:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:37:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:37:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:37:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:38:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:38:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:38:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:38:26 [DBG] querying nsqlookupd http://10.0.0.3:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
    2016-04-29 14:38:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:38:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:38:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    

    Is this intended/expected behaviour?

  • Consumer stall after RequeueWithoutBackoff

    Consumer stall after RequeueWithoutBackoff

    If a consumer is in backoff and attempts to recover with RDY 1 and then a message, but response with RequeueWithoutBackoff it will stall and not exit backoff. (failing test attached)

    cc: @georgicodes

  • producer: I/O timeout in PublishAsync benchmark

    producer: I/O timeout in PublishAsync benchmark

    I did some benchmark works for nsq, and encounter some strange problem. I pub 100000 msgs to nsq by go-nsq, but always failed. and I check small count (like 10000 msgs) was ok. the go-nsq log show write to nsqd timeout, and nsqd log tell me the producer's connection be reset. what problem in such situation?

    go-nsq log:

    2019/02/26 22:44:52 INF    1 (192.168.1.125:4150) connecting to nsqd
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) beginning close
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) readLoop exiting
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) breaking out of writeLoop
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) writeLoop exiting
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) finished draining, cleanup exiting
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) clean close complete
    
    2019/02/26 22:45:01 INF    1 exiting router
    
    

    nsqd log:

    [nsqd] 2019/02/26 22:39:55.590775 INFO: TCP: new client(192.168.1.99:4908)
    [nsqd] 2019/02/26 22:39:55.590882 INFO: CLIENT(192.168.1.99:4908): desired protocol magic '  V2'
    [nsqd] 2019/02/26 22:39:55.591299 INFO: [192.168.1.99:4908] IDENTIFY: {ClientID:HuangWei-PC Hostname:HuangWei-PC HeartbeatInterval:30000 OutputBufferSize:16384 OutputBufferTimeout:250 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent:go-nsq/1.0.7 MsgTimeout:0}
    [nsqd] 2019/02/26 22:40:04.104850 INFO: PROTOCOL(V2): [192.168.1.99:4908] exiting ioloop
    [nsqd] 2019/02/26 22:40:04.104955 ERROR: client(192.168.1.99:4908) - failed to send response - write tcp 192.168.1.125:4150->192.168.1.99:4908: write: connection reset by peer
    [nsqd] 2019/02/26 22:40:04.104980 INFO: PROTOCOL(V2): [192.168.1.99:4908] exiting messagePump
    

    go benchmark code:

    func BenchmarkProducer(b *testing.B) {
    	b.StopTimer()
    
    	config := nsq.NewConfig()
    	p, _ := nsq.NewProducer("192.168.1.125:4150", config)
    
    	body := make([]byte, 512)
    	startCh := make(chan struct{})
    	var wg sync.WaitGroup
    	parallel := runtime.GOMAXPROCS(0)
    	b.Log(parallel)
    	b.N = 100000
    	doneCh := make(chan *nsq.ProducerTransaction, parallel)
    	wg.Add(b.N)
    
    	for j := 0; j < parallel; j++ {
    		go func() {
    			<-startCh
    			for i := 0; i < b.N/parallel; i++ {
    				p.PublishAsync("test", body, doneCh)
    			}
    		}()
    
    		go func() {
    			for _ = range doneCh {
    				wg.Done()
    			}
    		}()
    	}
    
    	b.StartTimer()
    	close(startCh)
    	wg.Wait()
    }
    

    nsqd config:

    nsqd --lookupd-tcp-address=0.0.0.0:4160 --snappy=true --max-msg-size=67108864 --max-msg-timeout=1h --mem-queue-size=100000 
    
  • centralize logging and make it pluggable

    centralize logging and make it pluggable

    This request does two things, it centralizes logging to the log.go to remove some code duplication with respect to logging. It also make the log an interface so that people can easily drop their own logging structures in.

    The structure simply needs to implement the following methods:

        Debugf(format string, v ...interface{})
        Infof(format string, v ...interface{})
        Warnf(format string, v ...interface{})
        Errorf(format string, v ...interface{})
    
  • consumer: expose a way to retrieve runtime stats

    consumer: expose a way to retrieve runtime stats

    Currently, the ConnectToNSQLookupd method on Consumer fails to return any error when invoked with an invalid nsqlookupd address. Instead the error is logged (and swallowed) in the queryLookupd func, right after the call to apiRequestNegotiateV1, since queryLookupd doesn't return anything. This leaves a consumer with no reliable way to tell that the lookupd address provided was invalid.

    To repro:

    func foo() {
        q, _ := nsq.NewConsumer("the-topic", "the-channel", nsq.NewConfig())
        handler := &SomeHandler{}
        q.AddHandler(handler)
        err := q.ConnectToNSQLookupd("1.2.3.4:9999")
        // err is nil!
    }
    
  • producer: unexpected io.EOF when calling Stop

    producer: unexpected io.EOF when calling Stop

    We're calling Stop on a NSQ producer, and we're getting the following output after a number of MultiPublishes. Is this io.EOF error expected? We're using the tag 1.0.6.

    INFO[0011] INF    1 stopping
    INFO[0011] ERR    1 (127.0.0.1:4150) IO error - EOF
    INFO[0011] INF    1 (127.0.0.1:4150) beginning close
    INFO[0011] INF    1 (127.0.0.1:4150) readLoop exiting
    
  • Performance Question

    Performance Question

    Hey there, I'm using this code to make a producer that sends messages to NSQ. I'm using PublishAsync(), with a "responder" that basically does nothing:

    var respChan chan *nsq.ProducerTransaction
    
    func responder(respChan chan *nsq.ProducerTransaction) {
        for _ = range respChan {
            //Get rid of the messages or something... should probably respond, but not sure
            //how to do that yet...
        }
    }
    

    I'm sending as many messages as possible, calling publishAsync() in a loop reading from memory. My code maxes out the CPU, and manages about 14,000 messages per second (the messages are quite small, a msgpack'd struct containing a < 16 char string).

    In doing a code profile, my code apparently spends 50.5% of its time in "ExternalCode" (no idea what that is).

    I've attached my code profile: cpu copy

    Any idea what is going on here?

  • unintuitive behavior when using &nsq.Config{}

    unintuitive behavior when using &nsq.Config{}

    I got this on Ubuntu 12.04, go 1.2

    panic: non-positive interval for NewTicker
    
    goroutine 141 [running]:
    runtime.panic(0x815520, 0xc21010d230)
        /usr/local/go/src/pkg/runtime/panic.c:266 +0xb6
    time.NewTicker(0x0, 0x3fd8a050fdd8d7f5)
        /usr/local/go/src/pkg/time/tick.go:22 +0x9f
    github.com/bitly/go-nsq.(*Consumer).lookupdLoop(0xc21005a8c0)
        /tmp/godep/rev/2b/9a8b5edc036235580256e9a5e5f118a0025151/src/github.com/bitly/go-nsq/consumer.go:228 +0x6e
    created by github.com/bitly/go-nsq.(*Consumer).ConnectToNSQLookupd
        /tmp/godep/rev/2b/9a8b5edc036235580256e9a5e5f118a0025151/src/github.com/bitly/go-nsq/consumer.go:216 +0x282
    

    This is https://github.com/bitly/go-nsq/commit/2b9a8b5edc036235580256e9a5e5f118a0025151

    I'm connecting to 8 nsqlookupd hosts (which might increase the chance of this happening)

    I'm not setting config.lookupdPollInterval in my code.

  • fix(consumer): remove old nsqd connections if addresses change

    fix(consumer): remove old nsqd connections if addresses change

    Right now, in the event that nsqd addresses change, the old connections stay around in the consumer's connections map. This simply cleans up old connections from that map.

  • error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused?

    error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused?

    the nsq service is deployed on the public network. (106.75.49.135:4161, 106.75.49.135:4150),Can send and receive messages normally, but there will be 'error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused? '

    • docker-compose :
      nsqlookupd:
        image: nsqio/nsq
        command: /nsqlookupd
        ports:
          - "4160:4160"
          - "4161:4161"
      nsqd:
        image: nsqio/nsq
        command:  /nsqd --broadcast-address=nsqd --lookupd-tcp-address=nsqlookupd:4160
        depends_on:
          - nsqlookupd
        ports:
          - "4151:4151"
          - "4150:4150"
    
    • Error message:
    2/10/12 11:21:15 INF    1 [Study/Chegg] (nsqd:4150) connecting to nsqd
    2022/10/12 11:21:15 ERR    1 [Study/Chegg] (nsqd:4150) error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused
    {"Address":["106.75.49.141:4150"],"level":"info","msg":"Receiving NSQ messages from addresses","time":"2022-10-12T11:21:15+08:00"}
    

    send message: image

    sub message:

    image
  • Client doesn't know it's disconnected

    Client doesn't know it's disconnected

    I have a system where I'm getting log messages from nsqd such as:

    INFO: PROTOCOL(V2): [127.0.0.1:53376] exiting ioloop ERROR: client(127.0.0.1:53376) - failed to read command - read tcp 1270.0.1:4150->127.0.0.1:53376: i/o timeout INFO: PROTOCOL(v) [127.0.0.1:53376] exiting messagePump

    From looking at the /stats endpoint I can see after this message that the consumer is no longer connected (two producers are still connected fine). I can see that there were 2 messages in timeout which I presume is related to the disconnection of the consumer.

    However, there is no indication that the consumer knows that it has lost its connection so there's no attempt to reconnect (or even just notify me that it's stopped). The StopChan isn't being called and the Stats() end point on the consumer is still giving a connection count of 1.

    Is this the correct behaviour? Have I missed something around how to detect this i/o timeout situation? Unfortunately I can't share the source code, but happy to try things and report back.

    Thanks

    Andre

  • Disconnects seem to be quite ungraceful

    Disconnects seem to be quite ungraceful

    Been playing around with NSQ a lot lately and I keep hitting walls when trying to write test suites for assembling various network topologies. Most of the issues seems to stem from NSQD not handling properly consumer disconnects (I'm using go-nsq). I don't even know where to describe the strange things:

    • When stopping a consumer, sometimes the CLS message is sent to NSQD, sometimes it is not.
    • Even if the CLS does get to NSQD, sometimes it seems to not respond with CLOSE_WAIT, rather nukes the stream.
    • The logs are full of error messages on both consumer and broker side during shutdowns that one side or another tries to read/write but the stream is already dead (no graceful disconnect).
    • Disconnecting the last consumer doesn't seem to decrement the client count of a topic/channel.
    • Disconnecting a consumer doesn't seem to abort/reschedule the in-flight messages for that consumer.

    Seems to me that the entire shutdown pathway is very very wrong, just that various timeouts hack around the root cause. E.g. the client heartbeats (or lack thereof after a disconnect) is the one that will trigger the cleanup of leftover client counts; the in-flight timeout is the one that reschedules messages nor processed by a disconnected client.

    I'm unsure if I'm doing something weird here, but it seems that NSQ is very very prone to weird behavior when I have very short lived connections.

  • SIngleton Producer

    SIngleton Producer

    Is it possible to use singleton NSQ Producer? I'm running an API with NSQ Producer and it seems like i have to open and close connection everytime i hit endpoint.

A tiny wrapper around NSQ topic and channel :rocket:

Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even

Sep 27, 2022
Pause / Unpause NSQ Topics and Channels

Action pause unpause empty info check Worker Pool 1 <= n <= len(target) 0 for unlimited pool depend on how many the targets Target Array of topics or

Jun 29, 2022
Package notify provides an implementation of the Gnome DBus Notifications Specification.

go-notify Package notify provides an implementation of the Gnome DBus Notifications Specification. Examples Display a simple notification. ntf := noti

Dec 27, 2022
A simple pubsub package for go.

Package pubsub implements a simple multi-topic pub-sub library. Install pubsub with, go get github.com/cskr/pubsub This repository is a go module and

Dec 31, 2022
Simple synchronous event pub-sub package for Golang

event-go Simple synchronous event pub-sub package for Golang This is a Go language package for publishing/subscribing domain events. This is useful to

Jun 16, 2022
Scalable package delivery logistics simulator built using SingleStore and Vectorized Redpanda
Scalable package delivery logistics simulator built using SingleStore and Vectorized Redpanda

Reference Architecture using SingleStore and Redpanda for global logistics ?? INFO: For the story behind this code (and epic dashboards), check out th

Oct 29, 2022
An opinionated package that helps you print user-friendly output messages from your Go command line applications.

github.com/eth-p/clout (Command Line Output) clout is a package that helps you print user-friendly output messages from your Go command line applicati

Jan 15, 2022
Redis as backend for Queue Package
Redis as backend for Queue Package

redis Redis as backend for Queue package Setup start the redis server redis-server start the redis cluster, see the config # server 01 mkdir server01

Oct 16, 2022
Package htmltopdf implements wkhtmltopdf Go bindings.

htmltopdf Package htmltopdf implements wkhtmltopdf Go bindings. It can be used to convert HTML documents to PDF files. The package does not use the wk

Sep 19, 2022
NSQ as backend for Queue Package
NSQ as backend for Queue Package

NSQ as backend for Queue Package

Jul 4, 2022
A tiny wrapper around NSQ topic and channel :rocket:

Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even

Sep 27, 2022
Pause / Unpause NSQ Topics and Channels

Action pause unpause empty info check Worker Pool 1 <= n <= len(target) 0 for unlimited pool depend on how many the targets Target Array of topics or

Jun 29, 2022
Nsq http auth service for golang

nsq-auth nsq http auth service ./nsq-auth -h Usage: 2021/12/25 17:10:56 Usage:

Nov 21, 2022
Package set is a small wrapper around the official reflect package that facilitates loose type conversion and assignment into native Go types.

Package set is a small wrapper around the official reflect package that facilitates loose type conversion and assignment into native Go types. Read th

Dec 27, 2022
🤖 Automatically scrape PortableApps.com (or official release page) and convert into Edgeless plugin package

Edgeless 自动插件机器人 2 简介 该项目是为了使用 Golang 重新实现 Edgeless 自动插件机器人 特性 (WIP) 完全兼容 Edgeless 自动插件机器人,包括 Tasks,以实现无缝迁移 更快的构建速度 更好的代码结构 更高的拓展性 工作进度 截止至 2021/11/28

Sep 12, 2022
Mongo Go Models (mgm) is a fast and simple MongoDB ODM for Go (based on official Mongo Go Driver)
Mongo Go Models (mgm) is a fast and simple MongoDB ODM for Go (based on official Mongo Go Driver)

Mongo Go Models Important Note: We changed package name from github.com/Kamva/mgm/v3(uppercase Kamva) to github.com/kamva/mgm/v3(lowercase kamva) in v

Jan 2, 2023
Qmgo - The Go driver for MongoDB. It‘s based on official mongo-go-driver but easier to use like Mgo.

Qmgo English | 简体中文 Qmgo is a Go driver for MongoDB . It is based on MongoDB official driver, but easier to use like mgo (such as the chain call). Qmg

Dec 28, 2022
The Official Twilio SendGrid Led, Community Driven Golang API Library
The Official Twilio SendGrid Led, Community Driven Golang API Library

NEW: Subscribe to email notifications for releases and breaking changes. The default branch name for this repository has been changed to main as of 07

Dec 15, 2022
GoThanks automatically stars Go's official repository and your go.mod github dependencies, providing a simple way to say thanks to the maintainers of the modules you use and the contributors of Go itself.
GoThanks automatically stars Go's official repository and your go.mod github dependencies, providing a simple way  to say thanks to the maintainers of the modules you use and the contributors of Go itself.

Give thanks (in the form of a GitHub ★) to your fellow Go modules maintainers. About GoThanks performs the following operations Sends a star to Go's r

Dec 24, 2022
Casbin-forum is the official forum for Casbin developers and users.

Casbin-forum is the official forum for Casbin developers and users. Link https://forum.casbin.com/ Architecture Casbin-forum contains 2 p

Jan 3, 2023