Skip to content

Commit

Permalink
feat(zbchaos): port-forward to random local port (#465)
Browse files Browse the repository at this point in the history
All commands now provide `0` as the local port which results in a free
port picked at random. This makes it easier to run two zbchaos commands
at the same time without getting port conflicts.
  • Loading branch information
lenaschoenburg authored Dec 18, 2023
2 parents 1583862 + 95b043d commit 5596014
Show file tree
Hide file tree
Showing 12 changed files with 28 additions and 37 deletions.
3 changes: 1 addition & 2 deletions go-chaos/backend/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
)

func ConnectToZeebeCluster(k8Client internal.K8Client) (zbc.Client, func(), error) {
port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)

zbClient, err := internal.CreateZeebeClient(port)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions go-chaos/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ func takeBackup(flags *Flags) error {
return err
}

port := 9600
closePortForward := k8Client.MustGatewayPortForward(port, port)
port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()
url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, flags.backupId)
resp, err := http.Post(url, "", nil)
Expand All @@ -217,8 +216,7 @@ func waitForBackup(flags *Flags) error {
panic(err)
}

port := 9600
closePortForward := k8Client.MustGatewayPortForward(port, port)
port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()

for {
Expand Down
6 changes: 2 additions & 4 deletions go-chaos/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ func printCurrentTopology(flags *Flags) error {
return err
}

port := 9600
closePortForward := k8Client.MustGatewayPortForward(port, port)
port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()

topology, err := QueryTopology(port)
Expand All @@ -90,8 +89,7 @@ func waitForChange(flags *Flags) error {
return err
}

port := 9600
closePortForward := k8Client.MustGatewayPortForward(port, port)
port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()

interval := time.Second * 5
Expand Down
6 changes: 2 additions & 4 deletions go-chaos/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ Defaults to the later, which is useful for experimenting with deployment distrib
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
Expand All @@ -62,8 +61,7 @@ Useful for experimenting with deployment distribution.`,
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
Expand Down
6 changes: 2 additions & 4 deletions go-chaos/cmd/exporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ func AddExportingCmds(rootCmd *cobra.Command, flags *Flags) {
}

func pauseExporting(k8Client internal.K8Client) error {
port := 9600
closePortForward := k8Client.MustGatewayPortForward(port, port)
port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()
url := fmt.Sprintf("http://localhost:%d/actuator/exporting/pause", port)
resp, err := http.Post(url, "", nil)
Expand All @@ -69,8 +68,7 @@ func pauseExporting(k8Client internal.K8Client) error {
}

func resumeExporting(k8Client internal.K8Client) error {
port := 9600
closePortForward := k8Client.MustGatewayPortForward(port, port)
port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()
url := fmt.Sprintf("http://localhost:%d/actuator/exporting/resume", port)
resp, err := http.Post(url, "", nil)
Expand Down
3 changes: 1 addition & 2 deletions go-chaos/cmd/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ func AddPublishCmd(rootCmd *cobra.Command, flags *Flags) {
k8Client, err := createK8ClientWithFlags(flags)
panicOnError(err)

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
Expand Down
3 changes: 1 addition & 2 deletions go-chaos/cmd/stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ func AddStressCmd(rootCmd *cobra.Command, flags *Flags) {
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
Expand Down
3 changes: 1 addition & 2 deletions go-chaos/cmd/terminate.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ func AddTerminateCommand(rootCmd *cobra.Command, flags *Flags) {
// GracePeriod (in second) can be nil, which would mean using K8 default.
// Returns the broker which has been restarted
func restartBroker(k8Client internal.K8Client, nodeId int, partitionId int, role string, gracePeriod *int64) string {
port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
Expand Down
3 changes: 1 addition & 2 deletions go-chaos/cmd/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func AddTopologyCmd(rootCmd *cobra.Command, flags *Flags) {
panic(err)
}

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)
defer closeFn()

client, err := internal.CreateZeebeClient(port)
Expand Down
9 changes: 3 additions & 6 deletions go-chaos/cmd/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ Process instances are created until the required partition is reached.`,
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
Expand Down Expand Up @@ -88,8 +87,7 @@ Process instances are created until the required partition is reached.`,
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
Expand Down Expand Up @@ -118,8 +116,7 @@ Process instances are created until the required partition is reached.`,
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
Expand Down
14 changes: 11 additions & 3 deletions go-chaos/internal/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,12 @@ func (c K8Client) RestartPod(podName string) error {

// MustGatewayPortForward creates a port forwarding to a zeebe gateway with the given port.
// Panics when port forwarding fails.
// localPort can be 0 to let the OS choose a random, free port.
// Returns the exposed local port and a function to close the port forwarding.
//
// https://github.com/gruntwork-io/terratest/blob/master/modules/k8s/tunnel.go#L187-L196
// https://github.com/kubernetes/client-go/issues/51#issuecomment-436200428
func (c K8Client) MustGatewayPortForward(localPort int, remotePort int) func() {
func (c K8Client) MustGatewayPortForward(localPort int, remotePort int) (int, func()) {
names, err := c.GetGatewayPodNames()
if err != nil {
panic(err)
Expand Down Expand Up @@ -246,8 +249,13 @@ func (c K8Client) MustGatewayPortForward(localPort int, remotePort int) func() {
LogVerbose("\nError starting port forwarding tunnel: %s", err)
panic(err)
case <-portForwarder.Ready:
LogVerbose("Successfully created port forwarding tunnel")
return func() {
ports, err := portForwarder.GetPorts()
if err != nil {
panic(err)
}
exposedLocalPort := ports[0].Local
LogVerbose("Successfully created port forwarding tunnel from %d (local) to %d (remote)", exposedLocalPort, remotePort)
return int(exposedLocalPort), func() {
portForwarder.Close()
}
}
Expand Down
3 changes: 1 addition & 2 deletions go-chaos/worker/chaos_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ func getTargetClusterVersion(namespace string) string {
return ""
}

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
port, closeFn := k8Client.MustGatewayPortForward(0, 26500)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
Expand Down

0 comments on commit 5596014

Please sign in to comment.