OpenDILab RL Kubernetes Custom Resource and Operator Lib

Build Releases

DI Orchestrator

DI Orchestrator is designed to manage DI (Decision Intelligence) jobs using Kubernetes Custom Resource and Operator.


  • A well-prepared kubernetes cluster. Follow the instructions to create a kubernetes cluster, or create a local kubernetes node referring to kind or minikube
  • Cert-manager. Installation on kubernetes please refer to cert-manager docs. Or you can install it by the following command.
kubectl create -f ./config/certmanager/cert-manager.yaml

Install DI Orchestrator

DI Orchestrator consists of three components: di-operator, di-webhook and di-server. Install them with the following command.

kubectl create -f ./config/di-manager.yaml

di-operator, di-webhook and di-server will be installed in di-system namespace.

$ kubectl get pod -n di-system
NAME                               READY   STATUS    RESTARTS   AGE
di-operator-57cc65d5c9-5vnvn       1/1     Running   0          59s
di-server-7b86ff8df4-jfgmp         1/1     Running   0          59s
di-webhook-45jgi23fhc-9yght        1/1     Running   0          59s

Install AggregatorConfig

Since all DIJobs share the same configuration of aggregator, we define aggregator template in AggregatorConfig. Install AggregatorConfig with the following command:

kubectl create -f config/samples/agconfig.yaml -n di-system

Submit DIJob

# submit DIJob
$ kubectl create -f config/samples/dijob-cartpole.yaml

# get pod and you will see coordinator is created by di-operator
# a few seconds later, you will see collectors and learners created by di-server
$ kubectl get pod

# get logs of coordinator
$ kubectl logs cartpole-dqn-coordinator

User Guide

Refers to user-guide. For Chinese version, please refer to 中文手册


Refers to developer-guide.

Contact us throw [email protected]

Open sourced Decision Intelligence (DI), powered by SenseTime X-Lab & Shanghai AI Lab
  • 在 Pod 内增加集群信息

    在 Pod 内增加集群信息

    希望以 dijob replica 方式提交时,每个 pod 都能见到整个 replica 的 host 信息和自己的启动顺序,增加以下几个环境变量:

    1. replica 中所有 pod 的 FQDN,依据启动顺序排序
    2. 当前 pod 的 FQDN
    3. 当前 pod 的顺序编号

    DI-engine 中会根据这些变量实现对应的网络连接,attach-to 的生成逻辑可以从 di-orchestrator 中移除

  • add tasks to dijob spec

    add tasks to dijob spec

    1. goal

    There is only one pod template defined in a dijob, which results in that we can not define different commands or resources for different componets of di-engine such as collector, learner and evaluator. So we are supposed to find a more general way to define a custom resource of dijob.

    2. design *

    Inspired by VolcanoJob, we define the spec.tasks to describe different componets of di-engine. spec.tasks is a list, which allows us to define multiple tasks. We can specify different task.type to label the task as one of collector, learner, evaluator and none. none means the task is a general task, which is the default value.

    After change, the dijob can be defined as follow:

    kind: DIJob
      name: job-with-tasks
      priority: "normal"  # job priority, which is a reserved field for allocator
      backoffLimit: 0  # restart count
      cleanPodPolicy: "Running"  # the policy to clean pods after job completion
      preemptible: false  # job is preemtible or not
      minReplicas: 2  
      maxReplicas: 5
      - replicas: 1
        name: "learner"
        type: learner
            name: di
            - image:
              imagePullPolicy: IfNotPresent
              name: pydi
              - name: NCCL_DEBUG
                value: "INFO"
              command: ["/bin/bash", "-c",]
              - |
                ditask --label learner xxx
                  cpu: "1"
            restartPolicy: Never
      - replicas: 1
        name: "evaluator"
        type: evaluator
            name: di
            - image:
              imagePullPolicy: IfNotPresent
              name: pydi
              - name: NCCL_DEBUG
                value: "INFO"
              command: ["/bin/bash", "-c",]
              - |
                ditask --label evaluator xxx
            restartPolicy: Never
      - replicas: 2
        name: "collector"
        type: collector
            name: di
            - image:
              imagePullPolicy: IfNotPresent
              name: pydi
              - name: NCCL_DEBUG
                value: "INFO"
              command: ["/bin/bash", "-c",]
              - |
                ditask --label collector xxx
            restartPolicy: Never
      - lastTransitionTime: "2022-05-26T07:25:11Z"
        lastUpdateTime: "2022-05-26T07:25:11Z"
        message: job created.
        reason: JobPending
        status: "False"
        type: Pending
      - lastTransitionTime: "2022-05-26T07:25:11Z"
        lastUpdateTime: "2022-05-26T07:25:11Z"
        message: job is starting since all pods are created.
        reason: JobStarting
        status: "False"
        type: Starting
      phase: Starting
      profilings: {}
      readyReplicas: 0
      replicas: 4
          Pending: 1
          Pending: 1
          Pending: 2
      reschedules: 0
      restarts: 0

    task definition:

    type Task struct {
    	Name string `json:"name,omitempty"`
    	Type TaskType `json:"type,omitempty"`
    	Replicas int32 `json:"replicas,omitempty"`
    	Template corev1.PodTemplateSpec `json:"template,omitempty"`
    type TaskType string
    const (
    	TaskTypeLearner TaskType = "learner"
    	TaskTypeCollector TaskType = "collector"
    	TaskTypeEvaluator TaskType = "evaluator"
    	TaskTypeNone TaskType = "none"

    status.taskStatus definition:

    type DIJobStatus struct {
      // Phase defines the observed phase of the job
      // +kubebuilder:default=Pending
      Phase Phase `json:"phase,omitempty"`
      // ...
      // map for different task statuses. key:, value: TaskStatus
      TaskStatus map[string]TaskStatus
      // ...
    // count of different pod phases
    type TaskStatus map[corev1.PodPhase]int32
  • new version for di-engine new architecture

    new version for di-engine new architecture

    release notes


    • v1.0.0 for DI-engine new architecture
    • remove webhook
    • manage commands with cobra
    • refactor orchestrator architecture inspired from adaptdl
    • use gin to rewrite di-server
    • update di-server http interface
  • v0.2.0


    • [x] split webhook and operator
    • [x] add
    • [x] update CleanPolicyALL to CleanPolicyAll
    • [x] remove k8s service related operations from server, and operator is responsible for managing services
    • [x] add e2e test
  • refactor job spec

    refactor job spec

    • refactor job spec definition and add spec.tasks to support multi tasks #20
    • add DI_RANK to pod env and remove engineFields in job.spec #16
    • add e2e test
    • add validator to validate the correctness of dijob spec
    • change job.phase to Pending when job replicas scaled to 0
    • implement a processor to process di-server requests
    • refactor project structure
  • Release/v1.0


  • fix: job failed submit when collector/learner missed

    fix: job failed submit when collector/learner missed

    job failed submit when collector/learner missed because webhook create an empty dijob, and golang builder add some default value to some feilds of collector/learner, which result in invalid type error. solved by make coordinator/collector/learner as pointers.

  • Feat/job create event

    Feat/job create event

    • add event handler for dijob, and mark job as Created when job submitted
    • mark collector and learner as optional, only coordinator is required(
    • mark job Failed when the submitted job is incorrect(, but it's hard to test since client-go reflector decodes DIJob strictly, we have no chance to handle DIJob add event when incorrect job submitted
    • version -> v0.2.1
  • allocate的一些问题


    1.目前的allocator的逻辑,对于不可被抢占的job的初始分配,仅利用minreplicas修改replicas属性,那job的pods部署到哪个节点是完全由K8S决定吗?而且Release1.13代码的allocator.go中对不可被抢占job的初始分配部分貌似还没有写。 2.job是否可以被抢占的含义具体是什么?和是否能被调度是不是等价的? 3.调度策略的FitPolicy的Allocate和Optimize方法也没有进行实现,这部分内容什么时候可以补充? 4.文档中存在许多与最新代码不符合的地方,比如DIJob.Spec.Group属性在代码中已经被移除,文档中提到的job.spec.minreplicas属性代码中也没有,而是在JobInfo中。可以更新一下文档吗? 感谢!

