More effective network communication, two-way calling, notify and broadcast supported.

ARPC - More Effective Network Communication

Mentioned in Awesome Go

GoDoc MIT licensed Build Status Go Report Card Coverage Statusd

Contents

Features

  • Two-Way Calling
  • Two-Way Notify
  • Sync and Async Calling
  • Sync and Async Response
  • Batch Write | Writev | net.Buffers
  • Broadcast
  • Middleware
  • Pub/Sub
  • Opentracing
Pattern Interactive Directions Description
call two-way:
c -> s
s -> c
request and response
notify two-way:
c -> s
s -> c
request without response

Performance

  • simple echo load testing
Framework Protocol Codec Configuration Connection Num Number of Goroutines Per Connection Qps
arpc tcp/localhost encoding/json os: VMWare Ubuntu 18.04
cpu: AMD 3500U 4c8t
mem: 2G
8 10 80-100k
grpc http2/localhost protobuf os: VMWare Ubuntu 18.04
cpu: AMD 3500U 4c8t
mem: 2G
8 10 20-30k

Header Layout

  • LittleEndian
bodyLen reserved cmd flag methodLen sequence method body
4 bytes 1 byte 1 byte 1 bytes 1 bytes 8 bytes methodLen bytes bodyLen-methodLen bytes

Installation

  1. Get and install arpc
$ go get -u github.com/lesismal/arpc
  1. Import in your code:
import "github.com/lesismal/arpc"

Quick Start

package main

import "github.com/lesismal/arpc"

func main() {
	server := arpc.NewServer()

	// register router
	server.Handler.Handle("/echo", func(ctx *arpc.Context) {
		str := ""
		if err := ctx.Bind(&str); err == nil {
			ctx.Write(str)
		}
	})

	server.Run("localhost:8888")
}
package main

import (
	"log"
	"net"
	"time"

	"github.com/lesismal/arpc"
)

func main() {
	client, err := arpc.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	defer client.Stop()

	req := "hello"
	rsp := ""
	err = client.Call("/echo", &req, &rsp, time.Second*5)
	if err != nil {
		log.Fatalf("Call failed: %v", err)
	} else {
		log.Printf("Call Response: \"%v\"", rsp)
	}
}

API Examples

Register Routers

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

// message would be default handled one by one  in the same conn reader goroutine
handler.Handle("/route", func(ctx *arpc.Context) { ... })
handler.Handle("/route2", func(ctx *arpc.Context) { ... })

// this make message handled by a new goroutine
async := true
handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)

Router Middleware

See router middleware, it's easy to implement middlewares yourself

import "github.com/lesismal/arpc/extension/middleware/router"

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.Use(router.Recover())
handler.Use(router.Logger())
handler.Use(func(ctx *arpc.Context) { ... })
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
handler.Use(func(ctx *arpc.Context) { ... })

Coder Middleware

  • Coder Middleware is used for converting a message data to your designed format, e.g encrypt/decrypt and compress/uncompress
import "github.com/lesismal/arpc/extension/middleware/coder/gzip"

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.UseCoder(gzip.New())
handler.Handle("/echo", func(ctx *arpc.Context) { ... })

Client Call, CallAsync, Notify

  1. Call (Block, with timeout/context)
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
  1. CallAsync (Nonblock, with callback and timeout/context)
request := &Echo{...}

timeout := time.Second*5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) {
	response := &Echo{}
	ctx.Bind(response)
	...	
}, timeout)
  1. Notify (same as CallAsync with timeout/context, without callback)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)

Server Call, CallAsync, Notify

  1. Get client and keep it in your application
var client *arpc.Client
server.Handler.Handle("/route", func(ctx *arpc.Context) {
	client = ctx.Client
	// release client
	client.OnDisconnected(func(c *arpc.Client){
		client = nil
	})
})

go func() {
	for {
		time.Sleep(time.Second)
		if client != nil {
			client.Call(...)
			client.CallAsync(...)
			client.Notify(...)
		}
	}
}()
  1. Then Call/CallAsync/Notify

Broadcast - Notify

var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})

func broadcast() {
	var svr *arpc.Server = ... 
	msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i))
	mux.RLock()
	for client := range clientMap {
		client.PushMsg(msg, arpc.TimeZero)
	}
	mux.RUnlock()
}

Async Response

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

asyncResponse := true // default is true, or set false
handler.Handle("/echo", func(ctx *arpc.Context) {
	req := ...
	err := ctx.Bind(req)
	if err == nil {
		ctx.Write(data)
	}
}, asyncResponse)

Handle New Connection

// package
arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleConnected(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleConnected(func(c *arpc.Client) {
	...
})

Handle Disconnected

// package
arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleDisconnected(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleDisconnected(func(c *arpc.Client) {
	...
})

Handle Client's send queue overstock

// package
arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleOverstock(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleOverstock(func(c *arpc.Client) {
	...
})

Custom Net Protocol

// server
var ln net.Listener = ...
svr := arpc.NewServer()
svr.Serve(ln)

// client
dialer := func() (net.Conn, error) { 
	return ... 
}
client, err := arpc.NewClient(dialer)

Custom Codec

import "github.com/lesismal/arpc/codec"

var codec arpc.Codec = ...

// package
codec.Defaultcodec = codec

// server
svr := arpc.NewServer()
svr.Codec = codec

// client
client, err := arpc.NewClient(...)
client.Codec = codec

Custom Logger

import "github.com/lesismal/arpc/log"

var logger arpc.Logger = ...
log.SetLogger(logger) // log.DefaultLogger = logger

Custom operations before conn's recv and send

arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error) {
	// ...
})

arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error) {
	// ...
})

Custom arpc.Client's Reader by wrapping net.Conn

arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader) {
	// ...
})

Custom arpc.Client's send queue capacity

arpc.DefaultHandler.SetSendQueueSize(4096)

JS Client

Web Chat Examples

Pub/Sub Examples

  • start a server
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func main() {
	s := pubsub.NewServer()
	s.Password = password

	// server publish to all clients
	go func() {
		for i := 0; true; i++ {
			time.Sleep(time.Second)
			s.Publish(topicName, fmt.Sprintf("message from server %v", i))
		}
	}()

	s.Run(address)
}
  • start a subscribe client
import "github.com/lesismal/arpc/log"
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func onTopic(topic *pubsub.Topic) {
	log.Info("[OnTopic] [%v] \"%v\", [%v]",
		topic.Name,
		string(topic.Data),
		time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000"))
}

func main() {
	client, err := pubsub.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", address, time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	client.Password = password

	// authentication
	err = client.Authenticate()
	if err != nil {
		panic(err)
	}

	// subscribe topic
	if err := client.Subscribe(topicName, onTopic, time.Second); err != nil {
		panic(err)
	}

	<-make(chan int)
}
  • start a publish client
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func main() {
	client, err := pubsub.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", address, time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	client.Password = password

	// authentication
	err = client.Authenticate()
	if err != nil {
		panic(err)
	}

	for i := 0; true; i++ {
		if i%5 == 0 {
			// publish msg to all clients
			client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second)
		} else {
			// publish msg to only one client
			client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second)
		}
		time.Sleep(time.Second)
	}
}

More Examples

Owner
lesismal
less is more.
lesismal
Comments
  • 问一下 server 调用 client代码的样例 感觉我没用对, 又不知道哪里没对, 最终导致异常了

    问一下 server 调用 client代码的样例 感觉我没用对, 又不知道哪里没对, 最终导致异常了

    我的想问问 当 client 和 server已经连通之后, 在什么样的时机或者机制可以 通过ctx.Client.Call(....) 我的实际场景是, 当server拿到客户端传过来的数据之后, 需要经过耗时的运算, 然后将server的数据再通过同一个tcp连接 传回去, 但是不太方便直接在 在 server的handler中直接 ctx.Write(). 所以才想到用server端去"call client"

    client端

    
    client, err := arpc.NewClient(func() (net.Conn, error) {
        return net.DialTimeout("tcp", "localhost:10001", time.Second*3)
    })
    	
    
    payload := Payload{}
    client.Call("/send_payload", &payload, &rsp, time.Second*2)
    
    client.Handler.Handle("/receive_payload", func(ctx *arpc.Context) {
    
        log.Print("server call received")
        var payload protocol.Payload
        
        if err := ctx.Bind(&payload); err == nil {
    	    log.Print("req payload: ", payload.ToString())
    	    ctx.Write([]byte("ok"))
        }
    })
    

    server端 出错的的案例

    server := arpc.NewServer()
        
        ready_chan := make(chan bool)
        server.Handler.Handle("/send_payload", func(ctx *arpc.Context) {
        var payload protocol.Payload
        
        if err := ctx.Bind(&payload); err == nil {
    	    log.Print("req payload: ", payload.ToString())
    	    client = ctx.Client
    	    ctx.Write([]byte("ok"))
    	    ready_chan <- true
        }
        })
        
        go func() {
        <-ready_chan
        log.Print("ctx.Client", client) // 这一行注释掉就崩了, 异常参见下一个段落
        res := ""
        
        payload := protocol.NewPayload(
    	    1,
    	    1,
    	    1,
    	    []byte("server call client"),
        )
        client.Call("/receive_payload", &payload, &res, time.Second*20)
        log.Print(fmt.Sprintf("server call client, req:%s, res:%s", payload.ToString(), res))
        
        }()
        
        server.Run("localhost:10001")
    
    
    

    上面写法异常也看不到程序本身的行数, 应该是有地方没 recover到

    2021/09/09 15:16:13.832 [ERR] runtime error: runtime error: invalid memory address or nil pointer dereference
    traceback:
    goroutine 25 [running]:
    runtime/debug.Stack(0xc000107c20, 0x1b6980, 0x2bf960)
            c:/go/src/runtime/debug/stack.go:24 +0xa5
    github.com/lesismal/arpc/util.Recover()
            C:/Users/go/pkg/mod/github.com/lesismal/[email protected]/util/util.go:21 +0x5e
    panic(0x1b6980, 0x2bf960)
            c:/go/src/runtime/panic.go:969 +0x1c7
    github.com/lesismal/arpc.(*handler).OnMessage(0xc0000bc000, 0xc0000ba000, 0xc00001c200)
            C:/Users/go/pkg/mod/github.com/lesismal/[email protected]/handler.go:596 +0x2f4
    github.com/lesismal/arpc.(*Client).recvLoop(0xc0000ba000)
            C:/Users/go/pkg/mod/github.com/lesismal/[email protected]/client.go:735 +0x2bf
    github.com/lesismal/arpc/util.Safe(0xc00003e520)
            C:/Users/go/pkg/mod/github.com/lesismal/[email protected]/util/util.go:28 +0x50
    created by github.com/lesismal/arpc.(*Client).run
            C:/Users/go/pkg/mod/github.com/lesismal/[email protected]/client.go:680 +0x108
    
  • Benchmark test between v1.1.9 and v1.1.8

    Benchmark test between v1.1.9 and v1.1.8

    **Test environment **

    On the same machine, the Windows/Linux test results are relatively consistent.

    Use official documents:

    • arpc/examples/bench/client/client.go
    • arpc/examples/bench/server/server.go

    arpc v1.1.9 client.Call

    2021/08/17 09:47:32 [qps: 25756], [avg: 25796 / s], [total: 644904, 25 s]
    2021/08/17 09:47:33 [qps: 25208], [avg: 25773 / s], [total: 670112, 26 s]
    2021/08/17 09:47:34 [qps: 24706], [avg: 25734 / s], [total: 694818, 27 s]
    2021/08/17 09:47:35 [qps: 26389], [avg: 25757 / s], [total: 721207, 28 s]
    2021/08/17 09:47:36 [qps: 24529], [avg: 25715 / s], [total: 745736, 29 s]
    2021/08/17 09:47:37 [qps: 21087], [avg: 25560 / s], [total: 766823, 30 s]
    

    arpc v1.1.8 client.Call

    2021/08/17 09:48:59 [qps: 35669], [avg: 35430 / s], [total: 885762, 25 s]
    2021/08/17 09:49:00 [qps: 33920], [avg: 35372 / s], [total: 919682, 26 s]
    2021/08/17 09:49:01 [qps: 34806], [avg: 35351 / s], [total: 954488, 27 s]
    2021/08/17 09:49:02 [qps: 34576], [avg: 35323 / s], [total: 989064, 28 s]
    2021/08/17 09:49:03 [qps: 33994], [avg: 35277 / s], [total: 1023058, 29 s]
    2021/08/17 09:49:04 [qps: 34774], [avg: 35261 / s], [total: 1057832, 30 s]
    

    v1.1.9 client.Notify

    for k := 0; true; k++ {
    	rand.Read(data)
    	req := &HelloReq{Msg: base64.RawStdEncoding.EncodeToString(data)}
    	// rsp := &HelloRsp{}
    	err = client.Notify(method, req, time.Second*5)
    	if err != nil {
    		log.Printf("Call failed: %v", err)
    	// } else if rsp.Msg != req.Msg {
    	// 	log.Fatal("Call failed: not equal")
    	} else {
    		atomic.AddUint64(&qpsSec, 1)
    	}
    }
    
    2021/08/17 09:54:04 [qps: 6568], [avg: 63304 / s], [total: 1582624, 25 s]
    2021/08/17 09:54:05 [qps: 52315], [avg: 62882 / s], [total: 1634939, 26 s]
    2021/08/17 09:54:08 [qps: 51121], [avg: 62446 / s], [total: 1686060, 27 s]
    2021/08/17 09:54:08 [qps: 96364], [avg: 63658 / s], [total: 1782424, 28 s]
    2021/08/17 09:54:08 [qps: 15375], [avg: 61993 / s], [total: 1797799, 29 s]
    2021/08/17 09:54:09 [qps: 43706], [avg: 61383 / s], [total: 1841505, 30 s]
    

    v1.1.8 client.Notify

    for k := 0; true; k++ {
    	req := &HelloReq{Msg: "hello from client.Call"}
    	// rsp := &HelloRsp{}
    	err = client.Notify(method, req, time.Second*5)
    	if err != nil {
    		log.Printf("Call failed: %v", err)
    	} else {
    		//log.Printf("Call Response: \"%v\"", rsp.Msg)
    		atomic.AddUint64(&qpsSec, 1)
    	}
    }
    
    2021/08/17 09:51:12 [qps: 464933], [avg: 515004 / s], [total: 12875112, 25 s]
    2021/08/17 09:51:13 [qps: 340793], [avg: 508304 / s], [total: 13215905, 26 s]
    2021/08/17 09:51:14 [qps: 445335], [avg: 505971 / s], [total: 13661240, 27 s]
    2021/08/17 09:51:15 [qps: 498246], [avg: 505695 / s], [total: 14159486, 28 s]
    2021/08/17 09:51:16 [qps: 385566], [avg: 501553 / s], [total: 14545052, 29 s]
    2021/08/17 09:51:17 [qps: 504710], [avg: 501658 / s], [total: 15049762, 30 s]
    

    v1.1.9-server, v1.1.8-client client.Notify

    2021/08/17 09:55:59 [qps: 349016], [avg: 327355 / s], [total: 8183891, 25 s]
    2021/08/17 09:56:00 [qps: 335811], [avg: 327680 / s], [total: 8519702, 26 s]
    2021/08/17 09:56:01 [qps: 354877], [avg: 328688 / s], [total: 8874579, 27 s]
    2021/08/17 09:56:02 [qps: 302469], [avg: 327751 / s], [total: 9177048, 28 s]
    2021/08/17 09:56:03 [qps: 364375], [avg: 329014 / s], [total: 9541423, 29 s]
    2021/08/17 09:56:04 [qps: 341278], [avg: 329423 / s], [total: 9882701, 30 s]
    
  • issue with client

    issue with client

    你好,请问如果在连接成功的时候,客户端需要发送一条消息给服务端,是可以像以下代码这样发送消息么?

    client.Handler.HandleConnected(func(connectedClient *arpc.Client) {
    		req := ""
    		err := connectedClient.Call("/callAfterConnected","", &req, time.Second * 5)
    		if err == nil {
    			fmt.Println("Call After Connected Success.")
    		}else {
    			fmt.Println("Call After Connected Failed. ", err, "\n", req)
    		}
    	})
    

    我尝试了一下,发现很奇怪的现象,有时候这个 Handler 不会触发,如果触发该 Handler ,则必定错误,原因均为 timeout,此时 ARPC 会报 [WRN] [ARPC CLI] OnMessage: session not exist or expired 错误。

    备注:当前 client 并不需要执行 Stop 操作,所以确定不是因为 Stop 触发的。

  • 可以增加一个最大重试次数限制或者回调么?

    可以增加一个最大重试次数限制或者回调么?

    我在使用的过程中,发现通过 Client 建立的请求,当 Server 端意外失联之后,Client 端会不断的尝试重连,重试间隔 1s,无法中断,查看 client.go 的代码后发现有如下逻辑:

    https://github.com/lesismal/arpc/blob/b7b66250afb12b445295063631cb8072a35f0788/client.go#L752-L771

    期望: 是否可以在此处增加重连次数的回调?可以让调用者决定多少次之后放弃重新建立连接,转而处理其他业务? 如果考虑到 Handler 的调用次数可能过于频繁的话,那是否可以在 Client 的配置中增加个限制最大重试次数,超过之后直接走 Disconnected 的 Handler,这个 Disconnected 的 Handler 目前似乎只有在 Client 端主动 Stop 的时候才会执行。

    我尝试 fork 增加了部分相关代码,目前使用上是满足我的业务了,不过没有经过测试,也不敢提 pull request,所以在这里提出这个改进建议,谢谢阅读。

  • pubsub not removing disconnected clients

    pubsub not removing disconnected clients

    When subscribing client disconnects from server without unsubscribing first pupsub.Server.Publish() gives error: 2021/09/22 19:04:49.612 [ERR] [Publish] [topic: 'Broadcast'] failed client stopped, from Server to 127.0.0.1:35306

    From reading the code regarding pupsub.NewServer() svr.Handler.HandleDisconnected(svr.deleteClient) Publish() should no longer be trying to send to a disconnected client but this does not seem to be working?

    I do see the client disconnecting in log: 2021/09/22 19:04:30.206 [ERR] [APS SVR] 127.0.0.1:35306 Disconnected: EOF

    I tried also to do this manually but when I add to code:

    srv.Handler.HandleDisconnected(func (c *arpc.Client) {
        log.Error().Msg("HANDLE_DISCONNECTED_TEST")
    })
    

    Nothing happens either..

  • Bugs For arpc.js

    Bugs For arpc.js

    您好,请问一下,有关 websocket 的消息截断的问题,在 onMessage 函数下,通过 offset 去循环处理一段消息,这里使用 offset 的原因是什么?

    https://github.com/lesismal/arpc/blob/9ff9485288edf8f855b16f113061712e5737cf51/extension/jsclient/arpc.js#L179-L188

    我在使用的时候,发现 bodyLen 的长度计算不正确,例如我从服务器推送一条 Notify 的消息,在这里断点,得到的 event.data.byteLength 长度是 645,然而通过 bodyLen 计算出来的结果是 119,即使这是在 while 循环中,offset 会在下一次循环开始前偏移,但是消息已经在第一次循环里就发送给 handle 了

    https://github.com/lesismal/arpc/blob/9ff9485288edf8f855b16f113061712e5737cf51/extension/jsclient/arpc.js#L211-L214

    还有另一个问题是,第二次循环中,header 的内容并不存在,计算的 method 或者其他信息都是原来消息体的一部分信息,即使想要组装数据,似乎也没法完成。

    https://github.com/lesismal/arpc/blob/9ff9485288edf8f855b16f113061712e5737cf51/extension/jsclient/arpc.js#L190-L192

    以下是我使用的部分代码:

           //JS:
           client = new ArpcClient("ws://localhost:8888/ws", null)
           //订阅模式的 Handler
           client.handle("/broadcast/info", function(ctx) {
                console.log("[Info] ", ctx)
           });
           //请求订阅,成功之后通过上面的 Handler 获取服务器推送的消息
           client.call("/registerBroadcast", "xxxx", 5000, function(resp) {
                console.log("response ", resp)
           })
    
    
    //Server:
    func InitServer(port string)  {
    	ln, _ := websocket.Listen("localhost:"+port, nil)
    	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    		fmt.Println("url: %v", r.URL.String())
    		if r.URL.Path == "/" {
    			http.ServeFile(w, r, "chat.html")
    		} else if r.URL.Path == "/arpc.js" {
    			http.ServeFile(w, r, "arpc.js")
    		} else {
    			http.NotFound(w, r)
    		}
    	})
    
    	http.HandleFunc("/ws", ln.(*websocket.Listener).Handler)
    	go func() {
    		err := http.ListenAndServe("localhost:"+port, nil)
    		if err != nil {
    			fmt.Println("ListenAndServe: ", err)
    			panic(err)
    		}
    	}()
    
    	server := arpc.NewServer()
    
    	/*
    		Broadcast auth
    
    		request  :
    			AccessKey	string
    		response :
    			true or false
    	*/
    	server.Handler.Handle(pinApi.AddressForApi(pinApi.BROADCAST_AUTH), func(context *arpc.Context) {
    		var accessKey string
    		if err := context.Bind(&accessKey); err != nil {
    			context.Write(code.Failed)
    			return
    		}
    		if len(accessKey) > 0 && accessKey == default.AccessKey  {
    			BroadcastClients = append(BroadcastClients, context.Client)
    			context.Write(define.RPC_SUCCESS)
    		}else {
    			context.Write(code.Failed)
    		}
    	})
    
    	//broadcast info
    	go broadcastInfo()
    
    	server.Serve(ln)
    }
    func broadcastServerInfo() {
    	for true {
    		if len(BroadcastClients) > 0 {
    			log.Println(len(BroadcastClients), " clients listen Broad casting...")
                            // jsonData 是个较长的 json 数据
    			msg := RPCServer.NewMessage(arpc.CmdNotify,"/broadcast/info", jsonData)
    			for _, client := range BroadcastClients {
    				client.PushMsg(msg, arpc.TimeZero)
    			}
    		}
    		time.Sleep(time.Second * 1)
    	}
    }
    
    
  • timeout不工作的问题

    timeout不工作的问题

    如果在server里面call client的一个handle,然后client的handle超时后 server发给该client的一切call都直接timeout了 client自身的一切call server的操作也会直接timeout了

    此时应该怎么做?因为无法保证client的handle或者server的handle的处理时间 小于call时设置的timeout时间 不能影响其他请求吧?如果取消那个请求呢?

  • 例子里的micro下面的client.go的serviceManager.ClientBy获取不到服务

    例子里的micro下面的client.go的serviceManager.ClientBy获取不到服务

    package main
    
    import (
    	"net"
    	"time"
    
    	"github.com/lesismal/arpc/extension/micro"
    	"github.com/lesismal/arpc/extension/micro/etcd"
    	"github.com/lesismal/arpc/log"
    )
    
    func dialer(addr string) (net.Conn, error) {
    	return net.DialTimeout("tcp", addr, time.Second*3)
    }
    
    func main() {
    	var (
    		appPrefix = "app"
    		service   = "echo"
    
    		endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"}
    
    		serviceManager = micro.NewServiceManager(dialer)
    	)
    	discovery, err := etcd.NewDiscovery(endpoints, appPrefix, serviceManager)
    	if err != nil {
    		log.Error("NewDiscovery failed: %v", err)
    		panic(err)
    	}
    	defer discovery.Stop()
    
    		for {
    		time.Sleep(time.Second * 1)
    		client, err := serviceManager.ClientBy(service)
                    // 这里会报错,获取不到服务,必须要在上面先停留一段时间才能获取到。
    		if err != nil {
    			log.Error("get Client failed: %v", err)
    		} else {
    			req := "arden"
    			rsp := ""
    			err = client.Call("/echo", &req, &rsp, time.Second*5)
    			if err != nil {
    				log.Info("Call /echo failed: %v", err)
    			} else {
    				log.Info("Call /echo Response: \"%v\"", rsp)
    			}
    		}
    		time.Sleep(time.Second)
    	}
    }
    

    time.Sleep(time.Second * 1) client, err := serviceManager.ClientBy(service)' // 这里会报错,获取不到服务,必须要在上面先停留一段时间才能获取到。

  • extension/micro: feature request

    extension/micro: feature request

    Thanks for your work, arpc is awesome.

    1, In game scenario, sometimes a request from the frontend server must be transferred to a special server, not a random one.

    ref: https://github.com/lesismal/arpc/blob/master/extension/micro/service.go#L210

    2, RPCX has metadata called state which is very useful in production.

    ref: https://github.com/rpcxio/rpcx-examples/blob/master/state/client/client.go#L21

    Are you considering adding these above?

    Thanks!

  • Missing SetLogLevel

    Missing SetLogLevel

    Hello,

    Just performed an update to the latest version and noticed SetLogLevel function has disappeared. This code no longer compiles arpc.SetLogLevel(arpc.LogLevelError).

  • data race问题

    data race问题

    简单跑了一下,data race的问题还是很多的,不知是暂时没考虑 还是为了性能故意的race?

    像server的 github.com/lesismal/arpc/server.go:127 github.com/lesismal/arpc/server.go:128 github.com/lesismal/arpc/server.go:42 github.com/lesismal/arpc/server.go:182

    像client的 github.com/lesismal/arpc/client.go:726 github.com/lesismal/arpc/client.go:694 github.com/lesismal/arpc/handler.go:595 github.com/lesismal/arpc/handler.go:758 github.com/lesismal/arpc/client.go:694

    看起来好像不是prod ready的状态?

  • client 调用 server 附带的 values 会被原样再发送回来,是否可控不发送

    client 调用 server 附带的 values 会被原样再发送回来,是否可控不发送

    ctx.write的时候,会把收到的values原样回传,然而client.Call是拿不到最新的values的,因为接触不到完整的消息体,在on函数和middleware可以拿到,但是这些都是全局的

    c/s场景下单向调用的场景,可以认为是浪费了,values越大浪费越多,如果是server端因为需要还额外通过middleware写入了一下信息自己用,那回传的数据就更多了

    所以如果能让client可以方便的获取到values,这样可能部分场景能用到,另外就是可选关闭,我目前是通过middleware手动清空,但是感觉不够优雅

  • 怎么添加和获取message.values

    怎么添加和获取message.values

    client.go

    err = client.Call("/echo/sync", &req, &rsp, time.Second*5, map[interface{}]interface{}{
    		"x-arpc-context": "xxxxx",
    	})
    

    server.go

    fmt.Printf("message values: %+v", ctx.Message.Values())
    

    这种方式获取不到message.values

An implementation of a distributed KV store backed by Raft tolerant of node failures and network partitions 🚣
An implementation of a distributed KV store backed by Raft tolerant of node failures and network partitions 🚣

barge A simple implementation of a consistent, distributed Key:Value store which uses the Raft Concensus Algorithm. This project launches a cluster of

Nov 24, 2021
Parallel Digital Universe - A decentralized identity-based social network

Parallel Digital Universe Golang implementation of PDU. What is PDU? Usage Development Contributing PDU PDU is a decentralized identity-based social n

Nov 20, 2022
Like Go channels over the network

libchan: like Go channels over the network Libchan is an ultra-lightweight networking library which lets network services communicate in the same way

Dec 9, 2022
A Golang implementation of the Umee network, a decentralized universal capital facility in the Cosmos ecosystem.

Umee A Golang implementation of the Umee network, a decentralized universal capital facility in the Cosmos ecosystem. Umee is a Universal Capital Faci

Jan 3, 2023
Network connecter for storage.

database Quick fast connection to database use gorm. Installation $ go get -u github.com/coolstina/connecter Example package main import ( "fmt" "

Dec 4, 2021
Golang client library for adding support for interacting and monitoring Celery workers, tasks and events.

Celeriac Golang client library for adding support for interacting and monitoring Celery workers and tasks. It provides functionality to place tasks on

Oct 28, 2022
dht is used by anacrolix/torrent, and is intended for use as a library in other projects both torrent related and otherwise

dht Installation Install the library package with go get github.com/anacrolix/dht, or the provided cmds with go get github.com/anacrolix/dht/cmd/....

Dec 28, 2022
Dec 27, 2022
Take control of your data, connect with anything, and expose it anywhere through protocols such as HTTP, GraphQL, and gRPC.
Take control of your data, connect with anything, and expose it anywhere through protocols such as HTTP, GraphQL, and gRPC.

Semaphore Chat: Discord Documentation: Github pages Go package documentation: GoDev Take control of your data, connect with anything, and expose it an

Sep 26, 2022
A feature complete and high performance multi-group Raft library in Go.
A feature complete and high performance multi-group Raft library in Go.

Dragonboat - A Multi-Group Raft library in Go / 中文版 News 2021-01-20 Dragonboat v3.3 has been released, please check CHANGELOG for all changes. 2020-03

Dec 30, 2022
High performance, distributed and low latency publish-subscribe platform.
High performance, distributed and low latency publish-subscribe platform.

Emitter: Distributed Publish-Subscribe Platform Emitter is a distributed, scalable and fault-tolerant publish-subscribe platform built with MQTT proto

Jan 2, 2023
Fast, efficient, and scalable distributed map/reduce system, DAG execution, in memory or on disk, written in pure Go, runs standalone or distributedly.

Gleam Gleam is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize. Gleam is built

Jan 1, 2023
Simple, fast and scalable golang rpc library for high load

gorpc Simple, fast and scalable golang RPC library for high load and microservices. Gorpc provides the following features useful for highly loaded pro

Dec 19, 2022
🌧 BitTorrent client and library in Go
🌧 BitTorrent client and library in Go

rain BitTorrent client and library in Go. Running in production at put.io. Features Core protocol Fast extension Magnet links Multiple trackers UDP tr

Dec 28, 2022
A Go library for master-less peer-to-peer autodiscovery and RPC between HTTP services

sleuth sleuth is a Go library that provides master-less peer-to-peer autodiscovery and RPC between HTTP services that reside on the same network. It w

Dec 28, 2022
Full-featured BitTorrent client package and utilities

torrent This repository implements BitTorrent-related packages and command-line utilities in Go. The emphasis is on use as a library from other projec

Jan 4, 2023
Go Open Source, Distributed, Simple and efficient Search Engine

Go Open Source, Distributed, Simple and efficient full text search engine.

Dec 31, 2022