hey,
I believe there is a deadlock in your code.
I was running benchmarks to measure and compare your code with mine.
I was hit with
$ go test -bench=. -benchmem
goos: linux
goarch: amd64
pkg: test/concur
BenchmarkMe-4 4653 234443 ns/op 10216 B/op 205 allocs/op
BenchmarkOrig-4 fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
testing.(*B).doBench(0xc000126480, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:277 +0x73
testing.(*benchContext).processBench(0xc00000c0e0, 0xc000126480)
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:570 +0x218
testing.(*B).run(0xc000126480)
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:268 +0x65
testing.(*B).Run(0xc000126000, 0x548ce9, 0xd, 0x551778, 0x4c4300)
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:655 +0x41b
testing.runBenchmarks.func1(0xc000126000)
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:534 +0x78
testing.(*B).runN(0xc000126000, 0x1)
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:191 +0xeb
testing.runBenchmarks(0x548814, 0xb, 0xc00000c0c0, 0x623060, 0x2, 0x2, 0x62aa60)
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:540 +0x3b5
testing.(*M).Run(0xc000114000, 0x0)
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/testing.go:1363 +0x56a
main.main()
_testmain.go:45 +0x138
goroutine 66550 [chan receive]:
test/concur.runOrig()
/home/mh-cbon/gow/src/test/concur/main_test.go:59 +0x195
test/concur.BenchmarkOrig(0xc000126480)
/home/mh-cbon/gow/src/test/concur/main_test.go:73 +0x2b
testing.(*B).runN(0xc000126480, 0x64)
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:191 +0xeb
testing.(*B).launch(0xc000126480)
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:321 +0xea
created by testing.(*B).doBench
/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:276 +0x55
goroutine 66622 [chan receive]:
test/concur.runOrig.func1(0xc0002b2060, 0xc0000284e0)
/home/mh-cbon/gow/src/test/concur/main_test.go:44 +0x45
created by test/concur.runOrig
/home/mh-cbon/gow/src/test/concur/main_test.go:41 +0xdc
goroutine 66621 [chan send]:
test/concur/orig.Process.func1(0xc000016160, 0xc0002b2060, 0x551790, 0xc0001d0018)
/home/mh-cbon/gow/src/test/concur/orig/lib.go:112 +0x285
created by test/concur/orig.Process
/home/mh-cbon/gow/src/test/concur/orig/lib.go:36 +0xaf
goroutine 66610 [select (no cases)]:
test/concur/orig.Process.func1.1(0xc0001d0010, 0xc0002b2000, 0xc0000283c0)
/home/mh-cbon/gow/src/test/concur/orig/lib.go:52 +0x2cf
created by test/concur/orig.Process.func1
/home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x14c
goroutine 66549 [chan send]:
test/concur.runOrig.func1(0xc00013c000, 0xc000028240)
/home/mh-cbon/gow/src/test/concur/main_test.go:49 +0x66
created by test/concur.runOrig
/home/mh-cbon/gow/src/test/concur/main_test.go:41 +0xdc
goroutine 66623 [chan receive]:
test/concur/orig.Process.func1.1(0xc0001d0020, 0xc0002b2060, 0xc000028600)
/home/mh-cbon/gow/src/test/concur/orig/lib.go:52 +0x1c5
created by test/concur/orig.Process.func1
/home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x14c
goroutine 66609 [chan send]:
test/concur.runOrig.func1(0xc0002b2000, 0xc0000282a0)
/home/mh-cbon/gow/src/test/concur/main_test.go:49 +0x66
created by test/concur.runOrig
/home/mh-cbon/gow/src/test/concur/main_test.go:41 +0xdc
goroutine 66574 [select (no cases)]:
test/concur/orig.Process.func1.1(0xc0001d0000, 0xc00013c000, 0xc0001ce0c0)
/home/mh-cbon/gow/src/test/concur/orig/lib.go:52 +0x2cf
created by test/concur/orig.Process.func1
/home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x14c
exit status 2
FAIL test/concur 2.887s
My test code is
package main
import (
"math/rand"
"test/concur/me"
"test/concur/orig"
"testing"
"time"
)
var max = 100
var poolSize = 10
var outputLen = 10
var _ = time.Millisecond
var _ = rand.Intn
func runMe() {
input := make(chan func() interface{})
output := me.Process(input, me.Options{PoolSize: poolSize, OutChannelBuffer: outputLen})
go func() {
for work := 0; work < max; work++ {
value := work
input <- func() interface{} {
// time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
return value * 2
}
}
close(input)
}()
for val := range output {
// fmt.Println(val)
_ = val
}
}
func runOrig() {
inputChan := make(chan *orig.OrderedInput)
doneChan := make(chan bool)
outChan := orig.Process(inputChan, workFn, &orig.Options{PoolSize: poolSize, OutChannelBuffer: outputLen})
go func() {
for {
select {
case out, chok := <-outChan:
if chok {
// log.Println(out.Value)
_ = out
} else {
doneChan <- true
}
}
}
}()
for work := 0; work < max; work++ {
input := &orig.OrderedInput{Value: work}
inputChan <- input
}
close(inputChan)
<-doneChan
}
func workFn(val interface{}) interface{} {
// time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
return val.(int)
}
func BenchmarkMe(b *testing.B) {
for i := 0; i < b.N; i++ {
runMe()
}
}
func BenchmarkOrig(b *testing.B) {
for i := 0; i < b.N; i++ {
runOrig()
}
}
As you can see runOrig
is very much like the main
you have demonstrated at stackoverflow.
consider i commented sleep and print instructions to improve the measurements, they were polluting.
The package orig
is a copy paste of https://github.com/tejzpr/ordered-concurrently/blob/master/main.go in my local, i only renamed the package name for my tests.
Though, i did not try to debug it, it lacks clarity imho, and that is the reason i did that test to begin with.
I also tried to run the test with the race detector, bad news, it appears your code also contains a datarace
$ go test -race -bench=. -benchmem
goos: linux
goarch: amd64
pkg: test/concur
BenchmarkMe-4 382 3095423 ns/op 8830 B/op 204 allocs/op
BenchmarkOrig-4 ==================
WARNING: DATA RACE
Read at 0x00c0003fe020 by goroutine 84:
test/concur/orig.Process.func1.2()
/home/mh-cbon/gow/src/test/concur/orig/lib.go:87 +0x138
Previous write at 0x00c0003fe020 by goroutine 49:
test/concur/orig.Process.func1.1()
/home/mh-cbon/gow/src/test/concur/orig/lib.go:69 +0x3b2
Goroutine 84 (running) created at:
test/concur/orig.Process.func1()
/home/mh-cbon/gow/src/test/concur/orig/lib.go:82 +0x234
Goroutine 49 (running) created at:
test/concur/orig.Process.func1()
/home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x1ba
==================
==================
WARNING: DATA RACE
Read at 0x00c000018108 by goroutine 34:
internal/race.Read()
/home/mh-cbon/.gvm/gos/go1.15.2/src/internal/race/race.go:37 +0x206
sync.(*WaitGroup).Add()
/home/mh-cbon/.gvm/gos/go1.15.2/src/sync/waitgroup.go:71 +0x219
test/concur/orig.Process.func1.2()
/home/mh-cbon/gow/src/test/concur/orig/lib.go:84 +0x84
Previous write at 0x00c000018108 by goroutine 109:
internal/race.Write()
/home/mh-cbon/.gvm/gos/go1.15.2/src/internal/race/race.go:41 +0x125
sync.(*WaitGroup).Wait()
/home/mh-cbon/.gvm/gos/go1.15.2/src/sync/waitgroup.go:128 +0x126
test/concur/orig.Process.func1.2()
/home/mh-cbon/gow/src/test/concur/orig/lib.go:90 +0x186
Goroutine 34 (running) created at:
test/concur/orig.Process.func1()
/home/mh-cbon/gow/src/test/concur/orig/lib.go:82 +0x234
Goroutine 109 (running) created at:
test/concur/orig.Process.func1()
/home/mh-cbon/gow/src/test/concur/orig/lib.go:82 +0x234
==================
^Csignal: interrupt
FAIL test/concur 172.097s
Maybe that has to do with the fact i commented the sleep instructions, idk.
for completeness,
- i run go1.15.something/probably latest
- the code i compared with is available at https://play.golang.org/p/2o4W_BgaC4t though, not really tested, i was just experiencing.