Skip to content

Commit

Permalink
Retry mechanism for streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
filiptronicek committed Nov 2, 2023
1 parent 9e5af99 commit a09ad5e
Showing 1 changed file with 39 additions and 27 deletions.
66 changes: 39 additions & 27 deletions components/local-app/pkg/common/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/bufbuild/connect-go"
v1 "github.com/gitpod-io/gitpod/components/public-api/go/experimental/v1"
"github.com/sirupsen/logrus"
)

type WorkspaceDisplayData struct {
Expand Down Expand Up @@ -49,7 +50,6 @@ func GetWorkspaceRepo(ws *v1.Workspace) string {
return repository
}

// workspace.Msg.Result.Status.Instance.Status.Url
func GetWorkspaceUrl(ws *v1.Workspace) string {
if ws == nil || ws.Status == nil ||
ws.Status.Instance == nil || ws.Status.Instance.Status == nil {
Expand Down Expand Up @@ -224,41 +224,53 @@ func ObserveWsUntilStarted(ctx context.Context, workspaceId string) error {
return fmt.Errorf("Workspace already running")
}

stream, err := gitpod.Workspaces.StreamWorkspaceStatus(ctx, connect.NewRequest(&v1.StreamWorkspaceStatusRequest{WorkspaceId: workspaceId}))

if err != nil {
return err
}

fmt.Println("Waiting for workspace to start...")

fmt.Println("Workspace " + HumanizeWorkspacePhase(wsInfo.Msg.GetResult()))

previousStatus := ""

for stream.Receive() {
msg := stream.Msg()
if msg == nil {
fmt.Println("No message received")
maxRetries := 4
retries := 0
for {
stream, err := gitpod.Workspaces.StreamWorkspaceStatus(ctx, connect.NewRequest(&v1.StreamWorkspaceStatusRequest{WorkspaceId: workspaceId}))
if err != nil {
if retries >= maxRetries {
return fmt.Errorf("Failed to stream workspace status after %d retries: %w", maxRetries, err)
}
retries++
logrus.WithFields(logrus.Fields{
"retry": retries,
"maxRetries": maxRetries,
}).Warn("Streaming failed, retrying")
continue
}

if msg.GetResult().Instance.Status.Phase == v1.WorkspaceInstanceStatus_PHASE_RUNNING {
fmt.Println("Workspace running")
return nil
}
previousStatus := ""
for stream.Receive() {
msg := stream.Msg()
if msg == nil {
fmt.Println("No message received")
continue
}

currentStatus := HumanizeWorkspacePhase(wsInfo.Msg.GetResult())
if msg.GetResult().Instance.Status.Phase == v1.WorkspaceInstanceStatus_PHASE_RUNNING {
fmt.Println("Workspace running")
return nil
}

if currentStatus != previousStatus {
fmt.Println("Workspace " + currentStatus)
previousStatus = currentStatus
currentStatus := HumanizeWorkspacePhase(wsInfo.Msg.GetResult())
if currentStatus != previousStatus {
fmt.Println("Workspace " + currentStatus)
previousStatus = currentStatus
}
}
}

if err := stream.Err(); err != nil {
return err
if err := stream.Err(); err != nil {
if retries >= maxRetries {
return fmt.Errorf("Workspace stream ended unexpectedly after %d retries: %w", maxRetries, err)
}
retries++
fmt.Printf("Stream ended unexpectedly, retrying %d/%d\n", retries, maxRetries)
} else {
return fmt.Errorf("Workspace stream ended unexpectedly")
}
}

return fmt.Errorf("Workspace stream ended unexpectedly")
}

0 comments on commit a09ad5e

Please sign in to comment.