A library to simplify writing applications using TCP sockets to stream protobuff messages

BuffStreams GoDoc Build Status

Streaming Protocol Buffers messages over TCP in Golang

What is BuffStreams?

BuffStreams is a set of abstraction over TCPConns for streaming connections that write data in a format involving the length of the message + the message payload itself (like Protocol Buffers, hence the name).

BuffStreams gives you a simple interface to start a (blocking or non) listener on a given port, which will stream arrays of raw bytes into a callback you provide it. In this way, BuffStreams is not so much a daemon, but a library to build networked services that can communicate over TCP using Protocol Buffer messages.

Why BuffStreams

I was writing a few different projects for fun in Golang, and kept writing code something like what is in the library, but less organized. I decided to focus on the networking code, pulling it out and improving it so I knew it could be trusted to perform reliably across projects.

Ethos

There is nothing special or magical about Buffstreams, or the code in here. The idea isn't that it's a better, faster socket abstraction - it's to do as much of the boilerplate for you when handling streaming data like protobuff messages, with as little impact to performance as possible. Currently, Buffstreams is able to do over 1.1 million messsages per second, at 110 bytes per message on a single listening socket which saturates a 1gig nic.

The idea of Buffstreams is to do the boring parts and handle common errors, enabling you to write systems on top of it, while performing with as little overhead as possible.

How does it work?

Since protobuff messages lack any kind of natural delimeter, BuffStreams uses the method of adding a fixed header of bytes (which is configurable) that describes the size of the actual payload. This is handled for you, by the call to write. You never need to pack on the size yourself.

On the server side, it will listen for these payloads, read the fixed header, and then the subsequent message. The server must have the same maximum size as the client for this to work. BuffStreams will then pass the byte array to a callback you provided for handling messages received on that port. Deserializing the messages and interpreting their value is up to you.

One important note is that internally, BuffStreams does not actually use or rely on the Protocol Buffers library itself in any way. All serialization / deserialization is handled by the client prior to / after interactions with BuffStreams. In this way, you could theoretically use this library to stream any data over TCP that uses the same strategy of a fixed header of bytes + a subsequent message body.

Currently, I have only used it for ProtocolBuffers messages.

Logging

You can optionally enable logging of errors, although this naturally comes with a performance penalty under extreme load.

Benchmarks

I've tried very hard to optimize BuffStreams as best as possible, striving to keep it's averages above 1M messages per second, with no errors during transit.

See Bench

How do I use it?

Download the library

go get "github.com/StabbyCutyou/buffstreams"

Import the library

import "github.com/StabbyCutyou/buffstreams"

For a quick example of a complete end to end client and server, check out the examples in the test/ directory, namely test/client/test_client.go and test/server/test_server.go. These two files are designed to work together to demonstrate an end to end integration of Buffstreams, in the simplest possible way.

Listening for connections

One of the core objects in Buffstreams is the TCPListener. This struct allows you to open a socket on a local port, and begin waiting for clients to connect. Once a connection is made, each full message written by the client will be received by the Listener, and a callback you define will be invoked with the message contents (an array of bytes).

To begin listening, first create a TCPListenerConfig object to define how the listener should behave. A sample TCPListenerConfig might look like this:

cfg := TCPListenerConfig {
  EnableLogging: false, // true will have log messages printed to stdout/stderr, via log
  MaxMessageSize: 4096,
  Callback: func(byte[])error{return nil} // Any function type that adheres to this signature, you'll need to deserialize in here if need be
  Address: FormatAddress("", strconv.Itoa(5031)) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience. For listening, you normally don't want to provide an ip unless you have a reason.
}
btl, err := buffstreams.ListenTCP(cfg)

Once you've opened a listener this way, the socket is now in use, but the listener itself has not yet begun to accept connections.

To do so, you have two choices. By default, this operation will block the current thread. If you want to avoid that, and use a fire and forget approach, you can call

err := btl.StartListeningAsync()

If there is an error while starting up, it will be returned by this method. Alternatively, if you want to handle running the call yourself, or don't care that it blocks, you can call

err := btl.StartListening()

The ListenCallback

The way Buffstreams handles acting over the incoming messages is to let you provide a callback to operate on the bytes. ListenCallback takes in an array/slice of bytes, and returns an error.

type ListenCallback func([]byte) error

The callback will receive the raw bytes for a given protobuff message. The header containing the size will have been removed. It is the callbacks responsibility to deserialize and act upon the message.

The Listener gets the message, your callback does the work.

A sample callback might start like so:

  func ListenCallbackExample ([]byte data) error {
    msg := &message.ImportantProtoBuffStreamingMessage{}
    err := proto.Unmarshal(data, msg)
    // Now you do some stuff with msg
    ...
  }

The callback is currently run in it's own goroutine, which also handles reading from the connection until the reader disconnects, or there is an error. Any errors reading from a connection incoming will be up to the client to handle.

Writing messages

To begin writing messages to a new connection, you'll need to dial a using TCPConnConfig

cfg := TCPConnConfig {
  EnableLogging: false, // true will have log messages printed to stdout/stderr, via log
  MaxMessageSize: 4096, // You want this to match the MaxMessageSize the server expects for messages on that socket
  Address: FormatAddress("127.0.0.1", strconv.Itoa(5031)) // Any address with the pattern ip:port. The FormatAddress helper is here for convenience.
}

Once you have a configuration object, you can Dial out.

btc, err := buffstreams.DialTCP(cfg)

This will open a connection to the endpoint at the specified location. Additionally, the TCPConn that the TCPListener returns will also allow you to write data, using the same methods as below.

From there, you can write your data

  bytesWritten, err := btc.Write(msgBytes, true)

If there is an error in writing, that connection will be closed and be reopened on the next write. There is no guarantee if any the bytesWritten value will be >0 or not in the event of an error which results in a reconnect.

Manager

There is a third option, the provided Manager class. This class will give you a simple but effective Manager abstraction over dialing and listening over ports, managing the connections for you. You provide the normal configuration for dialing out or listening for incoming connections, and let the manager hold onto the references. The Manager is considered threadsafe, as it internally uses locks to ensure consistency and coordination between concurrent access to the connections being held.

The Manager is not really a "Pool", in that it doesn't open and hold X connections for you to re-use. However, it maintains many of the same behaviors as a pool, including caching and re-using connections, and as mentioned is threadsafe.

Creating a Manager

bm := buffstreams.NewManager()

Listening on a port. Manager always makes this asynchronous and non blocking

// Assuming you've got a configuration object cfg, see above
err := bm.StartListening(cfg)

Dialing out to a remote endpoint

// Assuming you've got a configuration object cfg, see above
err := bm.Dial(cfg)

Having opened a connection, writing to that connection in a constant fashion

bytesWritten, err := bm.Write("127.0.0.1:5031", dataBytes)

The Manager will keep listening and dialed out connections cached internally. Once you open one, it'll be kept open. The writer will match your incoming write destination, such that any time you write to that same address, the correct writer will be re-used. The listening connection will simply remain open, waiting to receive requests.

You can forcibly close these connections, by calling either

err := bm.CloseListener("127.0.0.1:5031")

or

err := bm.CloseWriter("127.0.0.1:5031")

Thanks

Special thanks to those who have reported bugs or helped me improve Buffstreams

  • Judson White

Roadmap

  • Release proper set of benchmarks, including more real-world cases
  • Configurable retry for the client, configurable errored-message queue for user to define failover process to handle.
  • Optional channel based streaming approach instead of callbacks
  • Further library optimizations via tools such as pprof

LICENSE

Apache v2 - See LICENSE

Owner
Sean Kelly
I used to be nasty at NBA Jam. Not so much anymore, though.
Sean Kelly
Comments
  • WIP refactor away from manager, discrete classes for writer / listener.

    WIP refactor away from manager, discrete classes for writer / listener.

    ping @lumost

    What do you think about this? It's totally untested and not ready for merging, but as a concept. Remove the manager, or at the very least, make it optional.

    Offer 2 abstractions over a socket - listener and writer. They do everything the manager did, but without the locking or keeping track of things. Maybe offer the Manager as an optional convenience for when you just don't care, but want the functionality.

  • Under the concurrent is very unsafe

    Under the concurrent is very unsafe

    Under the concurrent is very unsafe

    panic: runtime error: slice bounds out of range
    
    goroutine 19 [running]:
    panic(0x104640, 0xc42000c130)
    	/usr/local/go/src/runtime/panic.go:500 +0x1a1
    github.com/barnettZQG/buffstreams.(*TCPConn).Read(0xc4200be060, 0xc4201d9000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    	/Users/qingguo/gopath/src/github.com/barnettZQG/buffstreams/tcpconn.go:214 +0x14e
    github.com/barnettZQG/buffstreams.(*TCPListener).readLoop(0xc42007a510, 0xc4200be060)
    	/Users/qingguo/gopath/src/github.com/barnettZQG/buffstreams/tcplistener.go:169 +0x148
    created by github.com/barnettZQG/buffstreams.(*TCPListener).blockListen
    	/Users/qingguo/gopath/src/github.com/barnettZQG/buffstreams/tcplistener.go:100 +0xa2
    exit status 2
    
  • Read with delimiter

    Read with delimiter

    Currently the package only supports reading packets which only specify length, there are some scenarios when we read the packets based on delimiters. I have modified the code to handle the packets with Delimiters.

  • Break manager apart into listening / writing socket abstractions

    Break manager apart into listening / writing socket abstractions

    This will allow people to optionally just use the socket abstractions, or take the easier road of using the manager.

    This is actively being worked on.

  • Feature/remove conn interface

    Feature/remove conn interface

    This fixes a bug in how persist is treated, and removes using any of the interface level stuff for networking, going straight TCP as was the intention.

  • Review locking strategies

    Review locking strategies

    There is possibly some naivete in how I'm locking. Possible improvements:

    2 locks, one for listening, one for dialing

    Move writes out of locks, break the lookup + write into 2 ops when writing to an endpoint, since conns are already threadsafe.

    Synchronize access to the conn map so I need less locking code littered around map acces

  • Readme is in terrible shape

    Readme is in terrible shape

    I tossed that readme up really quickly just to have something.

    Need to address this with a proper readme that has cleaned up language and a general re-organization of information.

  • Is order of packet recieve guaranteed ?

    Is order of packet recieve guaranteed ?

    Got a question - not so much an issue - am working with protobuffers that i want to stream over tcp - and i came across your project here which really sounds like the same - but if i would try to use this as base for :

    • a client streams a video to the server
    • the server sends the video stream to all connected clients ( except the sender )
    • the clients will show the video

    Clients is a c-sharp application.

    Would your project here guarantee the order in which the packets are sent or do i need to implement some ordering id in the packet ?

    Would you think it will be fast enough ?

    Would you have any idea how i could 'broadcast'an incoming stream onto all connected clients ?

    And the fixes you committed will they become master soon ?

  • readLoop may memory leak

    readLoop may memory leak

    func (t *TCPListener) readLoop(conn *TCPConn) {
    	go func(c *TCPConn, s <-chan struct{}) {
    		<-s
    		c.Close()
    	}(conn, t.shutdownChannel)
            for {
            // if this return, the routine above will never be stopped unless shutdownGroup is closed
            // incomingHeaderBuffer or outgoingDataBuffer may memory leak
            }
    }
    
  • Incorrect header length

    Incorrect header length

    Headers are encoded with Varint encoding, as per the spec for varint: https://developers.google.com/protocol-buffers/docs/encoding

    Varint encoding uses the 8th bit of each byte as a more flag. As such, only 7 bits per byte actually contain data.

    This means that a 64 bit number now needs 80 bits to actually be encoded if the number is large enough.

Related tags
Serve traffic (HTTP/gRPC) over SSH using Domain Sockets

Serve On SSH Introduction There is often a need to offer services for administrative purposes on servers or even for microservices that are running on

Nov 10, 2022
Stream Camera based on TCP
Stream Camera based on TCP

streamera Term Project of Computer Networking streamera is a Stream Camera based on TCP, which contains client mode and server mode. Features Client M

Nov 11, 2022
Jun 6, 2022
Forked Version of Miekg's DNS library that recycles UDP sockets

Alternative (more granular) approach to a DNS library Less is more. Complete and usable DNS library. All Resource Records are supported, including the

Jan 20, 2022
Client - Server TCP Chat For String Messages And Random Files

GoChat Client - Server TCP Chat For String Messages And Random Files GoChat is a chat for string messages and random files using Golorem by Derek A. R

Sep 29, 2021
TcpRoute , TCP 层的路由器。对于 TCP 连接自动从多个线路(电信、联通、移动)、多个域名解析结果中选择最优线路。

TcpRoute2 TcpRoute , TCP 层的路由器。对于 TCP 连接自动从多个线路(允许任意嵌套)、多个域名解析结果中选择最优线路。 TcpRoute 使用激进的选路策略,对 DNS 解析获得的多个IP同时尝试连接,同时使用多个线路进行连接,最终使用最快建立的连接。支持 TcpRoute

Dec 27, 2022
Multiplexer over TCP. Useful if target server only allows you to create limited tcp connections concurrently.

tcp-multiplexer Use it in front of target server and let your client programs connect it, if target server only allows you to create limited tcp conne

May 27, 2021
TCP output for beats to send events over TCP socket.

beats-tcp-output How To Use Clone this project to elastic/beats/libbeat/output/ Modify elastic/beats/libbeat/publisher/includes/includes.go : // add i

Aug 25, 2022
Tcp chat go - Create tcp chat in golang

TCP chat in GO libs Go net package and goroutines and channels tcp tcp or transm

Feb 5, 2022
Support for Unix domain sockets in Go HTTP clients

unixtransport This package adds support for Unix domain sockets in Go HTTP clients. t := &http.Transport{...} unixtransport.Register(t) client := &h

Dec 21, 2022
Event driven modular status-bar for dwm; written in Go & uses Unix sockets for signaling.

dwmstat A simple event-driven modular status-bar for dwm. It is written in Go & uses Unix sockets for signaling. The status bar is conceptualized as a

Dec 25, 2021
A lightweight stream processing library for Go
A lightweight stream processing library for Go

go-streams A lightweight stream processing library for Go. go-streams provides a simple and concise DSL to build data pipelines. Wiki In computing, a

Dec 31, 2022
Totem - A Go library that can turn a single gRPC stream into bidirectional unary gRPC servers

Totem is a Go library that can turn a single gRPC stream into bidirectional unar

Jan 6, 2023
Library for directly interacting and controlling an Elgato Stream Deck on Linux.

Stream Deck Library for directly interacting and controlling an Elgato Stream Deck on Linux. This library is designed to take exclusive control over a

Dec 17, 2022
Using Wireshark to decrypt TLS gRPC Client-Server protobuf messages
Using Wireshark to decrypt TLS gRPC Client-Server protobuf messages

Using Wireshark to decrypt TLS gRPC Client-Server protobuf messages Sample client server in golang that demonstrates how to decode protobuf messages f

Sep 8, 2022
HTTP(S)/WS(S)/TCP Tunnels to localhost using only SSH.

An open source serveo/ngrok alternative.

Dec 29, 2022
A TCP socket based chat server implemented using Go

Go Chat Server A better TCP socket chat server implemented using Go Connecting nc localhost 5000 Docker Build the container image docker build -t grub

Oct 16, 2021
Detect nmap TCP SYN scans (-sS) using gopacket in golang.

Setup Fetch dependencies using apt (PRs welcome for concise instructions for other package managers): sudo apt install libpcap-dev git clone https://

Apr 27, 2022
P2P Forwarder - a tool for farwarding tcp/udp ports. Made using libp2p.
P2P Forwarder - a tool for farwarding tcp/udp ports. Made using libp2p.

P2P Forwarder A tool for farwarding ports. Made using libp2p. How it works A: opens desired ports ports inside P2P Forwarder A: shares it's id from P2

Nov 14, 2022