A Fast Golang Redis RESP3 client that supports client side caching and auto pipelining.

rueidis

Go Reference circleci Go Report Card Maintainability Test Coverage

A Fast Golang Redis RESP3 client that does auto pipelining and supports client side caching.

Features

  • auto pipeline for non-blocking redis commands
  • connection pooling for blocking redis commands
  • opt-in client side caching
  • redis cluster, pub/sub, streams, TLS, RedisJSON, RedisBloom, RediSearch, RedisGraph, RedisTimeseries
  • IDE friendly redis command builder
  • Hash/RedisJSON Object Mapping with client side caching and optimistic locking
  • OpenTelemetry tracing and metrics

Requirement

  • Currently, only supports redis >= 6.x

Getting Started

package main

import (
	"context"
	"github.com/rueian/rueidis"
)

func main() {
	c, _ := rueidis.NewClient(rueidis.ClientOption{
		InitAddress: []string{"127.0.0.1:6379"},
	})
	defer c.Close()

	ctx := context.Background()

	// SET key val NX
	c.Do(ctx, c.B().Set().Key("key").Value("val").Nx().Build()).Error()
	// GET key
	c.Do(ctx, c.B().Get().Key("key").Build()).ToString()
}

Auto Pipeline

All non-blocking commands sending to a single redis instance are automatically pipelined through one tcp connection, which reduces the overall round trip costs, and gets higher throughput.

Benchmark comparison with go-redis v8.11.4

Rueidis has higher throughput than go-redis v8.11.4 across 1, 8, and 64 parallelism settings.

It is even able to achieve ~14x throughput over go-redis in a local benchmark. (see parallelism(64)-key(16)-value(64)-10)

Single Client

client_test_set

Cluster Client

cluster_test_set

Benchmark source code: https://github.com/rueian/rueidis-benchmark

Client Side Caching

The Opt-In mode of server-assisted client side caching is always enabled, and can be used by calling DoCache() with an explicit client side TTL.

c.DoCache(ctx, c.B().Hmget().Key("myhash").Field("1", "2").Cache(), time.Minute).ToArray()

An explicit client side TTL is required because redis server may not send invalidation message in time when a key is expired on the server. Please follow #6833 and #6867

Although an explicit client side TTL is required, the DoCache() still sends a PTTL command to server and make sure that the client side TTL is not longer than the TTL on server side.

Users can use IsCacheHit() to verify that if the response came from the client side memory.

c.DoCache(ctx, c.B().Hmget().Key("myhash").Field("1", "2").Cache(), time.Minute).IsCacheHit() == true

If the OpenTelemetry is enabled by the rueidisotel.WithClient(client), then there are also two metrics instrumented:

  • rueidis_do_cache_miss
  • rueidis_do_cache_hits

Benchmark

client_test_get

Benchmark source code: https://github.com/rueian/rueidis-benchmark

Supported Commands for Client Side Caching

  • bitcount
  • bitfieldro
  • bitpos
  • expiretime
  • geodist
  • geohash
  • geopos
  • georadiusro
  • georadiusbymemberro
  • geosearch
  • get
  • getbit
  • getrange
  • hexists
  • hget
  • hgetall
  • hkeys
  • hlen
  • hmget
  • hstrlen
  • hvals
  • lindex
  • llen
  • lpos
  • lrange
  • pexpiretime
  • pttl
  • scard
  • sismember
  • smembers
  • smismember
  • sortro
  • strlen
  • ttl
  • type
  • zcard
  • zcount
  • zlexcount
  • zmscore
  • zrange
  • zrangebylex
  • zrangebyscore
  • zrank
  • zrevrange
  • zrevrangebylex
  • zrevrangebyscore
  • zrevrank
  • zscore
  • jsonget
  • jsonstrlen
  • jsonarrindex
  • jsonarrlen
  • jsonobjkeys
  • jsonobjlen
  • jsontype
  • jsonresp
  • bfexists
  • bfinfo
  • cfexists
  • cfcount
  • cfinfo
  • cmsquery
  • cmsinfo
  • topkquery
  • topklist
  • topkinfo

Blocking Commands

The following blocking commands use another connection pool and will not share the same connection with non-blocking commands and thus will not cause the pipeline to be blocked:

  • xread with block
  • xreadgroup with block
  • blpop
  • brpop
  • brpoplpush
  • blmove
  • blmpop
  • bzpopmin
  • bzpopmax
  • bzmpop
  • clientpause
  • migrate
  • wait

Pub/Sub

To receive messages from channels, the message handler should be registered when creating the redis connection:

c, _ := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:6379"},
    PubSubOption: rueidis.NewPubSubOption(func(prev error, client rueidis.DedicatedClient) {
        // Subscribe channels in this PubSubSetup hook for auto reconnecting after disconnected.
        // The "prev" err is previous disconnect error.
        err := client.Do(ctx, client.B().Subscribe().Channel("my_channel").Build()).Error()
    }, rueidis.PubSubHandler{
        OnMessage: func(channel, message string) {
            // handle the message
        },
    },
})

CAS Pattern

To do a CAS operation (WATCH + MULTI + EXEC), a dedicated connection should be used, because there should be no unintentional write commands between WATCH and EXEC. Otherwise, the EXEC may not fail as expected.

The dedicated connection shares the same connection pool with blocking commands.

c.Dedicated(func(client client.DedicatedClient) error {
    // watch keys first
    client.Do(ctx, client.B().Watch().Key("k1", "k2").Build())
    // perform read here
    client.Do(ctx, client.B().Mget().Key("k1", "k2").Build())
    // perform write with MULTI EXEC
    client.DoMulti(
        ctx,
        client.B().Multi().Build(),
        client.B().Set().Key("k1").Value("1").Build(),
        client.B().Set().Key("k2").Value("2").Build(),
        client.B().Exec().Build(),
    )
    return nil
})

However, occupying a connection is not good in terms of throughput. It is better to use Lua script to perform optimistic locking instead.

Lua Script

The NewLuaScript or NewLuaScriptReadOnly will create a script which is safe for concurrent usage.

When calling the script.Exec, it will try sending EVALSHA to the client and if the server returns NOSCRIPT, it will send EVAL to try again.

script := rueidis.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")
// the script.Exec is safe for concurrent call
list, err := script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()

Redis Cluster and Single Redis

To connect to a redis cluster, the NewClient should be used:

c, _ := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
    ShuffleInit: true,
})

To connect to a single redis node, still use the NewClient with one InitAddress

Command Builder

Redis commands are very complex and their formats are very different from each other.

This library provides a type safe command builder within client.B() that can be used as an entrypoint to construct a redis command. Once the command is completed, call the Build() or Cache() to get the actual command. And then pass it to either Client.Do() or Client.DoCache().

c.Do(ctx, c.B().Set().Key("mykey").Value("myval").Ex(10).Nx().Build())

Once the command is passed to the Client.Do(), Client.DoCache(), the command will be recycled and should not be reused.

The ClusterClient.B() also checks if the command contains multiple keys belongs to different slots. If it does, then panic.

Object Mapping

The NewHashRepository and NewJSONRepository creates an OM repository backed by redis hash or RedisJSON.

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/rueian/rueidis"
    "github.com/rueian/rueidis/om"
)

type Example struct {
    Key string `json:"key" redis:",key"` // the redis:",key" is required to indicate which field is the ULID key
    Ver int64  `json:"ver" redis:",ver"` // the redis:",ver" is required to do optimistic locking to prevent lost update
    Str string `json:"myStr"`            // both NewHashRepository and NewJSONRepository use json tag as field name
}

func main() {
    ctx := context.Background()
    c, _ := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
    // create the repo with NewHashRepository or NewJSONRepository
    repo := om.NewHashRepository("my_prefix", Example{}, c)

    exp := repo.NewEntity().(*Example)
    exp.Str = "mystr"
    fmt.Println(exp.Key) // output 01FNH4FCXV9JTB9WTVFAAKGSYB
    repo.Save(ctx, exp) // success

    // lookup "my_prefix:01FNH4FCXV9JTB9WTVFAAKGSYB" through client side caching
    cache, _ := repo.FetchCache(ctx, exp.Key, time.Second*5)
    exp2 := cache.(*Example)
    fmt.Println(exp2.Str) // output "mystr", which equals to exp.Str

    exp2.Ver = 0         // if someone changes the version during your GET then SET operation,
    repo.Save(ctx, exp2) // the save will fail with ErrVersionMismatch.
}

Object Mapping + RediSearch

If you have RediSearch, you can create and search the repository against the index.

if _, ok := repo.(*om.HashRepository); ok {
    repo.CreateIndex(ctx, func(schema om.FtCreateSchema) om.Completed {
        return schema.FieldName("myStr").Text().Build() // Note that the Example.Str field is mapped to myStr on redis by its json tag
    })
}

if _, ok := repo.(*om.JSONRepository); ok {
    repo.CreateIndex(ctx, func(schema om.FtCreateSchema) om.Completed {
        return schema.FieldName("$.myStr").Text().Build() // the field name of json index should be a json path syntax
    })
}

exp := repo.NewEntity().(*Example)
exp.Str = "foo"
repo.Save(ctx, exp)

n, records, _ := repo.Search(ctx, func(search om.FtSearchIndex) om.Completed {
    return search.Query("foo").Build() // you have full query capability by building the command from om.FtSearchIndex
})

fmt.Println("total", n) // n is total number of results matched in redis, which is >= len(records)

for _, v := range records.([]*Example) {
    fmt.Println(v.Str) // print "foo"
}

Object Mapping Limitation

NewHashRepository only accepts these field types:

  • string, *string
  • int64, *int64
  • bool, *bool
  • []byte

Field projection by RediSearch is not supported.

OpenTelemetry Tracing

Use rueidisotel.WithClient to create a client with OpenTelemetry Tracing enabled.

package main

import (
    "github.com/rueian/rueidis"
    "github.com/rueian/rueidis/rueidisotel"
)

func main() {
    client, _ := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
    client = rueidisotel.WithClient(client)
    defer client.Close()
}

Command Response Cheatsheet

It is hard to remember what message type is returned from redis and which parsing method should be used with. So, here is some common examples:

// GET
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()
// MGET
client.Do(ctx, client.B().Mget().Key("k1", "k2").Build()).ToArray()
// SET
client.Do(ctx, client.B().Set().Key("k").Value("v").Build()).Error()
// INCR
client.Do(ctx, client.B().Incr().Key("k").Build()).ToInt64()
// HGET
client.Do(ctx, client.B().Hget().Key("k").Field("f").Build()).ToString()
// HMGET
client.Do(ctx, client.B().Hmget().Key("h").Field("a", "b").Build()).ToArray()
// HGETALL
client.Do(ctx, client.B().Hgetall().Key("h").Build()).AsStrMap()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("1").Max("2").Build()).AsStrSlice()
// ZRANK
client.Do(ctx, client.B().Zrank().Key("k").Member("m").Build()).ToInt64()
// ZSCORE
client.Do(ctx, client.B().Zscore().Key("k").Member("m").Build()).ToFloat64()
// SCARD
client.Do(ctx, client.B().Scard().Key("k").Build()).ToInt64()
// SMEMBERS
client.Do(ctx, client.B().Smembers().Key("k").Build()).AsStrSlice()
// LINDEX
client.Do(ctx, client.B().Lindex().Key("k").Index(0).Build()).ToString()
// LPOP
client.Do(ctx, client.B().Lpop().Key("k").Build()).ToString()
client.Do(ctx, client.B().Lpop().Key("k").Count(2).Build()).AsStrSlice()

Not Yet Implement

The following subjects are not yet implemented.

  • go-redis like api layer
  • RESP2
Comments
  • Proposal for a go-redis like high level api

    Proposal for a go-redis like high level api

    Hi, I plan to use rueidis for some of my next projects, and I must say that I like your ''bottom-up" approach. But, currently writing commands is quite cumbersome. I would like to propose a design for a high-level API similar to what go-redis has.

    Rueidis' "Getting Started" section uses this:

    // SET key val NX
    c.Do(ctx, c.B().Set().Key("key").Value("val").Nx().Build()).Error()
    // GET key
    c.Do(ctx, c.B().Get().Key("key").Build()).ToString()
    

    What if we could turn that into:

    // SET key val NX
    c.SetNX(ctx, "key", "val").Error()
    // GET key
    c.Get(ctx, "key").ToString()
    

    Well maybe it's not hard to do so, this was the most straightforward thing I could think of. We basically wrap cmds.Builder:

    func (c *singleClient) SetNX(ctx context.Context, key, value string) (resp RedisResult) {
    	cmd := c.B().Set().Key(key).Value(value).Nx().Build()
    	resp = c.Do(ctx, cmd)
    	cmds.Put(cmd.CommandSlice())
    	return resp
    }
    
    func (c *singleClient) Get(ctx context.Context, key string) (resp RedisResult) {
    	cmd := c.B().Get().Key(key).Build()
    	resp = c.Do(ctx, cmd)
    	cmds.Put(cmd.CommandSlice())
    	return resp
    }
    

    The issue comes when trying to implement things like caching, how could the user specify to use client-side caching?

    We could create a separate function for each command? Each command would have two functions, * and *Cache (where * is the command).

    func (c *singleClient) SetNXCache(ctx context.Context, key, value string) (resp RedisResult) 
    
    func (c *singleClient) GetCache(ctx context.Context, key, value string) (resp RedisResult) 
    

    Or perhaps by some kind of hacky optional boolean?

    func (c *singleClient) SetNXCache(ctx context.Context, key, value string, cache ...bool) (resp RedisResult) 
    
    func (c *singleClient) GetCache(ctx context.Context, key, value string, cache ...bool) (resp RedisResult) 
    

    Or still something different?

    Let me know what you think about this, If we can decide on a concise design I wouldn't mind implementing it.

  • RedisLabs Enterprise Support

    RedisLabs Enterprise Support

    When using the RedisLabs Enterprise Docker image, I'm not able to make a connection due to the HELLO command not being supported in the enterprise edition ...

    $ /opt/redislabs/bin/redis-cli -p 12000 127.0.0.1:12000> HELLO (error) ERR unknown command 'HELLO'

    I don't see this called out on the compatibility list but confirmed with RedisLabs.

  • Flexible dynamic PUB/SUB API

    Flexible dynamic PUB/SUB API

    Hello again @rueian! I started experimenting with rueidis and here is what I found:

    • I am trying to replace redigo-based implementation with rueidis. Two main reasons: performance and the fact that with redigo I have to use 3 libraries to work with standalone Redis, Sentinel Redis, Redis Cluster.
    • For sync commands migration seems straightforward and I am observing 2x reduction in allocations which is super cool. I only looking at single-instance benchmarks at the moment, hopefully at some point I'll compare cluster case too
    • But I can not fully replace PUB/SUB layer unfortunately and this issue is about it

    The reason is that PUB/SUB in rueidis does not provide API suitable for my use case.

    In the system I have there is a PUB/SUB layer. It's responsible for dynamically subscribing/unsubscribing to channels. Channels are unknown on the PUB/SUB start – they can appear/disappear during application lifetime. On connection drop system reconnects to previously subscribed channels in Redis. In this case I can not afford re-subscribing from scratch on every channel added or removed. The number of channels is supposed to be very large.

    This is pretty valid patern for Redis: i.e. sending subscribe/unsubscribe commands over PUB/SUB connection.

    With redigo it's possible to implement:

    psc := redis.PubSubConn{Conn: c}
    psc.Subscribe("example")
    for {
        switch v := psc.Receive().(type) {
        case redis.Message:
            fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
        case redis.Subscription:
            fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
        case error:
            return v
        }
    }
    

    I.e. receiving messages is decoupled from Subscribe and Unsubscribe ops. So it's possible to control subscriptions in a desired way.

    With rueidis it's only possible to pass initial channels and receive updates from them. It's still possible to use dedicated connection and Subscribe it on additional channels in a separate goroutine for example – but when push received from Redis it's dropped on rueidis level since the channel is not in the initial list passed to initial Subscribe command. So application can't process that message.

    Also, rueidis has some logic that Receive() exits when (from readme):

    when received any unsubscribe/punsubscribe message related to the provided subscribe command.

    This is not actually desired in my case, I want PUB/SUB Receive() to work until I close it explicitly by closing dedicated connection maybe (but it does not have Close method btw). Or maybe until Context passed to Receive(ctx) is done - I suppose it should already work. Just do not stop receiving when unsubscribe from some channel received - it's a normal situation in my case.

    If I have not missed sth obvious then what I am proposing here is to add some raw access to PUB/SUB API:

    1. Do not have any additional logic in ruedis to manage PUB/SUB subscriptions (like it currently have keeping track of initial channels) – let application do this. I.e. pass everything from Redis to application's message handler. Possibly have a way to create PubSubConnection similar to Dedicated but which has Receive(ctx) without subscribe command passed to it.
    2. Do not exit on receiving unsubscribe on previously subscribed channel - this is normal situation. If implementing (1) I think this won't be the case anymore.
    3. Maybe add method to close Dedicated (or ) connection which will result into return from blocked Receive() - at least I was looking for one. In general I can control exit from Receive using Context's cancel() I suppose. But closing Pub/Sub connection which observed an error seems nice to have – I don't want to proceed with this connection actually. I think cancelling context is sth that still leaves broken connection alive (not sure though how it currently works).

    Hope this makes sense!

  • Deadlock spotted in v0.0.80

    Deadlock spotted in v0.0.80

    I am trying Rueidis before updating my PR here: https://github.com/thanos-io/thanos/pull/5593

    I have noticed that under heavy load, some goroutines get stuck (deadlock) in Rueidis code:

    image

    I would expect the goroutines count to go down but it stays the same.

    Here's the goroutine dump from pprof: rueidis_dump.txt

    My guess is that under heavy load it tries to switch to a replica in Redis Cluster and then fails to timeout some operation somewhere? I don't know how to reproduce this ATM but maybe you'll be able to get some ideas from the dump?

  • issue detected. crash with examples below. pls try run them as custom server and client.

    issue detected. crash with examples below. pls try run them as custom server and client.

    issue detected. crash with examples below. pls try run them as custom server and client. realised I get... strange. HELLO CLIENT ?

    server

    package main
    
    import (
            "flag"
            "fmt"
            "log"
            "strings"
    
            "net/http"
            _ "net/http/pprof"
    
            "github.com/IceFireDB/redhub"
            "github.com/IceFireDB/redhub/pkg/resp"
    )
    
    func main() {
            //var mu sync.RWMutex
            //var items = make(map[string][]byte)
            var network string
            var addr string
            var multicore bool
            var reusePort bool
            var pprofDebug bool
            var pprofAddr string
            flag.StringVar(&network, "network", "tcp", "server network (default \"tcp\")")
            flag.StringVar(&addr, "addr", "127.0.0.1:1234", "server addr (default \":6380\")")
            flag.BoolVar(&multicore, "multicore", true, "multicore")
            flag.BoolVar(&reusePort, "reusePort", false, "reusePort")
            flag.BoolVar(&pprofDebug, "pprofDebug", false, "open pprof")
            flag.StringVar(&pprofAddr, "pprofAddr", ":8888", "pprof address")
            flag.Parse()
            if pprofDebug {
                    go func() {
                            http.ListenAndServe(pprofAddr, nil)
                    }()
            }
    
            protoAddr := fmt.Sprintf("%s://%s", network, addr)
            option := redhub.Options{
                    Multicore: multicore,
                    ReusePort: reusePort,
            }
    
            rh := redhub.NewRedHub(
                    func(c *redhub.Conn) (out []byte, action redhub.Action) {
                            return
                    },
                    func(c *redhub.Conn, err error) (action redhub.Action) {
                            return
                    },
                    func(cmd resp.Command, out []byte) ([]byte, redhub.Action) {
                            var status redhub.Action
                            switch strings.ToLower(string(cmd.Args[0])) {
                            default:
                                    log.Printf("FIRST COMMAND?! = %s\n",cmd.Args[0])
                                    out = resp.AppendError(out, "ERR unknown command '"+string(cmd.Args[0])+"'")
                            case "ping":
                                    out = resp.AppendString(out, "PONG")
                            }
                            return out, status
                    },
            )
            log.Printf("started redhub server at %s", addr)
            err := redhub.ListendAndServe(protoAddr, option, rh)
            if err != nil {
                    log.Fatal(err)
            }
    }
    
    
    
    package main
    
    import (
            "context"
    //      "fmt"
            "log"
    //      "net"
            //"os"
            //"runtime"
            //"strconv"
            //"sync"
            "time"
    
            "github.com/rueian/rueidis"
    )
    
    var (   
            ctx      = context.Background()
    )
    
    func main() {
    
    
            credis, err := rueidis.NewClient(rueidis.ClientOption{
                    InitAddress: []string{"0.0.0.0:1234"},
            })
    
    
            var fwResStr string
            credis = credis.(rueidis.Client)
            fwResStr, err = credis.Do(ctx, credis.B().Arbitrary("12321312").Build()).ToString()
            if err != nil {
                    log.Printf("fwRerr = %s", err)
            }
            log.Printf("grrr!!! = %s", fwResStr)
            time.Sleep(100 * time.Second)
    
    }
    
  • how to speed up further?

    how to speed up further?

    i've benchmarked one by one without concurrent / batch / go routine... slower a bit than go-redis

    1. how do i set keep alive for the client side?
    2. how do i push it faster? as mentioned i've customized redis server (icefiredb redhub) and customized rueidis without sending the PING / HELLO etc. how to make it much much faster?
  • Panic during reconnect to mulfunctional Redis Cluster

    Panic during reconnect to mulfunctional Redis Cluster

    Got the panic like this after reconnect to a Redis Cluster:

    panic: protocol bug, message handled out of order
    
    goroutine 1148 [running]:
    github.com/rueian/rueidis.(*pipe)._backgroundRead(0xc0014ae210)
      /Users/fz/go/pkg/mod/github.com/rueian/[email protected]/pipe.go:301 +0x725
    github.com/rueian/rueidis.(*pipe)._background(0xc0014ae210)
      /Users/fz/go/pkg/mod/github.com/rueian/[email protected]/pipe.go:135 +0x185
    created by github.com/rueian/rueidis.(*pipe).background.func1
      /Users/fz/go/pkg/mod/github.com/rueian/[email protected]/pipe.go:113 +0x5a
    exit status 2
    

    In my case I had a working Redis cluster, then stopped the cluster (I am using docker-compose from https://github.com/Grokzen/docker-redis-cluster), ran it again. Newly created cluster was not properly functional due to cluster setup error, so every operation on every node at this point returned:

    127.0.0.1:7005> set x 1
    (error) CLUSTERDOWN Hash slot not served
    

    I'll try to provide a minimal reproducer example soon. This may be a bit tricky to reproduce though.

  • Closed connection re-establishement behaviour

    Closed connection re-establishement behaviour

    Hello @rueian!

    I just came across one thing which I'd like to solve somehow. Let's imagine we have a Client and then the connection with Redis lost. For example, we can stop Redis server. Then quickly start it again.

    In this case currently we get a behaviour that even though Redis is back, our next command issued to Redis over rueidis.Client (for some specific slot, which may be issued an hour after Redis restart happened for example) always returns EOF error. And only second command issued to Redis is successful.

    It seems to me that connection re-establishement mark happens inside mux.pipeline method after we issue a command:

    if resp = wire.Do(ctx, cmd); isBroken(resp.NonRedisError(), wire) {
    	m.wire[slot].CompareAndSwap(wire, m.init)
    }
    

    At the same time in the pipe._backgroundRead:

    for {
    	if msg, err = readNextMessage(p.r); err != nil {
    		return
    	}
    

    – we get error from read when connection is closed (upon Redis stop) but it seems it does not mark it for re-establishement, so we will still get EOF when trying to issue next command over rueidis.Client, and no errors on subsequent commands.

    One more thing I do not understand: it seems that rueidis does not PING all connections, only some of them. So the fact we have closed connection can not be discovered automatically soon after Redis available again. Dedicated connections are periodically PINGed, but general pipeline connections seems not. So connections are not re-established automatically.

    This makes me think there are two possible ways to solve this:

    1. When we receive an error from readNextMessage mark the connection in a way so that the next attempt to write a command over it will result in re-establishing the connection, so that we could avoid extra error on first command from application after some time.
    2. PING all connections in background to detect closed ones and re-establish. So that from the application side temporary network loss works transparently. And internal connections of rueidis quickly come to live if Redis available again.
    3. Probably combine 1 and 2

    Theoretically I could add retries on app level if I am getting EOF from rueidis – but not sure this really required if can be handled internally by the library.

  • Not exit when the ctx cancel for stream read

    Not exit when the ctx cancel for stream read

    
    ctx1, cancel := context.WithCancel(ctx)
    go func() {
    	time.Sleep(3 * time.Second)
    	cancel()
    }()
    
    xReadGroup = c.XReadGroup(ctx1, XReadGroupArgs{
    	Group:    "group",
    	Consumer: "consumer",
    	Streams:  []string{key, ">"},
    	Count:    1,
    	Block:    0,
    })
    
  • How to make the response time of FT.Search as short as possible with 500 user requests per second?

    How to make the response time of FT.Search as short as possible with 500 user requests per second?

    The service scenarios are as follows:

    1. Each user request will result in 5-10 different ft.search queries
    2. After querying the RedisJSON that meets the conditions, the JSON data need to be modified
    3. A maximum of 500 users may request the system at the same time per second

    Go-redis(for exapmle:res, err := r.Do("FT.SEARCH", "index", "@status:{0913|4912}", "LIMIT", 0, 1, "RETURN", 1, "$.Number").Result())is currently used to implement the above steps 1-2. Each user request opens a separate redis connection and the connection is disconnected at the end of steps 1-2.

    Due to the excellent performance of RediSearch+RedisJSON, the average response time can be less than 200ms when there are only 5 to 10 user requests per second, but when there are more than 30~50 user requests per second, the average response time increases exponentially to 20 to 30 seconds. I think that redis is blocked because of the significant increase in the number of queries.

    Can rueidis significantly reduce response time when QPS exceed 30 or even 300 to 500? What is the correct way to use Rueidis to meet the goal of minimizing response times for FT.Search queries?

  • Calls hang when redis stops

    Calls hang when redis stops

    • Spin up a server
    • Start making non-blocking requests to that server
    • Kill the server

    At this point, the calls start to hang with variable timeouts, without respecting context deadlines. The issue seems to be inside this function: https://github.com/rueian/rueidis/blob/master/mux.go#L85

    The error returned is a ECONNREFUSED, the wire is nil, and the function just loops over the retry block.

    I'm not proficient in Go, but AFAIU there is no context check inside that pipe function, so the behaviour is unpredictable.

    Is there a way to catch this sooner and obtaining some sort of fail fast?

  • How to dynamically control parameters when using RedisSearch

    How to dynamically control parameters when using RedisSearch

    I need to dynamically control parameters such as Verbatim, Highlight, Slop... when using RedisSearch, but it is difficult for me to define a type to receive a value, because the type of the value returned by each method is different, and the most important thing is that the type is not exported.

    This is how I expected it to be written:

    var c cmds.Completed
    c := cmds.Completed(r.B().FtSearch().Index(index).Query(text))
    if opt.Verbatim{
    	val,_ := c.(cmds.FtSearchQuery)
    	c = cmds.Completed(val.Verbatim())
    }
    if opt.Highlight{
    	val,_ := c.(cmds.FtSearchVerbatim)
    	c = cmds.Completed(val.Highlight())
    }
    ...
    
  • perf: improve user specified flush delay automatically by statistics

    perf: improve user specified flush delay automatically by statistics

    Choose flush delay automatically by statistics from previous flush histories.

    It generally achieves better throughput than just using MaxFlushDelay provided by users directly while still keeping reasonable CPU usage.


    Benchmark comparison to the old v0.0.89 by using the code https://github.com/FZambia/pipelines:

    PIPE_PARALLELISM=1 PIPE_MAX_FLUSH_DELAY=100

    ▶ PIPE_PARALLELISM=1 PIPE_MAX_FLUSH_DELAY=100 go test -run xxx -bench BenchmarkRueidis -benchtime=2s -count 20 > old-100us-p1.txt
    ▶ PIPE_PARALLELISM=1 PIPE_MAX_FLUSH_DELAY=100 go test -run xxx -bench BenchmarkRueidis -benchtime=2s -count 20 > new-100us-p1.txt
    ▶ benchstat old-100us-p1.txt new-100us-p1.txt
    name        old time/op    new time/op    delta
    Rueidis-10    12.0µs ± 0%     3.8µs ± 2%  -68.47%  (p=0.000 n=20+20)
    
    name        old alloc/op   new alloc/op   delta
    Rueidis-10     80.0B ± 0%     80.0B ± 0%     ~     (all equal)
    
    name        old allocs/op  new allocs/op  delta
    Rueidis-10      1.00 ± 0%      1.00 ± 0%     ~     (all equal)
    

    PIPE_PARALLELISM=128 PIPE_MAX_FLUSH_DELAY=100

    ▶ PIPE_PARALLELISM=128 PIPE_MAX_FLUSH_DELAY=100 go test -run xxx -bench BenchmarkRueidis -benchtime=2s -count 20 > old-100us-p128.txt
    ▶ PIPE_PARALLELISM=128 PIPE_MAX_FLUSH_DELAY=100 go test -run xxx -bench BenchmarkRueidis -benchtime=2s -count 20 > new-100us-p128.txt
    ▶ benchstat old-100us-p128.txt new-100us-p128.txt
    name        old time/op    new time/op    delta
    Rueidis-10     601ns ± 2%     589ns ± 2%  -1.89%  (p=0.000 n=20+19)
    
    name        old alloc/op   new alloc/op   delta
    Rueidis-10     80.0B ± 0%     80.0B ± 0%    ~     (all equal)
    
    name        old allocs/op  new allocs/op  delta
    Rueidis-10      1.00 ± 0%      1.00 ± 0%    ~     (all equal)
    

    PIPE_PARALLELISM=1 PIPE_MAX_FLUSH_DELAY=20

    ▶ PIPE_PARALLELISM=1 PIPE_MAX_FLUSH_DELAY=20 go test -run xxx -bench BenchmarkRueidis -benchtime=2s -count 20 > old-20us-p1.txt
    ▶ PIPE_PARALLELISM=1 PIPE_MAX_FLUSH_DELAY=20 go test -run xxx -bench BenchmarkRueidis -benchtime=2s -count 20 > new-20us-p1.txt
    ▶ benchstat old-20us-p1.txt new-20us-p1.txt
    name        old time/op    new time/op    delta
    Rueidis-10    4.43µs ± 1%    3.80µs ± 2%  -14.38%  (p=0.000 n=20+20)
    
    name        old alloc/op   new alloc/op   delta
    Rueidis-10     80.0B ± 0%     80.0B ± 0%     ~     (all equal)
    
    name        old allocs/op  new allocs/op  delta
    Rueidis-10      1.00 ± 0%      1.00 ± 0%     ~     (all equal)
    

    PIPE_PARALLELISM=128 PIPE_MAX_FLUSH_DELAY=20

    ▶ PIPE_PARALLELISM=128 PIPE_MAX_FLUSH_DELAY=20 go test -run xxx -bench BenchmarkRueidis -benchtime=2s -count 20 > old-20us-p128.txt
    ▶ PIPE_PARALLELISM=128 PIPE_MAX_FLUSH_DELAY=20 go test -run xxx -bench BenchmarkRueidis -benchtime=2s -count 20 > new-20us-p128.txt
    ▶ benchstat old-20us-p128.txt new-20us-p128.txt
    name        old time/op    new time/op    delta
    Rueidis-10     586ns ± 3%     587ns ± 3%   ~     (p=0.622 n=20+19)
    
    name        old alloc/op   new alloc/op   delta
    Rueidis-10     80.0B ± 0%     80.0B ± 0%   ~     (all equal)
    
    name        old allocs/op  new allocs/op  delta
    Rueidis-10      1.00 ± 0%      1.00 ± 0%   ~     (all equal)
    
  • Write batching strategy

    Write batching strategy

    Hey @rueian, this is me again.

    I was preparing Rueidis-based code for release and suddenly discovered an interesting thing. I did quite a lot of Go benchmarks to make sure the new implementation based on Rueidis produces a better operation latency and a better throughput. And it does.

    I also expected that migration to Rueidis will provide Centrifugo a better CPU utilization since Rueidis produces less memory allocations. And here are dragons.

    Before making release I decided to do macro-benchmarks and found that Centrifugo consumes more CPU than before in equal conditions. Moreover, Rueidis-based implementation results into more CPU usage on Redis instance than we had with previous implementation. I did not expect that at all. To investigate that I made a repo: https://github.com/FZambia/pipelines.

    In that repo I implemented 3 benchmarks: for pipelined Redigo, pipelined Go-Redis and Rueidis.

    After running benchmarks I observed the following:

    https://user-images.githubusercontent.com/1196565/206257922-cc6a0cc6-69e8-4295-82f1-d4c6f2ba9009.mp4

    ❯ go test -run xxx -bench . -benchtime 10s
    goos: darwin
    goarch: arm64
    pkg: github.com/FZambia/pipelines
    BenchmarkRedigo-8    	12460600	       959.6 ns/op	     181 B/op	       4 allocs/op
    BenchmarkGoredis-8   	 8069197	      1534 ns/op	     343 B/op	       5 allocs/op
    BenchmarkRueidis-8   	19451470	       620.0 ns/op	      80 B/op	       1 allocs/op
    

    Here we can see that CPU usage is:

    | | Redigo | Goredis | Rueidis | | ------------------- | ----------- | ----------- |----------- | | Application CPU, % | 285 | 270 | 470 | | Redis CPU, % | 56 | 34 | 80 |

    Nothing too special here – all numbers are +/- expected. Rueidis produced better throughput so it loaded Redis more and the price for the better throughput is application CPU utilization.

    But in Centrifugo case I compared CPU usage with Redigo and Rueidis in equal conditions. So I added rate limiter to benchmarks in the https://github.com/FZambia/pipelines repo to generate the same load in all cases. Limiting load to 100 commands per millisecond (100k per second).

    https://user-images.githubusercontent.com/1196565/206260753-dbe8ee9a-70f8-443f-abf1-84bb5a787092.mp4

    ❯ PIPE_LIMITED=1 go test -run xxx -bench . -benchtime 10s
    goos: darwin
    goarch: arm64
    pkg: github.com/FZambia/pipelines
    BenchmarkRedigo-8    	 1000000	     10000 ns/op	     198 B/op	       5 allocs/op
    BenchmarkGoredis-8   	 1000000	     10000 ns/op	     350 B/op	       8 allocs/op
    BenchmarkRueidis-8   	 1000000	     10000 ns/op	     113 B/op	       2 allocs/op
    PASS
    ok  	github.com/FZambia/pipelines	30.629s
    

    | | Redigo | Goredis | Rueidis | | ------------------- | ----------- | ----------- |----------- | | Application CPU, % | 91 | 96 | 118 | | Redis CPU, % | 36 | 34 | 45 |

    This is more interesting. We are generating the same load in all benchmarks but both app and Redis CPU is the worst in Rueidis case.

    Turned out the difference here is the result of different batch sizes we are sending to Redis. In Redigo/Goredis case we have larger batches than in Rueidis case. In Rueidis case we have smaller size batches and thus more syscalls in app and on Redis side. As we can see CPU is very sensitive to this.

    There is a project called Twemproxy which acts as a proxy between applications and Redis and makes automatic batches thus reducing load on Redis, so in general pipelining is known not only to increase throughput but to reduce CPU usage of Redis. As Redis is single threaded its capacity is quite limited actually.

    I tried to find a simple way to improve batching of Rueidis somehow. The simplest solution I found at this point is this one: https://github.com/rueian/rueidis/compare/main...FZambia:rueidis:GetWriterEachConn

    I.e. introducing an option to provide custom bufio.Writer. I used it like this:

    func rueidisClient() rueidis.Client {
    	options := rueidis.ClientOption{
    		InitAddress:  []string{":6379"},
    		DisableCache: true,
    	}
    	if os.Getenv("PIPE_DELAYED") != "" {
    		options.GetWriterEachConn = func(writer io.Writer) (*bufio.Writer, func()) {
    			mlw := newDelayWriter(bufio.NewWriterSize(writer, 1<<19), time.Millisecond)
    			w := bufio.NewWriterSize(mlw, 1<<19)
    			return w, func() { mlw.close() }
    		}
    	}
    	client, err := rueidis.NewClient(options)
    	if err != nil {
    		log.Fatal(err)
    	}
    	return client
    }
    
    
    type writeFlusher interface {
    	io.Writer
    	Flush() error
    }
    
    type delayWriter struct {
    	dst   writeFlusher
    	delay time.Duration // zero means to flush immediately
    
    	mu           sync.Mutex // protects tm, flushPending, and dst.Flush
    	tm           *time.Timer
    	err          error
    	flushPending bool
    }
    
    func newDelayWriter(dst writeFlusher, delay time.Duration) *delayWriter {
    	return &delayWriter{dst: dst, delay: delay}
    }
    
    func (m *delayWriter) Write(p []byte) (n int, err error) {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	if m.err != nil {
    		return 0, err
    	}
    	n, err = m.dst.Write(p)
    	if m.delay <= 0 {
    		err = m.dst.Flush()
    		return
    	}
    	if m.flushPending {
    		return
    	}
    	if m.tm == nil {
    		m.tm = time.AfterFunc(m.delay, m.delayedFlush)
    	} else {
    		m.tm.Reset(m.delay)
    	}
    	m.flushPending = true
    	return
    }
    
    func (m *delayWriter) delayedFlush() {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
    		return
    	}
    	err := m.dst.Flush()
    	if err != nil {
    		m.err = err
    	}
    	m.flushPending = false
    }
    
    func (m *delayWriter) close() {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	m.flushPending = false
    	if m.tm != nil {
    		m.tm.Stop()
    	}
    }
    

    The code of delayed writer inspired by Caddy's code. It basically delays writes into connection.

    We sacrifice latency for less syscalls.

    https://user-images.githubusercontent.com/1196565/206261128-b0121112-6b3c-4de4-85a9-88da536efdf8.mp4

    ❯ PIPE_LIMITED=1 PIPE_DELAYED=1 go test -run xxx -bench . -benchtime 10s
    goos: darwin
    goarch: arm64
    pkg: github.com/FZambia/pipelines
    BenchmarkRedigo-8    	 1000000	     10000 ns/op	     198 B/op	       5 allocs/op
    BenchmarkGoredis-8   	 1000000	     10000 ns/op	     350 B/op	       8 allocs/op
    BenchmarkRueidis-8   	 1000000	     10002 ns/op	     114 B/op	       2 allocs/op
    PASS
    ok  	github.com/FZambia/pipelines	30.712s
    

    | | Redigo | Goredis | Rueidis | | ------------------- | ----------- | ----------- |----------- | | Application CPU, % | 91 | 96 | 51 | | Redis CPU, % | 36 | 34 | 6 |

    From these results we can see that by better batching we can reduce both application and Redis CPU usage, as we make less read/write syscalls. For Rueidis CPU of benchmark process reduced from 118 to 51 %, for Redis process from 45 to 6 %. Extra millisecond latency seems tolerable for such a huge resource reduction.


    Unfortunately, it may be that I missed sth – so would be interesting to listen to your opinion, whether you see potential issues with this approach. Actually under different level of parallelism results may be different – since batch sizes change. All libraries in the test may perform better or worse.

    I think resource reduction like this is great to have. In Centrifugo case users tend to add more Centrifugo nodes that work with single Redis instance - so possibility to keep Redis CPU as low as possible seems nice. Probably you may suggest a better approach to achieve this.

  • Readonly command on Replica servers only

    Readonly command on Replica servers only

    Hello, I'm currently starting to implement this library on our system. Using default cluster implementation with 3 masters and 3 replicas. One of the main things that I want to implement is using only replica servers for all readonly command. Is this supported in the library? I can't seem to find it in the documentation/code.

    Not sure about the implementation, but on goredis this can be enabled by setting up a flag on goredis initialization https://github.com/go-redis/redis/blob/master/cluster.go#L40

  • Exported synonyms for internal Completed, Cacheable & Builder

    Exported synonyms for internal Completed, Cacheable & Builder

    It will be possible to create functions that return prepared Redis commands:

    type Object struct {
        ID        string
        Datetime time.Time
        Message string
    }
    
    func (o Object) SaveCmd(client rueidis.Client) rueidis.Completed {
        return client.B().
            Hset().
            Key("out:interval:"+o.ID).
            FieldValue().
            FieldValue("datetime", fmt.Sprint(o.Datetime.Unix())).
            FieldValue("message", o.Message).
            Build()
    }
    
  • VSCode appends `cmds.Completed` whenever `cmds.Builder` is accessed

    VSCode appends `cmds.Completed` whenever `cmds.Builder` is accessed

    I'm using the latest VSCode

    Whenever I use cmds.Builder and accept any of the suggested functions, VSCode appends cmds.Completed before the cmds.Builder.

    It's best to explain it visually: Code_Y06d4tSdJi

POC de caching en Go en utilisant go-redis/cache

Test-web POC de caching en Go en utilisant go-redis/cache, cette lib permet d'avoir un cache local et un cache redis (appel cache local puis cache red

Nov 19, 2021
LFU Redis implements LFU Cache algorithm using Redis as data storage

LFU Redis cache library for Golang LFU Redis implements LFU Cache algorithm using Redis as data storage LFU Redis Package gives you control over Cache

Nov 10, 2022
Concurrency-safe Go caching library with expiration capabilities and access counters

cache2go Concurrency-safe golang caching library with expiration capabilities. Installation Make sure you have a working Go environment (Go 1.2 or hig

Jan 1, 2023
groupcache is a caching and cache-filling library, intended as a replacement for memcached in many cases.

groupcache Summary groupcache is a distributed caching and cache-filling library, intended as a replacement for a pool of memcached nodes in many case

Dec 31, 2022
Design and Implement an in-memory caching library for general use

Cache Implementation in GoLang Problem Statement Design and Implement an in-memory caching library for general use. Must Have Support for multiple Sta

Dec 28, 2021
Multi-level caching service in Go
Multi-level caching service in Go

IgoVIUM Multi-level caching service in Go. Specifically: Distributed in-memory cache (L1) DB-based cache (L2) Long term historization on persistent vo

Nov 9, 2022
API Cache is a simple caching server, using grpc to accept messages.

API Cache is a simple caching server, using grpc to accept messages. It allows to store key-value pairs, where key is string and value is []byte.

Nov 16, 2021
A simple generic in-memory caching layer

sc sc is a simple in-memory caching layer for golang. Usage Wrap your function with sc - it will automatically cache the values for specified amount o

Jul 2, 2022
Cache library for golang. It supports expirable Cache, LFU, LRU and ARC.
Cache library for golang. It supports expirable Cache, LFU, LRU and ARC.

GCache Cache library for golang. It supports expirable Cache, LFU, LRU and ARC. Features Supports expirable Cache, LFU, LRU and ARC. Goroutine safe. S

Dec 30, 2022
An in-memory cache library for golang. It supports multiple eviction policies: LRU, LFU, ARC

GCache Cache library for golang. It supports expirable Cache, LFU, LRU and ARC. Features Supports expirable Cache, LFU, LRU and ARC. Goroutine safe. S

May 31, 2021
It is a cache system that supports the http port.
It is a cache system that supports the http port.

jarjarbinks This service has two different endpoints that are only used to save cache entry and find the saved entry with the relevant key. The cache

Jan 31, 2022
🧩 Redify is the optimized key-value proxy for quick access and cache of any other database throught Redis and/or HTTP protocol.

Redify (Any database as redis) License Apache 2.0 Redify is the optimized key-value proxy for quick access and cache of any other database throught Re

Sep 25, 2022
Minicache - Distributed cache implemented in Go. Like Redis but simpler.
Minicache - Distributed cache implemented in Go. Like Redis but simpler.

Distributed cache with client-side consistent hashing, distributed leader-elections, and dynamic node discovery. Supports both HTTP/gRPC interfaces secured with mTLS.

Jan 4, 2023
Fast key-value cache written on pure golang

GoCache Simple in-memory key-value cache with default or specific expiration time. Install go get github.com/DylanMrr/GoCache Features Key-value stor

Nov 16, 2021
Ristretto - A fast, concurrent cache library built with a focus on performance and correctness

Ristretto Ristretto is a fast, concurrent cache library built with a focus on pe

Aug 21, 2022
fastcache - fast thread-safe inmemory cache for big number of entries in Go

Fast thread-safe inmemory cache for big number of entries in Go. Minimizes GC overhead

Dec 27, 2022
Go Memcached client library #golang

About This is a memcache client library for the Go programming language (http://golang.org/). Installing Using go get $ go get github.com/bradfitz/gom

Dec 28, 2022
Go memcache client package

Description This is a memcache client package for the Go programming language. The following commands are implemented: get (single key) set, add, repl

Jan 8, 2022