Package flow provides support for basic FBP / pipelines

Flow - FBP / pipelines / workers pool

Build Status Go Report Card Coverage Status

Package flow provides support for very basic FBP / pipelines. It helps to structure multistage processing as a set of independent handlers communicating via channels. The typical use case is for ETL (extract, transform, load) type of processing. Package flow doesn't introduce any high-level abstraction and keeps everything in the hand of the user.

Package pool provides a simplified version of flow suitable for cases with a single-handler flows.

Details about flow package

  • Each handler represents an async stage. It consumes data from an input channel and publishes results to an output channel.
  • Each handler runs in a separate goroutine.
  • User must implement Handler functions and add it to the Flow.
  • Each handler usually creates an output channel, reads from the input channel, processes data, sends results to the output channel and closes the output channel.
  • Processing sequence determined by the order of those handlers.
  • Any Handler can run in multiple concurrent goroutines (workers) by using the Parallel decorator.
  • FanOut allows to pass multiple handlers in broadcast mode, i.e., each handler gets every input record. Outputs from these handlers merged into single output channel.
  • Processing error detected as return error value from user's handler func. Such error interrupts all other running handlers gracefully and won't keep any goroutine running/leaking.
  • Each Flow object can be executed only once.
  • Handler should handle context cancellation as a termination signal.

Install and update

go get -u github.com/go-pkgz/flow

Example of the flow's handler

// ReaderHandler creates flow.Handler, reading strings from any io.Reader
func ReaderHandler(reader io.Reader) Handler {
	return func(ctx context.Context, ch chan interface{}) (chan interface{}, func() error) {
		metrics := flow.GetMetrics(ctx) // metrics collects how many records read with "read" key.

		readerCh := make(chan interface{}, 1000)
		readerFn := func() error {
			defer close(readerCh)

			scanner := bufio.NewScanner(reader)
			for scanner.Scan() {

				select {
				case readerCh <- scanner.Text():
					metrics.Inc("read")
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return errors.Wrap(scanner.Err(), "scanner failed")
		}
		return readerCh, readerFn
	}
}

Usage of the flow package

for complete example see example

// flow illustrates the use of a Flow for concurrent pipeline running each handler in separate goroutine.
func ExampleFlow_flow() {

	f := New() // create new empty Flow
	f.Add(     // add handlers. Note: handlers can be added directly in New

		// first handler, generate 100 initial values.
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}, 100) // example of non-async handler
			for i := 1; i <= 100; i++ {
				out <- i
			}
			close(out)      // each handler has to close out channel
			return out, nil // no runnable function for non-async handler
		},

		// second handler - picks odd numbers only and multiply
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}) // async handler makes its out channel
			runFn = func() error {
				defer close(out) // handler should close out channel
				for e := range in {
					val := e.(int)
					if val%2 == 0 {
						continue
					}
					f.Metrics().Inc("passed") // increment user-define metric "passed"

					// send result to the next stage with flow.Send helper. Also checks for cancellation
					if err := Send(ctx, out, val*rand.Int()); err != nil {
						return err
					}
				}
				return nil
			}
			return out, runFn
		},

		// final handler - sum all numbers
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}, 1)
			runFn = func() error {
				defer close(out)
				sum := 0
				for {
					select {
					case e, more := <-in:
						if !more {
							out <- sum //send result
							return nil
						}
						val := e.(int)
						sum += val

					case <-ctx.Done():
						return ctx.Err()
					}
				}
			}
			return out, runFn
		},
	)

	f.Go() // activate flow

	// wait for all handlers to complete
	if err := f.Wait(); err == nil {
		fmt.Printf("all done, result=%v, passed=%d", <-f.Channel(), f.Metrics().Get("passed"))
	}
}
// illustrates the use of a Flow for concurrent pipeline running some handlers in parallel way.
func ExampleFlow_parallel() {

	f := New() // create new empty Flow

	// make flow with mixed singles and parallel handlers and activate
	f.Add(

		// generate 100 initial values in single handler
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}, 100) // example of non-async handler
			for i := 1; i <= 100; i++ {
				out <- i
			}
			close(out)      // each handler has to close out channel
			return out, nil // no runnable function for non-async handler
		},

		// multiple all numbers in 10 parallel handlers
		f.Parallel(10, func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			out = make(chan interface{}) // async handler makes its out channel
			runFn = func() error {
				defer close(out) // handler should close out channel
				for e := range in {
					val := e.(int)
					select {
					case out <- val * rand.Int(): // send result to the next stage
					case <-ctx.Done(): // check for cancellation
						return ctx.Err()
					}
				}
				return nil
			}
			return out, runFn
		}),

		// print all numbers
		func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
			runFn = func() error {
				defer close(out)
				sum := 0
				for e := range in {
					val := e.(int)
					sum += val
					select {
					case <-ctx.Done():
						return ctx.Err()
					default:
					}
				}
				fmt.Printf("all done, result=%d", sum)
				return nil
			}
			return out, runFn
		},
	)

	// wait for all handlers to complete
	if err := f.Wait(); err == nil {
		fmt.Printf("all done, result=%v", <-f.Channel())
	}
}

Details about pool package

Pool package provides thin implementation of workers pool. In addition to the default "run a func in multiple goroutines" mode, it also provides an optional support of chunked workers. In this mode each key, detected by user-provide func, guaranteed to be processed by the same worker. Such mode needed for stateful flows where each set of input records has to be processed sequentially and some state should be kept. Another thing pool allows to define is the batch size. This one is a simple performance optimization collecting input request into a buffer and send them to worker channel in batches (slices) instead of per-submit call.

Options:

  • ChunkFn - the function returns string identifying the chunk
  • Batch - sets batch size (default 1)
  • ChanResSize sets the size of output buffered channel (default 1)
  • ChanWorkerSize sets the size of workers buffered channel (default 1)
  • ContinueOnError allows workers continuation after error occurred
  • OnCompletion sets callback (for each worker) called on successful completion

worker function

Worker function passed by user and runs in multiple workers (goroutines) concurrently. This is the function: type workerFn func(ctx context.Context, inp interface{}, sender SenderFn, store WorkerStore} error

It takes inp parameter, does the job and optionally send result(s) with SenderFn to the common results channel. Error will terminate all workers unless ContinueOnError set.

Note: workerFn can be stateful, collect anything it needs and sends 0 or more results by calling SenderFn one or more times.

worker store

Each worker gets WorkerStore and can be used as thread-safe per-worker storage for any intermediate results.

type WorkerStore interface {
	Set(key string, val interface{})
	Get(key string) (interface{}, bool)
	GetInt(key string) int
	GetFloat(key string) float64
	GetString(key string) string
	GetBool(key string) bool
	Keys() []string
	Delete(key string)
}

alternatively state can be kept outside of workers as a slice of values and accessed by worker ID.

usage

    p := pool.New(8, func(ctx context.Context, v interface{}, sendFn pool.Sender, ws pool.WorkerStore} error {
        // worker function gets input v processes it and response(s) channel to send results

        input, ok := v.(string) // in this case it gets string as input
        if !ok {
            return errors.New("incorrect input type")
        }   
        // do something with input
        // ...
       
        v := ws.GetInt("something")  // access thread-local var
           
        sendFn("foo", nil) // send "foo" and nil error     
        sendFn("bar", nil) // send "foo" and nil error     
        pool.Metrics(ctx).Inc("counter")
        ws.Set("something", 1234) // keep thread-local things
       return "something", true, nil
    })
    
    cursor, err := p.Go(context.TODO()) // start all workers in 8 goroutines and get back result's cursor
    
    // submit values (consumer side)
    go func() {
        p.Submit("something")
        p.Submit("something else")
        p.Close() // indicates completion of all inputs
    }()   

    var v interface{}
    for cursor(ctx, &v) {
        log.Print(v)  // print value
    }
    
    if cursor.Err() != nil { // error happened
        return cursor.Err()
    } 

    // alternatively read all from the cursor (response channel)
    res, err := cursor.All(ctx)

    // metrics the same as for flow
    metrics := pool.Metrics()
    log.Print(metrics.Get("counter"))
Owner
go packages
go packages
go packages
Similar Resources

This package provides support for using Bluetooth with gokrazy!

This package provides support for using Bluetooth with gokrazy! Note that general purpose Bluetooth would depend on userland utilities such as bluez w

Jun 13, 2022

Basic-api-with-go - A basic api with golang

I am creating my first API with GO. Install go get -u github.com/Yefhem/basic-ap

Jan 3, 2022

Go-basic-graphql - Basic implementation of GraphQL using Go

Go-basic-graphql - Basic implementation of GraphQL using Go

Jan 29, 2022

🔥 Golang Rest Api with basic JWT Authentication and Basic Crud Operations.

🔥 Golang Rest Api with basic JWT Authentication and Basic Crud Operations.

Oct 4, 2022

Go-basic-skeleton - Simple and basic skeleton for go projects

Go Bootstrap (base/skeleton) Introduction This is a repository intended to serve

Mar 16, 2022

Go library containing a collection of financial functions for time value of money (annuities), cash flow, interest rate conversions, bonds and depreciation calculations.

go-finance Go library containing a collection of financial functions for time value of money (annuities), cash flow, interest rate conversions, bonds

Jan 2, 2023

🚧 Flexible mechanism to make execution flow interruptible.

🚧 breaker Flexible mechanism to make execution flow interruptible. 💡 Idea The breaker carries a cancellation signal to interrupt an action execution

Dec 13, 2022

Framework for performing work asynchronously, outside of the request flow

Framework for performing work asynchronously, outside of the request flow

JobRunner JobRunner is framework for performing work asynchronously, outside of the request flow. It comes with cron to schedule and queue job functio

Jan 1, 2023

Flow-based and dataflow programming library for Go (golang)

Flow-based and dataflow programming library for Go (golang)

GoFlow - Dataflow and Flow-based programming library for Go (golang) Status of this branch (WIP) Warning: you are currently on v1 branch of GoFlow. v1

Dec 30, 2022

This static analysis tool works to ensure your program's data flow does not spill beyond its banks.

Go Flow Levee This static analysis tool works to ensure your program's data flow does not spill beyond its banks. An input program's data flow is expl

Dec 1, 2022

A simulated-annealing approach to solving a max-flow removal problem

A simulated-annealing approach to solving a max-flow removal problem

RESISTANCE IS FUTILE How to run: Install the latest version of golang to your computer (1.16?) Run a postgres instance on your computer attatched to p

Aug 26, 2022

Contracts for the versus-flow.art project

Versus Flow Auction Contract This is a git repo for the cadence contrats for versus@flow. Follow the guide below to set it up and test locally in the

Jul 19, 2022

A general-purpose Cadence contract for trading NFTs on Flow

NFT Storefront The NFT storefront is a general-purpose Cadence contract for trading NFTs on Flow. NFTStorefront uses modern Cadence run-time type faci

Dec 24, 2022

manipulate WireGuard with OpenID Connect Client Initiated Backchannel Authentication(CIBA) Flow

oidc-wireguard-vpn manipulate WireGuard with OpenID Connect Client Initiated Backchannel Authentication(CIBA) Flow Requirements Linux WireGuard nftabl

Oct 7, 2022

cloud native application deploy flow

cloud native application deploy flow

Triton-io/Triton English | 简体中文 Introduction Triton provides a cloud-native DeployFlow, which is safe, controllable, and policy-rich. For more introdu

May 28, 2022

Examples on different options for implementing Flow Based Programming

Flow Based Programming This repository contains fragments and ideas related to Flow Based Programming. It shows different ways of implementing differe

Sep 22, 2022

pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Psub helper for create system using kafka to streaming and events driven base. Install go get github.com/teng231/psub have 3 env variables for config

Sep 26, 2022

GoogleBookAPI is built on top of flogo, a flow based application.

GoogleBookAPI Example GoogleBookAPI is built on top of flogo, a flow based application. Upon launch for first time, the application creates a topic go

Nov 19, 2021

Enterprise Network Flow Collector (IPFIX, sFlow, Netflow)

 Enterprise Network Flow Collector (IPFIX, sFlow, Netflow)

High-performance, scalable and reliable IPFIX, sFlow and Netflow collector (written in pure Golang). Features IPFIX RFC7011 collector sFLow v5 raw hea

Dec 27, 2022
Graphik is a Backend as a Service implemented as an identity-aware document & graph database with support for gRPC and graphQL
Graphik is a Backend as a Service implemented as an identity-aware document & graph database with support for gRPC and graphQL

Graphik is a Backend as a Service implemented as an identity-aware, permissioned, persistant document/graph database & pubsub server written in Go.

Dec 30, 2022
sq is a command line tool that provides jq-style access to structured data sources such as SQL databases, or document formats like CSV or Excel.

sq: swiss-army knife for data sq is a command line tool that provides jq-style access to structured data sources such as SQL databases, or document fo

Jan 1, 2023
A limited Flow Access API which runs outside of the Flow Network using the DPS

Access API Flow DPS implements the Flow Access API Specification, except for the following endpoints: SendTransaction GetLatestProtocolStateSnapshot G

Jul 28, 2022
Hunch provides functions like: All, First, Retry, Waterfall etc., that makes asynchronous flow control more intuitive.
Hunch provides functions like: All, First, Retry, Waterfall etc., that makes asynchronous flow control more intuitive.

Hunch Hunch provides functions like: All, First, Retry, Waterfall etc., that makes asynchronous flow control more intuitive. About Hunch Go have sever

Dec 8, 2022
Package buildinfo provides basic building blocks and instructions to easily add build and release information to your app.
Package buildinfo provides basic building blocks and instructions to easily add build and release information to your app.

Package buildinfo provides basic building blocks and instructions to easily add build and release information to your app. This is done by replacing variables in main during build with ldflags.

Nov 14, 2021
This package provides Go (golang) types and helper functions to do some basic but useful things with mxGraph diagrams in XML, which is most famously used by app.diagrams.net, the new name of draw.io.

Go Draw - Golang MX This package provides types and helper functions to do some basic but useful things with mxGraph diagrams in XML, which is most fa

Aug 30, 2022
Schemable - Schemable provides basic struct mapping against a database, using the squirrel package

Schemable Schemable provides basic struct mapping against a database, using the

Oct 17, 2022
Replacement of ApacheBench(ab), support for transactional requests, support for command line and package references to HTTP stress testing tool.

stress stress is an HTTP stress testing tool. Through this tool, you can do a stress test on the HTTP service and get detailed test results. It is ins

Aug 23, 2022
Package socket provides a low-level network connection type which integrates with Go's runtime network poller to provide asynchronous I/O and deadline support. MIT Licensed.

socket Package socket provides a low-level network connection type which integrates with Go's runtime network poller to provide asynchronous I/O and d

Dec 14, 2022