goim

goim v2.0

Language Build Status GoDoc Go Report Card

goim is an im server writen in golang.

Features

  • Light weight
  • High performance
  • Pure Golang
  • Supports single push, multiple push and broadcasting
  • Supports one key to multiple subscribers (Configurable maximum subscribers count)
  • Supports heartbeats (Application heartbeats, TCP, KeepAlive, HTTP long pulling)
  • Supports authentication (Unauthenticated user can't subscribe)
  • Supports multiple protocols (WebSocket,TCP,HTTP)
  • Scalable architecture (Unlimited dynamic job and logic modules)
  • Asynchronous push notification based on Kafka

Architecture

arch

Quick Start

Build

    make build

Run

    make run
    make stop

    // or
    nohup target/logic -conf=target/logic.toml -region=sh -zone=sh001 -deploy.env=dev -weight=10 2>&1 > target/logic.log &
    nohup target/comet -conf=target/comet.toml -region=sh -zone=sh001 -deploy.env=dev -weight=10 -addrs=127.0.0.1 2>&1 > target/logic.log &
    nohup target/job -conf=target/job.toml -region=sh -zone=sh001 -deploy.env=dev 2>&1 > target/logic.log &

Environment

    env:
    export REGION=sh
    export ZONE=sh001
    export DEPLOY_ENV=dev

    supervisor:
    environment=REGION=sh,ZONE=sh001,DEPLOY_ENV=dev

    go flag:
    -region=sh -zone=sh001 deploy.env=dev

Configuration

You can view the comments in target/comet.toml,logic.toml,job.toml to understand the meaning of the config.

Dependencies

Discovery

Kafka

Document

Protocol

English

中文

Examples

Websocket: Websocket Client Demo

Android: Android

iOS: iOS

Benchmark

benchmark

Benchmark Server

CPU Memory OS Instance
Intel(R) Xeon(R) CPU E5-2630 v2 @ 2.60GHz DDR3 32GB Debian GNU/Linux 8 1

Benchmark Case

  • Online: 1,000,000
  • Duration: 15min
  • Push Speed: 40/s (broadcast room)
  • Push Message: {"test":1}
  • Received calc mode: 1s per times, total 30 times

Benchmark Resource

  • CPU: 2000%~2300%
  • Memory: 14GB
  • GC Pause: 504ms
  • Network: Incoming(450MBit/s), Outgoing(4.39GBit/s)

Benchmark Result

  • Received: 35,900,000/s

中文

English

LICENSE

goim is is distributed under the terms of the MIT License.

Comments
  • goim v2.0 Roadmap

    goim v2.0 Roadmap

    v2.0 Roadmap:

    • [x] 在线信息,router换成了redis
    • [x] 支持节点与redis在线心跳维持
    • [x] 支持gRPC,及Discovery服务发现
    • [x] 支持节点连接数和权重调度
    • [x] 支持节点按区域调度
    • [x] 支持指令订阅
    • [x] 支持当前连接房间切换
    • [x] 支持多房间类型({type}://{room_id})
    • [x] 支持房间消息聚合
    • [x] 支持按device_id发送消息
    • [x] 支持IPv6

    v2.1 Roadmap:

    • [ ] websocket支持permessage-deflate压缩
    • [ ] 消息队列接口抽象

    Structure:

    ├── cmd
    │   ├── comet
    │   ├── job
    │   ├── logic
    ├── api
    │   ├── comet
    │   ├── logic
    ├── internal
    │   ├── comet
    │   ├── logic
    │   ├── job
    ├── pkg
    │   ├── grpc
    │   ├── bufio
    │   ├── websocket
    │   ├── xxx
    ├── benchmarks
    ├── examples
    ├── scripts
    ├── docs
    

    branch: v2.0

  • room broadcast msg在comet里websocket send的时候json marshal错误

    room broadcast msg在comet里websocket send的时候json marshal错误

    日志里错误打印: dispatch websocket error(json: error calling MarshalJSON for type json.RawMessage: invalid character '\x00' looking for beginning of value)

    case就是doc里的 curl -d "{"test": 1}" http://127.0.0.1:7172/1/push/room?rid=1 roomId我是在client.js里做auth的时候随意个userId在body里, 让logic里走到 roomId=1的逻辑返回里生成的roomId

    我打日志跟了一下, job模块里,在做proto.Body入buf之前都还是正常的,bytes.Writer之后RPC call到comet就不对了, 不知道我看的对不对。 看上去是在做proto.WriteTo的地方构建协议包的地方offset截断了json中的一个字符导致, proto的Operation也由5变成11,12之类的了,很像是offset问题 不知道是否是这样

  • 对于router服务存在的疑问

    对于router服务存在的疑问

    1.router服务存在是为了存储userId和serverId的对应关系,为什么不把router换成memcache或者redis?logic服务就没有必要在写一致性hash的相关算法了 2.就算是按照目前这种框架,在长连接的时候,comet->logic->router,而不直接comet->router,当然这样会存在一致性的问题

  • router挂掉后如何处理

    router挂掉后如何处理

    看了下现在router如果挂掉.无法再完成推送的.因为对于挂掉的router内存中维护的信息会丢失.我也看了这个issue94提出的解决方案:

    • 全量拉一遍. 但是我看对于物理连接的comet没有开放接口,有维护key的timer并未开放接口来遍历.是从这里拉取么?
    • 数据存储到mc或者redis. 但是这样的改动太大.因为现有的方式里.单机能够保证的原子性都无法保证.改动甚大.

    不知道B站现在线上是如何处理这个问题的.


    我现在想的是能否将router层全部移到comet中能解决这个问题:

    • 物理连接进来之后通过logic做了auth之后.就自己维护用户对应的物理连接信息.
    • logic逻辑不变.

    这样就解决了.物理连接和维护的session在多机不同步的问题.因为物理连接和user-connection mapping在一台机器.如果丢失那么同样会丢失掉物理连接.那么天然是保持同步的.

    如此之后,需要解决一个问题.那就是logic收到push请求之后如何找到一个用户的连接.可以有一个dispatch模块.所有的物理连接之前需要先请求dispatch.它会有加密规则并指定用户新发起的连接连接到指定的comet(这样comet也可以水平扩展).dispatch底层还需要一个路由规则支持,保证一个用户的所有连接都打到一台comet上.

    之后查询logic就查询这个路由规则就能找到相应用户在哪台comet server.


    但是如上之后,维护的业务数据(router里的数据)就已经跟物理耦合在一起.如果要发布,则可能对物理连接产生影响. 所以希望知道B站在实际使用过程中在这个问题上的思考和解决办法.

  • 如何运行的文档能描述详细点吗?能教我怎么使用么?

    如何运行的文档能描述详细点吗?能教我怎么使用么?

    首先运行kafka.sh,如下:

    bin/zookeeper-server-start.sh config/zookeeper.properties &
    bin/kafka-server-start.sh config/server.properties &
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic KafkaPushsTopic
    

    然后运行goim.sh,如下:

    cd $GOPATH/bin
    nohup $GOPATH/bin/router -c $GOPATH/bin/router.conf 2>&1 > /home/dev/golang2/logs/goim/panic-router.log &
    nohup $GOPATH/bin/logic -c $GOPATH/bin/logic.conf 2>&1 > /home/dev/golang2/logs/goim/panic-logic.log &
    nohup $GOPATH/bin/comet -c $GOPATH/bin/comet.conf 2>&1 > /home/dev/golang2/logs/goim/panic-comet.log &
    nohup $GOPATH/bin/job -c $GOPATH/bin/job.conf 2>&1 > /home/dev/golang2/logs/goim/panic-job.log &
    

    接下来到src/goim/examples/javascript,运行

    go run main.go
    

    之后打开浏览器

    firefox http://localhost:1999
    

    然后看到两个大字 “client demo”

    之后该怎么测试?如下面那样发消息?

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic KafkaPushsTopic
    消息内容1
    消息内容2
    

    是不是消息格式不对?浏览器上没有任何反应,websoket也没报错 panic-comet.log有一些打印:

    [06/10/16 20:38:12] [DEBG] ring wp: 8, idx: 0
    [06/10/16 20:38:12] [DEBG] ring rp: 8, idx: 0
    [06/10/16 20:38:22] [DEBG] ring wp: 9, idx: 1
    [06/10/16 20:38:22] [DEBG] ring rp: 9, idx: 1
    [06/10/16 20:38:32] [DEBG] ring wp: 10, idx: 2
    [06/10/16 20:38:32] [DEBG] ring rp: 10, idx: 2
    [06/10/16 20:38:42] [DEBG] ring wp: 11, idx: 3
    [06/10/16 20:38:42] [DEBG] ring rp: 11, idx: 3
    [06/10/16 20:38:52] [DEBG] ring wp: 12, idx: 4
    [06/10/16 20:38:52] [DEBG] ring rp: 12, idx: 4
    
  • websocekt 连接comet错误

    websocekt 连接comet错误

    "ws://127.0.0.1:3102/sub" 使用该url发送消息,comet日志错误如下: E0115 20:40:51.857977 17225 server_websocket.go:226] key: remoteIP: 127.0.0.1:49911 step: 4 ws handshake failed error(default server codec pack length error)

  • comet 报错

    comet 报错

    develop分支。并发测试,一个房间,1w个客户端连接,ab并发push每秒500条消息。comet所在机器负载只有30%,其他组件的机器负载基本没有: [2016/12/12 11:49:16 CST] [EROR] [main.(*Server).dispatchTCP:278] key: 1984_567 dispatch tcp error(write tcp4 10.10.16.183:8080->10.10.105.211:42620: write: broken pipe) [2016/12/12 12:11:36 CST] [EROR] [main.(*Server).serveTCP:126] key: handshake failed error(default server codec pack length error) [2016/12/12 13:09:01 CST] [EROR] [main.(*Server).serveTCP:126] key: handshake failed error(default server codec pack length error) [2016/12/12 13:11:53 CST] [EROR] [main.(*Server).serveTCP:126] key: handshake failed error(read tcp4 10.10.16.183:8080->45.33.10.137:57800: read: connection reset by peer) 上面这个报错非常的频繁,在客户端增加到2w的时候更加频繁了,这个错误可以直接忽略吗?

  • job 启动报错

    job 启动报错

    我在机器上布署了这个环境,但是发现收不到 push 的消息,查日志,发现在 panic-job.log 下面有下面的错误:

    [Sarama] 2016/06/01 09:48:45 [kafka_topic_push_group/43efe05adb1c] KafkaPushsTopic :: FAILED to get list of partitions: zk: node does not exist [06/01/16 09:48:45] [EROR] consumer error(kafka: error while consuming KafkaPushsTopic/-1: zk: node does not exist)

    这个问题我应该怎么处理呢?

  • data race

    data race

    Hi,毛大。最近在用data race detector测试goim,发现几处warning。我还没来得及研究各个warning会对业务产生什么影响,先发个issue引起大家注意。 贴一个comet里的报警,每个客户端连接会有两个goroutine处理消息,两个goroutine都会用到ch.CliProto,而该Ring结构里提供的方法里并没有加锁,导致data race: image

  • 33_3 server websocket failed error(websocket: close 1001 (going away))错误问题

    33_3 server websocket failed error(websocket: close 1001 (going away))错误问题

    我使用examples中的JS的websocket,连接测试环境时,总是出错:

    [08/13/16 15:06:13] [EROR] key: 33_3 server websocket failed error(websocket: close 1001 (going away)) [08/13/16 15:06:13] [DEBG] key: 33_3 wakeup exit dispatch goroutine [08/13/16 15:06:13] [DEBG] key: 33_3 dispatch goroutine exit [08/13/16 15:06:13] [DEBG] key: 33_3 server websocket goroutine exit

    但是测试时,发现后续通讯,还是能正常进行,可以推送到消息。这个会是什么原因造成的?

  • hello作者,遇到个discovery的问题 烦请帮忙看下

    hello作者,遇到个discovery的问题 烦请帮忙看下

    启动之后:comet有报错如下: client.go:587] discovery: client.Get(http://192.168.0.67:7171/discovery/polls?appid=infra.disco very&appid=goim.logic&env=dev&hostname=ecs-f8a3&latest_timestamp=1550021251882167963&latest_timestamp=0) get error code(-404) 查看本地的discovery registry.go:183] Polls zone() env(dev) appid(goim.comet) error(-404) 啥原因啊

  • mac环境m1芯片不兼容discovery, 搞了docker容器把所有环境都装在里面了 一系列问题 老哥们来看看

    mac环境m1芯片不兼容discovery, 搞了docker容器把所有环境都装在里面了 一系列问题 老哥们来看看

    2022/10/25 04:08:25 start watch filepath: ../conf/discovery.toml INFO 10/25-04:08:25.763 /usr/workspace/go/src/discovery/discovery/syncup.go:159 discovery changed nodes:[127.0.0.1:7171] zones:map[] INFO 10/25-04:08:25.764 /usr/workspace/go/src/discovery/registry/registry.go:229 Polls from(test1) new connection(1) INFO 10/25-04:08:25.769 /usr/workspace/go/pkg/mod/github.com/go-kratos/[email protected]/pkg/net/http/blademaster/server.go:97 blademaster: start http listen addr: 127.0.0.1:7171 INFO 10/25-04:08:25.769 /usr/workspace/go/src/discovery/http/http.go:28 [HTTP] Listening on: 127.0.0.1:7171

    容器里面discovery已经启动

    然后分别启动comet,logic,job 运行comet

    target/comet -conf=target/comet.toml -region=sh -zone=sh001 host=test1 deploy.env=dev weight=10 addrs=127.0.0.1

    运行logic

    target/logic -conf=target/logic.toml -region=sh -zone=sh001 host=test1 deploy.env=dev weight=10 addrs=127.0.0.1

    运行job

    target/job -conf=target/job.toml -region=sh -zone=sh001 deploy.env=dev weight=10 addrs=127.0.0.1

    同时docker的端口全部映射到宿主机了

    comet报错: false E1025 02:24:44.556011 1838 client.go:551] discovery: client.Get(http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=13cedd4558a6&latest_timestamp=1666664670575606167) error(Get "http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=13cedd4558a6&latest_timestamp=1666664670575606167": context canceled) job报错: [root@13cedd4558a6 goim-master]# target/job -conf=target/job.toml -region=sh -zone=sh001 deploy.env=dev weight=10 addrs=127.0.0.1 E1025 02:31:58.837223 1995 client.go:551] discovery: client.Get(http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=13cedd4558a6&latest_timestamp=1666664670575606167) error(Get "http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=13cedd4558a6&latest_timestamp=1666664670575606167": context canceled)

    容器内去curl curl 'http://127.0.0.1:7171/discovery/fetch?zone=sh001&env=dev&appid=goim.comet&status=1' {"code":0,"message":"0","ttl":1,"data":{"instances":{"sh001":[{"region":"sh","zone":"sh001","env":"dev","appid":"goim.comet","hostname":"13cedd4558a6","addrs":["grpc://127.0.0.1:3109"],"version":"","metadata":{"addrs":"","conn_count":"0","ip_count":"0","offline":"false","weight":"0"},"status":1,"reg_timestamp":1666666040250462677,"up_timestamp":1666666050262881042,"renew_timestamp":1666666040250462677,"dirty_timestamp":1666666050262881042,"latest_timestamp":1666666050262881042}]},"latest_timestamp":1666666040250462677}} 可以拿到结果 在目录:src/discovery/naming/client.go:583 应该是在这个地方报错 if err = d.httpClient.Get(ctx, uri, "", params, res); err != nil { if ctx.Err() != context.Canceled { log.Error("discovery: client.Get(%s) error(%+v)", uri+"?"+params.Encode(), err) } return } 是因为我docker容器和宿主机的网域问题还是什么问题,求大佬帮忙看看 !!!

  • 消息堆积疑惑

    消息堆积疑惑

    // Consume messages, watch signals
    func (j *Job) Consume() {
    	for {
    		select {
    		case err := <-j.consumer.Errors():
    			log.Errorf("consumer error(%v)", err)
    		case n := <-j.consumer.Notifications():
    			log.Infof("consumer rebalanced(%v)", n)
    		case msg, ok := <-j.consumer.Messages():
    			if !ok {
    				return
    			}
    			j.consumer.MarkOffset(msg, "")
    			// process push message
    			pushMsg := new(pb.PushMsg)
    			if err := proto.Unmarshal(msg.Value, pushMsg); err != nil {
    				log.Errorf("proto.Unmarshal(%v) error(%v)", msg, err)
    				continue
    			}
    			if err := j.push(context.Background(), pushMsg); err != nil {
    				log.Errorf("j.push(%v) error(%v)", pushMsg, err)
    			}
    			log.Infof("consume: %s/%d/%d\t%s\t%+v", msg.Topic, msg.Partition, msg.Offset, msg.Key, pushMsg)
    		}
    	}
    }
    

    这里job阻塞消费kafka会不会导致消费速度太慢,出现消息堆积呢