A lightweight job scheduler based on priority queue with timeout, retry, replica, context cancellation and easy semantics for job chaining. Build for golang web apps.

Table of Contents

  1. Introduction
    1. What is RIO?
    2. Concern
      1. An asynchronous job processor
      2. Easy management of these goroutines and chaining them

Introduction

Tag Go Build Coverage license godoc

Go Report Card Code Inspector Badge Code Inspector Report Card Codacy Badge

What is RIO?

Rio is a lightweight job scheduler and job chaining library. Its mainly build for Golang web apps, but it can be very easily mold to serve any application needing job scheduling. The library is an asynchronous job processor, which makes all the backend calls asynchronously with retry, timeout and context cancellation functionality. It also provides very easy semantics to join multiple datasources based on their output and input types, at the same time having no coupling between the datasources. This helps in creating new apis or resolvers for GraphQL apis a breeze.

Concern

Many times we write web apps which connects to different data sources, combines the data obtained from these sources and then do some more jobs. During these process, we do a lot of boilerplate to transform one data type to other. Also in the absense of a proper job scheduler, we create goroutines abruptly and without proper management. These create unmanagable code. To update those code is even more hard in future, when there is a new team member in the team.

Rio tries to solve this problem by introducing two concepts.

An asynchronous job processor

This is the piece which runs the multiple jobs asynchronously (Based on the Rob Pike video: Google I/O 2010). It has a priority queue(balancer.go and pool.go) which hands off incoming requests to a set of managed workers. The balancer hands off new job to the lightly loaded worker.

Easy management of these goroutines and chaining them

How many times do we do this:

call service 1 in goroutine 1
wait and get response from goroutine 1
call service 2 in goroutine 2, taking piece of data from service call 1
wait and get response from goroutine 2
call service 3 in goroutine 3, taking piece of data from service call 3
wait and get response from goroutine 3

You get the idea, this only delays thing more and does a lot of context switching. Rio helps in this, by chaining multiple calls together by means of using closures and function types and runs in one goroutine.

Now many can think is it not going to be slower compared to doing multiple goroutine calls. I think not, it will be faster. Think of the previous example. If you do not get response from service 1, can you invoke service 2, or if service 2 fails, can you call service 3? No right, as there is data dependency between these calls.

Rio chains dependent jobs together by introducing this pattern.

request := context,
          (<callback of service 1>.WithTimeOut(100 ms).WithRetry(3))
          .FollowedBy(<function which can transform data from service 1 response to request or partial request of 2>,
                      <callback of service 2>)
          .FollowedBy(<function which can transform data from service 2 response to request or partial request of 3>,
                                  <callback of service 3>)

In the example in examples/web.go the chaining pattern looks like this:

request := rio.BuildRequests(context.Background(),
      rio.NewFutureTask(callback1).WithMilliSecondTimeout(10).WithRetry(3), 2).
      FollowedBy(Call1ToCall2, rio.NewFutureTask(callback2).WithMilliSecondTimeout(20))

Once the chaining is done, post the job to load balancer

balancer.PostJob(request)
<-request.CompletedChannel

Once the call chain happens, the request comes back with responses for all these calls in a slice and you can do this

  1. Only one job response

    request.GetOnlyResponse()
    

    or

  2. Multiple job response

    request.GetResponse(index) ---0,1,2
    

    If any job fails, the response will be empty response, specifically rio.EMPTY_CALLBACK_RESPONSE

Owner
Comments
  • Add replica calls to services and pick the correct one based on some condition

    Add replica calls to services and pick the correct one based on some condition

    Think about a backend having 3 data centers, and all these data centers are no responding uniformly, some is slow and some are fast.

    or

    There are multiple datacenters having sync issues. Some data center is updated first and some are later.

    Now during this time, if a backend call is made to the slow responding dc or the dc having stale data, the obtained response will be of higher response time or be faulty respectively.

    Replica calls can help in this context. If the backend is allowing, Rio should be able to select the one response based on some condition, maybe response time based or some data quality based.

  • BuildSingleRequest has nil CompletedChannel

    BuildSingleRequest has nil CompletedChannel

    func BuildRequests(context context.Context, task *FutureTask, size int) *Request {
    
    	tasks := make([]*FutureTask, 0, size)
    	tasks = append(tasks, task)
    	return &Request{Ctx: context, Tasks: tasks, TaskCount: size, CompletedChannel: make(chan bool)}
    }
    

    compared to

    func BuildSingleRequest(context context.Context, task *FutureTask) *Request {
    	tasks := make([]*FutureTask, 1)
    	tasks = append(tasks, task)
    	return &Request{Ctx: context, Tasks: tasks}
    }
    
  • The BridgeConnection type has to be a struct with error

    The BridgeConnection type has to be a struct with error

    Currently, the BridgeConnection type is like this:

    type BridgeConnection chan interface{}

    It would have to be type BridgeConnection struct{ Data chan interface{} Error error }

    It will help in managing the errors well between chained tasks

  • Change the worker implementation to be buffered

    Change the worker implementation to be buffered

    Currently, the workers are unbuffered. Any new task comes in, it will be started in a separate goroutine, so we are not really using the priority queue properly and there are stray goroutines in the background. If the system crashes, there will be many goroutines that will fail per worker. It the worker requests channel is buffered, we will have fewer failed tasks.

    w := &Worker{ requests: make(chan *Request), pending: 0, index: i, Name: fmt.Sprintf("Worker-%d", i), done: b.done}

  • Support for single call followed by multiple parallel calls

    Support for single call followed by multiple parallel calls

    It would be cool to have a fanout of dependent calls based on one parent call response. Like this:

    Call 1 -->Bridge data ==> Call 2 & Call 3 in parallel

  • Add default worker wait time to abandon request, via configuration

    Add default worker wait time to abandon request, via configuration

    If

    • timeout is unset
    • Context is not cancelled
    • Network call is not responding The worker should abandon the request and continue, closing the request CompletedChannel with the error, request abandoned
  • Write test case for checking the count of goroutines before and after the test

    Write test case for checking the count of goroutines before and after the test

    We need to write test cases for calculating the resource utilization and if there are any unaccounted goroutines leaks. Use the following package to count the number of goroutines.

    runtime.NumGoroutine()

  • Write proper test case for the pool

    Write proper test case for the pool

    We have to write proper test case for the pool.go file. It's a priority queue based on the heap interface. The existing pool_test.go file does not have enough test case to test it on load.

  • Job posting to balancer and waiting for response should be abstracted in a single method call

    Job posting to balancer and waiting for response should be abstracted in a single method call

    Currently, we post new job to balancer and wait on the request CompletedChannel from the user side, like this:

    balancer.PostJob(request) <-request.CompletedChannel

    We should merge this together and have one method, which will either:

    • Fail immediately if the request validation fails
    • Block for the whole duration of the job, meaning the method should do the wait on the CompletedChannel. The reason for doing it, if any user of the api forgets to wait on the channel, the worker for that request will block indefinitely and there will be less workers in balancer. Balancer unknowingly will queue new work for that worker and when the buffer fills up, it will block also.
Machinery is an asynchronous task queue/job queue based on distributed message passing.
Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery Machinery is an asynchronous task queue/job queue based on distributed message passing. V2 Experiment First Steps Configuration Lock Broker

Dec 24, 2022
Crane scheduler is a Kubernetes scheduler which can schedule pod based on actual node load.

Crane-scheduler Overview Crane-scheduler is a collection of scheduler plugins based on scheduler framework, including: Dynamic scheuler: a load-aware

Dec 29, 2022
Scheduler: the scheduler of distbuild written in Golang

scheduler Introduction scheduler is the scheduler of distbuild written in Go. Pr

Feb 9, 2022
nano-gpu-scheduler is a Kubernetes scheduler extender for GPU resources scheduling.
nano-gpu-scheduler is a Kubernetes scheduler extender for GPU resources scheduling.

Nano GPU Scheduler About This Project With the continuous evolution of cloud native AI scenarios, more and more users run AI tasks on Kubernetes, whic

Dec 29, 2022
Statefulset-scheduler (aka sfs-scheduler)

statefulset-scheduler (aka sfs-scheduler) Installation I already upload docker i

Dec 19, 2021
Scheduler - Scheduler package is a zero-dependency scheduling library for Go

Scheduler Scheduler package is a zero-dependency scheduling library for Go Insta

Jan 14, 2022
Linstor-scheduler-extender - LINSTOR scheduler extender plugin for Kubernetes

linstor-scheduler-extender LINSTOR scheduler extender plugin for Kubernetes whic

Dec 30, 2022
goInterLock is golang job/task scheduler with distributed locking mechanism (by Using Redis🔒).
goInterLock is golang job/task scheduler with distributed locking mechanism (by Using Redis🔒).

goInterLock is golang job/task scheduler with distributed locking mechanism. In distributed system locking is preventing task been executed in every instant that has the scheduler,

Dec 5, 2022
A simple job scheduler backed by Postgres.

A simple job scheduler backed by Postgres used in production at https://operand.ai. Setup needs two environment variables, SECRET and ENDPOINT. The se

Sep 10, 2022
a self terminating concurrent job queue for indeterminate workloads in golang

jobtracker - a self terminating concurrent job queue for indeterminate workloads in golang This library is primarily useful for technically-recursive

Sep 6, 2022
Package tasks is an easy to use in-process scheduler for recurring tasks in Go

Tasks Package tasks is an easy to use in-process scheduler for recurring tasks in Go. Tasks is focused on high frequency tasks that run quick, and oft

Dec 18, 2022
A zero-dependencies and lightweight go library for job scheduling

A zero-dependencies and lightweight go library for job scheduling.

Aug 3, 2022
Tasqueue is a simple, lightweight distributed job/worker implementation in Go

Tasqueue Tasqueue is a simple, lightweight distributed job/worker implementation in Go Concepts tasqueue.Broker is a generic interface that provides m

Dec 24, 2022
Job scheduling made easy.

scheduler Job scheduling made easy. Scheduler allows you to schedule recurrent jobs with an easy-to-read syntax. Inspired by the article Rethinking Cr

Dec 30, 2022
cpuworker - A Customized Goroutine Scheduler over Golang Runtime
cpuworker - A Customized Goroutine Scheduler over Golang Runtime

cpuworker Status Working in process. Run the Demo Make sure the GOMAXPROCS is bigger than 1 and there is at least GOMAXPROCS physical OS threads avail

Dec 6, 2022
Scheduler CRUD For Golang
Scheduler CRUD For Golang

scheduler-CRUD 從dbdiagram.io建立table與create語法 在mysql建立table 以sqlc建CRUD function與Transaction 加入viper讀環境變量 以gin開發REST API 加入Dockerfile docker build -t th

Feb 10, 2022
Chrono is a scheduler library that lets you run your task and code periodically
Chrono is a scheduler library that lets you run your task and code periodically

Chrono is a scheduler library that lets you run your tasks and code periodically. It provides different scheduling functionalities to make it easier t

Dec 26, 2022
Chadburn is a scheduler alternative to cron, built on Go and designed for Docker environments.

Chadburn - a job scheduler Chadburn is a modern and low footprint job scheduler for docker environments, written in Go. Chadburn aims to be a replacem

Dec 6, 2022
GPU Sharing Scheduler for Kubernetes Cluster
GPU Sharing Scheduler for Kubernetes Cluster

GPU Sharing Scheduler Extender in Kubernetes Overview More and more data scientists run their Nvidia GPU based inference tasks on Kubernetes. Some of

Jan 6, 2023