Is there a way to stop jobs in mid-execution?
I switched workerpoolxt to use context but the job still runs even though the context has been cancelled... I'm not really sure how to fix this, or if it is even possible.
I have created a POC that reproduces this issue (code below) which you can also view/run on The Go Playground
Any help would be greatly appreciated!!
0 from job a
1 from job a
Job 'a' should have stopped here
2 from job a
3 from job a
4 from job a
[{a context deadline exceeded <nil>} {b <nil> from b}]
0 from job a
1 from job a
Job 'a' should have stopped here
[{a context deadline exceeded <nil>} {b <nil> from b}]
POC Code:
package main
import (
"context"
"fmt"
"time"
"github.com/gammazero/workerpool"
)
func main() {
runner := newRunner(context.Background(), 10)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second*3))
defer cancel()
runner.do(job{
Name: "a",
Context: ctx,
Task: func() jobResult {
for i := 0; i < 10000; i++ {
time.Sleep(time.Second * 1)
fmt.Println(i, "from job a")
}
return jobResult{Data: "from a"}
},
})
runner.do(job{
Name: "b",
Task: func() jobResult {
time.Sleep(time.Duration(time.Second * 6))
return jobResult{Data: "from b"}
},
})
results := runner.getjobResults()
fmt.Println(results)
}
type runner struct {
*workerpool.WorkerPool
defaultCtx context.Context
kill chan struct{}
result chan jobResult
results []jobResult
}
func (r *runner) processResults() {
for {
select {
case res, ok := <-r.result:
if !ok {
goto Done
}
r.results = append(r.results, res)
}
}
Done:
<-r.kill
}
func newRunner(ctx context.Context, numRunners int) *runner {
r := &runner{
WorkerPool: workerpool.New(numRunners),
kill: make(chan struct{}),
result: make(chan jobResult),
defaultCtx: ctx,
}
go r.processResults()
return r
}
func (r *runner) do(j job) {
r.Submit(r.wrap(&j))
}
func (r *runner) getjobResults() []jobResult {
r.StopWait()
close(r.result)
r.kill <- struct{}{}
return r.results
}
func (r *runner) wrap(job *job) func() {
return func() {
if job.Context == nil {
job.Context = r.defaultCtx
}
job.childContext, job.done = context.WithCancel(job.Context)
job.result = make(chan jobResult)
go job.Run()
r.result <- job.getResult()
}
}
type job struct {
Name string
Task func() jobResult
Context context.Context
result chan jobResult
childContext context.Context
stopped chan struct{}
done context.CancelFunc
}
func (j *job) Run() {
result := j.Task()
result.name = j.Name
j.result <- result
j.done()
}
func (j *job) getResult() jobResult {
select {
case r := <-j.result:
return r
case <-j.childContext.Done():
fmt.Printf("Job '%s' should have stopped here\n", j.Name)
switch j.childContext.Err() {
default:
return jobResult{name: j.Name, Error: j.childContext.Err()}
}
}
}
type jobResult struct {
name string
Error error
Data interface{}
}