Batch processing library for Go supports generics & values returning

Aggregator

Go Reference

Aggregator is a batch processing library for Go supports returning values. You can group up and process batch of tasks with keys in a single callback. Using it for grouping up database query or cache can help you to reduce loading of database and network.

THIS PROJECT IS IN BETA

This project may contain bugs and have not being tested at all. Use under your own risk, but feel free to test, make pull request and improve this project.

Features

  • Support multi Aggregators (using AggregatorList) for fallback.
  • Support multi workers to flush tasks.
  • Support Go generics for query keys and result values.
  • Support timeout-only or tasks limit-only.
  • Support singleflight (using singleflight-any).

Install

Currently Go 1.18+ is required (for go generics), backward compatible is planned.

go get github.com/serkodev/aggregator@latest

Example

callback := func(keys []string) (map[string]Book, error) {
    results := db.Query(`SELECT * FROM books WHERE name IN ?`, keys)
    return results, nil
}
agg, _ := aggregator.New(callback, 100*time.Millisecond, 5).Run()

for _, name := range []string{"foo", "foo", "bar", "baz", "baz"} {
    go func(n string) {
        book, err := agg.Query(n).Get()
        if err == nil {
            print(book.Name + ":" + book.Price, " ")
        }
    }(name)
}

// foo:10 foo:10 bar:25 baz:30 baz:30

How it works

flowchart LR;
    subgraph A [Aggregator]
        direction TB
        subgraph cb ["Customize Process (example)"]
        direction TB
            input("Input
            []string{#quot;foo#quot;, #quot;bar#quot;, #quot;baz#quot;}")
            db[("Database

            SELECT price FROM books<br />WHERE name
            IN ('foo', 'bar', 'baz')")]
            output("return map[string]int{
                    #quot;foo#quot;: 10,
                    #quot;bar#quot;: 25,
                    #quot;baz#quot;: 30,
            }")
            input --> db --> output
            style output text-align:left
        end

        Wait -- Reach tasks limit / Timeout -->
        cb --> rt("Return value to each Request")
    end

    req1[Request 1] --> q_foo_("Query(#quot;foo#quot;)"):::bgFoo --> A
    req2[Request 2] --> q_foo2("Query(#quot;foo#quot;)"):::bgFoo --> A
    req3[Request 3] --> q_bar_("Query(#quot;bar#quot;)"):::bgBar --> A
    req4[Request 4] --> q_baz_("Query(#quot;baz#quot;)"):::bgBaz --> A
    req5[Request 5] --> q_baz2("Query(#quot;baz#quot;)"):::bgBaz --> A

    A --- rtn1("return 10"):::bgFoo --> req1_[Request 1]
    A --- rtn2("return 10"):::bgFoo --> req2_[Request 2]
    A --- rtn3("return 25"):::bgBar --> req3_[Request 3]
    A --- rtn4("return 30"):::bgBaz --> req4_[Request 4]
    A --- rtn5("return 30"):::bgBaz --> req5_[Request 5]

    classDef bgFoo fill:green;
    classDef bgBar fill:blue;
    classDef bgBaz fill:purple;

Advance

AggregatorList

AggregatorList contains a slice of Aggregator, you can create it by aggregator.NewList(...). If the prior order aggregator cannot return data for any keys. Then AggregatorList will query data from the next aggregator for fallback.

For example, you create an AggregatorList with cache and database aggregator, when the data has not been cached, it will auto query from database.

cacheAgg := aggregator.New(func(k []string) (map[string]string, error) {
    fmt.Println("fetch from cache...", k)
    return map[string]string{
        "key1": "val1",
        "key2": "val2",
    }, nil
}, 50*time.Millisecond, 10)

databaseAgg := aggregator.New(func(k []string) (map[string]string, error) {
    fmt.Println("fetch from database...", k)
    return map[string]string{
        "key1": "val1",
        "key2": "val2",
        "key3": "val3",
        "key4": "val4",
    }, nil
}, 50*time.Millisecond, 10)

list := aggregator.NewList(cacheAgg, databaseAgg).Run()
results := list.QueryMulti([]string{"key1", "key2", "key3", "key4"})

// fetch from cache... ["key1", "key2", "key3", "key4"]
// fetch from database... ["key3", "key4"]
// results: ["val1", "val2", "val3", "val4"]

singleflight

In some use case you may need to prevent cache breakdown. Aggregator works with singleflight by using singleflight-any (supports Go generics).

Inspiration

LICENSE

MIT License

Similar Resources

Build platforms that flexibly mix SQL, batch, and stream processing paradigms

Overview Gazette makes it easy to build platforms that flexibly mix SQL, batch, and millisecond-latency streaming processing paradigms. It enables tea

Dec 12, 2022

Go pkg for returning your public facing IP address.

#publicip This package returns the public facing IP address of the calling client (a la https://icanhazip.com, but from Go!) Author James Polera james

Nov 21, 2022

Examples of accepting interfaces and returning structs

Go Interfaces When I started writing tests in Go, I sometimes used interfaces to mock things in tests. Then I discovered that this is not the correct

Dec 27, 2022

parse-curl.js golang version. Parse curl commands, returning an object representing the request.

parse-curl.js golang version. Parse curl commands, returning an object representing the request.

Nov 1, 2022

:steam_locomotive: Decodes url.Values into Go value(s) and Encodes Go value(s) into url.Values. Dual Array and Full map support.

Package form Package form Decodes url.Values into Go value(s) and Encodes Go value(s) into url.Values. It has the following features: Supports map of

Dec 26, 2022

Go-generics-simple-doubly-linked-list - A simple doubly linked list implemented using generics (Golang)

Welcome to Go-Generics-Simple-Doubly-Linked-List! Hi, This repository contains a

Jun 30, 2022

Tidb - An open-source NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads

Tidb - An open-source NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads

What is TiDB? TiDB ("Ti" stands for Titanium) is an open-source NewSQL database

Jan 5, 2022

Go structure annotations that supports encoding and decoding; similar to C-style bitfields. Supports bitfield packing, self-describing layout parameters, and alignment.

Go structure annotations that supports encoding and decoding; similar to C-style bitfields. Supports bitfield packing, self-describing layout parameters, and alignment.

STRUCTure EXtensions structex provides annotation rules that extend Go structures for implementation of encoding and decoding of byte backed data fram

Oct 13, 2022

Blast is a simple tool for API load testing and batch jobs

Blast Blast makes API requests at a fixed rate. The number of concurrent workers is configurable. The rate may be changed interactively during executi

Nov 10, 2022

Nomad is an easy-to-use, flexible, and performant workload orchestrator that can deploy a mix of microservice, batch, containerized, and non-containerized applications

Nomad is an easy-to-use, flexible, and performant workload orchestrator that can deploy a mix of microservice, batch, containerized, and non-containerized applications

Nomad is an easy-to-use, flexible, and performant workload orchestrator that can deploy a mix of microservice, batch, containerized, and non-containerized applications. Nomad is easy to operate and scale and has native Consul and Vault integrations.

Jan 5, 2023

A Kubernetes Native Batch System (Project under CNCF)

A Kubernetes Native Batch System (Project under CNCF)

Volcano is a batch system built on Kubernetes. It provides a suite of mechanisms that are commonly required by many classes of batch & elastic workloa

Jan 9, 2023

Commando - run commands against networking devices in batch mode

Commando - run commands against networking devices in batch mode

Commando is a tiny tool that enables users to collect command outputs from a single or a multiple networking devices defined in an inventory file.

Oct 30, 2022

A batch scheduler of kubernetes for high performance workload, e.g. AI/ML, BigData, HPC

A batch scheduler of kubernetes for high performance workload, e.g. AI/ML, BigData, HPC

kube-batch kube-batch is a batch scheduler for Kubernetes, providing mechanisms for applications which would like to run batch jobs leveraging Kuberne

Jan 6, 2023

A gogo batch downlaoder written in go

?-?-?!?1 GOgogoanyime Since you awe h-h-hewe you awe pwobabwy wondewing how the x3 fuck do you wun this?? The answew is quite easy, as you may be abwe

Nov 1, 2022

KFServing Inference Client A Go re-implementation of seldon-batch-processor.

KFServing Inference Client A Go re-implementation of seldon-batch-processor. See docs to understand its usage. The main reason why we choose to re-imp

Jan 7, 2022

🎯 ENS (.eth domain) batch domain resolver

ENS batch domain resolver (.eth domain) A simple program to check a batch of ENS domains availability. Configure Configs store in config.yaml file nex

Mar 15, 2022

Alta batch 3 ec2 with go

Alta batch 3 ec2 with go

Porvisioning Perlu login via ssh ke VM: # lenovo.pem adalah nama file permission (key-pair) yang sudah dibuat sebelumnya ssh -i ~/lenovo.pem ubuntu@ec

Aug 27, 2022
Unlimited job queue for go, using a pool of concurrent workers processing the job queue entries

kyoo: A Go library providing an unlimited job queue and concurrent worker pools About kyoo is the phonetic transcription of the word queue. It provide

Dec 21, 2022
Alternative sync library for Go
Alternative sync library for Go

Alternative sync library for Go. Overview Future - A placeholder object for a value that may not yet exist. Promise - While futures are defined as a t

Dec 23, 2022
πŸ‘· Library for safely running groups of workers concurrently or consecutively that require input and output through channels
πŸ‘· Library for safely running groups of workers concurrently or consecutively that require input and output through channels

Examples Quickstart Multiple Go Workers Passing Fields Getting Started Pull in the dependency go get github.com/catmullet/go-workers Add the import to

Dec 1, 2022
A simple and useful goroutine concurrent library.

Taskgroup A simple and useful goroutine concurrent library. Installation go get github.com/anthhub/taskgroup

May 19, 2021
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Jan 9, 2023
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Jan 2, 2023
Worker - A Golang library that provides worker pools

Worker A Golang library that provides worker pools. Usage See *_test.go files. T

Apr 15, 2022
Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines

workerpool Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines. Uses Go 1.18 generics. Notable differe

Oct 5, 2022
GoBatch is a batch processing framework in Go like Spring Batch in Java
GoBatch is a batch processing framework in Go like Spring Batch in Java

GoBatch English|δΈ­ζ–‡ GoBatch is a batch processing framework in Go like Spring Batch in Java. If you are familiar with Spring Batch, you will find GoBat

Dec 25, 2022
Fabric-Batch-Chaincode (FBC) is a library that enables batch transactions in chaincode without additional trusted systems.

Fabric-Batch-Chaincode Fabric-Batch-Chaincode (FBC) is a library that enables batch transactions in chaincode without additional trusted systems. Over

Nov 9, 2022