Create a Runtime SDK extension for Cluster API

Like how the admission controller lets you hook into different workload cluster event requests(creation, updation, deletion of objects) and validate or mutate them accordingly, the runtime extension allows you to connect to various cluster events and make necessary changes.

NOTE – Currently the feature is in the experimental stage and to enable the feature you have to load the environment variable EXP_RUNTIME_SDK=true

In general, the extension works as a webhook and can be written in any language of preference but to leverage the advantages of upstream CAPI we are going to use Golang here.

Here we are going to create a Runtime SDK extension that is going to hook into both DoAfterControlPlaneInitialized & DoAfterControlPlaneInitialized and for its operation on ConfigMaps. Let’s create a project name runtimesdk and create a main.go file where we are doing –

  • Initializing the necessary command line flags.
  • Creating a Golang profiler server.
  • Getting the client for interacting with the Kubernetes API server(see line 94).
  • Get the handler that we are going to implement next.
  • Initializing webhook server(see line 82).
  • Registering BeforeClusterDelete, AfterControlPlaneInitialized events in the webhook server(see line 108).
  • Run the webhook server.
package main

import (
	"flag"
	"net/http"
	"os"

	handler "github.com/aniruddha2000/runtime-sdk/handlers"
	"github.com/spf13/pflag"
	cliflag "k8s.io/component-base/cli/flag"
	"k8s.io/component-base/logs"
	logsv1 "k8s.io/component-base/logs/api/v1"
	"k8s.io/klog/v2"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"

	runtimecatalog "sigs.k8s.io/cluster-api/exp/runtime/catalog"
	runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1"
	"sigs.k8s.io/cluster-api/exp/runtime/server"
)

var (
	// catalog contains all information about RuntimeHooks.
	catalog = runtimecatalog.New()

	// Flags.
	profilerAddress string
	webhookPort     int
	webhookCertDir  string
	logOptions      = logs.NewOptions()
)

func init() {
	// Adds to the catalog all the RuntimeHooks defined in cluster API.
	_ = runtimehooksv1.AddToCatalog(catalog)
}

// InitFlags initializes the flags.
func InitFlags(fs *pflag.FlagSet) {
	// Initialize logs flags using Kubernetes component-base machinery.
	logsv1.AddFlags(logOptions, fs)

	// Add test-extension specific flags
	fs.StringVar(&profilerAddress, "profiler-address", "",
		"Bind address to expose the pprof profiler (e.g. localhost:6060)")

	fs.IntVar(&webhookPort, "webhook-port", 9443,
		"Webhook Server port")

	fs.StringVar(&webhookCertDir, "webhook-cert-dir", "/tmp/k8s-webhook-server/serving-certs/",
		"Webhook cert dir, only used when webhook-port is specified.")
}

func main() {
	// Creates a logger to be used during the main func.
	setupLog := ctrl.Log.WithName("main")

	// Initialize and parse command line flags.
	InitFlags(pflag.CommandLine)
	pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
	pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
	pflag.Parse()

	// Validates logs flags using Kubernetes component-base machinery and applies them
	if err := logsv1.ValidateAndApply(logOptions, nil); err != nil {
		setupLog.Error(err, "unable to start extension")
		os.Exit(1)
	}

	// Add the klog logger in the context.
	ctrl.SetLogger(klog.Background())

	// Initialize the golang profiler server, if required.
	if profilerAddress != "" {
		klog.Infof("Profiler listening for requests at %s", profilerAddress)
		go func() {
			klog.Info(http.ListenAndServe(profilerAddress, nil))
		}()
	}

	// Create a http server for serving runtime extensions
	webhookServer, err := server.New(server.Options{
		Catalog: catalog,
		Port:    webhookPort,
		CertDir: webhookCertDir,
	})
	if err != nil {
		setupLog.Error(err, "error creating webhook server")
		os.Exit(1)
	}

	// Lifecycle Hooks
	restConfig, err := ctrl.GetConfig()
	if err != nil {
		setupLog.Error(err, "error getting config for the cluster")
		os.Exit(1)
	}

	client, err := client.New(restConfig, client.Options{})
	if err != nil {
		setupLog.Error(err, "error creating client to the cluster")
		os.Exit(1)
	}

	lifecycleExtensionHandlers := handler.NewExtensionHandlers(client)

	// Register extension handlers.
	if err := webhookServer.AddExtensionHandler(server.ExtensionHandler{
		Hook:        runtimehooksv1.BeforeClusterDelete,
		Name:        "before-cluster-delete",
		HandlerFunc: lifecycleExtensionHandlers.DoBeforeClusterDelete,
	}); err != nil {
		setupLog.Error(err, "error adding handler")
		os.Exit(1)
	}

	if err := webhookServer.AddExtensionHandler(server.ExtensionHandler{
		Hook:        runtimehooksv1.AfterControlPlaneInitialized,
		Name:        "before-cluster-create",
		HandlerFunc: lifecycleExtensionHandlers.DoAfterControlPlaneInitialized,
	}); err != nil {
		setupLog.Error(err, "error adding handler")
		os.Exit(1)
	}

	// Setup a context listening for SIGINT.
	ctx := ctrl.SetupSignalHandler()

	// Start the https server.
	setupLog.Info("Starting Runtime Extension server")
	if err := webhookServer.Start(ctx); err != nil {
		setupLog.Error(err, "error running webhook server")
		os.Exit(1)
	}
}

Now, it’s time to create the handlers for each event, let’s create a file handlers/hooks.go , here we are doing this –

  • DoAfterControlPlaneInitialized –
    • Check whether a ConfigMap is present or not for the particular name & namespace.
    • If not it’s going to create one, otherwise it won’t complain about anything and the request will pass.
  • DoBeforeClusterDelete –
    • Check whether the ConfigMap is present or not for the particular name & namespace.
    • If yes it’s going to delete it before the workload cluster gets deleted, otherwise the request will pass.
package handler

import (
	"context"
	"fmt"

	"github.com/pkg/errors"
	corev1 "k8s.io/api/core/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/klog/v2"
	clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
	runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
)

type ExtensionHandler struct {
	client client.Client
}

func NewExtensionHandlers(client client.Client) *ExtensionHandler {
	return &ExtensionHandler{
		client: client,
	}
}

func (e *ExtensionHandler) DoBeforeClusterDelete(ctx context.Context, request *runtimehooksv1.BeforeClusterDeleteRequest, response *runtimehooksv1.BeforeClusterDeleteResponse) {
	log := ctrl.LoggerFrom(ctx)
	log.Info("DoBeforeClusterDelete is called")
	log.Info("Namespace:", request.Cluster.GetNamespace(), "ClusterName: ", request.Cluster.GetName())

	// Your implementation
	configMapName := fmt.Sprintf("%s-test-extension-hookresponse", request.Cluster.GetName())
	ok, err := e.checkConfigMap(ctx, &request.Cluster, configMapName)
	if err != nil {
		response.Status = runtimehooksv1.ResponseStatusFailure
		response.Message = err.Error()
		return
	}
	if ok {
		if err := e.deleteConfigMap(ctx, &request.Cluster, configMapName); err != nil {
			response.Status = runtimehooksv1.ResponseStatusFailure
			response.Message = err.Error()
			return
		}
	}
}

func (e *ExtensionHandler) DoAfterControlPlaneInitialized(ctx context.Context, request *runtimehooksv1.AfterControlPlaneInitializedRequest, response *runtimehooksv1.AfterControlPlaneInitializedResponse) {
	log := ctrl.LoggerFrom(ctx)
	log.Info("DoAfterControlPlaneInitialized is called")
	log.Info("Namespace:", request.Cluster.GetNamespace(), "ClusterName: ", request.Cluster.GetName())

	// Your implementation
	configMapName := fmt.Sprintf("%s-test-extension-hookresponse", request.Cluster.GetName())
	ok, err := e.checkConfigMap(ctx, &request.Cluster, configMapName)
	if err != nil {
		response.Status = runtimehooksv1.ResponseStatusFailure
		response.Message = err.Error()
		return
	}
	if !ok {
		if err := e.createConfigMap(ctx, &request.Cluster, configMapName); err != nil {
			response.Status = runtimehooksv1.ResponseStatusFailure
			response.Message = err.Error()
			return
		}
	}
}

func (e *ExtensionHandler) checkConfigMap(ctx context.Context, cluster *clusterv1.Cluster, configMapName string) (bool, error) {
	log := ctrl.LoggerFrom(ctx)
	log.Info("Checking for ConfigMap", configMapName)

	configMap := &corev1.ConfigMap{}
	nsName := client.ObjectKey{Namespace: cluster.GetNamespace(), Name: configMapName}
	if err := e.client.Get(ctx, nsName, configMap); err != nil {
		if apierrors.IsNotFound(err) {
			log.Info("ConfigMap not found")
			return false, nil
		}
		log.Error(err, "ConfigMap not found with an error")
		return false, errors.Wrapf(err, "failed to read the ConfigMap %s", klog.KRef(cluster.Namespace, configMapName))
	}
	log.Info("ConfigMap found")
	return true, nil
}

func (e *ExtensionHandler) createConfigMap(ctx context.Context, cluster *clusterv1.Cluster, configMapName string) error {
	log := ctrl.LoggerFrom(ctx)
	log.Info("Creating ConfigMap")

	configMap := e.getConfigMap(cluster, configMapName)
	if err := e.client.Create(ctx, configMap); err != nil {
		log.Error(err, "failed to create ConfigMap")
		return errors.Wrapf(err, "failed to create the ConfigMap %s", klog.KRef(cluster.Namespace, configMapName))
	}
	log.Info("configmap created successfully")
	return nil
}

func (e *ExtensionHandler) deleteConfigMap(ctx context.Context, cluster *clusterv1.Cluster, configMapName string) error {
	log := ctrl.LoggerFrom(ctx)
	log.Info("Deleting ConfigMap")

	if err := e.client.Delete(ctx, &corev1.ConfigMap{
		ObjectMeta: metav1.ObjectMeta{
			Name:      configMapName,
			Namespace: cluster.GetNamespace(),
		},
	}); err != nil {
		log.Error(err, "failed to delete ConfigMap")
		return err
	}
	return nil
}

func (e *ExtensionHandler) getConfigMap(cluster *clusterv1.Cluster, configMapName string) *corev1.ConfigMap {
	return &corev1.ConfigMap{
		ObjectMeta: metav1.ObjectMeta{
			Name:      configMapName,
			Namespace: cluster.GetNamespace(),
		},
		Data: map[string]string{
			"AfterControlPlaneInitialized-preloadedResponse": `{"Status": "Success"}`,
		},
	}
}

Implement the Kubernetes manifest

This is the most interesting part and some bits and pieces need to be taken care of, such as –

  • Kubernetes ecosystem by default only supports SSL secure webhooks. For that, we are going to use cert-manager to automate the self-signed certificate automation.
  • The extension config must be registered through the ExtensionConfig CRD.
  • Don’t forget about the RBAC, if you are doing some operation over some resources, make sure you define permissions for those.

NOTE – For this example, we are doing everything in runtimesdk namespace.

Let’s start with certificate.yaml

  • Creating a self-signed certificate using the Issuer
  • Defining the DNS Service name for the certificate
    • <service_name>.<namespace>.svc
apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
  name: runtime-sdk-selfsigned-issuer
  namespace: runtimesdk
spec:
  selfSigned: {}

---
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
  name: serving-cert
  namespace: runtimesdk
spec:
  dnsNames:
    - test-runtime-sdk-svc.runtimesdk.svc
    - test-runtime-sdk-svc.runtimesdk.svc.cluster.local
    - localhost
  issuerRef:
    kind: Issuer
    name: runtime-sdk-selfsigned-issuer
  secretName: test-runtime-sdk-svc-cert

service.yaml

  • Defining the ClusterIP service, and the target deployment. Running the webhook in port 443 which is typically used for https URLs.
apiVersion: v1
kind: Service
metadata:
  name: test-runtime-sdk-svc
  namespace: runtimesdk
spec:
  type: ClusterIP
  selector:
    app: test-runtime-sdk
  ports:
    - port: 443
      targetPort: 9443

deployment.yaml

  • Build your docker image and push it to the repository.
  • Get the certificates and mount them in a volume, and use it in the argument while running the container.
apiVersion: apps/v1
kind: Deployment
metadata:
  name: test-runtime-sdk
  namespace: runtimesdk
spec:
  selector:
    matchLabels:
      app: test-runtime-sdk
  template:
    metadata:
      labels:
        app: test-runtime-sdk
    spec:
      serviceAccountName: test-runtime-sdk-sa
      containers:
        - name: test-runtime-sdk
          image: <image_name>:<image_tag>
          imagePullPolicy: Always
          args:
            - --webhook-cert-dir=/var/run/webhook/serving-cert/
          resources:
            limits:
              memory: "128Mi"
              cpu: "500m"
          ports:
            - containerPort: 9443
          volumeMounts:
            - mountPath: /var/run/webhook/serving-cert
              name: serving-cert
      volumes:
        - name: serving-cert
          secret:
            secretName: test-runtime-sdk-svc-cert

Service Account, Cluster Role, Cluster Rolebindings –

  • Create your own service account.
  • Add get, list, create, and delete permissions.
  • Bind the role with the service account using role bindings.
apiVersion: v1
kind: ServiceAccount
metadata:
  name: test-runtime-sdk-sa
  namespace: runtimesdk

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: test-runtime-sdk-role
rules:
  - apiGroups:
      - ""
    resources:
      - configmaps
    verbs:
      - get
      - list
      - create
      - delete

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: test-runtime-sdk-role-rolebinding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: test-runtime-sdk-role-role
subjects:
  - kind: ServiceAccount
    name: test-runtime-sdk-sa
    namespace: runtimesdk

Lastly, the most important piece, the ExtensionConfig CRD –

  • Get the certificates through annotations.
  • Specify where the Runtime Extension is deployed.
  • Specify Runtime Extension is used by Cluster in which namespace.
apiVersion: runtime.cluster.x-k8s.io/v1alpha1
kind: ExtensionConfig
metadata:
  annotations:
    runtime.cluster.x-k8s.io/inject-ca-from-secret: runtimesdk/test-runtime-sdk-svc-cert
  name: test-runtime-sdk-extensionconfig
spec:
  clientConfig:
    service:
      name: test-runtime-sdk-svc
      namespace: runtimesdk # Note: this assumes the test extension get deployed in the runtimesdk namespace
      port: 443
  namespaceSelector:
    matchExpressions:
      - key: kubernetes.io/metadata.name
        operator: In
        values:
          - default # Note: this assumes the test extension is used by Cluster in the default namespace only

You can define the Dockerfile like this –

FROM golang:alpine3.17 as builder
WORKDIR /src
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build \
    --mount=type=cache,target=/go/pkg/mod \
    go build -o runtime-sdk

FROM alpine
WORKDIR /app
COPY --from=builder /src/runtime-sdk /app/runtime-sdk
ENTRYPOINT ["/app/runtime-sdk"]

Let’s run the App in a Kind CAPD Cluster

  • Export necessary ENV variables, Create a kind cluster –
$ cat > cluster.env << EOF
export CLUSTER_TOPOLOGY=true
export EXP_RUNTIME_SDK=true
export SERVICE_CIDR=["10.96.0.0/12"]
export POD_CIDR=["192.168.0.0/16"]
export SERVICE_DOMAIN="k8s.test"
EOF

$ source cluster.env

$ cat > kind-cluster-with-extramounts.yaml <<EOF
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
networking:
  ipFamily: dual
name: extension-config-test
nodes:
- role: control-plane
  extraMounts:
    - hostPath: /var/run/docker.sock
      containerPath: /var/run/docker.sock
EOF

$ kind create cluster --config kind-cluster-with-extramounts.yaml
Creating cluster "extension-config-test" ...
 ✓ Ensuring node image (kindest/node:v1.27.1) đŸ–ŧ
 ✓ Preparing nodes đŸ“Ļ  
 ✓ Writing configuration 📜 
 ✓ Starting control-plane đŸ•šī¸ 
 ✓ Installing CNI 🔌 
 ✓ Installing StorageClass 💾 
Set kubectl context to "kind-extension-config-test"
You can now use your cluster with:

kubectl cluster-info --context kind-extension-config-test

Thanks for using kind! 😊
  • Create the runtimesdk namespace & initialize the management cluster –
$ kubectl create ns runtimesdk

$ clusterctl init --infrastructure docker
Fetching providers
Installing cert-manager Version="v1.11.1"
Waiting for cert-manager to be available...
Installing Provider="cluster-api" Version="v1.4.2" TargetNamespace="capi-system"
Installing Provider="bootstrap-kubeadm" Version="v1.4.2" TargetNamespace="capi-kubeadm-bootstrap-system"
Installing Provider="control-plane-kubeadm" Version="v1.4.2" TargetNamespace="capi-kubeadm-control-plane-system"
Installing Provider="infrastructure-docker" Version="v1.4.2" TargetNamespace="capd-system"

Your management cluster has been initialized successfully!

You can now create your first workload cluster by running the following:

  clusterctl generate cluster [name] --kubernetes-version [version] | kubectl apply -f -
  • Now apply all of the created manifest, and see there are two thing that you must see is –
    • ExtensionConfig Deployment logs.
    • Status of the ExtensionConfig CRD.
$ k apply -f runtime-sdk/manifests/config/
extensionconfig.runtime.cluster.x-k8s.io/test-runtime-sdk-extensionconfig created
issuer.cert-manager.io/runtime-sdk-selfsigned-issuer created
certificate.cert-manager.io/serving-cert created
deployment.apps/test-runtime-sdk created
serviceaccount/test-runtime-sdk-sa created
clusterrole.rbac.authorization.k8s.io/test-runtime-sdk-role created
clusterrolebinding.rbac.authorization.k8s.io/test-runtime-sdk-role-rolebinding created
service/test-runtime-sdk-svc created
$ k get pods -n runtimesdk
NAME                                READY   STATUS    RESTARTS   AGE
test-runtime-sdk-5bc665d7b9-725hl   1/1     Running   0          12m

$ k logs -n runtimesdk test-runtime-sdk-5bc665d7b9-725hl --follow
I0524 07:30:59.714901       1 main.go:130] "main: Starting Runtime Extension server"
I0524 07:30:59.715180       1 server.go:149] "controller-runtime/webhook: Registering webhook" path="/hooks.runtime.cluster.x-k8s.io/v1alpha1/beforeclusterdelete/before-cluster-delete"
I0524 07:30:59.715261       1 server.go:149] "controller-runtime/webhook: Registering webhook" path="/hooks.runtime.cluster.x-k8s.io/v1alpha1/aftercontrolplaneinitialized/before-cluster-create"
I0524 07:30:59.715314       1 server.go:149] "controller-runtime/webhook: Registering webhook" path="/hooks.runtime.cluster.x-k8s.io/v1alpha1/discovery"
I0524 07:30:59.715340       1 server.go:217] "controller-runtime/webhook/webhooks: Starting webhook server"
I0524 07:30:59.716380       1 certwatcher.go:131] "controller-runtime/certwatcher: Updated current TLS certificate"
I0524 07:30:59.716757       1 certwatcher.go:85] "controller-runtime/certwatcher: Starting certificate watcher"
I0524 07:30:59.716918       1 server.go:271] "controller-runtime/webhook: Serving webhook server" host="" port=9443

Now the log showing that our app is running perfectly, let’s see the status now,

$ k describe extensionconfig test-runtime-sdk-extensionconfig -n runtimesdk
Name:         test-runtime-sdk-extensionconfig
Namespace:    
Labels:       <none>
Annotations:  runtime.cluster.x-k8s.io/inject-ca-from-secret: runtimesdk/test-runtime-sdk-svc-cert
API Version:  runtime.cluster.x-k8s.io/v1alpha1
Kind:         ExtensionConfig
Metadata:
  Creation Timestamp:  2023-05-24T07:21:49Z
  Generation:          2
  Resource Version:    3939
  UID:                 62af95a7-d924-46f6-9c5a-4ba3f4407749
Spec:
  Client Config:
    Ca Bundle:  LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURIakNDQWdhZ0F3SUJBZ0lSQUx0b1VxQzlEdHBIVTl2TkJrU0xmV0l3RFFZSktvWklodmNOQVFFTEJRQXcKQURBZUZ3MHlNekExTWpRd056SXhORGxhRncweU16QTRNakl3TnpJeE5EbGFNQUF3Z2dFaU1BMEdDU3FHU0liMwpEUUVCQVFVQUE0SUJEd0F3Z2dFS0FvSUJBUURBMUl0Mm1OdVdJMmpRUlY1cHRWTDZ3cGFHdWhObG9GWHV2b1poCkwzWHJRcktiWmRaRnJUbGlZSTI4TXlxVmhSNGh2U2MzVXp5TS8rUjdYVURCT01BNkFZeEtacXg0a3VPRk1ITXkKcUhDTTNuZTZUUCsxUS9CQkRWelMvdk9tRzdnNlF1V3VyMmFtbW4zeTI4dUpWZ0hVaUZQaHZLVHE4U0J4LzY0NQo3bEluQWVpSWVrc3JqTHFJRlFka3NnSlAvbUxSTjI4RTNPL0tVTEp5RWxsakxIelZZcmVXck5rUEh6OGVmZmFECmtmSnMxTTN0NFh3c1Jyd09QQXliUmtGcTNJbENpNEoyL3EyZHZTRlRXdy9EelRuSkE1OEt6N003MlN6aXlJRnkKM1U3ajRISkVqbG9paGU2dlJtUUxEZm5wV0xEdXhvbVJpdURMWU14dHU5VkxweEdIQWdNQkFBR2pnWkl3Z1k4dwpEZ1lEVlIwUEFRSC9CQVFEQWdXZ01Bd0dBMVVkRXdFQi93UUNNQUF3YndZRFZSMFJBUUgvQkdVd1k0SWpkR1Z6CmRDMXlkVzUwYVcxbExYTmtheTF6ZG1NdWNuVnVkR2x0WlhOa2F5NXpkbU9DTVhSbGMzUXRjblZ1ZEdsdFpTMXoKWkdzdGMzWmpMbkoxYm5ScGJXVnpaR3N1YzNaakxtTnNkWE4wWlhJdWJHOWpZV3lDQ1d4dlkyRnNhRzl6ZERBTgpCZ2txaGtpRzl3MEJBUXNGQUFPQ0FRRUFFSUsvOFJqeFBiYy80T2I4MWY4Z2h2dVN3Z0Y0V0dkK3dONVZpSndICngzVm5GWGJ6d1YvMHZreEJ5SDhFR2xLcnRjcTNVMDFvZ0taQVRadW9DYWxLVjZvUHYvNklNbXR4WHMzMk5EeWoKamwvU3FHOXJlMFhRMXBYa2xIVHpIMk9ha0ozWjZ1TUMxSzgrWS9YRUJMYzZibjhYSXpad3N5VDJkZ0RJeTkrNQpkMjZqek9EejZ4Y2h2TzBSNm1ZK2psazJpMzdwSHRiZWxrOExFeE9ObmFNWlZvWWIrYmtRWXZ5MEZQdEhsZ0NnClQycVBWQ3FISmV2cWxIakk3UFQ4YmVlNFVKcHc1Rld4L0FjbU9qd3BjTkZWbkMwaFFtZmNTazNvb2Z4bTViem0KUTd1d1ZaSzBmWDFaVjJvWGNrZEtPMUluNnZpVkpWSzRESzV3MXh3MnBMWHhGUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K
    Service:
      Name:       test-runtime-sdk-svc
      Namespace:  runtimesdk
      Port:       443
  Namespace Selector:
    Match Expressions:
      Key:       kubernetes.io/metadata.name
      Operator:  In
      Values:
        default
Status:
  Conditions:
    Last Transition Time:  2023-05-24T07:32:44Z
    Status:                True
    Type:                  Discovered
  Handlers:
    Failure Policy:  Fail
    Name:            before-cluster-delete.test-runtime-sdk-extensionconfig
    Request Hook:
      API Version:    hooks.runtime.cluster.x-k8s.io/v1alpha1
      Hook:           BeforeClusterDelete
    Timeout Seconds:  10
    Failure Policy:   Fail
    Name:             before-cluster-create.test-runtime-sdk-extensionconfig
    Request Hook:
      API Version:    hooks.runtime.cluster.x-k8s.io/v1alpha1
      Hook:           AfterControlPlaneInitialized
    Timeout Seconds:  10
Events:               <none>

If you look closely, it has fetched the CA bundle correctly from the annotations and both of the Hook showing in the status.

  • Create a Workload Cluster Now –
$ clusterctl generate cluster extension-config-test --flavor development \
--kubernetes-version v1.27.1 \
--control-plane-machine-count=1 \
--worker-machine-count=1 \
> manifests/capi/capi-quickstart.yaml

$ k apply -f manifests/capi/capi-quickstart.yaml
clusterclass.cluster.x-k8s.io/quick-start created
dockerclustertemplate.infrastructure.cluster.x-k8s.io/quick-start-cluster created
kubeadmcontrolplanetemplate.controlplane.cluster.x-k8s.io/quick-start-control-plane created
dockermachinetemplate.infrastructure.cluster.x-k8s.io/quick-start-control-plane created
dockermachinetemplate.infrastructure.cluster.x-k8s.io/quick-start-default-worker-machinetemplate created
kubeadmconfigtemplate.bootstrap.cluster.x-k8s.io/quick-start-default-worker-bootstraptemplate created
cluster.cluster.x-k8s.io/extension-config-test created

Let’s see the logs and ConfigMap if it has created something or not,

I0524 07:40:49.405854       1 hooks.go:52] "DoAfterControlPlaneInitialized is called"
I0524 07:40:49.406022       1 hooks.go:53] "Namespace:" default="ClusterName: " extension-config-test="(MISSING)"
I0524 07:40:49.406093       1 hooks.go:74] "Checking for ConfigMap" extension-config-test-test-extension-hookresponse="(MISSING)"
I0524 07:40:49.421562       1 hooks.go:80] "ConfigMap not found"
I0524 07:40:49.421596       1 hooks.go:92] "Creating ConfigMap"
I0524 07:40:49.437841       1 hooks.go:99] "configmap created successfully"
$ k get configmaps
NAME                                                DATA   AGE
extension-config-test-test-extension-hookresponse   1      76s
kube-root-ca.crt                                    1      26m

Yep, now our config map is up. Let’s test the delete,

$ delete -f manifests/capi/capi-quickstart.yaml

$ k logs -n runtimesdk test-runtime-sdk-5bc665d7b9-725hl --follow
I0524 07:44:08.266319       1 hooks.go:30] "DoBeforeClusterDelete is called"
I0524 07:44:08.266347       1 hooks.go:31] "Namespace:" default="ClusterName: " extension-config-test="(MISSING)"
I0524 07:44:08.268351       1 hooks.go:74] "Checking for ConfigMap" extension-config-test-test-extension-hookresponse="(MISSING)"
I0524 07:44:08.288940       1 hooks.go:86] "ConfigMap found"
I0524 07:44:08.289163       1 hooks.go:105] "Deleting ConfigMap"
$ k get configmaps
NAME               DATA   AGE
kube-root-ca.crt   1      29m

So, now there is now ConfigMap as well, Everything is working fine then 😉

Thanks for reading 🙂

Feedbacks are welcome!

Understanding go.mod and go.sum in Golang projects

In this post, I’ll primarily try to explain how dependency management works in Golang and we use various commands and go.mod files to tackle dependencies.

What is go.mod file?

go.mod is the root dependency module for golang projects and all the modules required for the project are present in the file. This means all the projects we are going to import in our projects will be listed in the file.

go mod init github.com/foo/bar we can create a go.mod file which will include the following content –

module github.com/foo/bar

go 1.17

require (
  github.com/gin-gonic/gin v1.8.1
  github.com/google/uuid v1.3.0
)

require (
  github.com/onsi/ginkgo v1.16.5 // indirect
  github.com/onsi/gomega v1.24.1 // indirect
)

Types of dependencies

  • Direct Dependency – It is the project that our project is directly using in the code.
  • Indirect Dependency – It is the module that our project is not using but some other module in our project is using. (e.g. – //indirect in the above go.mod file)

What is go mod tidy?

go mod tidy is the command to ensure that go.mod file has all the project dependencies listed. Also, if there is some dependency listed in the go.mod file that is not used by the project it will remove those.

What is go mod vendor?

So, with go vendor, a vendor directory will be created and all the dependencies will be stored in that directory. So next time instead of downloading from the internet go and take those dependencies from the vendor directory.

What is go.sum? Is it some kind of locking file?

go.mod file contains 100% information to build the project. But, go.sum contains cryptographic checksums to ensure provided module. Typically it contains all checksum for direct & indirect dependencies in a project so that the go.sum file is larger than the go.mod.

If someone clones your repository and they will receive an error if there is a mismatch in their downloaded copies and entries in the go.sum.

So, go.sum is not a lock file it’s an alternative dependency management system.

In addition, go.sum will be used to get local copies of the cache in the system for the further builds present in the $GOPATH/pkg/mod directory.

Tests your MongoDB code with mtest in Golang

What is Unit Testing?

Unit testing is a method we follow to test the smallest piece of code that can be logically isolated in a project often a function.

In Golang we have our standard library called testing. We use it to test a function and check various scenarios and our function is behaving the way we want.

Why do we need Unit Testing?

That’s an interesting question we use it for convenience and maintainability of our project in long term. Once the project has gone bigger & bigger it will be very hard for the developer to test that every corner of the project is running properly if some part of the code is changed. So we use testing for that suppose some developer has changed some part of the code & they can run the test and see everything is running properly.

What is Mocking?

Suppose you have written a web API or have written something related to a Database so when you will be testing you will require some kind of similar environment like a Server or Database to test your code. But, it’s very expensive and inconvenient to have some server or database to test the code. So we use something called mocking which consists of some kind of fake environment for testing and will provide similar features to the actual environment.

Testing MongoDB

Now you know what is testing and mocking and have written some awesome MongoDB and now you want to test the code. MongoDB provides something for us which is mtest. (P.S. – It states that “mtest is unstable and there is no backward compatibility guarantee. It is experimental and subject to change.”). So using it is a little tricky.

Creating Mock Environment

To create a mock deployment in mtest we do the following –

mt := mtest.New(t, mtest.NewOptions().DatabaseName("test-db").ClientType(mtest.Mock))

It will create and return a mock MongoDB deployment upon which we can run our tests.

Sample Go code to test

Suppose you have the following Go struct data

type Data struct {
	ID           primitive.ObjectID `json:"id" bson:"_id"`
    DataID       string             `json:"data_id" bson:"data_id"`
	Name         string             `json:"name" bson:"name"`
	PublishedAt  time.Time          `json:"publishedAt" bson:"publishedAt"`
}

You have the following method for this data struct dealing with MongoDB.

func Create(ctx context.Context, data Data, col *mongo.Collection) (string, error) {
	_, err := col.InsertOne(ctx, data)
	if err != nil {
		return "", err
	}

	return "Data Created", nil
}
func Get(ctx context.Context, dataID string, col *mongo.Collection) (Data, error) {
	res := col.FindOne(ctx, bson.M{
		"data_id": dataID,
	})

	if res.Err() != nil {
		return Data{}, res.Err()
	}

	var data Data
	err := res.Decode(&data)
	if err != nil {
		return Data{}, err
	}

	return data, nil
}

Writing tests for the above code

Create

By default when you run the mtest for a mock deployment it will create a collection mt.Coll and delete it after the test has been done. So, below which we will write a typical success command –

func TestCreateData(t *testing.T) {
	t.Parallel()

	ctx := context.Background()
	mt := mtest.New(t, mtest.NewOptions().DatabaseName("test-db").ClientType(mtest.Mock))
	defer mt.Close()

	testCases := []struct {
		name         string
		data         Data
		prepareMock  func(mt *mtest.T)
		want         string
		wantErr      bool
	}{
		{
			name:         "create data successfully",
			data: Data{
				ID:           primitive.NewObjectID(),
				DataID:       uuid.New().String(),
				Name:         "John Doe",
				PublishedAt:  time.Now(),
			},
			prepareMock: func(mt *mtest.T) {
				mt.AddMockResponses(mtest.CreateSuccessResponse())
			},
			want:    "Data Created",
			wantErr: false,
		},
	}

	for _, tt := range testCases {
		mt.Run(tt.name, func(mt *mtest.T) {
			tt.prepareMock(mt)

			got, err := Create(ctx, tt.data, mt.Coll)

			if tt.wantErr {
				assert.Errorf(t, err, "Want error but got: %v", err)
			} else {
				assert.NoErrorf(t, err, "Not expecting error but got: %v", err)
			}

			assert.Equalf(t, tt.want, got, "want: %v, but got: %v", tt.want, got)
		})
	}
}

What we are doing here –

  • Creating the MongoDB mock deployment(Also closing it after it is done by defer).
  • Defining a struct slice with the necessary data needed to test and operating over the slice so that in long term we can add more scenarios without writing the same code again and again.
  • prepareMock – It’s where we are creating the mock response on the collection. This means when you will be calling the target method the mock response will be returned and you will check whether your function is behaving correctly with the response scenario or not.
  • Lastly, you will call the Create method and assert the return values with the desired values or not.

Thanks to mtest that it provides a function to create a mock response for some successful insertion of data in the DB that is mtest.CreateSuccessResponse()

Get

Here if you see the code you will see that there are three scenarios in the code –

  • Get data successfully.
  • If no filter matches then it returns an error.
  • If data inside the Db is corrupted then it will throw some decode error.
func TestGetData(t *testing.T) {
	t.Parallel()

	ctx := context.Background()
	mt := mtest.New(t, mtest.NewOptions().DatabaseName("test-db").ClientType(mtest.Mock))
	defer mt.Close()

	id := primitive.NewObjectID()
	dataid := uuid.New().String()
	publishedAt := time.Now().UTC().Truncate(time.Millisecond)

	testCases := []struct {
		name         string
		dataID       string
		prepareMock  func(mt *mtest.T)
		want         Data
		wantErr      bool
	}{
		{
			name:         "get Data Successfully",
			prepareMock: func(mt *mtest.T) {
				first := mtest.CreateCursorResponse(1, "foo.bar", mtest.FirstBatch, bson.D{
					{Key: "_id", Value: id},
					{Key: "data_id", Value: dataid},
					{Key: "name", Value: "John Doe"},
					{Key: "publishedAt", Value: publishedAt},
				})
				killCursor := mtest.CreateCursorResponse(0, "foo.bar", mtest.NextBatch)

				mt.AddMockResponses(first, killCursor)
			},
			dataID: dataid,
			want: Data{
				ID:           id,
				DataID:       dataid,
				Name:         "John Doe",
				PublishedAt:  publishedAt,
			},
			wantErr: false,
		},
		{
			name:         "get decode error",
			prepareMock: func(mt *mtest.T) {
                // The name expect a `string` but in the DB there is `integer` stored
				// So, while decoding it will throw an error.
				first := mtest.CreateCursorResponse(1, "foo.bar", mtest.FirstBatch, bson.D{
					{Key: "_id", Value: id},
					{Key: "data_id", Value: dataid},
					{Key: "name", Value: 1234},
					{Key: "publishedAt", Value: publishedAt},
				})
				killCursor := mtest.CreateCursorResponse(0, "foo.bar", mtest.NextBatch)

				mt.AddMockResponses(first, killCursor)
			},
			dataID:     dataid,
			want:       Data{},
			wantErr:    true,
		},
		{
			name:         "wrong data ID",
			prepareMock: func(mt *mtest.T) {
				mt.AddMockResponses(bson.D{
					{Key: "ok", Value: 1},
					{Key: "acknowledged", Value: true},
					{Key: "n", Value: 0},
				})
			},
			dataID: uuid.NewString(),
			want:       Data{},
			wantErr:    true,
		},
	}

	for _, tt := range testCases {
		mt.Run(tt.name, func(mt *mtest.T) {
			tt.prepareMock(mt)

			got, err := Get(ctx, tt.dataID, mt.Coll)

			if tt.wantErr {
				assert.Errorf(t, err, "Want error but got: %v", err)
			} else {
				assert.NoErrorf(t, err, "Not expecting error but got: %v", err)
			}

			assert.Equalf(t, tt.want, got, "want: %v, but got: %v", tt.want, got)
		})
	}
}

What we are doing here –

  • As usual, initiating mock MongoDB deployment.
  • Defining a struct slice with necessary fields for testing. Also, define some Data struct fields so that it stays unique throughout the test case.
  • Scenario 1 (Successful Get of Data) – Here we define a mock response for the collection –
first := mtest.CreateCursorResponse(1, "foo.bar", mtest.FirstBatch, bson.D{
					{Key: "_id", Value: id},
					{Key: "data_id", Value: dataid},
					{Key: "name", Value: "John Doe"},
					{Key: "publishedAt", Value: publishedAt},
				})
				killCursor := mtest.CreateCursorResponse(0, "foo.bar", mtest.NextBatch)

				mt.AddMockResponses(first, killCursor)

Once we call the Get function the collections will return these mock responses and will not throw any error and return the data.

  • Scenario 2 (Decode Error) – Here we will return a mock response that will consist of an integer field in a place where it should be a string.
first := mtest.CreateCursorResponse(1, "foo.bar", mtest.FirstBatch, bson.D{
					{Key: "_id", Value: id},
					{Key: "data_id", Value: dataid},
					{Key: "name", Value: 1234},
					{Key: "publishedAt", Value: publishedAt},
				})
				killCursor := mtest.CreateCursorResponse(0, "foo.bar", mtest.NextBatch)

				mt.AddMockResponses(first, killCursor)

Here the Get function will get the response and will try to decode it wouldn’t be able to do it and will return a decode error.

  • Scenario 3 (No Filter Match) – Here we will create a mock response that will consist of zero response.
mt.AddMockResponses(bson.D{
					{Key: "ok", Value: 1},
					{Key: "acknowledged", Value: true},
					{Key: "n", Value: 0},
				})

Here the Get method will get n = 0 means no filter match and throw an error and that’s the correct behavior.

Thanks 🙂

Check for Kubernetes deployment with client-go library

For the past couple of days, I have been tinkering with the client-go library. It provides the necessary interfaces and methods by which you can manipulate the Kubernetes cluster resources from your go code. After exploring for a while I started working on a side project that does some checking over deployment and if the deployment doesn’t have a certain environment variable it will delete the deployment other wise it will keep it as it is.

Setup

In this blog, I am not going to give idea about how to set up a go project.

First, create a directory named app and create another directory inside it called service. Now create a file named init.go inside the service directory.

package service

import (
	"log"
	"os"
	"path/filepath"

	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

// Initializes the kube config clientset
func Init() *kubernetes.Clientset {
	config, err := rest.InClusterConfig()
	if err != nil {
		kubeconfig := filepath.Join("home", "aniruddha", ".kube", "config")
		if envvar := os.Getenv("KUBECONFIG"); len(envvar) > 0 {
			kubeconfig = envvar
		}

		config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
		if err != nil {
			log.Fatalf("kubeconfig can't be loaded: %v\n", err)
		}
	}

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatalf("error getting config client: %v\n", err)
	}

	return clientset
}

In the above code example, we call InClusterConfig first and that actually gives back the config object that contains a common attribute that can be passed to a Kubernetes client on initialization. If we couldn’t find the config we look for the Kube config in the default location in most of the Linux.

After we got the config now it’s time for initializing a client. We do it by NewForConfig method. It returns a clientset that contains the client’s resources for each group. Like the pods can be accessed by the corev1 group in the clientset struct.

Check for deployments

Create another directory under the app dir named client.

package client

import (
	"fmt"
	"log"
	"time"

	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/tools/cache"
)

const (
	ENVNAME = "TEST_ENV_NAME"
)

// Check for Deployment and start a go routine if new deployment added
func (c *Client) CheckDeploymentEnv(ns string) {
	informerFactory := informers.NewSharedInformerFactory(c.C, 30*time.Second)

	deploymentInformer := informerFactory.Apps().V1().Deployments()
	deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			log.Println("Deployment added. Let's start checking!")

			ch := make(chan error, 1)
			done := make(chan bool)

			go c.check(ns, ch, done)

		loop:
			for {
				select {
				case err := <-ch:
					log.Fatalf("error checking envvar: %v", err)
				case <-done:
					break loop
				}
			}
		},
	})

	informerFactory.Start(wait.NeverStop)
	informerFactory.WaitForCacheSync(wait.NeverStop)
}

Now in the CheckDeploymentEnv method, we first going to create the NewSharedInformerFactory which is going to give us back an interface that can be helpful to retrieve various resources from the local cache of the cluster. Then we can handle various events like add, update, delete, etc in the cluster and take action accordingly.

Then we add another function in the same file as above.

func (c *Client) check(namespace string, ch chan error, done chan bool) {
	deployments, err := ListDeploymentWithNamespace(namespace, c.C)
	if err != nil {
		ch <- fmt.Errorf("list deployment: %s", err.Error())
	}

	for _, deployment := range deployments.Items {
		var envSet bool
		for _, cntr := range deployment.Spec.Template.Spec.Containers {
			for _, env := range cntr.Env {
				if env.Name == ENVNAME {
					log.Printf("Deployment name: %s has envvar. All set to go!", deployment.Name)
					envSet = true
				}
			}
		}
		if !envSet {
			log.Printf("No envvar name %s - Deleting deployment with name %s\n", ENVNAME, deployment.Name)
			err = DeleteDeploymentWithNamespce(namespace, deployment.Name, c.C)
			if err != nil {
				ch <- err
			}
		}
	}
	done <- true
}

Here we list the deployments(covered next) and for every deployment, we check for env variables and delete them if we found that the env variable is missing. And pass true to the done channel if everything is successful otherwise pass the error to the other channel.

Deployment Handler

Create another file named deployment.go in the client directory.

package client

import (
	"fmt"
	"log"

	v1 "k8s.io/api/apps/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
)

// List deployment resource with the given namespace
func ListDeploymentWithNamespace(ns string, clientset *kubernetes.Clientset) (*v1.DeploymentList, error) {
	deployment, err := clientset.AppsV1().Deployments(ns).List(ctx, metav1.ListOptions{})
	if err != nil {
		return nil, err
	}
	return deployment, nil
}

// Delete deployment resource with the given namespace
func DeleteDeploymentWithNamespce(ns, name string, clientset *kubernetes.Clientset) error {
	err := clientset.AppsV1().Deployments(ns).Delete(ctx, name, metav1.DeleteOptions{})
	if err != nil {
		if errors.IsNotFound(err) {
			log.Printf("Deployment don't exists with name %s\n", name)
			return nil
		} else {
			return fmt.Errorf("delete Deployment: %v", err)
		}
	}
	log.Printf("Deployment deleted with name: %v\n", name)

	return nil
}

Here we have two methods one for listing the deployments and another for deleting them. Here we directly get the resources from clients means we are querying them on the Kubernetes API server unlike previously from the local in-memory cache.

Now create another file name client.go in the client directory. and use the code below.

package client

import (
	"context"

	"k8s.io/client-go/kubernetes"
)

var (
	ctx = context.TODO()
)

type Client struct {
	C *kubernetes.Clientset
}

// Return a new Client
func NewClient() *Client {
	return &Client{}
}

main.go

package main

import (
	"flag"
	"log"

	"github.com/aniruddha2000/yosemite/app/client"
	"github.com/aniruddha2000/yosemite/app/service"
)

func main() {
	var nameSpace string

	flag.StringVar(&nameSpace, "ns", "test-ns",
		"namespace name on which the checking is going to take place")

	log.Printf("Checking Pods for namespace %s\n", nameSpace)
	c := client.NewClient()
	c.C = service.Init()

	c.CheckDeploymentEnv(nameSpace)
}

Here we are just taking the namespace from the flag and calling all the necessary functions mentioned in the entire article.

Run the app in the Kubernetes cluster

In order to run the app in the cluster, we have to set up CusterRole & ClusterRoleBinding for the default service account for the pod.

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: pod-namespace-clusterrole
rules:
  - apiGroups: ["apps"]
    resources: ["deployments"]
    verbs: ["list", "delete"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pod-namespace-clusterrolebinding
subjects:
  - kind: ServiceAccount
    name: default
    namespace: default
roleRef:
  kind: ClusterRole
  name: pod-namespace-clusterrole
  apiGroup: rbac.authorization.k8s.io

Then you have to build the project and make a docker image out of it using docker build, docker tag & docker push command. Then create a deployment YAML template mentioned below and apply that.

apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: client
  name: client
spec:
  replicas: 1
  selector:
    matchLabels:
      app: client
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: client
    spec:
      containers:
      - image: <YOUR DOCKER IMAGE>
        name: client-app
        resources: {}
status: {}

Here is my GitHub URL for the project – https://github.com/aniruddha2000/yosemite/

You can find how to run the project in the README of the mentioned GitHub URL above.

Advantages of Golang sync.RWMutex over sync.Mutex

First of all, let’s understand what mutex is and why we use it. Mutex is a locking mechanism that protects shared data in a multi-threaded program where multiple threads are accessing the data concurrently. If we don’t use mutex then race conditions might happen in the program that will lead to inconsistent data throughout the program.

There are two types of Mutex in Golang –

  • sync.Mutex
    It protects the shared data both in reading & writing. This means if one thread is reading/writing another thread can’t read/write into the data. And if there is multiple thread reading then the read will happen one by one by each thread.
package main

import (
	"fmt"
	"sync"
	"time"
)

type SyncData struct {
	lock sync.Mutex
	wg   sync.WaitGroup
}

func main() {
	// m := map[int]int{}

	var sc SyncData

	sc.wg.Add(7)

	go readLoop(&sc)
	go readLoop(&sc)
	go readLoop(&sc)
	go readLoop(&sc)
	go writeLoop(&sc)
	go writeLoop(&sc)
	go writeLoop(&sc)

	sc.wg.Wait()
}

func writeLoop(sc *SyncData) {
	sc.lock.Lock()
	time.Sleep(1 * time.Second)
	fmt.Println("Write lock")
	fmt.Println("Write unlock")
	sc.lock.Unlock()
	sc.wg.Done()
}

func readLoop(sc *SyncData) {
	sc.lock.Lock()
	time.Sleep(1 * time.Second)
	fmt.Println("Read lock")
	fmt.Println("Read unlock")
	sc.lock.Unlock()
	sc.wg.Done()
}

Playground

Here you can see the write will block both read/write and the read will block the write as well as read. [E.G – You can see the delay between the read print statements]

  • Sync.RWMutex
    Now we know if the data is the same and the system is read-heavy it is ok to allow multiple threads to read from the data as there won’t be any conflict. So we use RWMutex instead where the idea is any number of readers can acquire the read lock at the same time but only one writer will be able to acquire the write lock at a time.
package main

import (
	"fmt"
	"sync"
	"time"
)

type SyncData struct {
	lock sync.RWMutex
	wg   sync.WaitGroup
}

func main() {
	// m := map[int]int{}

	var sc SyncData

	sc.wg.Add(7)

	go readLoop(&sc)
	go readLoop(&sc)
	go readLoop(&sc)
	go readLoop(&sc)
	go writeLoop(&sc)
	go writeLoop(&sc)
	go writeLoop(&sc)

	sc.wg.Wait()
}

func writeLoop(sc *SyncData) {
	sc.lock.Lock()
	time.Sleep(1 * time.Second)
	fmt.Println("Write lock")
	fmt.Println("Write unlock")
	sc.lock.Unlock()
	sc.wg.Done()
}

func readLoop(sc *SyncData) {
	sc.lock.RLock()
	time.Sleep(1 * time.Second)
	fmt.Println("Read lock")
	fmt.Println("Read unlock")
	sc.lock.RUnlock()
	sc.wg.Done()
}

Playground

Here you can see the write is blocking read & write but read is not blocking any read. Multiple threads is able to read at the same time. [E.G. – You can see the delay is write print statement but you won’t see any delay in read print statement]

Create a GraphQL API using Golang – Part 1

For the past couple of days, I have been tinkering with GraphQL and followed an awesome blog post, and created my own GraphQL API. But the blog post only contains Creating Links and Getting Links. I understand all the components and started extending the application to support the Update & Delete as well.

You should complete this and follow my blog to extend the app. Let’s start –

Get A Single Link

At first, add a query in the GraphQL Schema – graph/schema.graphqls

type Query {
  links: [Link!]!
  link(id: ID!): Link!
}

Then run the $ go run github.com/99designs/gqlgen generate

You will see a resolver being created in schema.resolvers.go with the below function signature –

func (r *queryResolver) Link(ctx context.Context, id string) (*model.Link, error) {

Now go to the internal/links/links.go and add a Get method to get the Link with respect to the id from the database.

func Get(id string) Links {
	var link Links
	stmt, err := database.Db.Prepare("SELECT ID, Title, Address FROM Links WHERE ID=?")
	if err != nil {
		log.Fatal(err)
	}
	defer stmt.Close()

	err = stmt.QueryRow(id).Scan(&link.ID, &link.Title, &link.Address)
	if err != nil {
		log.Fatal(err)
	}
	return link
}

Now it’s time for the resolver to come in picture –

func (r *queryResolver) Link(ctx context.Context, id string) (*model.Link, error) {
	link := links.Get(id)
	return &model.Link{
		ID:      link.ID,
		Title:   link.Title,
		Address: link.Address,
	}, nil
}

Update A Link

Now is the time to update an existing link. Now this time as we are writing into the database we have to use mutations.

type Mutation {
   updateLink(id: ID!, input: NewLink!): Link!
}

Now let’s generate the same using $ go run github.com/99designs/gqlgen generate

And add the Update method in the links.go –

func (link Links) Update(id string) int64 {
	stmt, err := database.Db.Prepare("UPDATE Links SET Title=? , Address=? WHERE ID=?")
	if err != nil {
		log.Fatal(err)
	}
	defer stmt.Close()

	res, err := stmt.Exec(link.Title, link.Address, id)
	if err != nil {
		log.Fatal(err)
	}

	rowsAffected, err := res.RowsAffected()
	if err != nil {
		log.Fatal(err)
	}
	return rowsAffected
}

Here we are taking the id input and doing an update operation on to it and returning the affected rows count.

Now it’s time for the resolvers –

func (r *mutationResolver) UpdateLink(ctx context.Context, id string, input model.NewLink) (*model.Link, error) {
	link := links.Links{
		Title:   input.Title,
		Address: input.Address,
	}
	rowsAffected := link.Update(id)
	if rowsAffected == 0 {
		return nil, errors.New("zero rows affected")
	}
	return &model.Link{
		ID:      id,
		Title:   link.Title,
		Address: link.Address,
	}, nil
}

Here we are taking the GraphQL input and doing an update operation on that and returning the updated value with an in-between affected rows check for 0.

Delete A Link

Now as usual add the mutation first in the schema –

type Mutation {
    deleteLink(id: ID!): String!
}

Now let’s generate the same using $ go run github.com/99designs/gqlgen generate

Now add the code in the links.go to perform the delete operation –

func Delete(id string) int64 {
	stmt, err := database.Db.Prepare("DELETE FROM Links WHERE ID=?")
	if err != nil {
		log.Fatal(err)
	}
	defer stmt.Close()

	res, err := stmt.Exec(id)
	if err != nil {
		log.Fatal(err)
	}

	rowsAffected, err := res.RowsAffected()
	if err != nil {
		log.Fatal(err)
	}
	return rowsAffected
}

Here we run the delete query and get the rows affected count and return it.

Now it’s time to resolve it –

func (r *mutationResolver) DeleteLink(ctx context.Context, id string) (string, error) {
	rowsAffected := links.Delete(id)
	if rowsAffected == 0 {
		return "", errors.New("zero rows affected")
	}
	return fmt.Sprintf("%v rows affected", rowsAffected), nil
}

Here we call the delete with the desired id string as an input and if got deleted successfully then we return a string with “<number> rows affected”.

Create a key Value storage using Golang – Part 2

In the previous blog I have discussed about how to make a key-value storage with a in memory storage. Now I am going to discuss about how you can extend this to use a file system storage.

Let’s first define our structure which is going to hold the information about the storage –

type DiskFS struct {
	FS             filesystem.Fs
	RootFolderName string
}

Now for this project we are going to create our own filesystem implementation we can use afero but I decided to use my own implementation for more learning.

First create a directory in the root folder called filesystem and create a file called fs.go and write the following code

package filesystem

import (
	"io"
	"os"
)

type FileSystem struct {
	Fs
}

type File interface {
	io.Closer
	io.Reader
	io.ReaderAt
	io.Seeker
	io.Writer
	io.WriterAt

	Name() string
	Readdir(count int) ([]os.FileInfo, error)
	Stat() (os.FileInfo, error)
	Sync() error
	WriteString(s string) (ret int, err error)
}

type Fs interface {
	Create(name string) (File, error)
	Mkdir(name string, perm os.FileMode) error
	Open(name string) (File, error)
	OpenFile(name string, flag int, perm os.FileMode) (File, error)
	Stat(name string) (os.FileInfo, error)
	Remove(name string) error
}

Now as we are going to use os module to implement the filesystem create a file called osfs.go and write the below code

package filesystem

import (
	"os"
)

type OsFs struct {
	Fs
}

// Return a File System for OS
func NewOsFs() Fs {
	return &OsFs{}
}

func (OsFs) Create(name string) (File, error) {
	file, err := os.Create(name)
	if err != nil {
		return nil, err
	}
	return file, nil
}

func (OsFs) Open(name string) (File, error) {
	file, err := os.Open(name)
	if err != nil {
		return nil, err
	}
	return file, err
}

func (OsFs) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
	file, err := os.OpenFile(name, flag, perm)
	if err != nil {
		return nil, err
	}
	return file, nil
}

func (OsFs) Mkdir(name string, perm os.FileMode) error {
	return os.Mkdir(name, perm)
}

func (OsFs) Stat(name string) (os.FileInfo, error) {
	return os.Stat(name)
}

func (OsFs) Remove(name string) error {
	return os.Remove(name)
}

Now let’s create utility functions to handle some situations. create a utils.go file and write the code

package filesystem

import (
	"os"
)

func DirExists(fs Fs, name string) (bool, error) {
	file, err := fs.Stat(name)
	if err == nil && file.IsDir() {
		return true, nil
	}
	if os.IsNotExist(err) {
		return false, nil
	}
	return false, err
}

func Exists(fs Fs, name string) (bool, error) {
	_, err := fs.Stat(name)
	if err == nil {
		return true, nil
	}
	if os.IsNotExist(err) {
		return false, nil
	}
	return false, err
}

func ReadDir(fs Fs, dirName string) ([]os.FileInfo, error) {
	dir, err := fs.Open(dirName)
	if err != nil {
		return nil, err
	}
	defer dir.Close()
	list, err := dir.Readdir(-1)
	if err != nil {
		return nil, err
	}
	return list, nil
}

func ReadFile(fs Fs, name string) ([]byte, error) {
	file, err := fs.Open(name)
	if err != nil {
		return nil, err
	}
	defer file.Close()
	data, err := os.ReadFile(name)
	if err != nil {
		return nil, err
	}
	return data, nil
}

Ok so our filesystem is done. Now it’s time to write the code for file system based storage implementation. Write the below code inside records.go file

// Return the Disk structure file system
func NewDisk(rootFolder string) *DiskFS {
	diskFs := filesystem.NewOsFs()
	ok, err := filesystem.DirExists(diskFs, rootFolder)
	if err != nil {
		log.Fatalf("Dir exists: %v", err)
	}
	if !ok {
		err := diskFs.Mkdir(rootFolder, os.ModePerm)
		if err != nil {
			log.Fatalf("Create dir: %v", err)
		}
	}
	return &DiskFS{FS: diskFs, RootFolderName: rootFolder}
}

// Store key, value in the file system
func (d *DiskFS) Store(key, val string) {
	file, err := d.FS.Create(d.RootFolderName + "/" + key)
	if err != nil {
		log.Fatalf("Create file: %v", err)
	}
	defer file.Close()

	_, err = file.Write([]byte(val))
	if err != nil {
		log.Fatalf("Writing file: %v", err)
	}
}

func (d *DiskFS) List() map[string]string {
	m := make(map[string]string, 2)
	dir, err := filesystem.ReadDir(d.FS, d.RootFolderName)
	if err != nil {
		log.Fatalf("Error reading the directory: %v", err)
	}

	for _, fileName := range dir {
		content, err := filesystem.ReadFile(d.FS, d.RootFolderName+"/"+fileName.Name())
		if err != nil {
			log.Fatalf("Error reading the file: %v", err)
		}
		m[fileName.Name()] = string(content)
	}
	return m
}

func (d *DiskFS) Get(key string) (string, error) {
	ok, err := filesystem.Exists(d.FS, d.RootFolderName+"/"+key)
	if err != nil {
		log.Fatalf("File exist: %v", err)
	}

	if ok {
		file, err := filesystem.ReadFile(d.FS, d.RootFolderName+"/"+key)
		if err != nil {
			log.Fatalf("Error reading the file: %v", err)
		}
		return string(file), nil
	}
	return "", errors.New("key not found")
}

func (d *DiskFS) Delete(key string) error {
	ok, err := filesystem.Exists(d.FS, d.RootFolderName+"/"+key)
	if err != nil {
		log.Fatalf("File exist: %v", err)
	}
	if ok {
		err = d.FS.Remove(d.RootFolderName + "/" + key)
		if err != nil {
			log.Fatalf("Delete file err: %v", err)
		}
		return nil
	}
	return errors.New("key not found")
}

Now if you run the app with -storage-type=disk flag you can access all the file system based operation and you can see a directory gets created called storage and inside it the file created and in the file the content is the value.

In the next part I am going to write the tests for the application.

Create a key Value storage using Golang – Part 1

A few days back I gave a interview in a company for Golang developer intern. There they asked me about a lot of questions and some I cracked and some I couldn’t cracked. The things I failed miserably was Go interfaces and Go Concurrency(Channels & atomic package). I took it as a motivation and learnt the Go interface again and wrote a blog on that. Now It was time for me to develop a project so my knowledge become more concrete. Nabarun gave a idea to make a key-value storage using Go that will support multiple storage types.

So I started building the project. First create a directory api/models from the root of your directory. Then create a file called records.go and write the following code –

type InMemory struct {
	Data map[string]string `json:"data"`
}

This code will be responsible for in memory storage of the key – value storage. Now lets define some interface that will be help us when we will be enhancing our app to support more storage systems like file structure. Add the following code in records.go

type StorageSetter interface {
	Store(string, string)
}

type StorageGetter interface {
	List() map[string]string
	Get(string) (string, error)
}

type StorageDestroyer interface {
	Delete(string) error
}

type Storage interface {
	StorageSetter
	StorageGetter
	StorageDestroyer
}

Now let’s see why I have written multiple interface like setter and Getter? Because it is always best practice to keep the interface small. It actually helps increasing abstraction.

Now let’s define a helper function in records.go that will return the InMemory structs.

// Return In Memory struct
func NewCache() *InMemory {
	return &InMemory{Data: make(map[string]string, 2)}
}

Let’s now create the methods in records.go which will do operations on the struct –

func (r *InMemory) Store(key, val string) {
	r.Data[key] = val
}

func (r *InMemory) List() map[string]string {
	return r.Data
}

func (r *InMemory) Get(key string) (string, error) {
	val, ok := r.Data[key]
	if !ok {
		return "", errors.New("key not found")
	}
	return val, nil
}

func (r *InMemory) Delete(key string) error {
	_, ok := r.Data[key]
	if !ok {
		return errors.New("key not found")
	}
	delete(r.Data, key)
	return nil
}

Now let’s define our server so create a directory api/controllers and create base.go file inside it. And write the code inside it.

type Server struct {
	Router *http.ServeMux
	Cache  models.Storage
}

This Server struct will contain dependency of the server which is typically the router and the storage interface.

Now create a server.go inside the api directory and write the code –

package api

import (
	"flag"

	"github.com/aniruddha2000/goEtcd/api/controllers"
)

var server controllers.Server

// Initialize and run the server
func Run() {
	var storageType string

	flag.StringVar(&storageType, "storage-type", "in-memory",
		"Define the storage type that will be used in the server. By defaut the value is in-memory.")
	flag.Parse()

	server.Initialize(storageType)
	server.Run("8888")
}

Here you can see it is taking the flag from the command line and passing it in the Initialize method and calling Run method to run the server in the port 8888. Now let’s define these two Initialize & Run method in the base.go file –

func (s *Server) Initialize(storageType string) {
	s.Router = http.NewServeMux()

	switch storageType {
	case "in-memory":
		s.Cache = models.NewCache()
	case "disk":
		s.Cache = models.NewDisk()
	default:
		log.Fatal("Use flags `in-memory` or `disk`")
	}

	log.Printf("Starting server with %v storage", storageType)

	s.initializeRoutes()
}

// Run the server on desired port and logs the status
func (s *Server) Run(addr string) {
	cert, err := tls.LoadX509KeyPair("localhost.crt", "localhost.key")
	if err != nil {
		log.Fatalf("Couldn't load the certificate: %v", cert)
	}

	server := &http.Server{
		Addr:    ":" + addr,
		Handler: s.Router,
		TLSConfig: &tls.Config{
			Certificates: []tls.Certificate{cert},
		},
	}

	fmt.Println("Listenning to port", addr)
	log.Fatal(server.ListenAndServeTLS("", ""))
}

Here you can see Initialize method is setting the router for the server and then setting the storage for different storage and at the last it is initializing the routes.

In the Run method it is loading the certificate and setting up the server and running the server at the end.

Now let’s define initializeRoutes function that we saw in the last initialize method in the base.go. Create a routes.go file in side api/controllers

func (s *Server) initializeRoutes() {
	s.Router.HandleFunc("/record", s.Create)
	s.Router.HandleFunc("/records", s.List)
	s.Router.HandleFunc("/get/record", s.Get)
	s.Router.HandleFunc("/del/record", s.Delete)
}

Now we will see the implementation of the route controllers. Create a cache.go file inside the api/controllers and paste the below code –

package controllers

import (
	"log"
	"net/http"

	j "github.com/aniruddha2000/goEtcd/api/json"
)

func (s *Server) Create(w http.ResponseWriter, r *http.Request) {
	if r.Method == "POST" {
		r.ParseForm()
		key := r.Form["key"]
		val := r.Form["val"]

		for i := 0; i < len(key); i++ {
			s.Cache.Store(key[i], val[i])
		}

		j.JSON(w, r, http.StatusCreated, "Record created")
	} else {
		j.JSON(w, r, http.StatusBadRequest, "POST Request accepted")
	}
}

func (s *Server) List(w http.ResponseWriter, r *http.Request) {
	if r.Method == "GET" {
		records := s.Cache.List()
		j.JSON(w, r, http.StatusOK, records)
	} else {
		j.JSON(w, r, http.StatusBadRequest, "GET Request accepted")
	}
}

func (s *Server) Get(w http.ResponseWriter, r *http.Request) {
	if r.Method == "GET" {
		keys, ok := r.URL.Query()["key"]
		if !ok || len(keys[0]) < 1 {
			log.Println("Url Param 'key' is missing")
			return
		}
		key := keys[0]

		val, err := s.Cache.Get(key)
		if err != nil {
			j.JSON(w, r, http.StatusNotFound, err.Error())
			return
		}
		j.JSON(w, r, http.StatusOK, map[string]string{key: val})
	} else {
		j.JSON(w, r, http.StatusBadRequest, "POST Request accepted")
	}
}

func (s *Server) Delete(w http.ResponseWriter, r *http.Request) {
	if r.Method == "DELETE" {
		keys, ok := r.URL.Query()["key"]
		if !ok || len(keys[0]) < 1 {
			log.Println("Url Param 'key' is missing")
			return
		}
		key := keys[0]

		err := s.Cache.Delete(key)
		if err != nil {
			j.JSON(w, r, http.StatusNotFound, err.Error())
			return
		}
		j.JSON(w, r, http.StatusNoContent, map[string]string{"data": "delete"})
	} else {
		j.JSON(w, r, http.StatusBadRequest, "DELETE Request accepted")
	}
}

Here you can see the controller that will handle different route traffics and call the record methods and doing the operations.

I have creates a helper JSON method to reduce redundant code while writing the route controllers. create a api/json directory and crate a json.go file. Paste the code below –

func JSON(w http.ResponseWriter, r *http.Request, statusCode int, data interface{}) {
	w.Header().Set("Location", fmt.Sprintf("%s%s", r.Host, r.RequestURI))
	w.WriteHeader(statusCode)
	json.NewEncoder(w).Encode(data)
}

In the next part I will walk you though how to extend this application to support disk based storage along side with In-Memory storage.

Golang Interface Simplified

What is Interface?

Interface is used for abstraction. It contains one or more method signatures. Below is an example of how we define interface.

type Human interface {
	speak()
}

Why we use interface?

In simple term interfaces are the contract for the methods for different structure type. To increase the code readability and maintenance we use interface. Let’s say there is Person datatype in my application and all the methods mentioned above actually implement the Person data type.

type Person struct {
	First string
	Last  string
}

Now let’s say the method mentioned in the interface actually implement the Person struct

func (p Person) speak() {
	fmt.Println("I am Person ", p.First)
}

Now the interesting part our software got a new requirement of adding another data type called SecretAgent.

type SecretAgent struct {
	Person Person
	Org    string
}

Now we define another method speak() for the SecretAgent data type.

func (s SecretAgent) speak() {
	fmt.Println("I am secret agent ", s.Person.First)
}

Now we can take the help of interface and the power of abstraction. We define a function that will take the interface and call the speak method.

func Earthling(h Human) {
	fmt.Println("Hey there I am from planet Earth")
	h.speak()
}

Understand what happened above? The Indian function take the human interface and call the speak method and we don’t have to specify for which data type the speak is going to work it will be managed by the go interfaces. So, it reduced a lot of hard coding and our design is future ready to accept more data type.

Let’s see the main function.

func main() {
	sa1 := SecretAgent{
		Person: Person{First: "James", Last: "Bond"},
		Org:    "MI6",
	}
	sa2 := SecretAgent{
		Person: Person{First: "Ajit", Last: "Doval"},
		Org:    "RAW",
	}
	p1 := Person{First: "Dr.", Last: "Strange"}

	Earthling(sa1)
	Earthling(sa2)
	Earthling(p1)
}

How to get environment variable value with the help of struct tags?

Struct Tags

Go struct tags are annotations that appear after the type in a Go struct declaration. Each tag is composed of short strings associated with some corresponding value.

A struct tag looks like this, with the tag offset with backtick “ characters:

type User struct {
	Name string `example:"name"`
}

We can write other go code to examine the tags and do some cool stuff with it. The tags don’t change the main go struct behavior.

How to get environment variable value by the struct tags?

The classic example of populating the struct fields by the environment variable is by doing os.Getenv() again and again and manually populating the values. But in this case the work is to much repeat and error prone if the struct is too large.

type EmailConfig struct {
	Email string `env:"EMAIL"`
}

func main() {
	cfg := EmailConfig{
		Email: os.Getenv("EMAIL")
	}
}

But can we improve it? Yes we can by using the reflection

import (
	"fmt"
	"os"
	"reflect"
)

type Config struct {
	Email    string `env:"EMAIL"`
}

const tagName = "env"

func LoadConfig(q *Config) {
	v := reflect.ValueOf(q).Elem()
	if v.Kind() == reflect.Struct {
		val := reflect.TypeOf(q).Elem()
		for i := 0; i < val.NumField(); i++ {
			field := val.Field(i)
			tag := field.Tag.Get(tagName)
			fmt.Printf("field : %v | tagName : %v\n", field.Name, tag)
			envVal := os.Getenv(tag)
			reflect.ValueOf(q).Elem().FieldByName(field.Name).Set(reflect.ValueOf(envVal))
		}
	}
}

func main() {
	var cfg Config
	LoadConfig(&cfg)
	fmt.Println(cfg)
}
  • Here we have initiated a empty cfg structs and passed the reference in the LoadConfig function.
  • In the LoadConfig function we have iterated over the struct field.
  • we extract the tag fields and set the field values with the environment variable value extracted by the tag name.