Ordered-concurrently a library for parallel processing with ordered output in Go

Tests codecov Go Reference Gitpod ready-to-code Go Report Card

Ordered Concurrently

A library for parallel processing with ordered output in Go. This module processes work concurrently / in parallel and returns output in a channel in the order of input. It is useful in concurrently / parallelly processing items in a queue, and get output in the order provided by the queue.

Usage

Get Module

go get github.com/tejzpr/ordered-concurrently/v3

Import Module in your source code

import concurrently "github.com/tejzpr/ordered-concurrently/v3" 

Create a work function by implementing WorkFunction interface

// Create a type based on your input to the work function
type loadWorker int

// The work that needs to be performed
// The input type should implement the WorkFunction interface
func (w loadWorker) Run() interface{} {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
	return w * 2
}

Run

Example - 1

func main() {
	max := 10
	inputChan := make(chan concurrently.WorkFunction)
	ctx := context.Background()
	output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
	go func() {
		for work := 0; work < max; work++ {
			inputChan <- loadWorker(work)
		}
		close(inputChan)
	}()
	for out := range output {
		log.Println(out.Value)
	}
}

Example - 2 - Process unknown number of inputs

func main() {
	inputChan := make(chan concurrently.WorkFunction, 10)
	ctx := context.Background()
	output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})

	ticker := time.NewTicker(100 * time.Millisecond)
	done := make(chan bool)
	wg := &sync.WaitGroup{}
	go func() {
		input := 0
		for {
			select {
			case <-done:
				return
			case <-ticker.C:
				inputChan <- loadWorker(input)
				wg.Add(1)
				input++
			default:
			}
		}
	}()

	var res []loadWorker
	go func() {
		for out := range output {
			res = append(res, out.Value.(loadWorker))
			wg.Done()
		}
	}()

	time.Sleep(1600 * time.Millisecond)
	ticker.Stop()
	done <- true
	close(inputChan)
	wg.Wait()

	// Check if output is sorted
	isSorted := sort.SliceIsSorted(res, func(i, j int) bool {
		return res[i] < res[j]
	})
	if !isSorted {
		log.Println("output is not sorted")
	}
}

Credits

  1. u/justinisrael for inputs on improving resource usage.
  2. mh-cbon for identifying potential deadlocks.
Similar Resources

Run The World. Command aggregator output. Define many services watch them in one place.

Run The World. Command aggregator output. Define many services watch them in one place.

Feb 2, 2022

Generic mapStringInterface tool for extracting of data for CSV output

Generic mapStringInterface tool for extracting of data for CSV output

Nov 2, 2021

Parse a shell script and output all export declarations in an easy to read format

Find Exports Parse a shell script and output all export declarations in an easy to read format. Usage Example $ findexports ~/.bashrc PATH=$PATH:/usr/

Jan 13, 2022

🏃‍♂️ A new way to execute commands and manipulate command output in Go

🏃‍♂️ A new way to execute commands and manipulate command output in Go

Nov 11, 2022

Library to work with MimeHeaders and another mime types. Library support wildcards and parameters.

Mime header Motivation This library created to help people to parse media type data, like headers, and store and match it. The main features of the li

Nov 9, 2022

Evolutionary optimization library for Go (genetic algorithm, partical swarm optimization, differential evolution)

Evolutionary optimization library for Go (genetic algorithm, partical swarm optimization, differential evolution)

eaopt is an evolutionary optimization library Table of Contents Changelog Example Background Features Usage General advice Genetic algorithms Overview

Dec 30, 2022

cross-platform, normalized battery information library

battery Cross-platform, normalized battery information library. Gives access to a system independent, typed battery state, capacity, charge and voltag

Dec 22, 2022

GoLang Library for Browser Capabilities Project

Browser Capabilities GoLang Project PHP has get_browser() function which tells what the user's browser is capable of. You can check original documenta

Sep 27, 2022

Go bindings for unarr (decompression library for RAR, TAR, ZIP and 7z archives)

go-unarr Golang bindings for the unarr library from sumatrapdf. unarr is a decompression library and CLI for RAR, TAR, ZIP and 7z archives. GoDoc See

Dec 29, 2022
Comments
  • deadlock

    deadlock

    hey,

    I believe there is a deadlock in your code.

    I was running benchmarks to measure and compare your code with mine.

    I was hit with

    $ go test -bench=. -benchmem
    goos: linux
    goarch: amd64
    pkg: test/concur
    BenchmarkMe-4     	    4653	    234443 ns/op	   10216 B/op	     205 allocs/op
    BenchmarkOrig-4   	fatal error: all goroutines are asleep - deadlock!
    
    goroutine 1 [chan receive]:
    testing.(*B).doBench(0xc000126480, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:277 +0x73
    testing.(*benchContext).processBench(0xc00000c0e0, 0xc000126480)
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:570 +0x218
    testing.(*B).run(0xc000126480)
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:268 +0x65
    testing.(*B).Run(0xc000126000, 0x548ce9, 0xd, 0x551778, 0x4c4300)
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:655 +0x41b
    testing.runBenchmarks.func1(0xc000126000)
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:534 +0x78
    testing.(*B).runN(0xc000126000, 0x1)
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:191 +0xeb
    testing.runBenchmarks(0x548814, 0xb, 0xc00000c0c0, 0x623060, 0x2, 0x2, 0x62aa60)
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:540 +0x3b5
    testing.(*M).Run(0xc000114000, 0x0)
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/testing.go:1363 +0x56a
    main.main()
    	_testmain.go:45 +0x138
    
    goroutine 66550 [chan receive]:
    test/concur.runOrig()
    	/home/mh-cbon/gow/src/test/concur/main_test.go:59 +0x195
    test/concur.BenchmarkOrig(0xc000126480)
    	/home/mh-cbon/gow/src/test/concur/main_test.go:73 +0x2b
    testing.(*B).runN(0xc000126480, 0x64)
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:191 +0xeb
    testing.(*B).launch(0xc000126480)
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:321 +0xea
    created by testing.(*B).doBench
    	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:276 +0x55
    
    goroutine 66622 [chan receive]:
    test/concur.runOrig.func1(0xc0002b2060, 0xc0000284e0)
    	/home/mh-cbon/gow/src/test/concur/main_test.go:44 +0x45
    created by test/concur.runOrig
    	/home/mh-cbon/gow/src/test/concur/main_test.go:41 +0xdc
    
    goroutine 66621 [chan send]:
    test/concur/orig.Process.func1(0xc000016160, 0xc0002b2060, 0x551790, 0xc0001d0018)
    	/home/mh-cbon/gow/src/test/concur/orig/lib.go:112 +0x285
    created by test/concur/orig.Process
    	/home/mh-cbon/gow/src/test/concur/orig/lib.go:36 +0xaf
    
    goroutine 66610 [select (no cases)]:
    test/concur/orig.Process.func1.1(0xc0001d0010, 0xc0002b2000, 0xc0000283c0)
    	/home/mh-cbon/gow/src/test/concur/orig/lib.go:52 +0x2cf
    created by test/concur/orig.Process.func1
    	/home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x14c
    
    goroutine 66549 [chan send]:
    test/concur.runOrig.func1(0xc00013c000, 0xc000028240)
    	/home/mh-cbon/gow/src/test/concur/main_test.go:49 +0x66
    created by test/concur.runOrig
    	/home/mh-cbon/gow/src/test/concur/main_test.go:41 +0xdc
    
    goroutine 66623 [chan receive]:
    test/concur/orig.Process.func1.1(0xc0001d0020, 0xc0002b2060, 0xc000028600)
    	/home/mh-cbon/gow/src/test/concur/orig/lib.go:52 +0x1c5
    created by test/concur/orig.Process.func1
    	/home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x14c
    
    goroutine 66609 [chan send]:
    test/concur.runOrig.func1(0xc0002b2000, 0xc0000282a0)
    	/home/mh-cbon/gow/src/test/concur/main_test.go:49 +0x66
    created by test/concur.runOrig
    	/home/mh-cbon/gow/src/test/concur/main_test.go:41 +0xdc
    
    goroutine 66574 [select (no cases)]:
    test/concur/orig.Process.func1.1(0xc0001d0000, 0xc00013c000, 0xc0001ce0c0)
    	/home/mh-cbon/gow/src/test/concur/orig/lib.go:52 +0x2cf
    created by test/concur/orig.Process.func1
    	/home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x14c
    exit status 2
    FAIL	test/concur	2.887s
    

    My test code is

    package main
    
    import (
    	"math/rand"
    	"test/concur/me"
    	"test/concur/orig"
    	"testing"
    	"time"
    )
    
    var max = 100
    var poolSize = 10
    var outputLen = 10
    
    var _ = time.Millisecond
    var _ = rand.Intn
    
    func runMe() {
    	input := make(chan func() interface{})
    	output := me.Process(input, me.Options{PoolSize: poolSize, OutChannelBuffer: outputLen})
    	go func() {
    		for work := 0; work < max; work++ {
    			value := work
    			input <- func() interface{} {
    				// time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
    				return value * 2
    			}
    		}
    		close(input)
    	}()
    	for val := range output {
    		// fmt.Println(val)
    		_ = val
    	}
    }
    
    func runOrig() {
    	inputChan := make(chan *orig.OrderedInput)
    	doneChan := make(chan bool)
    	outChan := orig.Process(inputChan, workFn, &orig.Options{PoolSize: poolSize, OutChannelBuffer: outputLen})
    	go func() {
    		for {
    			select {
    			case out, chok := <-outChan:
    				if chok {
    					// log.Println(out.Value)
    					_ = out
    				} else {
    					doneChan <- true
    				}
    			}
    		}
    	}()
    	for work := 0; work < max; work++ {
    		input := &orig.OrderedInput{Value: work}
    		inputChan <- input
    	}
    	close(inputChan)
    	<-doneChan
    }
    func workFn(val interface{}) interface{} {
    	// time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
    	return val.(int)
    }
    
    func BenchmarkMe(b *testing.B) {
    	for i := 0; i < b.N; i++ {
    		runMe()
    	}
    }
    func BenchmarkOrig(b *testing.B) {
    	for i := 0; i < b.N; i++ {
    		runOrig()
    	}
    }
    

    As you can see runOrig is very much like the main you have demonstrated at stackoverflow.

    consider i commented sleep and print instructions to improve the measurements, they were polluting.

    The package orig is a copy paste of https://github.com/tejzpr/ordered-concurrently/blob/master/main.go in my local, i only renamed the package name for my tests.

    Though, i did not try to debug it, it lacks clarity imho, and that is the reason i did that test to begin with.

    I also tried to run the test with the race detector, bad news, it appears your code also contains a datarace

    $ go test -race -bench=. -benchmem
    goos: linux
    goarch: amd64
    pkg: test/concur
    BenchmarkMe-4     	     382	   3095423 ns/op	    8830 B/op	     204 allocs/op
    BenchmarkOrig-4   	==================
    WARNING: DATA RACE
    Read at 0x00c0003fe020 by goroutine 84:
      test/concur/orig.Process.func1.2()
          /home/mh-cbon/gow/src/test/concur/orig/lib.go:87 +0x138
    
    Previous write at 0x00c0003fe020 by goroutine 49:
      test/concur/orig.Process.func1.1()
          /home/mh-cbon/gow/src/test/concur/orig/lib.go:69 +0x3b2
    
    Goroutine 84 (running) created at:
      test/concur/orig.Process.func1()
          /home/mh-cbon/gow/src/test/concur/orig/lib.go:82 +0x234
    
    Goroutine 49 (running) created at:
      test/concur/orig.Process.func1()
          /home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x1ba
    ==================
    ==================
    WARNING: DATA RACE
    Read at 0x00c000018108 by goroutine 34:
      internal/race.Read()
          /home/mh-cbon/.gvm/gos/go1.15.2/src/internal/race/race.go:37 +0x206
      sync.(*WaitGroup).Add()
          /home/mh-cbon/.gvm/gos/go1.15.2/src/sync/waitgroup.go:71 +0x219
      test/concur/orig.Process.func1.2()
          /home/mh-cbon/gow/src/test/concur/orig/lib.go:84 +0x84
    
    Previous write at 0x00c000018108 by goroutine 109:
      internal/race.Write()
          /home/mh-cbon/.gvm/gos/go1.15.2/src/internal/race/race.go:41 +0x125
      sync.(*WaitGroup).Wait()
          /home/mh-cbon/.gvm/gos/go1.15.2/src/sync/waitgroup.go:128 +0x126
      test/concur/orig.Process.func1.2()
          /home/mh-cbon/gow/src/test/concur/orig/lib.go:90 +0x186
    
    Goroutine 34 (running) created at:
      test/concur/orig.Process.func1()
          /home/mh-cbon/gow/src/test/concur/orig/lib.go:82 +0x234
    
    Goroutine 109 (running) created at:
      test/concur/orig.Process.func1()
          /home/mh-cbon/gow/src/test/concur/orig/lib.go:82 +0x234
    ==================
    ^Csignal: interrupt
    FAIL	test/concur	172.097s
    

    Maybe that has to do with the fact i commented the sleep instructions, idk.

    for completeness,

    • i run go1.15.something/probably latest
    • the code i compared with is available at https://play.golang.org/p/2o4W_BgaC4t though, not really tested, i was just experiencing.
  • Fully automate dev setup with Gitpod

    Fully automate dev setup with Gitpod

    This commit implements a fully-automated development setup using Gitpod.io, an online IDE for GitLab, GitHub, and Bitbucket that enables Dev-Environments-As-Code. This makes it easy for anyone to get a ready-to-code workspace for any branch, issue or pull request almost instantly with a single click.

  • Change WorkFunction to anonymous function

    Change WorkFunction to anonymous function

    Is your feature request related to a problem? Please describe. Since this is right now declared as an interface with a single Run() method I am unable to pass any other variables as part of context.

    Describe the solution you'd like Using anonymous function instead of interface we don't need to declare any new types and are able to use execution context. Such as (from the example code):

    func main() {
            // init some external client
            c := client.New(...)
    	max := 10
    	inputChan := make(chan concurrently.WorkFunction)
    	output := concurrently.Process(inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
    	go func() {
    		for work := 0; work < max; work++ {
    			inputChan <- func() interface{} {
    				// do some work using client
    				result := c.DoWork(work)
    				return result
    			}
    		}
    		close(inputChan)
    	}()
    	for out := range output {
    		log.Println(out.Value)
    	}
    }
    

    Describe alternatives you've considered N/A

    Additional context N/A

An easy-to-use Map Reduce Go parallel-computing framework inspired by 2021 6.824 lab1. It supports multiple workers on a single machine right now.

MapReduce This is an easy-to-use Map Reduce Go framework inspired by 2021 6.824 lab1. Feature Multiple workers on single machine right now. Easy to pa

Dec 5, 2022
A simple package for executing work in parallel up to a limit.

concurrencylimiter A simple package for executing work concurrently - up to a limit. The intended usecase looks something like: func concurrentlyDo(ta

Dec 19, 2021
💥 Fusion is a tiny stream processing library written in Go.

?? Fusion Fusion is a tiny stream processing library written in Go. See reactor for a stream processing tool built using fusion. Features Simple & lig

Jun 30, 2021
Go library for Common Lisp format style output

format This library has the goal to bring the Common Lisp format directive to Go. This is work-in-progress, see the summary implementation table below

Jul 7, 2020
Processing Nomad Events Stream
Processing Nomad Events Stream

Nomad Events Sink Nomad Events Sink is an events collection agent which uses Nomad Events SDK to fetch events. Events can help debug the cluster state

Dec 19, 2022
Exercise for solve problem data processing, performance and something wrong in passing data

Citcall Exercise Exercise for solve problem data processing, performance and something wrong in passing data Pengolahan data data processing - Readme

Nov 25, 2021
Tidb - An open-source NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads
Tidb - An open-source NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads

What is TiDB? TiDB ("Ti" stands for Titanium) is an open-source NewSQL database

Jan 5, 2022
Colorize (highlight) `go build` command output
Colorize (highlight) `go build` command output

colorgo colorgo is a wrapper to go command that colorizes output from go build and go test. Installation go get -u github.com/songgao/colorgo Usage c

Dec 18, 2022
A cli for fetching the status and full output of CircleCI jobs.

CCI A cli for fetching the status and full output of CircleCI jobs. Install go install github.com/tmessi/cci/cci@latest Usage cci is designed to have

Oct 29, 2021
rsync wrapper (or output parser) that pushes metrics to prometheus

rsync-prom An rsync wrapper (or output parser) that pushes metrics to prometheus. This allows you to then build dashboards and alerting for your rsync

Dec 11, 2022