🚀Gev is a lightweight, fast non-blocking TCP network library based on Reactor mode. Support custom protocols to quickly and easily build high-performance servers.

gev

Github Actions Go Report Card Codacy Badge GoDoc LICENSE Code Size

中文 | English

gev is a lightweight, fast non-blocking TCP network library based on Reactor mode.

Support custom protocols to quickly and easily build high-performance servers.

Features

  • High-performance event loop based on epoll and kqueue
  • Support multi-core and multi-threading
  • Dynamic expansion of read and write buffers implemented by Ring Buffer
  • Asynchronous read and write
  • SO_REUSEPORT port reuse support
  • Automatically clean up idle connections
  • Support WebSocket/Protobuf
  • Support for scheduled tasks, delayed tasks
  • Support for custom protocols

Network model

gev uses only a few goroutines, one of them listens for connections and the others (work coroutines) handle read and write events of connected clients. The count of work coroutines is configurable, which is the core number of host CPUs by default.

Performance Test

📈 Test chart

Test environment: Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

Throughput Test

limit GOMAXPROCS=1(Single thread),1 work goroutine

image

limit GOMAXPROCS=4,4 work goroutine

image

Other Test

Speed ​​Test

Compared with the simple performance of similar libraries, the pressure measurement method is the same as the evio project.

  • gnet
  • eviop
  • evio
  • net (StdLib)

limit GOMAXPROCS=1,1 work goroutine

image

limit GOMAXPROCS=1,4 work goroutine

image

limit GOMAXPROCS=4,4 work goroutine

image

Install

go get -u github.com/Allenxuxu/gev

Getting start

echo demo

package main

import (
	"flag"
	"strconv"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	//log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	//log.Println("OnMessage")
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	//log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops))
	if err != nil {
		panic(err)
	}

	s.Start()
}

Handler is an interface that programs must implement.

type Handler interface {
	OnConnect(c *connection.Connection)
	OnMessage(c *connection.Connection, ctx interface{}, data []byte) []byte
	OnClose(c *connection.Connection)
}

func NewServer(handler Handler, opts ...Option) (server *Server, err error)

OnMessage will be called back when a complete data frame arrives.Users can get the data, process the business logic, and return the data that needs to be sent.

When there is data coming, gev does not call back OnMessage immediately, but instead calls back an UnPacket function.Probably the execution logic is as follows:

ctx, receivedData := c.protocol.UnPacket(c, buffer)
	if ctx != nil || len(receivedData) != 0 {
		sendData := c.OnMessage(c, ctx, receivedData)
		if len(sendData) > 0 {
			return c.protocol.Packet(c, sendData)
		}
	}

protocol

The UnPacket function will check whether the data in the ringbuffer is a complete data frame. If it is, the data will be unpacked and return the payload data. If it is not a complete data frame, it will return directly.

The return value of UnPacket (interface{}, []byte) will be passed in as a call to OnMessage ctx interface{}, data []byte and callback.Ctx is designed to pass special information generated when parsing data frames in the UnPacket function (which is required for complex data frame protocols), and data is used to pass payload data.

type Protocol interface {
	UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
	Packet(c *Connection, data []byte) []byte
}

type DefaultProtocol struct{}

func (d *DefaultProtocol) UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
	ret := buffer.Bytes()
	buffer.RetrieveAll()
	return nil, ret
}

func (d *DefaultProtocol) Packet(c *Connection, data []byte) []byte {
	return data
}

As above, gev provides a default Protocol implementation that will fetch all data in the receive buffer ( ringbuffer ).In actual use, there is usually a data frame protocol of its own, and gev can be set in the form of a plug-in: it is set by variable parameters when creating Server.

s, err := gev.NewServer(handler,gev.Protocol(&ExampleProtocol{}))

Check out the example Protocol for a detailed.

There is also a Send method that can be used for sending data. But Send puts the data to Event-Loop and invokes it to send the data rather than sending data by itself immediately.

Check out the example Server timing push for a detailed.

func (c *Connection) Send(buffer []byte) error

ShutdownWrite works for reverting connected status to false and closing connection.

Check out the example Maximum connections for a detailed.

func (c *Connection) ShutdownWrite() error

RingBuffer is a dynamical expansion implementation of circular buffer.

WebSocket

The WebSocket protocol is built on top of the TCP protocol, so gev doesn't need to be built in, but instead provides support in the form of plugins, in the plugins/websocket directory.

code
type Protocol struct {
	upgrade *ws.Upgrader
}

func New(u *ws.Upgrader) *Protocol {
	return &Protocol{upgrade: u}
}

func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
	upgraded := c.Context()
	if upgraded == nil {
		var err error
		out, _, err = p.upgrade.Upgrade(buffer)
		if err != nil {
			log.Println("Websocket Upgrade :", err)
			return
		}
		c.SetContext(true)
	} else {
		header, err := ws.VirtualReadHeader(buffer)
		if err != nil {
			log.Println(err)
			return
		}
		if buffer.VirtualLength() >= int(header.Length) {
			buffer.VirtualFlush()

			payload := make([]byte, int(header.Length))
			_, _ = buffer.Read(payload)

			if header.Masked {
				ws.Cipher(payload, header.Mask, 0)
			}

			ctx = &header
			out = payload
		} else {
			buffer.VirtualRevert()
		}
	}
	return
}

func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
	return data
}

The detailed implementation can be viewed by the plugin. The source code can be viewed using the websocket example.

Example

echo server
package main

import (
	"flag"
	"strconv"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	//log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	//log.Println("OnMessage")
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	//log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops))
	if err != nil {
		panic(err)
	}

	s.Start()
}
Automatically clean up idle connections
package main

import (
	"flag"
	"strconv"
	"time"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/gev/log"
)

type example struct {
}

func (s *example) OnConnect(c *connection.Connection) {
	log.Info(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Infof("OnMessage from : %s", c.PeerAddr())
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Info("OnClose: ", c.PeerAddr())
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops),
		gev.IdleTime(5*time.Second))
	if err != nil {
		panic(err)
	}

	s.Start()
}
Maximum connections
package main

import (
	"log"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/toolkit/sync/atomic"
)

// Server example
type Server struct {
	clientNum     atomic.Int64
	maxConnection int64
	server        *gev.Server
}

// New server
func New(ip, port string, maxConnection int64) (*Server, error) {
	var err error
	s := new(Server)
	s.maxConnection = maxConnection
	s.server, err = gev.NewServer(s,
		gev.Address(ip+":"+port))
	if err != nil {
		return nil, err
	}

	return s, nil
}

// Start server
func (s *Server) Start() {
	s.server.Start()
}

// Stop server
func (s *Server) Stop() {
	s.server.Stop()
}

// OnConnect callback
func (s *Server) OnConnect(c *connection.Connection) {
	s.clientNum.Add(1)
	log.Println(" OnConnect : ", c.PeerAddr())

	if s.clientNum.Get() > s.maxConnection {
		_ = c.ShutdownWrite()
		log.Println("Refused connection")
		return
	}
}

// OnMessage callback
func (s *Server) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Println("OnMessage")
	out = data
	return
}

// OnClose callback
func (s *Server) OnClose(c *connection.Connection) {
	s.clientNum.Add(-1)
	log.Println("OnClose")
}

func main() {
	s, err := New("", "1833", 1)
	if err != nil {
		panic(err)
	}
	defer s.Stop()

	s.Start()
}
Server timing push
package main

import (
	"container/list"
	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"log"
	"sync"
	"time"
)

// Server example
type Server struct {
	conn   *list.List
	mu     sync.RWMutex
	server *gev.Server
}

// New server
func New(ip, port string) (*Server, error) {
	var err error
	s := new(Server)
	s.conn = list.New()
	s.server, err = gev.NewServer(s,
		gev.Address(ip+":"+port))
	if err != nil {
		return nil, err
	}

	return s, nil
}

// Start server
func (s *Server) Start() {
	s.server.RunEvery(1*time.Second, s.RunPush)
	s.server.Start()
}

// Stop server
func (s *Server) Stop() {
	s.server.Stop()
}

// RunPush push message
func (s *Server) RunPush() {
	var next *list.Element

	s.mu.RLock()
	defer s.mu.RUnlock()

	for e := s.conn.Front(); e != nil; e = next {
		next = e.Next()

		c := e.Value.(*connection.Connection)
		_ = c.Send([]byte("hello\n"))
	}
}

// OnConnect callback
func (s *Server) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())

	s.mu.Lock()
	e := s.conn.PushBack(c)
	s.mu.Unlock()
	c.SetContext(e)
}

// OnMessage callback
func (s *Server) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Println("OnMessage")
	out = data
	return
}

// OnClose callback
func (s *Server) OnClose(c *connection.Connection) {
	log.Println("OnClose")
	e := c.Context().(*list.Element)

	s.mu.Lock()
	s.conn.Remove(e)
	s.mu.Unlock()
}

func main() {
	s, err := New("", "1833")
	if err != nil {
		panic(err)
	}
	defer s.Stop()

	s.Start()
}
WebSocket
package main

import (
	"flag"
	"log"
	"math/rand"
	"strconv"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/gev/plugins/websocket/ws"
	"github.com/Allenxuxu/gev/plugins/websocket/ws/util"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, data []byte) (messageType ws.MessageType, out []byte) {
	log.Println("OnMessage:", string(data))
	messageType = ws.MessageBinary
	switch rand.Int() % 3 {
	case 0:
		out = data
	case 1:
		msg, err := util.PackData(ws.MessageText, data)
		if err != nil {
			panic(err)
		}
		if err := c.Send(msg); err != nil {
			msg, err := util.PackCloseData(err.Error())
			if err != nil {
				panic(err)
			}
			if e := c.Send(msg); e != nil {
				panic(e)
			}
		}
	case 2:
		msg, err := util.PackCloseData("close")
		if err != nil {
			panic(err)
		}
		if e := c.Send(msg); e != nil {
			panic(e)
		}
	}
	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := NewWebSocketServer(handler, &ws.Upgrader{},
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops))
	if err != nil {
		panic(err)
	}

	s.Start()
}
package main

import (
	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/plugins/websocket"
	"github.com/Allenxuxu/gev/plugins/websocket/ws"
)

// NewWebSocketServer 创建 WebSocket Server
func NewWebSocketServer(handler websocket.WebSocketHandler, u *ws.Upgrader, opts ...gev.Option) (server *gev.Server, err error) {
	opts = append(opts, gev.Protocol(websocket.New(u)))
	return gev.NewServer(websocket.NewHandlerWrap(u, handler), opts...)
}
protobuf
package main

import (
	"flag"
	"log"
	"strconv"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	pb "github.com/Allenxuxu/gev/example/protobuf/proto"
	"github.com/Allenxuxu/gev/plugins/protobuf"
	"github.com/golang/protobuf/proto"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	msgType := ctx.(string)

	switch msgType {
	case "msg1":
		msg := &pb.Msg1{}
		if err := proto.Unmarshal(data, msg); err != nil {
			log.Println(err)
		}
		log.Println(msgType, msg)
	case "msg2":
		msg := &pb.Msg2{}
		if err := proto.Unmarshal(data, msg); err != nil {
			log.Println(err)
		}
		log.Println(msgType, msg)
	default:
		log.Println("unknown msg type")
	}

	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops),
		gev.Protocol(&protobuf.Protocol{}))
	if err != nil {
		panic(err)
	}

	log.Println("server start")
	s.Start()
}
package main

import (
	"bufio"
	"fmt"
	"log"
	"math/rand"
	"net"
	"os"

	pb "github.com/Allenxuxu/gev/example/protobuf/proto"
	"github.com/Allenxuxu/gev/plugins/protobuf"
	"github.com/golang/protobuf/proto"
)

func main() {
	conn, e := net.Dial("tcp", ":1833")
	if e != nil {
		log.Fatal(e)
	}
	defer conn.Close()

	var buffer []byte
	for {
		reader := bufio.NewReader(os.Stdin)
		fmt.Print("Text to send: ")
		text, _ := reader.ReadString('\n')
		name := text[:len(text)-1]

		switch rand.Int() % 2 {
		case 0:
			msg := &pb.Msg1{
				Name: name,
				Id:   1,
			}

			data, err := proto.Marshal(msg)
			if err != nil {
				panic(err)
			}
			buffer = protobuf.PackMessage("msg1", data)
		case 1:
			msg := &pb.Msg2{
				Name:  name,
				Alias: "big " + name,
				Id:    2,
			}

			data, err := proto.Marshal(msg)
			if err != nil {
				panic(err)
			}
			buffer = protobuf.PackMessage("msg2", data)
		}

		_, err := conn.Write(buffer)
		if err != nil {
			panic(err)
		}
	}
}

Thanks

Thanks JetBrains for the free open source license

References

Owner
徐旭
礼法岂是为吾辈而设
徐旭
Comments
  • 关于doPendingFunc执行的疑问

    关于doPendingFunc执行的疑问

    感谢你开源的高性能通信框架啊,谢谢。 在看源码的时候,我的理解是doPendingFunc函数的执行是通过eventfd来触发的,这样可以统一通过epollWait来调度执行,但有两个疑问: 1,为什么要统一通过epollWait来统一调度呢,不可以想写数据的时候直接就往connfd中write呢?这样会有什么问题呢?谢谢。 2,

    func (l *EventLoop) doPendingFunc() {
    	l.mu.Lock()
    	pf := l.pendingFunc
    	l.pendingFunc = nil
    	l.mu.Unlock()
    
    	length := len(pf)
    	for i := 0; i < length; i++ {  // 遍历所有的pengdingFunc,并执行
    		pf[i]()
    	}
    }
    

    doPendingFunc 这个函数会遍历所有的pendingFunc并执行,那么只要一个pendingFunc写入的eventfd被调度了,那么就会执行所有的pendingFunc,而且定时器也会定时触发这个调度。这样感觉会导致一些异步任务的执行并不是由这个任务本身去触发的,而是由其他的任务触发的,而且还会有多次write(eventfd)后,多次read(eventfd)没必要的情况(因为pengdingFunc已经全部执行完了)。 请问是我哪里理解错了吗?谢谢。

  • epoll busy loop

    epoll busy loop

    我写了个客户,启动500个协程去连接服务器后立刻关闭连接。发现有时候客户端这边连接已经关闭完了,服务端资源还没释放,就像下面这样:

    top - 14:59:36 up 29 days, 18:10,  2 users,  load average: 1.01, 0.57, 0.26
    Tasks: 268 total,   1 running, 267 sleeping,   0 stopped,   0 zombie
    Cpu0  : 53.9%us, 46.1%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
    Cpu1  :  0.7%us,  0.7%sy,  0.0%ni, 98.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
    Cpu2  :  2.6%us,  0.3%sy,  0.0%ni, 97.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
    Cpu3  :  0.3%us,  0.7%sy,  0.0%ni, 99.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
    Mem:   1923764k total,  1737412k used,   186352k free,   191004k buffers
    Swap:  4128760k total,    81104k used,  4047656k free,   892780k cached
    
      PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
     5179 root      20   0  981m 6928 2960 S 100.5  0.4   3:11.67 mirserver
     2531 root      20   0  871m 362m 5408 S  2.6 19.3   1218:10 java
     5244 root      20   0  2704 1208  872 R  1.0  0.1   0:10.24 top
    

    测试用的客户端代码是这样的:

    func main(){
        loops, err := strconv.Atoi(os.Args[1])
        if err != nil {
            log.Fatalln(err)
        }
        
        success,failed := 0,0
        
        wg := &sync.WaitGroup{}
        for i:=0;i<loops;i++ {
            go func(){
                wg.Add(1)
                defer wg.Done()
                
                conn, err := net.DialTimeout("tcp", "login.afkplayer.com:7000", time.Second * 60)
                if err != nil {
                    failed ++
                    log.Println(err)
                    return
                }
                success ++
                conn.Close()
            }()
            
        }
        
        wg.Wait()
        log.Println("Test complete...")
        log.Printf("Success: %d Failed: %d\n", success, failed)
    }
    

    执行结果是这样的:

    E:\HardGo\src>go run main.go 500
    2019/12/13 14:57:37 Test complete...
    2019/12/13 14:57:37 Success: 500 Failed: 0
    
  • centos6.5  32位系统下 RussellLuo/timingwheel  运行后 panic

    centos6.5 32位系统下 RussellLuo/timingwheel 运行后 panic

    panic: runtime error: invalid memory address or nil pointer dereference [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x804a6bc]

    goroutine 1 [running]: runtime/internal/atomic.Xchg64(0xa1201ac, 0xc9fda7b8, 0x16d, 0x0, 0x38b) /usr/lib/golang/src/runtime/internal/atomic/asm_386.s:151 +0xc github.com/Allenxuxu/timingwheel%2eV2.(*bucket).SetExpiration(...) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/bucket.go:74 github.com/Allenxuxu/timingwheel%2eV2.(*TimingWheel).add(0xa0ae0a0, 0xa0ade80, 0xa0ae0a0) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/timingwheel.go:79 +0x167 github.com/Allenxuxu/timingwheel%2eV2.(*TimingWheel).add(0xa0ae000, 0xa0ade80, 0xa0adea0) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/timingwheel.go:106 +0x1f2 github.com/Allenxuxu/timingwheel%2eV2.(*TimingWheel).addOrRun(0xa0ae000, 0xa0ade80) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/timingwheel.go:113 +0x29 github.com/Allenxuxu/timingwheel%2eV2.(*TimingWheel).EveryFunc(0xa0ae000, 0x3b9aca00, 0x0, 0xa0782c0, 0xa07c3a0) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/timingwheel.go:185 +0x190 github.com/Allenxuxu/gev.(*Server).RunEvery(...) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/server.go:84 main.(*Server).Start(0xa07c3a0) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/example/pushmessage/main.go:33 +0x72 main.main() /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/example/pushmessage/main.go:90 +0x85 exit status 2

    系统信息: Linux develop 2.6.32-358.el6.i686 #1 SMP Thu Feb 21 21:50:49 UTC 2013 i686 i686 i386 GNU/Linux

  • 在 k8s 上 pod cpu 满载

    在 k8s 上 pod cpu 满载

    一、问题描述

    我在两套 k8s 环境都有出现,我分配多少CPU他运行一辆天后就占用多少CPU。在没有什么用户量的情况下就会出现占满。

    1、运行环境

    # 运行系统
    alpine:3.11
    # gev 版本
    github.com/Allenxuxu/gev v0.1.9
    # golang 版本
    golang:1.15
    
    

    2、部分编排文件

            livenessProbe:
              tcpSocket:
                port: 6061
              periodSeconds: 3
              initialDelaySeconds: 3
            resources:
              requests:
                cpu: 1
                memory: 2Gi
              limits:
                cpu: 1
                memory: 2Gi
    

    3、没有新的连接,仅有一些连接和断开的日志,基本都是这种日志,大概隔几秒会有一组

    [server_gev_handler.go:25::dm-im/im-gateway/server.(*gevHandler).OnConnect()] Mar 16 09:34:38.953 [D] OnConnect remote address is 172.31.19.178:11738
    [server_gev_handler.go:25::dm-im/im-gateway/server.(*gevHandler).OnConnect()] Mar 16 09:34:38.954 [D] OnConnect remote address is 172.31.28.106:46826
    [server_gev_handler.go:156::dm-im/im-gateway/server.(*gevHandler).OnClose()] Mar 16 09:34:38.954 [D] OnClose remote address is 172.31.19.178:11738
    [server_gev_handler.go:156::dm-im/im-gateway/server.(*gevHandler).OnClose()] Mar 16 09:34:38.954 [D] OnClose remote address is 172.31.28.106:46826
    

    4、pod CPU 信息(当前分配1000M,测试了分配2000M也会满)

    im-gateway-857f97b9f-ld2kt             993m         16Mi
    

    5、连接占满的情况下的连接信息

    /data # netstat -nat |awk '{print $6}'|sort|uniq -c|sort -rn
         19 ESTABLISHED
          8 CLOSE_WAIT
          4 LISTEN
          1 established)
          1 Foreign
    

    6、go profile 信息

    File: app
    Build ID: 053ed7dab2cea4aa2ece55e7125b24b2e6c1a958
    Type: cpu
    Time: Mar 16, 2021 at 9:21am (CST)
    Duration: 30.23s, Total samples = 29.46s (97.47%)
    Entering interactive mode (type "help" for commands, "o" for options)
    (pprof) top
    Showing nodes accounting for 24990ms, 84.83% of 29460ms total
    Dropped 74 nodes (cum <= 147.30ms)
    Showing top 10 nodes out of 28
          flat  flat%   sum%        cum   cum%
       14110ms 47.90% 47.90%    15080ms 51.19%  syscall.Syscall6
        4710ms 15.99% 63.88%     4950ms 16.80%  time.now
        1260ms  4.28% 68.16%     2660ms  9.03%  runtime.mapaccess2
        1190ms  4.04% 72.20%     9050ms 30.72%  github.com/Allenxuxu/gev/eventloop.(*EventLoop).handlerEvent
         770ms  2.61% 74.81%     1680ms  5.70%  github.com/Allenxuxu/gev/connection.(*Connection).HandleEvent
         680ms  2.31% 77.12%     1330ms  4.51%  github.com/Allenxuxu/gev/eventloop.(*EventLoop).doPendingFunc
         630ms  2.14% 79.26%      630ms  2.14%  github.com/Allenxuxu/toolkit/sync/spinlock.(*SpinLock).Lock
         630ms  2.14% 81.40%      690ms  2.34%  runtime.(*itabTableType).find
         510ms  1.73% 83.13%    24690ms 83.81%  github.com/Allenxuxu/gev/poller.(*Poller).Poll
         500ms  1.70% 84.83%      500ms  1.70%  sync.(*entry).load
    

    二、期望 CPU 随业务量增减

  • 调用Connection.Send时,必须创建新的buffer,开销略大

    调用Connection.Send时,必须创建新的buffer,开销略大

    `func (c *Connection) Send(buffer []byte) error { if !c.connected.Get() { return ErrConnectionClosed }

    c.loop.QueueInLoop(func() {
    	if c.connected.Get() {
    		c.sendInLoop(c.protocol.Packet(c, buffer))
    	}
    })
    return nil
    

    }` buffer参数需要等到sendInLoop执行完成后,才能被重用或释放 但目前接口设计上来说,Send调用方是无法知道buffer是否已经被使用完毕,导致无法使用Connection的UserBuffer或者buffer池等方法重复使用buffer,而是每次发送必须创建一个新的buffer,使用完毕后等待gc自动回收,效率略差

    暂时想到两个方案:

    1. Connection.Callback增加OnSendInLoopFinish回调,将buffer传参出去,调用方在回调中将buffer重新放回内存池
    2. 修改Protocal的Packet方法的data参数,类型改为interface,允许外部传入protobuf等未打包的对象,在自定义Packet实现时,使用Connection的UserBuffer作为打包buffer

    不知道是否可以支持 ?

  • 是否有更详细的文档描述?

    是否有更详细的文档描述?

    首先很开心能分享这样的一个非常不错的框架。但是我还是一个新手,我有一些下面的问题或小小建议:

    1. 文档太糙
    2. 目录结构太随意以及测试*_test.go 正常要放在test目录下更清晰一些,大致看了下目录结构,是否可以这样(实践循环库、网络连接、协议、配置、example、test目录),是否能更简化一些目录?
    3. 本身tcp协议有粘包现象,通讯协议这块是否可以增加自定义预留同时也多开放一些(text、frame、自定义)
    4. 流程是否能清晰一些?或有一个视频讲解?
  • websocket http header保存

    websocket http header保存

    websocket升级的时候需要把http header保存下来的,里面的数据后面conn处理请求要用到的,现在看起来没法做到,需要修改框架支持,

    Originally posted by @lyfunny in https://github.com/Allenxuxu/gev/issues/4#issuecomment-613216254

  • chore: Refactor code quality issues

    chore: Refactor code quality issues

    Description

    Hey 👋 , I'm member of the Developer Outreach team at DeepSource and ran DeepSource analysis on my fork of the repo. It found some interesting code quality improvements to consider.

    This PR fixes a few of the issues detected for you to assess if it is right for you. Happy to provide the tweaks separately otherwise :)

    Important changes

    • Fix check for empty string
    • Remove unnecessary use of slice
    • Added .deepsource.toml file for continuous analysis on bug risks/performance/code-quality issues

    Type of change

    • [x] Antipattern
    • [ ] New feature (non-breaking change which adds functionality)

    Checklist:

    • [x] My code follows the style guidelines of this project
    • [ ] I have performed a self-review of my own code
  • fix:stop the timer when closing the connetion

    fix:stop the timer when closing the connetion

    If I have set the idletime , the timer won't be stoped when the connection closed . The timingwheel still holds the reference to the connection. The connection will not be recyled until it is expired , which may lead memory leak . Especially when the library is designed to handle large numbers of connections in websocket , which means mass connections and long idletime

  • 关于sendInLoop中EAGAIN的处理

    关于sendInLoop中EAGAIN的处理

    这里当err == unix.EAGAIN时直接return的话,不管n是多少,未Write()成功的那部分数据相当于直接丢弃了,为什么不把这部分未成功写入的数据存到c.outBuffer里再return呢?请教。 https://github.com/Allenxuxu/gev/blob/8e27f6e802c5b940e09d00114a07ff2d18b53324/connection/connection.go#L267-L273

  • 请教如何使用gev做一个im。

    请教如何使用gev做一个im。

    谢谢你分享的项目,最近在用gev和websocket做一个im的时候遇到一个问题,如何设计路由。我参考了你写的websocket例子,

    type example struct{
    	router map[int64]*connection.Connection // userId -> conn
    }
    

    router来维护整个用户和它连接的路由信息。 获取目标用户id,我是放在 OnMessage(c *connection.Connection, data []byte) 中第二个参数data中带进来的,就是说data数据也要结构化的,不知道这样做有没有问题啊? 另一个问题,OnClose(c *connection.Connection) 关闭一个连接的时候,只有conn,那如何在router里面清除这个连接归属用户的信息啊?还有用的网页在线websocket客户端断开连接的时候,好像没有触发OnClose,不知道是不是我改的有问题? 谢谢。

  • 百兆数据传输效率

    百兆数据传输效率

    参考Example中的protobuf实例进行了初步测试,改动了proto文件: message SendMessageReq { bytes Body = 1; } message SendMessageRsp { bool Ok = 1; } 每次传递的Body数据量在百兆左右,发现每个连接在初次进行数据传输时,都存在近一分钟左右的耗时,目前怀疑是Buffer扩容的耗时导致,请教一下,针对偶尔存在百兆数据传输的业务情形下,该如何使用或者有哪些思路优化gev的数据传输效率呢?

  • EpollWait循环中wakeHandlerRead()是不是处理早了

    EpollWait循环中wakeHandlerRead()是不是处理早了

    https://github.com/Allenxuxu/gev/blob/e0a684e2ec1b6bfb3b1ab6a7b29011392795c7b5/poller/epoll.go#L158-L185

    1. 从作用上看,设置eventLoopLocal的needWake标志是为了让一次epoll循环中,Epollwait监听到来自EventFd的unix.EPOLLIN事件返回后直至dopendingFunc中取taskQueue前都不发生多余的write EventFd进而下次触发多余的唤醒。
    2. 按照epollwait + for 的模式处理事件,在Read EventFd之前多次write EventFd会被Read EventFd一次性读完,并不会引起下次多余的唤醒(EventFd在这里有点边缘触发的感觉),那么含有Read EventFd的wakeHandlerRead()只要在取taskQueue前一刻调用,实际上是具有needWake标志同样的作用。
    3. 从目前的代码观察,在处理Epollwait返回事件的for循环中,碰到EventFd就被会调用wakeHandlerRead()处理,然而从wakeHandlerRead()被调用直到dopendingFunc中取taskQueue前这个空档期只能用needWake标志来约束write EventFd不发生,那倒不如将wakeHandlerRead()推迟到取taskQueue之前,needWake标志就不需要存在了?
  • 控制EpollWait的msec优势在哪?

    控制EpollWait的msec优势在哪?

    https://github.com/Allenxuxu/gev/blob/16e310c2338c31a48ab627a2ba39097317e1dfe3/poller/epoll.go#L145-L156 阻塞的EpollWait调用在阻塞线程时应该也会主动让出CPU吧,看上去效果和上面runtime.Gosched() 的效果一样。还是说这里有什么神奇操作。

High-performance, non-blocking, event-driven, easy-to-use networking framework written in Go, support tls/http1.x/websocket.

High-performance, non-blocking, event-driven, easy-to-use networking framework written in Go, support tls/http1.x/websocket.

Jan 8, 2023
Netpoll is a high-performance non-blocking I/O networking framework, which focused on RPC scenarios, developed by ByteDance.
Netpoll is a high-performance non-blocking I/O networking framework, which focused on RPC scenarios, developed by ByteDance.

Netpoll is a high-performance non-blocking I/O networking framework, which focused on RPC scenarios, developed by ByteDance. RPC is usually heavy on processing logic and therefore cannot handle I/O serially. But Go's standard library net designed blocking I/O API, so that the RPC framework can only follow the One Conn One Goroutine design.

Jan 2, 2023
Go network programming framework, supports multiplexing, synchronous and asynchronous IO mode, modular design, and provides flexible custom interfaces
Go network programming framework, supports multiplexing, synchronous and asynchronous IO mode, modular design, and provides flexible custom interfaces

Go network programming framework, supports multiplexing, synchronous and asynchronous IO mode, modular design, and provides flexible custom interfaces。The key is the transport layer, application layer protocol has nothing to do

Nov 7, 2022
Encode and Decode Message Length Indicators for TCP/IP socket based protocols

SimpleMLI A Message Length Indicator Encoder/Decoder Message Length Indicators (MLI) are commonly used in communications over raw TCP/IP sockets. This

Nov 24, 2022
Http-server - A HTTP server and can be accessed via TLS and non-TLS mode

Application server.go runs a HTTP/HTTPS server on the port 9090. It gives you 4

Feb 3, 2022
Bee is a tool to scan ports by TCP and UDP protocols

Bee - Port scan tool ?? Bee is a tool to scan ports by TCP and UDP protocols Building from Source Code First, we compile the source code with the ligh

Oct 10, 2021
Golang pow implementation client <-> server over UDP and TCP protocols
Golang pow implementation client <-> server over UDP and TCP protocols

Client <-> server over UDP and TCP pow protocol Denial-of-Service-attacks are a typical situation when providing services over a network. A method for

Jan 13, 2022
the pluto is a gateway new time, high performance, high stable, high availability, easy to use

pluto the pluto is a gateway new time, high performance, high stable, high availability, easy to use Acknowledgments thanks nbio for providing low lev

Sep 19, 2021
Network-wide ads & trackers blocking DNS server
Network-wide ads & trackers blocking DNS server

Privacy protection center for you and your devices Free and open source, powerful network-wide ads & trackers blocking DNS server. AdGuard.com | Wiki

Dec 31, 2022
A fast, high performance Cross-platform lightweight Nat Tracker Server,
A fast, high performance Cross-platform lightweight Nat Tracker Server,

NatTrackerServer A fast, high performance Cross-platform lightweight Nat Tracker Server suport IPv4 and IPv6 Tracker Server protocol 1、get NAT public

Apr 15, 2022
A high-performance concurrent scanner written by go, which can be used for survival detection, tcp port detection, and web service detection.
A high-performance concurrent scanner written by go, which can be used for survival detection, tcp port detection, and web service detection.

aScan A high-performance concurrent scanner written by go, which can be used for survival detection, tcp port detection, and web service detection. Fu

Aug 15, 2022
TCPProbe is a modern TCP tool and service for network performance observability.
TCPProbe is a modern TCP tool and service for network performance observability.

TCPProbe is a modern TCP tool and service for network performance observability. It exposes information about socket’s underlying TCP session, TLS and HTTP (more than 60 metrics). you can run it through command line or as a service. the request is highly customizable and you can integrate it with your application through gRPC. it runs in a Kubernetes cluster as cloud native application and by adding annotations on pods allow a fine control of the probing process.

Dec 15, 2022
x-crafter is used to quickly create templates from your prototype, also come with a builder to quickly regenerate your code

XCrafter ?? x-crafter is used to quickly create templates from your prototype, also come with a builder to quickly regenerate your code. Install Using

Nov 29, 2021
llb - It's a very simple but quick backend for proxy servers. Can be useful for fast redirection to predefined domain with zero memory allocation and fast response.

llb What the f--k it is? It's a very simple but quick backend for proxy servers. You can setup redirect to your main domain or just show HTTP/1.1 404

Sep 27, 2022
A TCP Server Framework with graceful shutdown, custom protocol.

xtcp A TCP Server Framework with graceful shutdown,custom protocol. Usage Define your protocol format: Before create server and client, you need defin

Dec 7, 2022
Middleware for Blocking IP ranges by inserting CIDR Blocks and searching IPs through those blocks

firewall Middleware for Blocking IP ranges by inserting CIDR Blocks and searching IPs through those blocks. Features Easy to use Efficient and Fast Co

Oct 9, 2022
Package socket provides a low-level network connection type which integrates with Go's runtime network poller to provide asynchronous I/O and deadline support. MIT Licensed.

socket Package socket provides a low-level network connection type which integrates with Go's runtime network poller to provide asynchronous I/O and d

Dec 14, 2022
TcpRoute , TCP 层的路由器。对于 TCP 连接自动从多个线路(电信、联通、移动)、多个域名解析结果中选择最优线路。

TcpRoute2 TcpRoute , TCP 层的路由器。对于 TCP 连接自动从多个线路(允许任意嵌套)、多个域名解析结果中选择最优线路。 TcpRoute 使用激进的选路策略,对 DNS 解析获得的多个IP同时尝试连接,同时使用多个线路进行连接,最终使用最快建立的连接。支持 TcpRoute

Dec 27, 2022