Kubernetes Integration Testing

Summary

Infrastructure pipeline testing is essential for ensuring minimal regression, healthy systems, and faster mean time to recovery in patches. I have found it especially useful to perform integration tests to validate environments pre-deployment, to ensure they meet all the technical and compliance requirements.

Use Case

For the test, I have created a sample flask app, deployed on EKS, that will reach out to S3 and grab a file called "test". The flask-app manifest defines a readiness probe to ensure the /test endpoint (that grabs the file from s3) can fire successfully. Our inspec test suite will then ensure the pod deploys and starts. The pytest suite will validate k8s components like IAM authorization through our readiness probe, networking, and has the potential to expand to end to end tests in the future.

Alt Text

If deploying this code set, ensure to replace any <VAR> with an appropriate value for your environment.

from os import environ
from flask import Flask
from boto3 import client, resource
app = Flask(__name__)
@app.route('/')
def init():
try:
return "app is up"
except:
return "environ S3_BUCKET variable not set", 500
@app.route('/test')
def homepage():
try:
s3 = resource('s3')
obj = s3.Object(environ['S3_BUCKET'], 'test')
return obj.get()['Body'].read().decode('utf-8')
except:
return "Error getting param", 403
if __name__ == "__main__":
app.run(host='0.0.0.0')
view raw flask_s3.py hosted with ❤ by GitHub

---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flask-app
labels:
app: flask
spec:
selector:
matchLabels:
app: flask
template:
metadata:
labels:
app: flask
spec:
automountServiceAccountToken: true
serviceAccountName: <SERVICE_ACCOUNT_NAME>
containers:
- name: flask
env:
- name: AWS_DEFAULT_REGION
value: us-east-1
- name: S3_BUCKET
value: <NAME_OF_S3_BUCKET>
image: <IMAGE_REPOSITORY>
imagePullPolicy: Always
livenessProbe:
httpGet:
path: /
port: 5000
scheme: HTTP
ports:
- containerPort: 5000
protocol: TCP
readinessProbe:
httpGet:
path: /test
port: 5000
scheme: HTTP
resources:
requests:
cpu: 250m
memory: 50Mi
view raw deployment.yml hosted with ❤ by GitHub

---
apiVersion: v1
kind: ServiceAccount
metadata:
annotations:
eks.amazonaws.com/role-arn: <ARN_OF_ROLE_FOR_TESTING>
name: s3-reader
view raw sa.yml hosted with ❤ by GitHub

---
apiVersion: v1
kind: Service
metadata:
labels:
app: flask
name: flask-app
spec:
ports:
- port: 80
protocol: TCP
targetPort: 5000
selector:
app: flask
sessionAffinity: None
type: ClusterIP
view raw svc.yml hosted with ❤ by GitHub

Technologies

Inspec

Despite open source offerings, such as inspec-k8s, the backing sdk was written by a company who closed its doors in 2019. Depending on the backing runner for your cluster creation, you can also run into dependencies mismatch, creating a nonstarter execution, see dry types PR. Ultimately, it is possible, but several workarounds are needed in getting started.

To fully utilize inspec, you will have to deploy your application configuration first, as well as install the Kubernetes train. Alternatively, running from bgeesaman's inspec-k8s-runner docker container gives you a nice starting place. Once installed, you can begin to test the various components within your namespace.

Below are some sample resources you can utilize to test against.

### Test your pods exists, and all are running
control "k8s-app-validate" do
  impact 1.0
  title "Validate K8s test Application"
  desc "The k8s-app test app should exist and be running"

  ### Test various namespaces exist
  describe k8sobject(api: 'v1', type: 'namespaces', name: 'default') do
      it { should exist }
  end
  ### Test for every pod in deployment the pod exists, and is running
  k8sobjects(api: 'v1', type: 'pods', namespace: 'default', labelSelector: 'app=flask').items.each do |pod|
    describe "#{pod.namespace}/#{pod.name} pod" do
      subject { k8sobject(api: 'v1', type: 'pods', namespace: pod.namespace, name: pod.name) }
      it { should exist }
      it { should be_running }
    end
  end
  ### Test your service exists
  describe k8sobjects(api: 'v1', type: 'services', namespace: 'default', labelSelector: 'app=flask') do
    it { should exist }
  end
end

Pytest

Vapor has a framework called kubetest, which will deploy a configuration to your cluster in a unique namespace. This framework is meant for deploying and testing, and not necessarily testing existing infrastructure that exists. One of the advantages of this framework is it natively has readiness checks, whereas inspec did not.

Once deployed, you can utilize standard pytest execution to assert against those resources. Below walks through a few scenarios, which will use the configurations provided above.

There are some limitations in the API objects supported, see API Resources.

from boto3 import client
from time import time

# Create And Test Service Account
def create_sa(kube, modifier):
    """
    A helper function to create service account
    """
    sa = kube.load_serviceaccount("configs/sa.yaml")
    account_id = client('sts').get_caller_identity()["Account"]
    role_arn = f"arn:aws:iam::{account_id}:role/test_role"
    sa.obj.metadata.annotations['eks.amazonaws.com/role-arn'] = role_arn
    return sa

def test_create_sa(kube, modifier):
    """
    A function to test the creation of a service account
    Goal: This will test the ability to interface with the k8s cli
    """
    sa = create_sa(kube, modifier)
    kube.create(sa)
    assert sa.is_ready()
# Create and Test Deployment
def create_deployment(kube, sa, modifier):
    """
    A helper function to create a deployment object
    """
    account_id = client('sts').get_caller_identity()["Account"]

    deployment = kube.load_deployment('configs/deployment.yaml')
    repository = f"{account_id}.dkr.ecr.us-east-1.amazonaws.com/k8s-test"
    deployment.obj.spec.template.spec.containers[0].image = repository
    deployment.obj.spec.template.spec.service_account_name = sa.obj.metadata.name
    deployment.obj.spec.template.spec.containers[0].env[1].value = f"quackenbush-test-bucket"
    return deployment

def test_deployment(kube, modifier):
    """
    A function to test the creation of a deployment.
    Goals: If a pod becomes ready, that means it can successful connect to SSM
    """
    sa = create_sa(kube, modifier)
    kube.create(sa)

    deployment = create_deployment(kube, sa, modifier)
    kube.create(deployment)

    timeout = time() + 60 # 60 seconds
    while not deployment.is_ready():
        if time() > timeout:
            # Fail early
            assert deployment.is_ready()

    pods = deployment.get_pods()

    for pod in pods:
        pod.wait_until_containers_start(timeout=60)
        timeout = time() + 300 # 5 Minutes
        while not pod.is_ready():
            if time() > timeout:
                break

        assert pod.is_ready()

Go

Another popular framework that can be executed is terratest. This codebase could be extended to use the same workflows as before, where we have our container communicate with S3. However, for simplicity, I will demonstrate just spinning up a simple web based pod, and then tunneling traffic to verify health.

package test

import (
	"crypto/tls"
	"fmt"
	"path/filepath"
	"testing"
	"time"

	http_helper "github.com/gruntwork-io/terratest/modules/http-helper"
	"github.com/gruntwork-io/terratest/modules/k8s"
)

func TestKubernetesDeployment(t *testing.T) {
	t.Run("test", func(t *testing.T) {
		validateK8s(t)
	})
}
func validateK8s(t *testing.T) {
	kubeResourcePath, _ := filepath.Abs("./configs/")
	podName := "static-web"

	options := k8s.NewKubectlOptions("", "", "default")
	k8s.KubectlApply(t, options, kubeResourcePath)

	defer k8s.KubectlDelete(t, options, kubeResourcePath)
	k8s.WaitUntilPodAvailable(t, options, podName, 30, 5*time.Second)

	// Create Tunnel and validate success
	tunnel := k8s.NewTunnel(options, k8s.ResourceTypePod, podName, 0, 80)
	defer tunnel.Close()
	tunnel.ForwardPort(t)
	tlsConfig := tls.Config{}
	http_helper.HttpGetWithRetryWithCustomValidation(
		t,
		fmt.Sprintf("http://%s", tunnel.Endpoint()),
		&tlsConfig,
		10,
		5*time.Second,
		validateHealth,
	)
}

func validateHealth(statusCode int, body string) bool {
	if statusCode != 200 {
		return false
	}
	// Were going to ignore body, but we could do some test there too
	return true
}