Develop sample controller step by step

Sample Controller

Spec

  • Group: example.com
  • CR: Foo
  • Version: v1alpha1

Tools

0. Init module

go mod init

1. Define CRD

  1. Create a directory.

    mkdir -p pkg/apis/example.com/v1alpha1
    
  2. Create pkg/apis/example.com/v1alpha1/doc.go.

    // +k8s:deepcopy-gen=package
    // +groupName=example.com
    
    package v1alpha1
  3. Create pkg/apis/example.com/v1alpha1/types.go.

    package v1alpha1
    
    import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    
    // These const variables are used in our custom controller.
    const (
        GroupName string = "example.com"
        Kind      string = "Foo"
        Version   string = "v1alpha1"
        Plural    string = "foos"
        Singluar  string = "foo"
        ShortName string = "foo"
        Name      string = Plural + "." + GroupName
    )
    
    // +genclient
    // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
    
    // Foo is a specification for a Foo resource
    type Foo struct {
        metav1.TypeMeta   `json:",inline"`
        metav1.ObjectMeta `json:"metadata,omitempty"`
    
        Spec   FooSpec   `json:"spec"`
        Status FooStatus `json:"status"`
    }
    
    // FooSpec is the spec for a Foo resource
    type FooSpec struct {
        DeploymentName string `json:"deploymentName"`
        Replicas       *int32 `json:"replicas"`
    }
    
    // FooStatus is the status for a Foo resource
    type FooStatus struct {
        AvailableReplicas int32 `json:"availableReplicas"`
    }
    
    // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
    
    // FooList is a list of Foo resources
    type FooList struct {
        metav1.TypeMeta `json:",inline"`
        metav1.ListMeta `json:"metadata"`
    
        Items []Foo `json:"items"`
    }
  4. Create pkg/apis/example.com/v1alpha1/register.go.

    package v1alpha1
    
    import (
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/runtime/schema"
    )
    
    var (
        // SchemeBuilder initializes a scheme builder
        SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
        // AddToScheme is a global function that registers this API group & version to a scheme
        AddToScheme = SchemeBuilder.AddToScheme
    )
    
    // SchemeGroupVersion is group version used to register these objects.
    var SchemeGroupVersion = schema.GroupVersion{
        Group:   GroupName,
        Version: Version,
    }
    
    func Resource(resource string) schema.GroupResource {
        return SchemeGroupVersion.WithResource(resource).GroupResource()
    }
    
    func addKnownTypes(scheme *runtime.Scheme) error {
        scheme.AddKnownTypes(SchemeGroupVersion,
            &Foo{},
            &FooList{},
        )
        metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
        return nil
    }

※ At this point, pkg/apis/example.com/v1alpha1/register.go would have an error cannot use &(FooList literal) (value of type *FooList) as "k8s.io/apimachinery/pkg/runtime".Object value in argument to scheme.AddKnownTypes: missing method DeepCopyObject as DeepCopyObject will be generated in the next step.

2. Generate code

  1. Set execDir env var for code-generator.

    execDir=~/repos/kubernetes/code-generator
    

    If you already cloned, you can specify the directory.

  2. Clone code-generator if you haven't cloned.

    git clone https://github.com/kubernetes/code-generator.git $execDir
    
    generate-groups.sh Usage
    ... the generators comma separated to run (deepcopy,defaulter,client,lister,informer) or "all". the output package name (e.g. github.com/example/project/pkg/generated). the external types dir (e.g. github.com/example/api or github.com/example/project/pkg/apis). the groups and their versions in the format "groupA:v1,v2 groupB:v1 groupC:v2", relative to . ... arbitrary flags passed to all generator binaries. Examples: generate-groups.sh all github.com/example/project/pkg/client github.com/example/project/pkg/apis "foo:v1 bar:v1alpha1,v1beta1" generate-groups.sh deepcopy,client github.com/example/project/pkg/client github.com/example/project/pkg/apis "foo:v1 bar:v1alpha1,v1beta1"">
    "${execDir}"/generate-groups.sh
    Usage: generate-groups.sh 
                   
                    
                     
                     
                       ...
    
      
                      
                        the generators comma separated to run (deepcopy,defaulter,client,lister,informer) or "all". 
                       
                         the output package name (e.g. github.com/example/project/pkg/generated). 
                        
                          the external types dir (e.g. github.com/example/api or github.com/example/project/pkg/apis). 
                         
                           the groups and their versions in the format "groupA:v1,v2 groupB:v1 groupC:v2", relative to 
                          
                           . ... arbitrary flags passed to all generator binaries. Examples: generate-groups.sh all github.com/example/project/pkg/client github.com/example/project/pkg/apis "foo:v1 bar:v1alpha1,v1beta1" generate-groups.sh deepcopy,client github.com/example/project/pkg/client github.com/example/project/pkg/apis "foo:v1 bar:v1alpha1,v1beta1" 
                          
                         
                        
                       
                      
                     
                    
                   
                  
  3. Generate codes (deepcopy, clientset, listers, and informers).

    ※ You need to replace github.com/nakamasato/sample-controller with your package name.

    "${execDir}"/generate-groups.sh all github.com/nakamasato/sample-controller/pkg/client github.com/nakamasato/sample-controller/pkg/apis example.com:v1alpha1 --go-header-file "${execDir}"/hack/boilerplate.go.txt
    
    files
    tree .
    .
    ├── README.md
    ├── go.mod
    ├── go.sum
    └── pkg
        ├── apis
        │   └── example.com
        │       └── v1alpha1
        │           ├── doc.go
        │           ├── register.go
        │           ├── types.go
        │           └── zz_generated.deepcopy.go
        └── client
            ├── clientset
            │   └── versioned
            │       ├── clientset.go
            │       ├── doc.go
            │       ├── fake
            │       │   ├── clientset_generated.go
            │       │   ├── doc.go
            │       │   └── register.go
            │       ├── scheme
            │       │   ├── doc.go
            │       │   └── register.go
            │       └── typed
            │           └── example.com
            │               └── v1alpha1
            │                   ├── doc.go
            │                   ├── example.com_client.go
            │                   ├── fake
            │                   │   ├── doc.go
            │                   │   ├── fake_example.com_client.go
            │                   │   └── fake_foo.go
            │                   ├── foo.go
            │                   └── generated_expansion.go
            ├── informers
            │   └── externalversions
            │       ├── example.com
            │       │   ├── interface.go
            │       │   └── v1alpha1
            │       │       ├── foo.go
            │       │       └── interface.go
            │       ├── factory.go
            │       ├── generic.go
            │       └── internalinterfaces
            │           └── factory_interfaces.go
            └── listers
                └── example.com
                    └── v1alpha1
                        ├── expansion_generated.go
                        └── foo.go
    
    21 directories, 29 files
    
  4. Run go mod tidy.

3. Create CRD yaml file

config/crd/example.com_foos.yaml:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: foos.example.com
spec:
  group: example.com
  names:
    kind: Foo
    listKind: FooList
    plural: foos
    singular: foo
  scope: Namespaced
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          properties:
            apiVersion:
              type: string
            kind:
              type: string
            metadata:
              type: object
            spec:
              type: object
              properties:
                deploymentName:
                  type: string
                replicas:
                  type: integer
                  minimum: 1
                  maximum: 10
            status:
              type: object
              properties:
                availableReplicas:
                  type: integer

※ You can also use controller-gen, which is a subproject of the kubebuilder project, to generate CRD yaml.

4. Checkpoint: Check custom resource and codes

What to check:

  • Create CRD
  • Create CR
  • Read the CR from sample-controller

Steps:

  1. Create main.go to retrieve custom resource Foo.

    main.go
    package main
    
    import (
        "context"
        "flag"
        "fmt"
        "log"
        "path/filepath"
    
        "k8s.io/client-go/tools/clientcmd"
        "k8s.io/client-go/util/homedir"
    
        client "github.com/nakamasato/sample-controller/pkg/client/clientset/versioned"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    )
    
    func main() {
        var kubeconfig *string
    
        if home := homedir.HomeDir(); home != "" {
            kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional)")
        } else {
            kubeconfig = flag.String("kubeconfig", "", "absolute path to kubeconfig file")
        }
        flag.Parse()
    
        config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
        if err != nil {
            log.Printf("Building config from flags, %s", err.Error())
        }
    
        clientset, err := client.NewForConfig(config)
        if err != nil {
            log.Printf("getting client set %s\n", err.Error())
        }
        fmt.Println(clientset)
    
        foos, err := clientset.ExampleV1alpha1().Foos("").List(context.Background(), metav1.ListOptions{})
        if err != nil {
            log.Printf("listing foos %s\n", err.Error())
        }
        fmt.Printf("length of foos is %d\n", len(foos.Items))
    }
  2. Build sample-controller

    go mod tidy
    go build
    
  3. Test sample-controller (main.go).

    1. Register the CRD.

      kubectl apply -f config/crd/foos.yaml
      
    2. Run sample-controller.

      ./sample-controller
      

      Result: no Foo exists

      &{0xc000498d20 0xc00048c480}
      length of foos is 0
      
    3. Create sample foo (custom resource) with config/sample/foo.yaml.

      apiVersion: example.com/v1alpha1
      kind: Foo
      metadata:
      name: foo-sample
      spec:
      deploymentName: foo-sample
      replicas: 1
      kubectl apply -f config/sample/foo.yaml
      
    4. Run the controller again.

      ./sample-controller
      &{0xc000496d20 0xc00048a480}
      length of foos is 1
      
    5. Clean up foo (custom resource).

      kubectl delete -f config/sample/foo.yaml
      

5. Write controller

5.1. Create Controller

  1. Create controller.

    What's inside the controller:

    1. Define Controller struct with sampleclientset, foosLister, foosSynced, and workqueue.
    2. Define NewController function
      1. Create Controller with the arguments sampleclientset and fooInformer, which will be passed in main.go.
      2. Add event handlers for addFunc and DeleteFunc to the informer.
      3. Return the controller.
    3. Define Run, which will be called in main.go.
      1. Wait until the cache is synced.
      2. Run c.worker repeatedly every second until the stop channel is closed.
    4. Define worker: just call processNextItem.
    5. Define processNextItem: always return true for now.
    pkg/controller/foo.go
    package controller
    
    import (
    	"log"
    	"time"
    
    	clientset "github.com/nakamasato/sample-controller/pkg/client/clientset/versioned"
    	informers "github.com/nakamasato/sample-controller/pkg/client/informers/externalversions/example.com/v1alpha1"
    	listers "github.com/nakamasato/sample-controller/pkg/client/listers/example.com/v1alpha1"
    
    	"k8s.io/apimachinery/pkg/util/wait"
    	"k8s.io/client-go/tools/cache"
    	"k8s.io/client-go/util/workqueue"
    )
    
    type Controller struct {
    	// sampleclientset is a clientset for our own API group
    	sampleclientset clientset.Interface
    
    	foosLister listers.FooLister    // lister for foo
    	foosSynced cache.InformerSynced // cache is synced for foo
    
    	// queue
    	workqueue workqueue.RateLimitingInterface
    }
    
    func NewController(sampleclientset clientset.Interface, fooInformer informers.FooInformer) *Controller {
    	controller := &Controller{
    		sampleclientset: sampleclientset,
    		foosSynced:      fooInformer.Informer().HasSynced,
    		foosLister:      fooInformer.Lister(),
    		workqueue:       workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "foo"),
    	}
    
    	fooInformer.Informer().AddEventHandler(
    		cache.ResourceEventHandlerFuncs{
    			AddFunc:    controller.handleAdd,
    			DeleteFunc: controller.handleDelete,
    		},
    	)
    	return controller
    }
    
    func (c *Controller) Run(ch chan struct{}) error {
    	if ok := cache.WaitForCacheSync(ch, c.foosSynced); !ok {
    		log.Printf("cache is not synced")
    	}
    
    	go wait.Until(c.worker, time.Second, ch)
    
    	<-ch
    	return nil
    }
    
    func (c *Controller) worker() {
    	c.processNextItem()
    }
    
    func (c *Controller) processNextItem() bool {
    	return true
    }
    
    func (c *Controller) handleAdd(obj interface{}) {
    	log.Println("handleAdd was called")
    	c.workqueue.Add(obj)
    }
    
    func (c *Controller) handleDelete(obj interface{}) {
    	log.Println("handleDelete was called")
    	c.workqueue.Add(obj)
    }

    Although controller.go is under the root directory in sample-controller, here creates controller under pkg/controller directory in this repo. You can also move it to main package if you want.

  2. Update main.go to initialize a controller and run it.

     import (
    -       "context"
            "flag"
    -       "fmt"
            "log"
            "path/filepath"
    +       "time"
    
            "k8s.io/client-go/tools/clientcmd"
            "k8s.io/client-go/util/homedir"
    
    -       client "github.com/nakamasato/sample-controller/pkg/client/clientset/versioned"
    -       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    +       clientset "github.com/nakamasato/sample-controller/pkg/client/clientset/versioned"
    +       informers "github.com/nakamasato/sample-controller/pkg/client/informers/externalversions"
    +       "github.com/nakamasato/sample-controller/pkg/controller"
     )
    
     func main() {
    @@ -29,15 +29,16 @@ func main() {
                    log.Printf("Building config from flags, %s", err.Error())
            }
    
    -       clientset, err := client.NewForConfig(config)
    +       exampleClient, err := clientset.NewForConfig(config)
            if err != nil {
                    log.Printf("getting client set %s\n", err.Error())
            }
    -       fmt.Println(clientset)
    
    -       foos, err := clientset.ExampleV1alpha1().Foos("").List(context.Background(), metav1.ListOptions{})
    -       if err != nil {
    -               log.Printf("listing foos %s\n", err.Error())
    +       exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, 20*time.Minute)
    +       ch := make(chan struct{})
    +       controller := controller.NewController(exampleClient, informerFactory.Example().V1alpha1().Foos())
    +       exampleInformerFactory.Start(ch)
    +       if err = controller.Run(ch); err != nil {
    +               log.Printf("error occurred when running controller %s\n", err.Error())
            }
    -       fmt.Printf("length of foos is %d\n", len(foos.Items))
     }

    At the line of exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, 20*time.Minute), the second argument specifies ResyncPeriod, which defines the interval of resync (The resync operation consists of delivering to the handler an update notification for every object in the informer's local cache). For more detail, please read NewSharedIndexInformer

    I'm not exactly sure why here specifies 30 seconds for ResyncPeriod.

    main.go
    package main
    
    import (
    	"flag"
    	"log"
    	"path/filepath"
    	"time"
    
    	"k8s.io/client-go/tools/clientcmd"
    	"k8s.io/client-go/util/homedir"
    
    	clientset "github.com/nakamasato/sample-controller/pkg/client/clientset/versioned"
    	informers "github.com/nakamasato/sample-controller/pkg/client/informers/externalversions"
    	"github.com/nakamasato/sample-controller/pkg/controller"
    )
    
    func main() {
    	var kubeconfig *string
    
    	if home := homedir.HomeDir(); home != "" {
    		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional)")
    	} else {
    		kubeconfig = flag.String("kubeconfig", "", "absolute path to kubeconfig file")
    	}
    	flag.Parse()
    
    	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    	if err != nil {
    		log.Printf("Building config from flags, %s", err.Error())
    	}
    
    	exampleClient, err := clientset.NewForConfig(config)
    	if err != nil {
    		log.Printf("getting client set %s\n", err.Error())
    	}
    
    	exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, 20*time.Minute)
    	ch := make(chan struct{})
    	controller := controller.NewController(exampleClient, exampleInformerFactory.Example().V1alpha1().Foos())
    	exampleInformerFactory.Start(ch)
    	if err = controller.Run(ch); err != nil {
    		log.Printf("error occurred when running controller %s\n", err.Error())
    	}
    }
  3. Build and run the controller.

    go build
    ./sample-controller
    
  4. Create and delete CR.

    kubectl apply -f config/sample/foo.yaml
    
    kubectl delete -f config/sample/foo.yaml
    
  5. Check the controller logs.

    2021/12/19 17:31:25 handleAdd was called
    2021/12/19 17:31:47 handleDelete was called
    

5.2. Fetch foo object

Implement the following logic:

  1. Get a workqueue item.
  2. Get the key for the item from the cache.
  3. Split the key into namespace and name.
  4. Get the Foo resource with namespace and name from the lister.

Steps:

  1. Define enqueueFoo to convert Foo resource into namespace/name string before putting into the workqueue.

    func (c *Controller) handleAdd(obj interface{}) {
        log.Println("handleAdd was called")
        c.enqueueFoo(obj)
    }
    
    func (c *Controller) handleDelete(obj interface{}) {
        log.Println("handleDelete was called")
        c.enqueueFoo(obj)
    }
    
    // enqueueFoo takes a Foo resource and converts it into a namespace/name
    // string which is then put onto the work queue. This method should *not* be
    // passed resources of any type other than Foo.
    func (c *Controller) enqueueFoo(obj interface{}) {
        var key string
        var err error
        if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
            log.Printf("failed to get key from the cache %s\n", err.Error())
            return
        }
        c.workqueue.Add(key)
    }
  2. Update processNextItem.

    func (c *Controller) processNextItem() bool {
        obj, shutdown := c.workqueue.Get()
        if shutdown {
            return false
        }
    
        // wrap this block in a func to use defer c.workqueue.Done
        err := func(obj interface{}) error {
            // call Done to tell workqueue that the item was finished processing
            defer c.workqueue.Done(obj)
            var key string
            var ok bool
    
            if key, ok = obj.(string); !ok {
                // As the item in the workqueue is actually invalid, we call
                // Forget here else we'd go into a loop of attempting to
                // process a work item that is invalid.
                c.workqueue.Forget(obj)
                return nil
            }
    
            ns, name, err := cache.SplitMetaNamespaceKey(key)
            if err != nil {
                log.Printf("failed to split key into namespace and name %s\n", err.Error())
                return err
            }
    
            // temporary main logic
            foo, err := c.foosLister.Foos(ns).Get(name)
            if err != nil {
                log.Printf("failed to get foo resource from lister %s\n", err.Error())
                return err
            }
            log.Printf("Got foo %+v\n", foo.Spec)
    
            // Forget the queue item as it's successfully processed and
            // the item will not be requeued.
            c.workqueue.Forget(obj)
            return nil
        }(obj)
    
        if err != nil {
            return true
        }
    
        return true
    }
  3. Build and run the controller.

    go build
    ./sample-controller
    
  4. Create and delete CR.

    kubectl apply -f config/sample/foo.yaml
    
    kubectl delete -f config/sample/foo.yaml
    
  5. Check the controller logs.

    ./sample-controller
    2021/12/20 05:53:10 handleAdd was called
    2021/12/20 05:53:10 Got foo {DeploymentName:foo-sample Replicas:0xc0001a942c}
    2021/12/20 05:53:16 handleDelete was called
    2021/12/20 05:53:16 failed to get foo resource from lister foo.example.com "foo-sample" not found
    

5.3. Implement reconciliation logic - Enable to Create/Delete Deployment for Foo resource

At the end of this step, we'll be able to create Deployment for Foo resource.

  1. Add fields (kubeclientset, deploymentsLister, and deploymentsSynced) to Controller.

     type Controller struct {
    +       // kubeclientset is a standard kubernetes clientset
    +       kubeclientset kubernetes.Interface
            // sampleclientset is a clientset for our own API group
            sampleclientset clientset.Interface
    
    +       deploymentsLister appslisters.DeploymentLister
    +       deploymentsSynced cache.InformerSynced
    +
            foosLister listers.FooLister    // lister for foo
            foosSynced cache.InformerSynced // cache is synced for foo
    
    @@ -24,12 +39,19 @@ type Controller struct {
            workqueue workqueue.RateLimitingInterface
     }
  2. Update NewController as follows:

    -func NewController(sampleclientset clientset.Interface, fooInformer informers.FooInformer) *Controller {
    +func NewController(
    +       kubeclientset kubernetes.Interface,
    +       sampleclientset clientset.Interface,
    +       deploymentInformer appsinformers.DeploymentInformer,
    +       fooInformer informers.FooInformer) *Controller {
            controller := &Controller{
    -               sampleclientset: sampleclientset,
    -               foosSynced:      fooInformer.Informer().HasSynced,
    -               foosLister:      fooInformer.Lister(),
    -               workqueue:       workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "foo"),
    +               kubeclientset:     kubeclientset,
    +               sampleclientset:   sampleclientset,
    +               deploymentsLister: deploymentInformer.Lister(),
    +               deploymentsSynced: deploymentInformer.Informer().HasSynced,
    +               foosLister:        fooInformer.Lister(),
    +               foosSynced:        fooInformer.Informer().HasSynced,
    +               workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "foo"),
            }
  3. Update main.go to pass the added arguments to NewController.

    import (
    ...
    +       kubeinformers "k8s.io/client-go/informers"
    +       "k8s.io/client-go/kubernetes"
    ...
    )
    func main() {
        ...
    +       kubeClient, err := kubernetes.NewForConfig(config)
    +       if err != nil {
    +               log.Printf("getting kubernetes client set %s\n", err.Error())
    +       }
    +
            exampleClient, err := clientset.NewForConfig(config)
            if err != nil {
                    log.Printf("getting client set %s\n", err.Error())
            }
    
    -       exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, 20*time.Minute)
    +       kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
    +       exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
            ch := make(chan struct{})
    -       controller := controller.NewController(exampleClient, exampleInformerFactory.Example().V1alpha1().Foos())
    +       controller := controller.NewController(
    +               kubeClient,
    +               exampleClient,
    +               kubeInformerFactory.Apps().V1().Deployments(),
    +               exampleInformerFactory.Example().V1alpha1().Foos(),
    +       )
    +       kubeInformerFactory.Start(ch)
        ...
    }
  4. Create syncHandler and newDeployment.

    func (c *Controller) syncHandler(key string) error {
    	ns, name, err := cache.SplitMetaNamespaceKey(key)
    	if err != nil {
    		log.Printf("failed to split key into namespace and name %s\n", err.Error())
    		return err
    	}
    
    	foo, err := c.foosLister.Foos(ns).Get(name)
    	if err != nil {
    		log.Printf("failed to get foo resource from lister %s\n", err.Error())
    		if errors.IsNotFound(err) {
    			return nil
    		}
    		return err
    	}
    
    	deploymentName := foo.Spec.DeploymentName
    	if deploymentName == "" {
    		log.Printf("deploymentName must be specified %s\n", key)
    		return nil
    	}
    	deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
    	if errors.IsNotFound(err) {
    		deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{})
    	}
    
    	if err != nil {
    		return err
    	}
    
    	log.Printf("deployment %+v", deployment)
    
    	return nil
    }
    
    func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
    	labels := map[string]string{
    		"app":        "nginx",
    		"controller": foo.Name,
    	}
    	return &appsv1.Deployment{
    		ObjectMeta: metav1.ObjectMeta{
    			Name:            foo.Spec.DeploymentName,
    			Namespace:       foo.Namespace,
    			OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(foo, samplev1alpha1.SchemeGroupVersion.WithKind("Foo"))},
    		},
    		Spec: appsv1.DeploymentSpec{
    			Replicas: foo.Spec.Replicas,
    			Selector: &metav1.LabelSelector{
    				MatchLabels: labels,
    			},
    			Template: corev1.PodTemplateSpec{
    				ObjectMeta: metav1.ObjectMeta{
    					Labels: labels,
    				},
    				Spec: corev1.PodSpec{
    					Containers: []corev1.Container{
    						{
    							Name:  "nginx",
    							Image: "nginx:latest",
    						},
    					},
    				},
    			},
    		},
    	}
    }
  5. Update processNextItem to call syncHandler for main logic.

    @@ -77,20 +99,12 @@ func (c *Controller) processNextItem() bool {
                            return nil
                    }
    
    -               ns, name, err := cache.SplitMetaNamespaceKey(key)
    -               if err != nil {
    -                       log.Printf("failed to split key into namespace and name %s\n", err.Error())
    -                       return err
    +               if err := c.syncHandler(key); err != nil {
    +                       // Put the item back on the workqueue to handle any transient errors.
    +                       c.workqueue.AddRateLimited(key)
    +                       return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
                    }
    
    -               // temporary main logic
    -               foo, err := c.foosLister.Foos(ns).Get(name)
    -               if err != nil {
    -                       log.Printf("failed to get foo resource from lister %s\n", err.Error())
    -                       return err
    -               }
    -               log.Printf("Got foo %+v\n", foo.Spec)
    -
                    // Forget the queue item as it's successfully processed and
                    // the item will not be requeued.
                    c.workqueue.Forget(obj)
  6. Delete handleDelete function as it's covered by ownerReferences (details mentioned in the next step) for delete action.

            fooInformer.Informer().AddEventHandler(
                    cache.ResourceEventHandlerFuncs{
                            AddFunc:    controller.handleAdd,
    -                       DeleteFunc: controller.handleDelete,
                    },
            )
    -func (c *Controller) handleDelete(obj interface{}) {
    -       log.Println("handleDelete was called")
    -       c.enqueueFoo(obj)
    -}
  7. Test sample-controller.

    1. Build and run the controller.

      go build
      ./sample-controller
      
    2. Create Foo resource.

      kubectl apply -f config/sample/foo.yaml
      

      Check Deployment:

      kubectl get deploy
      NAME         READY   UP-TO-DATE   AVAILABLE   AGE
      foo-sample   0/1     1            0           3s
      

      Check sample-controller's logs:

      2021/12/20 19:58:30 handleAdd was called
      2021/12/20 19:58:30 deployment foo-sample exists
      
    3. Delete Foo resource.

      kubectl delete -f config/sample/foo.yaml
      

      Check Deployment:

      kubectl get deploy
      No resources found in default namespace.
      

      Check sample-controller's logs:

      2021/12/20 19:59:14 handleDelete was called
      2021/12/20 19:59:14 failed to get foo resource from lister foo.example.com "foo-sample" not found
      

      Deployment is deleted when the corresponding Foo is deleted thanks to OwnerReference's cascading deletion feature:

      Kubernetes checks for and deletes objects that no longer have owner references, like the pods left behind when you delete a ReplicaSet. When you delete an object, you can control whether Kubernetes deletes the object's dependents automatically, in a process called cascading deletion.

5.4. Implement reconciliation logic - Check and update Deployment if necessary

What needs to be done:

  • In syncHandler
    • Check if the found Deployment is managed by the sample-controller.
    • Check if the found Deployment's replicas is same as the specified replica in Foo resource.
  • In NewController
    • Set UpdateFunc as an event handler for the informer in order to call syncHandler when Foo resource is updated.

Steps:

  1. Update syncHandler:
    1. Check if the Deployment is managed by the controller.

          // If the Deployment is not controlled by this Foo resource, we should log
          // a warning to the event recorder and return error msg.
          if !metav1.IsControlledBy(deployment, foo) {
              msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
              log.Println(msg)
              return fmt.Errorf("%s", msg)
          }
    2. Check the replica and update Deployment object if replicas in Deployment and Foo differ.

          // If this number of the replicas on the Foo resource is specified, and the
          // number does not equal the current desired replicas on the Deployment, we
          // should update the Deployment resource.
          if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
              log.Printf("Foo %s replicas: %d, deployment replicas: %d\n", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
              deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{})
          }
          // If an error occurs during Update, we'll requeue the item so we can
          // attempt processing again later. This could have been caused by a
          // temporary network failure, or any other transient reason.
          if err != nil {
              return err
          }
  2. Update event handlers in NewController:
            fooInformer.Informer().AddEventHandler(
                    cache.ResourceEventHandlerFuncs{
    -                       AddFunc: controller.handleAdd,
    +                       AddFunc: controller.enqueueFoo,
    +                       UpdateFunc: func(old, new interface{}) {
    +                               controller.enqueueFoo(new)
    +                       },
                    },
            )
  3. Remove unused handleAdd function.
    -func (c *Controller) handleAdd(obj interface{}) {
    -       log.Println("handleAdd was called")
    -       c.enqueueFoo(obj)
    -}
  4. Test if sample-controller updates replicas.
    1. Apply Foo resource.

      kubectl apply -f config/sample/foo.yaml
      
      kubectl get deploy
      NAME         READY   UP-TO-DATE   AVAILABLE   AGE
      foo-sample   1/1     1            1           3h41m
      
    2. Increase replica to 2.

      kubectl patch foo foo-sample -p '{"spec":{"replicas": 2}}' --type=merge
      

      logs:

      2021/12/21 10:08:19 Foo foo-sample replicas: 2, deployment replicas: 1
      

      Replicas of Deployment increased.

      kubectl get deploy
      NAME         READY   UP-TO-DATE   AVAILABLE   AGE
      foo-sample   2/2     2            2           3h42m
      
    3. Delete Foo resource.

      kubectl delete -f config/sample/foo.yaml
      
  5. Test if sample-controller wouldn't touch Deployment that is not managed by the controller.
    1. Apply Deployment with name foo-sample.

      kubectl create deployment foo-sample --image=nginx
      
    2. Apply Foo resource with name foo-sample.

      kubectl apply -f config/sample/foo.yaml
      
    3. Log:

      2021/12/21 10:14:50 deployment foo-sample found
      2021/12/21 10:14:50 Resource "foo-sample" already exists and is not managed by Foo
      
    4. Clean up.

      kubectl delete -f config/sample/foo.yaml
      kubectl delete deploy foo-sample
      

5.5. Implement reconciliation logic - Update Foo status

  1. Create updateFooStatus function and add the logic at the end of syncHandler

    func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1.Deployment) error {
        // NEVER modify objects from the store. It's a read-only, local cache.
        // You can use DeepCopy() to make a deep copy of original object and modify this copy
        // Or create a copy manually for better performance
        fooCopy := foo.DeepCopy()
        fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
        // If the CustomResourceSubresources feature gate is not enabled,
        // we must use Update instead of UpdateStatus to update the Status block of the Foo resource.
        // UpdateStatus will not allow changes to the Spec of the resource,
        // which is ideal for ensuring nothing other than resource status has been updated.
        _, err := c.sampleclientset.ExampleV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{})
        return err
    }
    func (c *Controller) syncHandler() {
        ...
    
        // Finally, we update the status block of the Foo resource to reflect the
        // current state of the world
        err = c.updateFooStatus(foo, deployment)
        if err != nil {
            log.Printf("failed to update Foo status for %s", foo.Name)
            return err
        }
    
        return nil
    }
  2. Add subresources to CustomResourceDefinition.

          subresources:
            status: {}
            scale:
              specReplicasPath: .spec.replicas
              statusReplicasPath: .status.replicas
              labelSelectorPath: .status.labelSelector

    For more details, see subresources

  3. Test status

    1. Apply Foo
      kubectl apply -f config/sample/foo.yaml
      
    2. Check status (not updated immediately -> will be fixed in the next section.)
      kubectl get foo foo-sample -o jsonpath='{.status}'
      {"availableReplicas":0}%
      
      Currently, the informer just monitors Foo resource, which cannot capture the update of Deployment.status.availableReplicas.
    3. Check status after a while
      kubectl get foo foo-sample -o jsonpath='{.status}'
      {"availableReplicas":1}%
      
  4. Test scale

    1. Scale.

      kubectl scale --replicas=3 foo foo-sample
      
      kubectl get deploy
      NAME         READY   UP-TO-DATE   AVAILABLE   AGE
      foo-sample   3/3     3            3           95s
      
      kubectl scale --replicas=1 foo foo-sample
      
      kubectl get deploy
      NAME         READY   UP-TO-DATE   AVAILABLE   AGE
      foo-sample   1/1     1            1           9m10s
      

5.6. Implement reconciliation logic - Capture the update of Deployment

  1. Add handleObject function.

    // handleObject will take any resource implementing metav1.Object and attempt
    // to find the Foo resource that 'owns' it. It does this by looking at the
    // objects metadata.ownerReferences field for an appropriate OwnerReference.
    // It then enqueues that Foo resource to be processed. If the object does not
    // have an appropriate OwnerReference, it will simply be skipped.
    func (c *Controller) handleObject(obj interface{}) {
        var object metav1.Object
        var ok bool
        if object, ok = obj.(metav1.Object); !ok {
            tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
            if !ok {
                return
            }
            object, ok = tombstone.Obj.(metav1.Object)
            if !ok {
                return
            }
            log.Printf("Recovered deleted object '%s' from tombstone", object.GetName())
        }
        log.Printf("Processing object: %s", object.GetName())
        if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
            // If this object is not owned by a Foo, we should not do anything more
            // with it.
            if ownerRef.Kind != "Foo" {
                return
            }
    
            foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
            if err != nil {
                log.Printf("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
                return
            }
    
            c.enqueueFoo(foo)
            return
        }
    }

    When Deployment managed by Foo is added/updated/deleted, get the corresponding Foo and put the key (naemspace/name) to the workqueue.

  2. Add event handlers to deploymentInformer in NewController.

        // Set up an event handler for when Deployment resources change. This
        // handler will lookup the owner of the given Deployment, and if it is
        // owned by a Foo resource then the handler will enqueue that Foo resource for
        // processing. This way, we don't need to implement custom logic for
        // handling Deployment resources. More info on this pattern:
        // https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
        deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: controller.handleObject,
            UpdateFunc: func(old, new interface{}) {
                newDepl := new.(*appsv1.Deployment)
                oldDepl := old.(*appsv1.Deployment)
                if newDepl.ResourceVersion == oldDepl.ResourceVersion {
                    // Periodic resync will send update events for all known Deployments.
                    // Two different versions of the same Deployment will always have different RVs.
                    return
                }
                controller.handleObject(new)
            },
            DeleteFunc: controller.handleObject,
        })
  3. Test the Foo's status after Deployment is updated.

    1. Create Foo resource.
      kubectl apply -f config/sample/foo.yaml
      
    2. Check Foo's status (will be immediately updated.)
      kubectl get foo foo-sample -o jsonpath='{.status}'
      {"availableReplicas":1}
      

5.7. Implement reconciliation logic - Create events for Foo resource

  1. Add necessary packages.
    @@ -13,20 +13,34 @@ import (
            "k8s.io/apimachinery/pkg/util/wait"
            appsinformers "k8s.io/client-go/informers/apps/v1"
            "k8s.io/client-go/kubernetes"
    +       typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
            appslisters "k8s.io/client-go/listers/apps/v1"
            "k8s.io/client-go/tools/cache"
    +       "k8s.io/client-go/tools/record"
            "k8s.io/client-go/util/workqueue"
    
            samplev1alpha1 "github.com/nakamasato/sample-controller/pkg/apis/example.com/v
    1alpha1"
            clientset "github.com/nakamasato/sample-controller/pkg/client/clientset/versio
    ned"
    +       "github.com/nakamasato/sample-controller/pkg/client/clientset/versioned/scheme
    "
            informers "github.com/nakamasato/sample-controller/pkg/client/informers/extern
    alversions/example.com/v1alpha1"
            listers "github.com/nakamasato/sample-controller/pkg/client/listers/example.com/v1alpha1"
  2. Add eventRecorder to Controller.
     type Controller struct {
    @@ -43,6 +57,10 @@ type Controller struct {
    
            // queue
            workqueue workqueue.RateLimitingInterface
    +
    +       // recorder is an event recorder for recording Event resources to the
    +       // Kubernetes API.
    +       recorder record.EventRecorder
     }
  3. Initialize eventBroadcaster.
     func NewController(
    @@ -50,6 +68,11 @@ func NewController(
            sampleclientset clientset.Interface,
            deploymentInformer appsinformers.DeploymentInformer,
            fooInformer informers.FooInformer) *Controller {
    +
    +       eventBroadcaster := record.NewBroadcaster()
    +       eventBroadcaster.StartStructuredLogging(0)
    +       eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface:     kubeclientset.CoreV1().Events("")})
    +       recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component:     controllerAgentName})
            controller := &Controller{
                    kubeclientset:     kubeclientset,
                    sampleclientset:   sampleclientset,
    @@ -58,6 +81,7 @@ func NewController(
                    foosLister:        fooInformer.Lister(),
                    foosSynced:        fooInformer.Informer().HasSynced,
                    workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.    DefaultControllerRateLimiter(), "foo"),
    +               recorder:          recorder,
            }
  4. Define constants.
     const (
    +       // SuccessSynced is used as part of the Event 'reason' when a Foo is synced
    +       SuccessSynced = "Synced"
    +       // ErrResourceExists is used as part of the Event 'reason' when a Foo fails
    +       // to sync due to a Deployment of the same name already existing.
    +       ErrResourceExists = "ErrResourceExists"
    +
            // MessageResourceExists is the message used for Events when a resource
            // fails to sync due to a Deployment already existing
            MessageResourceExists = "Resource %q already exists and is not managed by Foo"
    +       // MessageResourceSynced is the message used for an Event fired when a Foo
    +       // is synced successfully
    +       MessageResourceSynced = "Foo synced successfully"
    +
    +       controllerAgentName = "sample-controller"
     )
  5. Record events.
    @@ -199,6 +223,7 @@ func (c *Controller) syncHandler(key string) error {
            // a warning to the event recorder and return error msg.
            if !metav1.IsControlledBy(deployment, foo) {
                    msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
    +               c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
                    log.Println(msg)
                    return fmt.Errorf("%s", msg)
            }
    @@ -228,6 +253,7 @@ func (c *Controller) syncHandler(key string) error {
                    return err
            }
    
    +       c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
            return nil
     }
  6. Test event.
    1. Apply Foo.
    2. Check event.
      kubectl get event --field-selector involvedObject.kind=Foo
      LAST SEEN   TYPE     REASON   OBJECT           MESSAGE
      22s         Normal   Synced   foo/foo-sample   Foo synced successfully
      

Reference

Comments
Kube-step-podautoscaler - Controller to scale workloads based on steps
Kube-step-podautoscaler - Controller to scale workloads based on steps

Refer controller/*controller.go for implementation details and explanation for a better understanding.

Sep 5, 2022
Image clone controller is a kubernetes controller to safe guard against the risk of container images disappearing

Image clone controller image clone controller is a kubernetes controller to safe guard against the risk of container images disappearing from public r

Oct 10, 2021
Knative Sample Controller

Knative Sample Controller Knative sample-controller defines a few simple resources that are validated by webhook and managed by a controller to demons

Jan 21, 2022
EdgeDB-Golang-Docker-Sample - The sample of connection between EdgeDB Server and Go Echo API Server

EdgeDB Golang Docker Sample 『Go + Docker Composeを使ってEdgeDBを動かしてみた』のサンプルコードです。 使い

Nov 2, 2022
Hardening a sketchy containerized application one step at a time

Road to Secure Kubernetes Hardening a containerized application one step at a time This repository hosts a tutorial on security hardening a containeri

Jun 8, 2022
Opinionated platform that runs on Kubernetes, that takes you from App to URL in one step.
Opinionated platform that runs on Kubernetes, that takes you from App to URL in one step.

Epinio Opinionated platform that runs on Kubernetes, that takes you from App to URL in one step. Contents Epinio Contents What problem does Epinio sol

Nov 13, 2022
Bitrise step to parse a JaCoCo generated report and output the code coverage percentages to be used by other steps.

JaCoCo Report Parser This step parses a JaCoCo generated XML report in the jacoco_report_path and outputs the coverage percentages in a String format

Dec 6, 2021
Annotated and kubez-autoscaler-controller will maintain the HPA automatically for kubernetes resources.

Kubez-autoscaler Overview kubez-autoscaler 通过为 deployment / statefulset 添加 annotations 的方式,自动维护对应 HorizontalPodAutoscaler 的生命周期. Prerequisites 在 kuber

Jan 2, 2023
network-node-manager is a kubernetes controller that controls the network configuration of a node to resolve network issues of kubernetes.
network-node-manager is a kubernetes controller that controls the network configuration of a node to resolve network issues of kubernetes.

Network Node Manager network-node-manager is a kubernetes controller that controls the network configuration of a node to resolve network issues of ku

Dec 18, 2022
A controller to create K8s Ingresses for Openshift routes.

route-to-ingress-operator A controller to create corresponding ingress.networking.k8s.io/v1 resources for route.openshift.io/v1 TODO int port string p

Jan 7, 2022
A Kubernetes Terraform Controller
A Kubernetes Terraform Controller

Terraform Controller Terraform Controller is a Kubernetes Controller for Terraform, which can address the requirement of Using Terraform HCL as IaC mo

Jan 2, 2023
Carrier is a Kubernetes controller for running and scaling game servers on Kubernetes.
Carrier is a Kubernetes controller for running and scaling game servers on Kubernetes.

Carrier is a Kubernetes controller for running and scaling game servers on Kubernetes. This project is inspired by agones. Introduction Genera

Nov 25, 2022
A fluxcd controller for managing remote manifests with kubecfg

kubecfg-operator A fluxcd controller for managing remote manifests with kubecfg This project is in very early stages proof-of-concept. Only latest ima

Nov 1, 2022
Write controller-runtime based k8s controllers that read/write to git, not k8s

Git Backed Controller The basic idea is to write a k8s controller that runs against git and not k8s apiserver. So the controller is reading and writin

Dec 10, 2021
The k8s-generic-webhook is a library to simplify the implementation of webhooks for arbitrary customer resources (CR) in the operator-sdk or controller-runtime.

k8s-generic-webhook The k8s-generic-webhook is a library to simplify the implementation of webhooks for arbitrary customer resources (CR) in the opera

Nov 24, 2022
the simplest testing framework for Kubernetes controller.

KET(Kind E2e Test framework) KET is the simplest testing framework for Kubernetes controller. KET is available as open source software, and we look fo

Dec 10, 2022
Kubernetes workload controller for container image deployment

kube-image-deployer kube-image-deployer는 Docker Registry의 Image:Tag를 감시하는 Kubernetes Controller입니다. Keel과 유사하지만 단일 태그만 감시하며 더 간결하게 동작합니다. Container, I

Mar 8, 2022
Knative Controller which emits cloud events when Knative Resources change state

Knative Sample Controller Knative sample-controller defines a few simple resources that are validated by webhook and managed by a controller to demons

Oct 2, 2021
A controller managing namespaces deployments, statefulsets and cronjobs objects. Inspired by kube-downscaler.

kube-ns-suspender Kubernetes controller managing namespaces life cycle. kube-ns-suspender Goal Usage Internals The watcher The suspender Flags Resourc

Dec 27, 2022