diff --git a/client/events/EventClient.go b/client/events/EventClient.go index a73cbbb0cb..921c2e9351 100644 --- a/client/events/EventClient.go +++ b/client/events/EventClient.go @@ -286,6 +286,7 @@ func (impl *EventRESTClientImpl) sendEvent(event Event) (bool, error) { impl.logger.Errorw("error while UpdateJiraTransition request ", "err", err) return false, err } + defer resp.Body.Close() impl.logger.Debugw("event completed", "event resp", resp) return true, err } diff --git a/internal/middleware/instrument.go b/internal/middleware/instrument.go index 086eee1f5e..142154c14a 100644 --- a/internal/middleware/instrument.go +++ b/internal/middleware/instrument.go @@ -117,6 +117,16 @@ var DeploymentStatusCronDuration = promauto.NewHistogramVec(prometheus.Histogram Name: "deployment_status_cron_process_time", }, []string{"cronName"}) +var TerminalSessionRequestCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "initiate_terminal_session_request_counter", + Help: "count of requests for initiated, established and closed terminal sessions", +}, []string{"sessionAction", "isError"}) + +var TerminalSessionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "terminal_session_duration", + Help: "duration of each terminal session", +}, []string{"podName", "namespace", "clusterId"}) + // prometheusMiddleware implements mux.MiddlewareFunc. func PrometheusMiddleware(next http.Handler) http.Handler { // prometheus.MustRegister(requestCounter) @@ -134,3 +144,11 @@ func PrometheusMiddleware(next http.Handler) http.Handler { requestCounter.WithLabelValues(path, method, strconv.Itoa(d.Status())).Inc() }) } + +func IncTerminalSessionRequestCounter(sessionAction string, isError string) { + TerminalSessionRequestCounter.WithLabelValues(sessionAction, isError).Inc() +} + +func RecordTerminalSessionDurationMetrics(podName, namespace, clusterId string, sessionDuration float64) { + TerminalSessionDuration.WithLabelValues(podName, namespace, clusterId).Observe(sessionDuration) +} diff --git a/pkg/clusterTerminalAccess/UserTerminalAccessService.go b/pkg/clusterTerminalAccess/UserTerminalAccessService.go index 30f9968f2b..08dfe78b93 100644 --- a/pkg/clusterTerminalAccess/UserTerminalAccessService.go +++ b/pkg/clusterTerminalAccess/UserTerminalAccessService.go @@ -393,7 +393,7 @@ func (impl *UserTerminalAccessServiceImpl) closeAndCleanTerminalSession(accessSe } func (impl *UserTerminalAccessServiceImpl) closeSession(sessionId string) { - impl.terminalSessionHandler.Close(sessionId, 1, "Process exited") + impl.terminalSessionHandler.Close(sessionId, 1, terminal.ProcessExitedMsg) } func (impl *UserTerminalAccessServiceImpl) extractMetadataString(request *models.UserTerminalSessionRequest) string { diff --git a/pkg/k8s/K8sCommonService.go b/pkg/k8s/K8sCommonService.go index e75b5a26f3..ddd3acbc0f 100644 --- a/pkg/k8s/K8sCommonService.go +++ b/pkg/k8s/K8sCommonService.go @@ -220,7 +220,12 @@ func (impl *K8sCommonServiceImpl) GetManifestsByBatch(ctx context.Context, reque defer cancel() go func() { ans := impl.getManifestsByBatch(ctx, requests) - ch <- ans + select { + case <-ctx.Done(): + return + default: + ch <- ans + } }() select { case ans := <-ch: diff --git a/pkg/terminal/constants.go b/pkg/terminal/constants.go new file mode 100644 index 0000000000..44164e5f88 --- /dev/null +++ b/pkg/terminal/constants.go @@ -0,0 +1,6 @@ +package terminal + +const ( + SessionTerminated = "SessionTerminated" + SessionInitiating = "SessionInitiating" +) diff --git a/pkg/terminal/terminalSesion.go b/pkg/terminal/terminalSesion.go index f53f24c767..eb59eefbcf 100644 --- a/pkg/terminal/terminalSesion.go +++ b/pkg/terminal/terminalSesion.go @@ -19,9 +19,11 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" + errors2 "errors" "fmt" "github.com/caarlos0/env" "github.com/devtron-labs/common-lib/utils/k8s" + "github.com/devtron-labs/devtron/internal/middleware" "github.com/devtron-labs/devtron/pkg/argoApplication" "github.com/devtron-labs/devtron/pkg/cluster" "github.com/devtron-labs/devtron/pkg/cluster/repository" @@ -31,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "log" "net/http" + "strconv" "strings" "sync" "time" @@ -44,6 +47,7 @@ import ( ) const END_OF_TRANSMISSION = "\u0004" +const ProcessExitedMsg = "Process exited" // PtyHandler is what remotecommand expects from a pty type PtyHandler interface { @@ -54,11 +58,17 @@ type PtyHandler interface { // TerminalSession implements PtyHandler (using a SockJS connection) type TerminalSession struct { - id string - bound chan error - sockJSSession sockjs.Session - sizeChan chan remotecommand.TerminalSize - doneChan chan struct{} + id string + bound chan error + sockJSSession sockjs.Session + sizeChan chan remotecommand.TerminalSize + doneChan chan struct{} + context context.Context + contextCancelFunc context.CancelFunc + podName string + namespace string + clusterId string + startedOn time.Time } // TerminalMessage is the messaging protocol between ShellController and TerminalSession. @@ -166,6 +176,15 @@ func (sm *SessionMap) Set(sessionId string, session TerminalSession) { sm.Sessions[sessionId] = session } +func (sm *SessionMap) SetTerminalSessionStartTime(sessionId string) { + sm.Lock.Lock() + defer sm.Lock.Unlock() + if session, ok := sm.Sessions[sessionId]; ok { + session.startedOn = time.Now() + sm.Sessions[sessionId] = session + } +} + // Close shuts down the SockJS connection and sends the status code and reason to the client // Can happen if the process exits or if there is an error starting up the process // For now the status code is unused and reason is shown to the user (unless "") @@ -178,11 +197,23 @@ func (sm *SessionMap) Close(sessionId string, status uint32, reason string) { if err != nil { log.Println(err) } + isErroredConnectionTermination := isConnectionClosedByError(status) + middleware.IncTerminalSessionRequestCounter(SessionTerminated, strconv.FormatBool(isErroredConnectionTermination)) + middleware.RecordTerminalSessionDurationMetrics(terminalSession.podName, terminalSession.namespace, terminalSession.clusterId, time.Since(terminalSession.startedOn).Seconds()) + close(terminalSession.doneChan) + terminalSession.contextCancelFunc() delete(sm.Sessions, sessionId) } } +func isConnectionClosedByError(status uint32) bool { + if status == 2 { + return true + } + return false +} + var terminalSessions = SessionMap{Sessions: make(map[string]TerminalSession)} // handleTerminalSession is Called by net/http for any new /api/sockjs connections @@ -243,7 +274,7 @@ func CreateAttachHandler(path string) http.Handler { // startProcess is called by handleAttach // Executed cmd in the container specified in request and connects it up with the ptyHandler (a session) -func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config, +func startProcess(ctx context.Context, k8sClient kubernetes.Interface, cfg *rest.Config, cmd []string, ptyHandler PtyHandler, sessionRequest *TerminalSessionRequest) error { namespace := sessionRequest.Namespace podName := sessionRequest.PodName @@ -262,17 +293,18 @@ func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config, TerminalSizeQueue: ptyHandler, Tty: true, } - - err = execWithStreamOptions(exec, streamOptions) + isErroredConnectionTermination := false + middleware.IncTerminalSessionRequestCounter(SessionInitiating, strconv.FormatBool(isErroredConnectionTermination)) + terminalSessions.SetTerminalSessionStartTime(sessionRequest.SessionId) + err = execWithStreamOptions(ctx, exec, streamOptions) if err != nil { return err } - return nil } -func execWithStreamOptions(exec remotecommand.Executor, streamOptions remotecommand.StreamOptions) error { - return exec.Stream(streamOptions) +func execWithStreamOptions(ctx context.Context, exec remotecommand.Executor, streamOptions remotecommand.StreamOptions) error { + return exec.StreamWithContext(ctx, streamOptions) } func getExecutor(k8sClient kubernetes.Interface, cfg *rest.Config, podName, namespace, containerName string, cmd []string, stdin bool, tty bool) (remotecommand.Executor, error) { @@ -344,32 +376,39 @@ var validShells = []string{"bash", "sh", "powershell", "cmd"} // Waits for the SockJS connection to be opened by the client the session to be bound in handleTerminalSession func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *TerminalSessionRequest) { + session := terminalSessions.Get(request.SessionId) + sessionCtx := session.context + timedCtx, _ := context.WithTimeout(sessionCtx, 60*time.Second) select { - case <-terminalSessions.Get(request.SessionId).bound: - close(terminalSessions.Get(request.SessionId).bound) + case <-session.bound: + close(session.bound) var err error if isValidShell(validShells, request.Shell) { cmd := []string{request.Shell} - err = startProcess(k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request) + err = startProcess(sessionCtx, k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request) } else { // No Shell given or it was not valid: try some shells until one succeeds or all fail // FIXME: if the first Shell fails then the first keyboard event is lost for _, testShell := range validShells { cmd := []string{testShell} - if err = startProcess(k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request); err == nil { + if err = startProcess(sessionCtx, k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request); err == nil || errors2.Is(err, context.Canceled) { break } } } - if err != nil { + if err != nil && !errors2.Is(err, context.Canceled) { terminalSessions.Close(request.SessionId, 2, err.Error()) return } - terminalSessions.Close(request.SessionId, 1, "Process exited") + terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg) + case <-timedCtx.Done(): + // handle case when connection has not been initiated from FE side within particular time + close(session.bound) + terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg) } } @@ -432,10 +471,17 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR return statusCode, nil, err } req.SessionId = sessionID + sessionCtx, cancelFunc := context.WithCancel(context.Background()) terminalSessions.Set(sessionID, TerminalSession{ - id: sessionID, - bound: make(chan error), - sizeChan: make(chan remotecommand.TerminalSize), + id: sessionID, + bound: make(chan error), + sizeChan: make(chan remotecommand.TerminalSize), + doneChan: make(chan struct{}), + context: sessionCtx, + contextCancelFunc: cancelFunc, + podName: req.PodName, + namespace: req.Namespace, + clusterId: strconv.Itoa(req.ClusterId), }) config, client, err := impl.getClientConfig(req) @@ -559,7 +605,7 @@ func (impl *TerminalSessionHandlerImpl) RunCmdInRemotePod(req *TerminalSessionRe buf := &bytes.Buffer{} errBuf := &bytes.Buffer{} impl.logger.Debug("reached execWithStreamOptions method call") - err = execWithStreamOptions(exec, remotecommand.StreamOptions{ + err = execWithStreamOptions(context.Background(), exec, remotecommand.StreamOptions{ Stdout: buf, Stderr: errBuf, })