GoBatch is a batch processing framework in Go like Spring Batch in Java

GoBatch

GoDoc Reference Go Report Card MIT license

English|中文

GoBatch is a batch processing framework in Go like Spring Batch in Java. If you are familiar with Spring Batch, you will find GoBatch very easy to use.

Architecture

In GoBatch, Job is divided into multiple Steps, the steps are executed successively. GoBatch will create a JobExecution data stored into database when executing a Job, also will create a StepExecution when executing a Step.

There are three types of step:

  • Simple Step execute business logic defined in Handler in a single thread.
  • Chunk Step process data by chunks. The process flow is reading a chunk of data, processing it, then writing output. The process is repeated until no more data to read.
  • Partition Step split task into multiple sub tasks, then execute sub tasks parallelly in sub steps, and aggregate result of sub steps at last.

Features

  • Modular construction for batch application
  • Serial and parallel process flow on your need
  • Break point to resume job
  • Builtin file processing component
  • Listeners for job and step execution
  • Easy to extend

Install

go get -u github.com/chararch/gobatch

Use Step

  1. Create or choose a database, eg: gobatch
  2. Create tables from sql/schema_mysql.sql into previous database
  3. Write gobatch code and run it

Code

Example

import (
	"chararch/gobatch"
	"context"
	"database/sql"
	"fmt"
)

// simple task
func mytask() {
	fmt.Println("mytask executed")
}

//reader
type myReader struct {
}
func (r *myReader) Read(chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
	curr, _ := chunkCtx.StepExecution.StepContext.GetInt("read.num", 0)
	if curr < 100 {
		chunkCtx.StepExecution.StepContext.Put("read.num", curr+1)
		return fmt.Sprintf("value-%v", curr), nil
	}
	return nil, nil
}

//processor
type myProcessor struct {
}
func (r *myProcessor) Process(item interface{}, chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
	return fmt.Sprintf("processed-%v", item), nil
}

//writer
type myWriter struct {
}
func (r *myWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
	fmt.Printf("write: %v\n", items)
	return nil
}

func main()  {
	//set db for gobatch to store job&step execution context
	db, err := sql.Open("mysql", "gobatch:gobatch123@tcp(127.0.0.1:3306)/gobatch?charset=utf8&parseTime=true")
	if err != nil {
		panic(err)
	}
	gobatch.SetDB(db)

	//build steps
	step1 := gobatch.NewStep("mytask").Handler(mytask).Build()
	//step2 := gobatch.NewStep("my_step").Handler(&myReader{}, &myProcessor{}, &myWriter{}).Build()
	step2 := gobatch.NewStep("my_step").Reader(&myReader{}).Processor(&myProcessor{}).Writer(&myWriter{}).ChunkSize(10).Build()

	//build job
	job := gobatch.NewJob("my_job").Step(step1, step2).Build()

	//register job to gobatch
	gobatch.Register(job)

	//run
	//gobatch.StartAsync(context.Background(), job.Name(), "")
	gobatch.Start(context.Background(), job.Name(), "")
}

You can look at the code in test/example.go

Write a Simple step

There are several methods to write a simple step logic:

// 1. write a function with one of the following signature
func(execution *StepExecution) BatchError
func(execution *StepExecution)
func() error
func()

// 2. implement the Handler interface
type Handler interface {
	Handle(execution *StepExecution) BatchError
}

Once you wrote the function or Handler interface implementation, you can build step like this:

step1 := gobatch.NewStep("step1").Handler(myfunction).Build()
step2 := gobatch.NewStep("step2").Handler(myHandler).Build()
//or
step1 := gobatch.NewStep("step1", myfunction).Build()
step2 := gobatch.NewStep("step2", myHandler).Build()

Write a Chunk step

To build a chunk step, you should implement the following interfaces, only the Reader is required:

type Reader interface {
    //Read each call of Read() will return a data item, if there is no more data, a nil item will be returned.
    Read(chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Processor interface {
    //Process process an item from reader and return a result item
    Process(item interface{}, chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Writer interface {
    //Write write items generated by processor in a chunk
    Write(items []interface{}, chunkCtx *ChunkContext) BatchError
}

There is another interface named ItemReader, which you can use instead of Reader:

type ItemReader interface {
    //ReadKeys read all keys of some kind of data
    ReadKeys() ([]interface{}, error)
    //ReadItem read value by one key from ReadKeys result
    ReadItem(key interface{}) (interface{}, error)
}

For convenience, you can implement the following interface along with Reader or Writer to do some initialization or cleanup:

type OpenCloser interface {
	Open(execution *StepExecution) BatchError
	Close(execution *StepExecution) BatchError
}

You could see the chunk step example under test/example2

Write a Partition step

you can implement the Partitioner interface to split a step into multiple sub steps, optionally you can implement the Aggregator interface if you want to do some aggregation after all sub steps completed:

type Partitioner interface {
	//Partition generate sub step executions from specified step execution and partitions count
	Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError)
	//GetPartitionNames generate sub step names from specified step execution and partitions count
	GetPartitionNames(execution *StepExecution, partitions uint) []string
}

type Aggregator interface {
    //Aggregate aggregate result from all sub step executions
    Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError
}

If you already have a chunk step with an ItemReader, you can easily build a partition step nothing more than specifying partitions count:

    step := gobatch.NewStep("partition_step").Handler(&ChunkHandler{db}).Partitions(10).Build()

Read & Write File

Suppose a file with the following content(each field seperated by a '\t'):

trade_1	account_1	cash	1000	normal	2022-02-27 12:12:12
trade_2	account_2	cash	1000	normal	2022-02-27 12:12:12
trade_3	account_3	cash	1000	normal	2022-02-27 12:12:12
……

We want to read the content and insert each record into a database table named 't_trade', then we do it this way:

type Trade struct {
    TradeNo   string    `order:"0"`
    AccountNo string    `order:"1"`
    Type      string    `order:"2"`
    Amount    float64   `order:"3"`
    TradeTime time.Time `order:"5"`
    Status    string    `order:"4"`
}

var tradeFile = file.FileObjectModel{
    FileStore:     &file.LocalFileSystem{},
    FileName:      "/data/{date,yyyy-MM-dd}/trade.data",
    Type:          file.TSV,
    Encoding:      "utf-8",
    ItemPrototype: &Trade{},
}

type TradeWriter struct {
    db *gorm.DB
}

func (p *TradeWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
    models := make([]*Trade, len(items))
    for i, item := range items {
        models[i] = item.(*Trade)
    }
    e := p.db.Table("t_trade").Create(models).Error
    if e != nil {
        return gobatch.NewBatchError(gobatch.ErrCodeDbFail, "save trade into db err", e)
    }
    return nil
}

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("trade_import").ReadFile(tradeFile).Writer(&TradeWriter{db}).Partitions(10).Build()
    //...
    job := gobatch.NewJob("my_job").Step(...,step,...).Build()
    gobatch.Register(job)
    gobatch.Start(context.Background(), job.Name(), "{\"date\":\"20220202\"}")
}

Suppose we want export data in 't_trade' to a csv file, we can do like this:

type Trade struct {
    TradeNo   string    `order:"0" header:"trade_no"`
    AccountNo string    `order:"1" header:"account_no"`
    Type      string    `order:"2" header:"type"`
    Amount    float64   `order:"3" header:"amount"`
    TradeTime time.Time `order:"5" header:"trade_time" format:"2006-01-02_15:04:05"`
    Status    string    `order:"4" header:"trade_no"`
}

var tradeFileCsv = file.FileObjectModel{
    FileStore:     &file.LocalFileSystem{},
    FileName:      "/data/{date,yyyy-MM-dd}/trade_export.csv",
    Type:          file.CSV,
    Encoding:      "utf-8",
    ItemPrototype: &Trade{},
}


type TradeReader struct {
    db *gorm.DB
}

func (h *TradeReader) ReadKeys() ([]interface{}, error) {
    var ids []int64
    h.db.Table("t_trade").Select("id").Find(&ids)
    var result []interface{}
    for _, id := range ids {
        result = append(result, id)
    }
    return result, nil
}

func (h *TradeReader) ReadItem(key interface{}) (interface{}, error) {
    id := int64(0)
    switch r := key.(type) {
    case int64:
        id = r
    case float64:
        id = int64(r)
    default:
        return nil, fmt.Errorf("key type error, type:%T, value:%v", key, key)
    }
    trade := &Trade{}
    result := h.db.Table("t_trade").Find(trade, "id = ?", id)
    if result.Error != nil {
        return nil, result.Error
    }
    return trade, nil
}

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("trade_export").Reader(&TradeReader{db}).WriteFile(tradeFileCsv).Partitions(10).Build()
    //...
}

Listeners

There are different listeners for the lifecycle of job and step execution:

type JobListener interface {
	BeforeJob(execution *JobExecution) BatchError
	AfterJob(execution *JobExecution) BatchError
}

type StepListener interface {
	BeforeStep(execution *StepExecution) BatchError
	AfterStep(execution *StepExecution) BatchError
}

type ChunkListener interface {
	BeforeChunk(context *ChunkContext) BatchError
	AfterChunk(context *ChunkContext) BatchError
	OnError(context *ChunkContext, err BatchError)
}

type PartitionListener interface {
	BeforePartition(execution *StepExecution) BatchError
	AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError
	OnError(execution *StepExecution, err BatchError)
}

You can specify listeners during building job:

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("my_step").Handler(handler,...).Listener(listener,...).Build()
    //...
    job := gobatch.NewJob("my_job").Step(step,...).Listener(listener,...).Build()
}

Global Settings

SetDB

GoBatch needs a database to store job and step execution contexts, so you must pass a *sql.DB instance to GoBatch before running job.

    gobatch.SetDB(sqlDb)

SetTransactionManager

If you are trying to build a chunk step, you must register a TransactionManager instance to GoBatch, the interface is:

type TransactionManager interface {
	BeginTx() (tx interface{}, err BatchError)
	Commit(tx interface{}) BatchError
	Rollback(tx interface{}) BatchError
}

GoBatch has a DefaultTxManager, if you have set DB and have no TransactionManager set yet, GoBatch also create a DefaultTxManager instance for you.

SetMaxRunningJobs & SetMaxRunningSteps

GoBatch has internal TaskPools to run jobs and steps, the max running jobs and steps are limited by the pool size. The default value of the max running jobs and steps are 10, 1000. You can change the default settings by:

    gobatch.SetMaxRunningJobs(100)
    gobatch.SetMaxRunningSteps(5000)
Similar Resources

DND-magic-item-Generator - D&D magic item generator like in Diablo

DND-magic-item-Generator D&D magic item generator like in Diablo Legendary items

Mar 28, 2022

Nvote - Decentralized, vote-driven community similar to services like Reddit and HackerNews. Built on nostr

NVote Nvote is a decentralized, vote-driven community similar to services like R

Jan 4, 2023

Service that wrap up different movies-related APIs like IMDB and match it to streaming services

Service that wrap up different movies-related APIs like IMDB and match it to streaming services

Service that wrap up different movies-related APIs like IMDB and match it to streaming services. That way you can check in which platforms you can find your favorite movies.

Feb 10, 2022

Serverless SOAR (Security Orchestration, Automation and Response) framework for automatic inspection and evaluation of security alert

Serverless SOAR (Security Orchestration, Automation and Response) framework for automatic inspection and evaluation of security alert

DeepAlert DeepAlert is a serverless framework for automatic response of security alert. Overview DeepAlert receives a security alert that is event of

Jan 3, 2023

This bot require you to run the GETH client + use ethers framework.

Mad Liquidity Sniper This bot require you to run the GETH client + use ethers framework. All addresses and private keys contained have been changed fo

Oct 19, 2021

RESTful based volume management framework for GlusterFS

Heketi Heketi provides a RESTful management interface which can be used to manage the life cycle of GlusterFS volumes. With Heketi, cloud services lik

Nov 18, 2020

Flowdog - Framework for inspecting and editing traffic in AWS VPCs

Flowdog - Framework for inspecting and editing traffic in AWS VPCs

Twitter thread flowdog This is an application/framework for inspection and manip

Dec 5, 2022

Aes for go and java; build go fo wasm and use wasm parse java response.

aes_go_wasm_java aes for go and java; build go fo wasm and use wasm parse java response. vscode setting config settings.json { "go.toolsEnvVars":

Dec 14, 2021

Update-java-ca-certificates - Small utility to convert the system trust store to a system Java KeyStore

update-java-ca-certificates This small utility takes care of creating a system-w

Dec 28, 2022

Fabric-Batch-Chaincode (FBC) is a library that enables batch transactions in chaincode without additional trusted systems.

Fabric-Batch-Chaincode Fabric-Batch-Chaincode (FBC) is a library that enables batch transactions in chaincode without additional trusted systems. Over

Nov 9, 2022

Build platforms that flexibly mix SQL, batch, and stream processing paradigms

Overview Gazette makes it easy to build platforms that flexibly mix SQL, batch, and millisecond-latency streaming processing paradigms. It enables tea

Dec 12, 2022

Batch processing library for Go supports generics & values returning

Aggregator Aggregator is a batch processing library for Go supports returning values. You can group up and process batch of tasks with keys in a singl

Dec 29, 2022

Go Stream, like Java 8 Stream.

Go Stream, like Java 8 Stream.

Dec 1, 2022

A Cloud Native Buildpack that contributes SDKMAN and uses it to install dependencies like the Java Virtual Machine

gcr.io/paketo-buildpacks/sdkman A Cloud Native Buildpack that contributes SDKMAN and uses it to install dependencies like the Java Virtual Machine. Be

Jan 8, 2022

A go module supply Java-Like generic stream programming (while do type check at runtime)

gostream A go module supplying Java-Like generic stream programming (while do type check at runtime) Using Get a Stream To get a Stream, using SliceSt

Jan 16, 2022

Auto Judger for BUAA-SE-OOP Course (2021 Spring)

patpat Auto Judger for BUAA-SE-OOP Course (2021 Spring) 1. 评测机使用方法 1.1. 一些准备工作 1.1.1. 下载评测机 见 GitHub 中的 Releases,下载对应版本即可。目前提供 Windows, Linux, MacOS(I

Sep 25, 2022

Centralized Configuration System written in Golang - Spring cloud compatible

Centralized Configuration System written in Golang - Spring cloud compatible

Centralized Configuration System What is Vecosy Vecosy is a configuration service exposed through REST/GRPC. Is Spring Cloud Conf compatible and also

Dec 13, 2022

A simple, efficient spring animation library for smooth, natural motion🎼

A simple, efficient spring animation library for smooth, natural motion🎼

Harmonica A simple, efficient spring animation library for smooth, natural motion. It even works well on the command line.

Jan 1, 2023

EaseMesh is a service mesh that is compatible with the Spring Cloud ecosystem.

EaseMesh is a service mesh that is compatible with the Spring Cloud ecosystem.

A service mesh implementation for connecting, control, and observe services in spring-cloud.

Jan 4, 2023
Comments
  • [Feqture Request] HTTP reader,processor,writer

    [Feqture Request] HTTP reader,processor,writer

    在我们的实践中,batch 只是负责调度,具体的实现都是在业务服务中实现的,一般的流程是

    1. 通过 GET read API 读取一页数据,返回 { "data":[], "has_next_page":true, "next_cursor":"xxx" } 通过 has_next_page 判断是否还有下一页,用 next_cursor 读下一页数据

    2. 将 data 中的每一个 item 作为 body,POST process API

    3. 将一页所有 tems 的 process 结果作为 body,POST write API

    看能否内置这样一种处理机制。

  • Documentation/Example Request. (文档/示例请求)

    Documentation/Example Request. (文档/示例请求)

    We have been doing experimental projects with your library. We are more interested in Partitioner and PartitionerFactory. However no documentation is provided for it. Please provide some example for the same.

    Translated for you: 我们一直在与您的图书馆进行实验项目。我们对 Partitioner 和 PartitionerFactory 更感兴趣。但是没有为它提供文档。请提供一些相同的例子。

An experiment which attempts to create streams similar to what is available in Java now that we have generics in Go.

go-streams An experiment which attempts to create streams similar to what is available in Java now that we have generics. Should I use this library? N

Aug 9, 2022
Sap api integrations batch master record reads

sap-api-integrations-batch-master-record-reads sap-api-integrations-batch-master-record-reads は、外部システム(特にエッジコンピューティング環境)をSAPと統合することを目的に、SAP API で ロットマ

Sep 14, 2022
💾 Wolke API is the API behind Wolke image storage and processing aswell as user management

?? Wolke API Wolke API is the API behind Wolke image storage and processing aswell as user management Deploying To deploy Wolke Bot you'll need podman

Dec 21, 2021
Triangula-api-server - API server for processing images with Triangula

Triangula API server Minimalistic API server that calculates and serves artistic

Jan 10, 2022
Pokemon Unite scoreboard HUD and extra tools running over captured game feeds using the OpenCV video processing API and Client/Server architecture.
Pokemon Unite scoreboard HUD and extra tools running over captured game feeds using the OpenCV video processing API and Client/Server architecture.

unite Pokemon Unite scoreboard HUD and extra tools running over captured game feeds using the OpenCV video processing API. Client (OBS Live) Server Ar

Dec 5, 2022
fzf-like fuzzy-finder as a Go library
fzf-like fuzzy-finder as a Go library

go-fuzzyfinder go-fuzzyfinder is a Go library that provides fuzzy-finding with an fzf-like terminal user interface. Installation $ go get github.com/k

Jan 1, 2023
Software for archiving my digital stuff like tweets

rsms's memex Software for managing my digital information, like tweets. Usage First check out the source and build. You'll need Make and Go installed.

Nov 17, 2022
A simple Go utility to display track information from, and send commands to, spotifyd from Tiling Window Managers like Sway and i3
A simple Go utility to display track information from, and send commands to, spotifyd from Tiling Window Managers like Sway and i3

Untitled Spotifyd Controller A simple Go utility to display track information from, and send commands to, spotifyd from Tiling Window Managers like Sw

Mar 8, 2022
🚀 A command with fzf-like UI to quickly search Wikipedia articles and open it in your browser

fzwiki A command with fzf-like UI to quickly search Wikipedia articles and open it in your browser. Usage Run the command by specifying a search query

Dec 20, 2022