Parapipe - paralleling pipeline

Parapipe - paralleling pipeline

Mentioned in Awesome Go tests linters coverage Go Report Card GoDoc

The library provides a zero-dependency non-blocking buffered FIFO-pipeline for structuring the code and vertically scaling your app. Unlike regular pipeline examples you may find on the internet - parapipe executes everything on each step concurrently, yet maintaining the output order. Although, this library does not use any locks or mutexes. Just pure channels.

When to use

  • processed data can be divided in chunks (messages), and the flow may consist of one or more stages
  • data should be processed concurrently (scaled vertically)
  • the order of processing messages must be maintained

Installation

go get github.com/nazar256/parapipe

Usage

  1. Create a pipeline
cfg := parapipe.Config{
    ProcessErrors: false,	// messages implementing "error" interface will not be passed to subsequent workers
}
pipeline := parapipe.NewPipeline(cfg)
  1. Add pipes - call Pipe() method one or more times
concurrency := 5    // how many messages to process concurrently for each pipe
pipeline.Pipe(concurrency, func(msg interface{}) interface{} {
    typedMsg := msg.(YourInputType)     // assert your type for the message
    // do something and generate a new value "someValue"
    return someValue
})
  1. Get "out" channel when all pipes are added and read results from it
for result := range pipeline.Out() {
    typedResut := result.(YourResultType)
    // do something with the result
}

It's important to read everything from "out" even when the pipeline won't produce any viable result. It will be stuck otherwise.

  1. Push values for processing into the pipeline:
pipeline.Push("something")
  1. Close pipeline to clean up its resources and close its output channel after the last message. All internal channels, goroutines, including Out() channel will be closed in a cascade. It's not recommended closing pipeline using defer because you may not want to hang output util defer is executed.
pipeline.Close()

Error handling

To handle errors just return them as a result then listen to them on Out. By default, errors will not be processed by subsequent stages.

pipeline.Pipe(4, func(msg interface{}) interface{} {
    inputValue := msg.(YourInputType)     // assert your type for the message
    someValue, err := someOperation(inputValue)
    if err != nil {
        return err      // error can also be a result and can be returned from a pipeline stage (pipe)
    }
    return someValue
})
// ...
for result := range pipeline.Out() {
    err := result.(error)
    if err != nil {
        // handle the error
        // you may want to stop sending new values to the pipeline in your own way and do close(pipeline.In())
    }   
    typedResut := result.(YourResultType)
    // do something with the result
}

Optionally you may allow passing errors to subsequent pipes. For example, if you do not wish to stop the pipeline on errors, but rather process them in subsequent pipes.

cfg := parapipe.Config{
    ProcessErrors: true,	// messages implementing "error" interface will be passed to subsequent workers as any message
}
concurrency := 5    // how many messages to process concurrently for each pipe

pipeline := parapipe.NewPipeline(cfg).
    Pipe(concurrency, func(msg interface{}) interface{} {
        inputValue := msg.(YourInputType)     // assert your type for the message
        someValue, err := someOperation(inputValue)
        if err != nil {
            return err      // error can also be a result and can be returned from a pipeline stage (pipe)
        }
        return someValue
    }).
    Pipe(concurrency, func(msg interface{}) interface{} {
        switch inputValue := msg.(type) {
            case error:
                // process error 
            case YourNormalExpectedType:
                // process message normally
        }
    })

Limitations

  • Out() method can be used only once on each pipeline. Any subsequent Pipe() call will cause panic. Though, when you need to stream values somewhere from the middle of the pipeline - just send them to your own channel.
  • do not try to Push to the pipeline before the first Pipe is defined - it will panic
  • as at the time of writing Go does not have generics, you have to assert the type for incoming messages in pipes explicitly, which means the type of the message can be checked in runtime only.

Performance

As already was mentioned, parapipe makes use of interface{} and also executes callbacks in a separate goroutine per each message. This can have a great performance impact because of heap allocation and creation of goroutines. For instance if you try to stream a slice of integers, each of them will be converted to an interface type and will likely be allocated in heap. Moreover, if an execution time of each step is relatively small, than a goroutine creation may decrease overall performance considerably.

If the performance is the priority, its recommended that you pack such messages in batches (i.e. slices) and stream that batches instead. Obviously that's your responsibility to process batch in the order you like inside step (pipe) callback.

Basically the overall recommendations for choosing batch size are in general the same as if you have to create a slice of interfaces or create a new goroutine.

Examples

AMQP middleware

Parapipe can be handful when you need to process messages in the middle concurrently, yet maintaining their order.

See the working example of using parapipe in AMQP client.

Other examples

With parapipe you can:

  • respond a JSON-feed as stream, retrieve, enrich and marshal each object concurrently, in maintained order and return them to the client
  • fetch and merge entries from different sources as one stream
  • structure your HTTP-controllers
  • processing heavy files in effective way
Owner
Similar Resources

mongodb helper functions, document and pipeline builder

MongoUtils Mongodb helper functions, document and pipeline builder. Helpers ParseObjectID Parse object id from string. ParseObjectID(id string) *primi

Dec 12, 2021

repo de teste para executar á pipeline do rancher

pipeline-example-go This is a sample golang project to demonstrate the integration with rancher pipeline. Building go build -o ./bin/hello-server Runn

Dec 19, 2021

Sentiment Analysis Pipeline + API written in Golang (currently processing Twitter tweets).

Go Sentiment Analysis Components Config: config module based in JSON (enter twitter credentials for use) Controllers: handle the API db call/logic for

Mar 22, 2022

Sentiment Analysis Pipeline + API written in Golang (currently processing Twitter tweets).

Go Sentiment Analysis Components Config: config module based in JSON (enter twitter credentials for use) Controllers: handle the API db call/logic for

Mar 22, 2022

Git-based DevOps PaaS: Project, Pipeline, Kubernetes, ServiceMesh, MutilCloud

gitctl 一体化 DevOps 平台 从代码到应用的一体化编排,应用全生命周期管理,多云托管。 gitctl 会有哪些功能? git 代码托管 projec

Oct 24, 2022

An experimental tektoncd/pipeline resolver

vegetable resolvers A set of experimental tektoncd/pipeline resolvers. The idea of this is to experiment with what could be achieved with a Resolver.

May 12, 2022

WIP - Pin - local pipeline project with Docker Golang API.

WIP - Pin - local pipeline project with Docker Golang API.

pin 🔥 WIP - Local pipeline project with Docker Golang API. 🌐 Installation Download latest release You can download latest release from here Install

May 28, 2022
Comments
  • Add documentation for queuing work into the pipeline

    Add documentation for queuing work into the pipeline

    Please consider adding documentation for queuing work into the pipeline as I just spent the better part of a day debugging my own pipeline. In my case, the issue was that I had a for loop Push()ing items into the pipeline, and while the pipeline was running, it could deadlock after just a few items. The solution was to use a go routine to Push() items onto the pipeline so that it wouldn't block listening on the out channel. At least I think that is the solution.

    Anyway, please consider adding some documentation for best practices for Push()ing items onto the pipeline.

Baker is a high performance, composable and extendable data-processing pipeline for the big data era

Baker is a high performance, composable and extendable data-processing pipeline for the big data era. It shines at converting, processing, extracting or storing records (structured data), applying whatever transformation between input and output through easy-to-write filters.

Dec 14, 2022
churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline applications.

Churro - ETL for Kubernetes churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline appli

Mar 10, 2022
A library for performing data pipeline / ETL tasks in Go.
A library for performing data pipeline / ETL tasks in Go.

Ratchet A library for performing data pipeline / ETL tasks in Go. The Go programming language's simplicity, execution speed, and concurrency support m

Jan 19, 2022
A distributed, fault-tolerant pipeline for observability data

Table of Contents What Is Veneur? Use Case See Also Status Features Vendor And Backend Agnostic Modern Metrics Format (Or Others!) Global Aggregation

Dec 25, 2022
Powerful workflow engine and end-to-end pipeline solutions implemented with native Kubernetes resources. https://cyclone.dev
Powerful workflow engine and end-to-end pipeline solutions implemented with native Kubernetes resources. https://cyclone.dev

Cyclone Cyclone is a powerful workflow engine and end-to-end pipeline solution implemented with native Kubernetes resources, with no extra dependencie

Dec 6, 2022
Baker is a high performance, composable and extendable data-processing pipeline for the big data era

Baker is a high performance, composable and extendable data-processing pipeline for the big data era. It shines at converting, processing, extracting or storing records (structured data), applying whatever transformation between input and output through easy-to-write filters.

Dec 14, 2022
Pack a Go workflow/function as a Unix-style pipeline command
Pack a Go workflow/function as a Unix-style pipeline command

tpack Pack a Go workflow/function as a Unix-style pipeline command. Wiki In Unix-like computer operating systems, a pipeline is a mechanism for inter-

Nov 9, 2022
churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline applications.

Churro - ETL for Kubernetes churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline appli

Mar 10, 2022
Code for the Go pipeline blog post

Concurrent and Parallel Pipelines in Go This repository contains the code that goes along with a blog post about concurrent and parallel pipelines in

Nov 11, 2022
A sample golang project to demonstrate the integration with rancher pipeline

pipeline-example-go This is a sample golang project to demonstrate the integration with rancher pipeline. Building go build -o ./bin/hello-server Runn

Oct 30, 2021