Job queuing service and async task runner.

Valet

CI codecov Go Report Card Go Reference License

Stateless Go server responsible for running tasks asynchronously and concurrently.

Overview

At its core, valet is a simple job queuing system and an asynchronous task runner. A task is a user-defined func that is run as a callback by the service.

Job

The implementation uses the notion of job, which describes the work that needs to be done and carries information about the task that will run for the specific job. User-defined tasks are assigned to jobs. Every job can be assigned with a different task, a JSON payload with the data required for the task to be executed, and an optional timeout interval. Jobs can be scheduled to run at a specified time or instantly.

After the tasks have been executed, their results along with the errors (if any) are stored into a storage system.

Pipeline

A pipeline is a sequence of jobs that need to be executed in a specified order, one by one. Every job in the pipeline can be assigned with a different task and parameters, and each task callback can optionally use the results of the previous task in the job sequence. A pipeline can also be scheduled to run sometime in the future, or immediately.

Architecture

Your callback functions can live in any repo and should be registered to your own build of valet. To unlock this level of flexibility, the service is provided as a Go pkg rather than a cmd, enabling the task registration before building the executable.

Internally, the service consists of the following components.

  • Server - Exposes a RESTful API to enable communication with external services.
  • Job queue - A FIFO queue that supports the job queuing mechanism of the service.
  • Scheduler - Responsible for dispatching the jobs from the job queue to the worker pool and of course for scheduling the tasks for future execution.
  • Worker pool - A number of available go-routines, responsible for the concurrent execution of the jobs.
  • Storage - The storage system where jobs and their results persist.

The design intends to follow the hexagonal architecture pattern and to support modularity and extendability.

So far, valet provides the following interfaces and can be configured accordingly to function with any of the listed technologies.

API

Storage

  • In-memory key-value storage.
  • MySQL
  • PostgreSQL
  • Redis

Job queue

  • In-memory job queue.
  • RabbitMQ
  • Redis

Installation

  1. Download the pkg.
go get github.com/svaloumas/valet
  1. Define your own task functions in your repo by implementing the type func(...interface{}) (interface{}, error).
package somepkg

import (
	"github.com/svaloumas/valet"
)

// DummyParams is an example of a task params structure.
type DummyParams struct {
	URL string `json:"url,omitempty"`
}

// DummyTask is a dummy task callback.
func DummyTask(args ...interface{}) (interface{}, error) {
	dummyParams := &DummyParams{}
	var previousResultsMetadata string
	valet.DecodeTaskParams(args, dummyParams)
	valet.DecodePreviousJobResults(args, &resultsMetadata) // Applies to pipelines.

	metadata, err := downloadContent(params.URL)
	if err != nil {
		return nil, err
	}
	return metadata, nil
}

func downloadContent(URL string) (string, error) {
	return "some metadata", nil
}

args[0] holds the task parameters as they were given through the API call for the job/pipeline creation. args[1] holds the results of the previous task only in case of a pipeline execution. Prefer to safely access the arguments by using valet.DecodeTaskParams and valet.DecodePreviousJobResults to decode them into your custom task param structs.

  1. Copy config.yaml from the repo and set a configuration according to your needs.

  2. Initialize valet in a main function under your repo, register your tasks to the service and run it.

mkdir -p cmd/valetd/
touch cmd/valetd/main.go
// cmd/valetd/main.go
package main

import (
	"github.com/svaloumas/valet"
	"path/to/somepkg"
)

func main() {
	v := valet.New("/path/to/config.yaml")
	v.RegisterTask("mytask", somepkg.DummyTask)
	v.Run()
}
  1. Build and run the service.
  • To run the service and its dependencies as Docker containers, use the Dockerfile, docker-compose and Makefile files provided.

    docker-compose up -d --build
  • Build and run the service as a standalone binary.

    Optionally set the corresponding environment variables depending on your configuration options.

    export POSTGRES_DSN=
    export RABBITMQ_URI=
    export MYSQL_DSN=
    export REDIS_URL=
    go build -o valetd cmd/valted/*.go
    ./valetd
    

Configuration

All configuration is set through config.yaml, which lives under the project's root directory.

# Server config section
server:
  protocol: http                    # string - options: http, grpc
  http:
    port: 8080                      # int
  grpc:
    port: 50051                     # int
# Job queue config section
job_queue:
  option: rabbitmq                  # string - options: memory, rabbitmq, redis
  memory_job_queue:
    capacity: 100                   # int
  rabbitmq:
    queue_params:
      queue_name: job               # string
      durable: false                # boolean
      deleted_when_unused: false    # boolean
      exclusive: false              # boolean
      no_wait: false                # boolean
    consume_params:
      name: rabbitmq-consumer       # string
      auto_ack: true                # boolean
      exclusive: false              # boolean
      no_local: false               # boolean
      no_wait: false                # boolean
    publish_params:
      exchange:                     # string
      routing_key: job              # string
      mandatory: false              # boolean
      immediate: false              # boolean
  redis:
    key_prefix: valet               # string
    min_idle_conns: 10              # int
    pool_size: 10                   # int
# Workerpool config section
worker_pool:
  workers: 4                        # int
  queue_capacity: 4                 # int
# Scheduler config section
scheduler:
  job_queue_polling_interval: 5     # int
  storage_polling_interval: 60      # int
# Storage config section
storage:
  option: memory                    # string - options:  memory, mysql, postgres, redis
  mysql:
    connection_max_lifetime:        # int
    max_idle_connections:           # int
    max_open_connections:           # int
  postgres:
    connection_max_lifetime:        # int
    max_idle_connections:           # int
    max_open_connections:           # int
  redis:
    key_prefix: valet               # string
    min_idle_conns: 10              # int
    pool_size: 10                   # int
# Global config section
timeout_unit: second                # string - options: millisecond, second
logging_format: text                # string - options: text, json

Secrets

Currently, the secrets depending on the configuration are the following: MySQL DSN, PostgreSQL DSN, Redis URL and the RabbitMQ URI. Each can be provided as an environment variable. Alternatively, if you choose to use the provided Docker compose files, you can create the corresponding Docker secrets.

Env variable Docker secret
MYSQL_DSN valet-mysql-dsn
POSTGRES_DSN valet-postgres-dsn
REDIS_URL valet-redis-url
RABBITMQ_URI valet-rabbitmq-uri

Usage

Create a new job by making a POST HTTP call to /jobs or via gRPC to job.Job.Create service method. You can inject arbitrary parameters for your task to run by including them in the request body.

{
    "name": "a job",
    "description": "what this job is all about, but briefly",
    "task_name": "dummytask",
    "task_params": {
        "url": "www.some-fake-url.com"
    },
    "timeout": 10
}

To schedule a new job to run at a specific time, add run_at field to the request body.

{
    "name": "a scheduled job",
    "description": "what this scheduled job is all about, but briefly",
    "task_name": "dummytask",
    "run_at": "2022-06-06T15:04:05.999",
    "task_params": {
        "url": "www.some-fake-url.com"
    },
    "timeout": 10
}

Create a new pipeline by making a POST HTTP call to /pipelines or via gRPC to pipeline.Pipeline.Create service method. You can inject arbitrary parameters for your tasks to run by including them in the request body. Optionally, you can tune your tasks to use any results of the previous task in the pipeline, creating a bash-like command pipeline. Pipelines can also be scheduled for execution at a specific time, by adding run_at field to the request payload just like it's done with the jobs.

{
    "name": "a scheduled pipeline",
    "description": "what this pipeline is all about",
    "run_at": "2022-06-06T15:04:05.999",
    "jobs": [
        {
            "name": "the first job",
            "description": "some job description",
            "task_name": "dummytask",
            "task_params": {
                "url": "www.some-fake-url.com"
            }
        },
        {
            "name": "a second job",
            "description": "some job description",
            "task_name": "anothertask",
            "task_params": {
                "url": "www.some-fake-url.com"
            },
            "use_previous_results": true,
            "timeout": 10
        },
        {
            "name": "the last job",
            "description": "some job description",
            "task_name": "dummytask",
            "task_params": {
                "url": "www.some-fake-url.com"
            },
            "use_previous_results": true
        }
    ]
}

Tests

Run the complete test suite.

docker-compose up -d
make test
Owner
Similar Resources

Run vscode task in command line

vstask Run vscode task in command line install go install "github.com/ttttmr/vstask" usage NAME: vstask - Run vscode task in command line USAGE:

Jul 1, 2022

Prismforce-task - Assignment to test fundamental knowledge for Prism Force

Technical Assignment Assignment to test fundamental knowledge for Prism Force. T

Feb 15, 2022

Self-service account creation and credential reset for FreeIPA

Self-service account creation and credential reset for FreeIPA

Auri Auri stands for: Automated User Registration IPA Auri implements self service account creation and reset of credentials for FreeIPA Features Requ

Dec 21, 2022

A cross platform desktop service that watches custom folders for file changes and updates the corresponding database in Notion.

A cross platform desktop service that watches custom folders for file changes and updates the corresponding database in Notion. Perfect for tracking reading lists

Mar 12, 2022

A pay later service to allow our users to buy goods from a merchant now, and then allow them to pay for those goods at a later date.

simple-pay-later A pay later service to allow our users to buy goods from a merchant now, and then allow them to pay for those goods at a later date.

Dec 11, 2021

A Simple Anonym FileSharing Service which is able to be render in the Browser and in an Terminal-Client

FileSharingService A Simple Anonym FileSharing Service which is able to be render in the Browser and in an Terminal-Client How to use Take a look in t

Dec 25, 2022

Dogecoin GigaWallet is a backend service for creating payment platforms, tipping bots, dogecoin exchanges and more.

Dogecoin GigaWallet is a backend service for creating payment platforms, tipping bots, dogecoin exchanges and more.

The Dogecoin GigaWallet is a backend service which provides a convenient integration API for platforms such as online shops, exchanges, social media p

Nov 3, 2022

Go package providing opinionated tools and methods for working with the `aws-sdk-go/service/cloudfront` package.

go-aws-cloudfront Go package providing opinionated tools and methods for working with the aws-sdk-go/service/cloudfront package. Documentation Tools $

Feb 2, 2022

Service that wrap up different movies-related APIs like IMDB and match it to streaming services

Service that wrap up different movies-related APIs like IMDB and match it to streaming services

Service that wrap up different movies-related APIs like IMDB and match it to streaming services. That way you can check in which platforms you can find your favorite movies.

Feb 10, 2022
Comments
  • feat: add pipelines

    feat: add pipelines

    • Adds support for pipeline execution
    • Adds adapters for MySQL, PostgreSQL, Redis, MemoryDB, HTTP, gRPC for pipeline functionality
    • Updates documentation (swagger & README)
    • Splits the Exec functions in two separate ones, one for the job execution and one for the pipeline.
    • Fixes bugs in worksrv (closing channels in case of an early exit, etc.)
    • Adds missing jobresult proto files
  • feat: add Kafka job queue implementation

    feat: add Kafka job queue implementation

    Is your feature request related to a problem? Please describe. Currently, there are three concrete implementations of the JobQueue interface (in-memory, RabbitMQ, and Redis).

    Describe the solution you'd like It would be nice to extend this flexibility by adding a Kafka FIFO job queue implementation.

    Additional context The interface to be implemented is the following.

    // JobQueue represents a driven actor queue interface.
    type JobQueue interface {
    	// Push adds a job to the queue.
    	Push(j *domain.Job) error
    
    	// Pop removes and returns the head job from the queue.
    	Pop() *domain.Job
    
    	// CheckHealth checks if the job queue is alive.
    	CheckHealth() bool
    
    	// Close liberates the bound resources of the job queue.
    	Close()
    }
    

    The config and the job queue factory method should also be updated to support the new functionality. README should also be updated accordingly.

My Simple Instagram-Clone API task submission, conducted by Appointy for internship shortlisting.

go-pointy Go-Pointy is a simple Instagram API Clone, made using GoLang. I had tried my best to not be lazy and finish the tasks, as a beginner to the

Jul 25, 2022
The task given by Appointy completed to develop APIs for a basic version of Instagram.

✨ Instagram APIs ✨ The task given by Appointy completed to develop APIs for a basic version of Instagram. Create an User Get User by Id Create a Post

Oct 9, 2021
The task is to develop a basic version of a Instagram.
The task is to develop a basic version of a Instagram.

Golang-api-task Developer: Mukka Deepak The task is to develop a basic version of aInstagram. You are only required to develop the API for the system.

Oct 23, 2021
Instagram clone as Appointy task
Instagram clone as Appointy task

Instagram Backend Clone Task for Appointy Internship 2021 Explore the docs » View Demo · Report Bug · Request Feature Table of Contents About the Proj

Oct 11, 2021
ABAG - Go Task

ABAG - Go Task Problem Statement Jimmy owns a garden in which he has planted N trees in a row. After a few years, the trees have grown up and now they

Jul 30, 2022
Recruitment task.

Clients-emails project. Description. Microservice that stores clients and send them emails based on mailing ID. The service uses Go and PostgreSQL. Bo

Nov 24, 2021
A Google interview task my friend told me about.

deriving π given a normal distribution Try it yourself: package main import ( "fmt" "github.com/cpmech/gosl/rnd" ) // normal returns a number acc

Dec 16, 2021
Support variable parameters task send worker

go-worker-pool Support variable parameters task send worker 调用方式 go get github.com/214200196/gwp/v1 import ( "fmt" "github.com/214200196/gwp/v1" "

Dec 16, 2021
Stq - Simple Task Queue with REST API

stq Simple Tasks Queue with HTTP API Environments list: Variable Value BACKEND m

Jan 5, 2022
ABAG - The implementation for the alternating trees problem specified in the task

ABAG - GO task This repo contains the implementation for the alternating trees p

Jan 6, 2022