Skip to content

Commit

Permalink
Use docker image for the workers (#447)
Browse files Browse the repository at this point in the history
Allow to specify a dockerImageTag as global parameter, to be used by the
deployed worker (and maybe later starter, etc.)

This is to make sure that we use the same client version as the
deployed/target zeebe cluster.

closes #446
  • Loading branch information
ChrisKujawa authored Dec 6, 2023
2 parents 1a04a25 + fca064c commit 1549c1c
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 17 deletions.
2 changes: 1 addition & 1 deletion go-chaos/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ The workers can be used as part of some chaos experiments to complete process in
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

err = k8Client.CreateWorkerDeployment()
err = k8Client.CreateWorkerDeployment(DockerImageTag)
ensureNoError(err)

internal.LogInfo("Worker successfully deployed to the current namespace: %s", k8Client.GetCurrentNamespace())
Expand Down
2 changes: 2 additions & 0 deletions go-chaos/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var Version = "development"
var Commit = "HEAD"
var Verbose bool
var JsonLogging bool
var DockerImageTag string = "zeebe"

func NewCmd() *cobra.Command {
flags := Flags{}
Expand All @@ -102,6 +103,7 @@ func NewCmd() *cobra.Command {
rootCmd.PersistentFlags().BoolVarP(&JsonLogging, "jsonLogging", "", false, "json logging output")
rootCmd.PersistentFlags().StringVar(&flags.kubeConfigPath, "kubeconfig", "", "path the the kube config that will be used")
rootCmd.PersistentFlags().StringVarP(&flags.namespace, "namespace", "n", "", "connect to the given namespace")
rootCmd.PersistentFlags().StringVarP(&DockerImageTag, "dockerImageTag", "", "", "use the given docker image tag for deployed resources, e.g. worker/starter")

AddBackupCommand(rootCmd, &flags)
AddBrokersCommand(rootCmd, &flags)
Expand Down
1 change: 1 addition & 0 deletions go-chaos/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func Test_ShouldBeAbleToRunExperiments(t *testing.T) {
vars := make(map[string]interface{})
vars["clusterPlan"] = "test" // specifies the cluster plan for which we read the experiments
vars["clusterId"] = "" // need to be set to empty string, otherwise we run into a SIGSEG
vars["zeebeImage"] = "gcr.io/zeebe-io/zeebe:SNAPSHOT"

commandStep3, err := zeebeClient.NewCreateInstanceCommand().BPMNProcessId("chaosToolkit").LatestVersion().VariablesFromMap(vars)
require.NoError(t, err)
Expand Down
16 changes: 9 additions & 7 deletions go-chaos/internal/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import (
"embed"
"errors"
"fmt"
"strings"

v12 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
"strings"
)

// k8Deployments holds our static k8 manifests, which are copied with the go:embed directive
Expand All @@ -49,8 +48,10 @@ func (c K8Client) getGatewayDeployment() (*v12.Deployment, error) {
}
return &deploymentList.Items[0], err
}

func (c K8Client) CreateWorkerDeployment() error {
func (c K8Client) CreateWorkerDeploymentDefault() error {
return c.CreateWorkerDeployment("zeebe")
}
func (c K8Client) CreateWorkerDeployment(dockerImageTag string) error {
workerBytes, err := k8Deployments.ReadFile("manifests/worker.yaml")
if err != nil {
return err
Expand All @@ -65,17 +66,18 @@ func (c K8Client) CreateWorkerDeployment() error {
return err
}

container := &deployment.Spec.Template.Spec.Containers[0]
if !c.SaaSEnv {
// We are in self-managed environment
// We have to update the service url such that our workers can connect
// We expect that the used helm release name is == to the namespace name

// JAVA_OPTIONS
envVar := deployment.Spec.Template.Spec.Containers[0].Env[0]
envVar := container.Env[0]
envVar.Value = strings.Replace(envVar.Value, "zeebe-service:26500", fmt.Sprintf("%s-zeebe-gateway:26500", c.GetCurrentNamespace()), 1)
deployment.Spec.Template.Spec.Containers[0].Env[0] = envVar
container.Env[0] = envVar
}

container.Image = strings.Replace(container.Image, "REPLACE", dockerImageTag, 1)
_, err = c.Clientset.AppsV1().Deployments(c.GetCurrentNamespace()).Create(context.TODO(), deployment, metav1.CreateOptions{})

if err != nil {
Expand Down
45 changes: 41 additions & 4 deletions go-chaos/internal/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func Test_ShouldDeployWorkerDeployment(t *testing.T) {
k8Client := CreateFakeClient()

// when
err := k8Client.CreateWorkerDeployment()
err := k8Client.CreateWorkerDeploymentDefault()

// then
require.NoError(t, err)
Expand All @@ -115,13 +115,31 @@ func Test_ShouldDeployWorkerDeployment(t *testing.T) {
assert.Contains(t, deploymentList.Items[0].Spec.Template.Spec.Containers[0].Env[0].Value, "-Dapp.brokerUrl=testNamespace-zeebe-gateway:26500")
}

func Test_ShouldDeployWorkerDeploymentWithDifferentDockerImage(t *testing.T) {
// given
k8Client := CreateFakeClient()

// when
err := k8Client.CreateWorkerDeployment("testTag")

// then
require.NoError(t, err)
deploymentList, err := k8Client.Clientset.AppsV1().Deployments(k8Client.GetCurrentNamespace()).List(context.TODO(), metav1.ListOptions{})
require.NoError(t, err)

assert.Equal(t, 1, len(deploymentList.Items))
assert.Equal(t, "worker", deploymentList.Items[0].Name)
assert.Contains(t, deploymentList.Items[0].Spec.Template.Spec.Containers[0].Env[0].Value, "-Dapp.brokerUrl=testNamespace-zeebe-gateway:26500")
assert.Contains(t, deploymentList.Items[0].Spec.Template.Spec.Containers[0].Image, "testTag")
}

func Test_ShouldNotReturnErrorWhenWorkersAlreadyDeployed(t *testing.T) {
// given
k8Client := CreateFakeClient()
_ = k8Client.CreateWorkerDeployment()
_ = k8Client.CreateWorkerDeploymentDefault()

// when
err := k8Client.CreateWorkerDeployment()
err := k8Client.CreateWorkerDeploymentDefault()

// then
require.NoError(t, err)
Expand All @@ -138,7 +156,25 @@ func Test_ShouldDeployWorkerInSaas(t *testing.T) {
k8Client.createSaaSCRD(t)

// when
err := k8Client.CreateWorkerDeployment()
err := k8Client.CreateWorkerDeploymentDefault()

// then
require.NoError(t, err)
deploymentList, err := k8Client.Clientset.AppsV1().Deployments(k8Client.GetCurrentNamespace()).List(context.TODO(), metav1.ListOptions{})
require.NoError(t, err)

assert.Equal(t, 1, len(deploymentList.Items))
assert.Equal(t, "worker", deploymentList.Items[0].Name)
assert.Contains(t, deploymentList.Items[0].Spec.Template.Spec.Containers[0].Env[0].Value, "-Dapp.brokerUrl=zeebe-service:26500")
}

func Test_ShouldDeployWorkerInSaasWithDifferentDockerImageTag(t *testing.T) {
// given
k8Client := CreateFakeClient()
k8Client.createSaaSCRD(t)

// when
err := k8Client.CreateWorkerDeployment("testTag")

// then
require.NoError(t, err)
Expand All @@ -148,4 +184,5 @@ func Test_ShouldDeployWorkerInSaas(t *testing.T) {
assert.Equal(t, 1, len(deploymentList.Items))
assert.Equal(t, "worker", deploymentList.Items[0].Name)
assert.Contains(t, deploymentList.Items[0].Spec.Template.Spec.Containers[0].Env[0].Value, "-Dapp.brokerUrl=zeebe-service:26500")
assert.Contains(t, deploymentList.Items[0].Spec.Template.Spec.Containers[0].Image, "testTag")
}
2 changes: 1 addition & 1 deletion go-chaos/internal/manifests/worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ spec:
spec:
containers:
- name: worker
image: gcr.io/zeebe-io/worker:zeebe
image: gcr.io/zeebe-io/worker:REPLACE
imagePullPolicy: Always
env:
- name: JAVA_OPTIONS
Expand Down
14 changes: 13 additions & 1 deletion go-chaos/worker/chaos_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/camunda/zeebe/clients/go/v8/pkg/entities"
Expand Down Expand Up @@ -52,6 +53,9 @@ type ZbChaosVariables struct {
ClusterPlan *string
// the target cluster for our chaos experiment
ClusterId *string
// the zeebe docker image used for the chaos experiment
// used later for workers and starter to use the right client versions
ZeebeImage string
// the chaos provider, which contain details to the chaos experiment
Provider ChaosProvider
}
Expand Down Expand Up @@ -89,8 +93,16 @@ func HandleZbChaosJob(client worker.JobClient, job entities.Job, commandRunner C
clusterAccessArgs = append(clusterAccessArgs, "--namespace", *jobVariables.ClusterId+"-zeebe")
} // else we run local against our k8 context

dockerImageSplit := strings.Split(jobVariables.ZeebeImage, ":")
if len(dockerImageSplit) <= 1 {
errorMsg := fmt.Sprintf("%s. Error on running command. [key: %d, variables: %v].", "Expected to read a dockerImage and split on ':', but read '"+jobVariables.ZeebeImage+"'", job.Key, job.Variables)
internal.LogInfo(errorMsg)
_, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(job.Retries - 1).ErrorMessage(errorMsg).Send(ctx)
return
}

commandArgs := append(clusterAccessArgs, jobVariables.Provider.Arguments...)
commandArgs = append(commandArgs, "--verbose", "--jsonLogging")
commandArgs = append(commandArgs, "--verbose", "--jsonLogging", "--dockerImageTag", dockerImageSplit[1])

err = commandRunner(commandArgs, commandCtx)
if err != nil {
Expand Down
42 changes: 39 additions & 3 deletions go-chaos/worker/chaos_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,35 @@ func Test_ShouldFailToHandleReadExperimentsJobWithoutPayload(t *testing.T) {
assert.Equal(t, 0, fakeJobClient.RetriesVal)
}

func Test_ShouldFailWhenDockerImageIsNotSet(t *testing.T) {
// given
fakeJobClient := &FakeJobClient{}
variables := createZbChaosVariables()
variables.ZeebeImage = ""
jsonString, err := json.Marshal(variables)
commandRunner := func(args []string, ctx context.Context) error {
return nil // success
}

require.NoError(t, err)
job := entities.Job{
ActivatedJob: &pb.ActivatedJob{
Key: 123,
ProcessInstanceKey: 456,
Retries: 1,
Variables: string(jsonString),
},
}

// when
HandleZbChaosJob(fakeJobClient, job, commandRunner)

// then
assert.True(t, fakeJobClient.Failed)
assert.Equal(t, 123, fakeJobClient.Key)
assert.Equal(t, 0, fakeJobClient.RetriesVal)
}

func Test_ShouldHandleCommand(t *testing.T) {
// given
fakeJobClient := &FakeJobClient{}
Expand Down Expand Up @@ -96,7 +125,9 @@ func Test_ShouldHandleCommand(t *testing.T) {
"disconnect", "gateway",
"--all",
"--verbose",
"--jsonLogging"}
"--jsonLogging",
"--dockerImageTag",
"test"}
assert.Equal(t, expectedArgs, appliedArgs)
}

Expand Down Expand Up @@ -133,7 +164,9 @@ func Test_ShouldHandleCommandForSelfManagedWhenNoClusterId(t *testing.T) {
"disconnect", "gateway",
"--all",
"--verbose",
"--jsonLogging"}
"--jsonLogging",
"--dockerImageTag",
"test"}
assert.Equal(t, expectedArgs, appliedArgs)
}

Expand Down Expand Up @@ -209,7 +242,9 @@ func Test_ShouldFailJobWhenHandleFails(t *testing.T) {
"disconnect", "gateway",
"--all",
"--verbose",
"--jsonLogging"}
"--jsonLogging",
"--dockerImageTag",
"test"}
assert.Equal(t, expectedArgs, appliedArgs)
}

Expand All @@ -231,6 +266,7 @@ func createZbChaosVariables() ZbChaosVariables {
Path: "zbchaos",
Arguments: []string{"disconnect", "gateway", "--all"},
},
ZeebeImage: "gcr.io/zeebe-io/zeebe:test",
}
return variables
}

0 comments on commit 1549c1c

Please sign in to comment.