From 6e1c6d0b2cd3e3a875e5b1cf0c938d959786f6a7 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Tue, 7 Jan 2025 18:28:07 +0530 Subject: [PATCH] feat: store the informer events even if the agent is not connected Signed-off-by: Chetan Banavikalmutt --- agent/agent.go | 12 ++-- agent/connection.go | 16 +++-- agent/inbound.go | 16 +++++ agent/outbound.go | 21 ------- pkg/types/types.go | 5 +- principal/apis/eventstream/eventstream.go | 25 ++++++++ .../apis/eventstream/eventstream_test.go | 43 ++++++++++++- principal/apis/eventstream/mock/mock.go | 13 +++- principal/auth.go | 3 +- principal/callbacks.go | 60 +++++++++---------- principal/server.go | 34 +++++++++++ principal/server_test.go | 40 +++++++++++++ 12 files changed, 215 insertions(+), 73 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 84648cf5..20f05e01 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -224,6 +224,12 @@ func (a *Agent) Start(ctx context.Context) error { } }() + // Wait for the app informer to be synced + err := a.appManager.EnsureSynced(waitForSyncedDuration) + if err != nil { + return fmt.Errorf("failed to sync applications: %w", err) + } + if a.remote != nil { a.remote.SetClientMode(a.mode) // TODO: Right now, maintainConnection always returns nil. Revisit @@ -233,12 +239,6 @@ func (a *Agent) Start(ctx context.Context) error { a.emitter = event.NewEventSource(fmt.Sprintf("agent://%s", "agent-managed")) - // Wait for the app informer to be synced - err := a.appManager.EnsureSynced(waitForSyncedDuration) - if err != nil { - return fmt.Errorf("failed to sync applications: %w", err) - } - return err } diff --git a/agent/connection.go b/agent/connection.go index 05cfce8e..674aa372 100644 --- a/agent/connection.go +++ b/agent/connection.go @@ -36,12 +36,14 @@ func (a *Agent) maintainConnection() error { if err != nil { log().Warnf("Could not connect to %s: %v", a.remote.Addr(), err) } else { - err = a.queues.Create(a.remote.ClientID()) - if err != nil { - log().Warnf("Could not create agent queue pair: %v", err) - } else { - a.SetConnected(true) + if !a.queues.HasQueuePair(a.remote.ClientID()) { + err = a.queues.Create(a.remote.ClientID()) + if err != nil { + log().Warnf("Could not create agent queue pair: %v", err) + continue + } } + a.SetConnected(true) } } else { err = a.handleStreamEvents() @@ -214,10 +216,6 @@ func (a *Agent) handleStreamEvents() error { } log().WithField("component", "EventHandler").Info("Stream closed") - err = a.queues.Delete(a.remote.ClientID(), true) - if err != nil { - log().Errorf("Could not remove agent queue: %v", err) - } return nil } diff --git a/agent/inbound.go b/agent/inbound.go index b3ba7c9e..af8cfb2e 100644 --- a/agent/inbound.go +++ b/agent/inbound.go @@ -84,6 +84,14 @@ func (a *Agent) processIncomingApplication(ev *event.Event) error { logCtx.Errorf("Error creating application: %v", err) } case event.SpecUpdate: + if !exists { + logCtx.Debug("Received an Update event for an app that doesn't exists. Creating the incoming app") + if _, err := a.createApplication(incomingApp); err != nil { + return fmt.Errorf("could not create incoming app: %w", err) + } + return nil + } + if !sourceUIDMatch { logCtx.Debug("Source UID mismatch between the incoming app and existing app. Deleting the existing app") if err := a.deleteApplication(incomingApp); err != nil { @@ -150,6 +158,14 @@ func (a *Agent) processIncomingAppProject(ev *event.Event) error { logCtx.Errorf("Error creating appproject: %v", err) } case event.SpecUpdate: + if !exists { + logCtx.Debug("Received an Update event for an appProject that doesn't exists. Creating the incoming appProject") + if _, err := a.createAppProject(incomingAppProject); err != nil { + return fmt.Errorf("could not create incoming appProject: %w", err) + } + return nil + } + if !sourceUIDMatch { logCtx.Debug("Source UID mismatch between the incoming and existing appProject. Deleting the existing appProject") if err := a.deleteAppProject(incomingAppProject); err != nil { diff --git a/agent/outbound.go b/agent/outbound.go index ad7412a1..1ecc5da7 100644 --- a/agent/outbound.go +++ b/agent/outbound.go @@ -26,13 +26,6 @@ func (a *Agent) addAppCreationToQueue(app *v1alpha1.Application) { logCtx := log().WithField("event", "NewApp").WithField("application", app.QualifiedName()) logCtx.Debugf("New app event") - // If the agent is not connected, we ignore this event. It just makes no - // sense to fill up the send queue when we can't send. - if !a.IsConnected() { - logCtx.Trace("Agent is not connected, ignoring this event") - return - } - // Update events trigger a new event sometimes, too. If we've already seen // the app, we just ignore the request then. if a.appManager.IsManaged(app.QualifiedName()) { @@ -66,13 +59,6 @@ func (a *Agent) addAppUpdateToQueue(old *v1alpha1.Application, new *v1alpha1.App return } - // If the agent is not connected, we ignore this event. It just makes no - // sense to fill up the send queue when we can't send. - if !a.IsConnected() { - logCtx.Trace("Agent is not connected, ignoring this event") - return - } - // If the app is not managed, we ignore this event. if !a.appManager.IsManaged(new.QualifiedName()) { logCtx.Tracef("App is not managed") @@ -109,13 +95,6 @@ func (a *Agent) addAppDeletionToQueue(app *v1alpha1.Application) { logCtx := log().WithField("event", "DeleteApp").WithField("application", app.QualifiedName()) logCtx.Debugf("Delete app event") - // If the agent is not connected, we ignore this event. It just makes no - // sense to fill up the send queue when we can't send. - if !a.IsConnected() { - logCtx.Trace("Agent is not connected, ignoring this event") - return - } - if !a.appManager.IsManaged(app.QualifiedName()) { logCtx.Tracef("App is not managed") } diff --git a/pkg/types/types.go b/pkg/types/types.go index 00f0c8d9..c83eca22 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -66,4 +66,7 @@ func (k EventContextKey) String() string { return string(k) } -const ContextAgentIdentifier EventContextKey = "agent_name" +const ( + ContextAgentIdentifier EventContextKey = "agent_name" + ContextAgentMode EventContextKey = "agent_mode" +) diff --git a/principal/apis/eventstream/eventstream.go b/principal/apis/eventstream/eventstream.go index 2c02c3d8..88dcce68 100644 --- a/principal/apis/eventstream/eventstream.go +++ b/principal/apis/eventstream/eventstream.go @@ -25,6 +25,7 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/queue" "github.com/argoproj-labs/argocd-agent/internal/session" "github.com/argoproj-labs/argocd-agent/pkg/api/grpc/eventstreamapi" + "github.com/argoproj-labs/argocd-agent/pkg/types" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" @@ -160,6 +161,17 @@ func (s *Server) newClientConnection(ctx context.Context, timeout time.Duration) return c, nil } +// agentMode gets the agent mode from the context ctx. Returns an error +// if no agent mode is found in the context +func agentMode(ctx context.Context) (string, error) { + agentMode, ok := ctx.Value(types.ContextAgentMode).(string) + if !ok { + return "", fmt.Errorf("invalid context: no agent mode") + } + + return agentMode, nil +} + // onDisconnect must be called whenever client c disconnects from the stream func (s *Server) onDisconnect(c *client) { c.lock.Lock() @@ -271,6 +283,19 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe return fmt.Errorf("panic: nil item in queue") } + mode, err := agentMode(c.ctx) + if err != nil { + return fmt.Errorf("unable to extract the agent mode from context") + } + + if types.AgentModeFromString(mode) != types.AgentModeManaged { + // Only Update events are valid for unmanaged agents + if ev.Type() != event.Update.String() { + logCtx.WithField("type", ev.Type()).Debug("Discarding event for unmanaged agent") + return nil + } + } + eventWriter := s.eventWriters.Get(c.agentName) if eventWriter == nil { return fmt.Errorf("panic: event writer not found for agent %s", c.agentName) diff --git a/principal/apis/eventstream/eventstream_test.go b/principal/apis/eventstream/eventstream_test.go index 58a968c2..46cee26b 100644 --- a/principal/apis/eventstream/eventstream_test.go +++ b/principal/apis/eventstream/eventstream_test.go @@ -20,10 +20,11 @@ import ( "testing" "time" - "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/queue" + "github.com/argoproj-labs/argocd-agent/pkg/types" "github.com/argoproj-labs/argocd-agent/principal/apis/eventstream/mock" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" "github.com/sirupsen/logrus" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,7 +37,10 @@ func Test_Subscribe(t *testing.T) { qs := queue.NewSendRecvQueues() qs.Create("default") s := NewServer(qs) - st := &mock.MockEventServer{AgentName: "default"} + st := &mock.MockEventServer{ + AgentName: "default", + AgentMode: string(types.AgentModeManaged), + } st.AddRecvHook(func(s *mock.MockEventServer) error { log().WithField("component", "RecvHook").Tracef("Entry") ticker := time.NewTicker(500 * time.Millisecond) @@ -110,6 +114,41 @@ func Test_Subscribe(t *testing.T) { assert.Error(t, err) }) + t.Run("Test events being discarded for unmanaged agent", func(t *testing.T) { + qs := queue.NewSendRecvQueues() + qs.Create("default") + s := NewServer(qs) + st := &mock.MockEventServer{ + AgentName: "default", + AgentMode: string(types.AgentModeAutonomous), + } + st.AddRecvHook(func(s *mock.MockEventServer) error { + log().WithField("component", "RecvHook").Tracef("Entry") + ticker := time.NewTicker(500 * time.Millisecond) + <-ticker.C + ticker.Stop() + log().WithField("component", "RecvHook").Tracef("Exit") + return io.EOF + }) + emitter := event.NewEventSource("test") + qs.SendQ("default").Add(emitter.ApplicationEvent( + event.Create, + &v1alpha1.Application{ObjectMeta: v1.ObjectMeta{Name: "foo", Namespace: "test"}}, + )) + qs.SendQ("default").Add(emitter.ApplicationEvent( + event.Update, + &v1alpha1.Application{ObjectMeta: v1.ObjectMeta{Name: "foo", Namespace: "test"}}, + )) + qs.SendQ("default").Add(emitter.ApplicationEvent( + event.Delete, + &v1alpha1.Application{ObjectMeta: v1.ObjectMeta{Name: "foo", Namespace: "test"}}, + )) + err := s.Subscribe(st) + assert.Nil(t, err) + assert.Equal(t, 0, int(st.NumRecv.Load())) + assert.Equal(t, 1, int(st.NumSent.Load())) + }) + } func init() { diff --git a/principal/apis/eventstream/mock/mock.go b/principal/apis/eventstream/mock/mock.go index b2e71f43..9a5e58ca 100644 --- a/principal/apis/eventstream/mock/mock.go +++ b/principal/apis/eventstream/mock/mock.go @@ -38,6 +38,7 @@ type MockEventServer struct { grpc.ServerStream AgentName string + AgentMode string NumSent atomic.Uint32 NumRecv atomic.Uint32 Application v1alpha1.Application @@ -58,11 +59,17 @@ func (s *MockEventServer) AddRecvHook(hook RecvHook) { } func (s *MockEventServer) Context() context.Context { + ctx := context.TODO() + if s.AgentName != "" { - return context.WithValue(context.TODO(), types.ContextAgentIdentifier, s.AgentName) - } else { - return context.TODO() + ctx = context.WithValue(ctx, types.ContextAgentIdentifier, s.AgentName) + } + + if s.AgentMode != "" { + ctx = context.WithValue(ctx, types.ContextAgentMode, s.AgentMode) } + + return ctx } func (s *MockEventServer) Send(sub *eventstreamapi.Event) error { diff --git a/principal/auth.go b/principal/auth.go index 8e6db3d6..48569033 100644 --- a/principal/auth.go +++ b/principal/auth.go @@ -118,7 +118,8 @@ func (s *Server) authenticate(ctx context.Context) (context.Context, error) { // claims at this point is validated and we can propagate values to the // context. - authCtx := session.ClientIdToContext(ctx, agentInfo.ClientID) + authCtx := context.WithValue(session.ClientIdToContext(ctx, agentInfo.ClientID), + types.ContextAgentMode, agentInfo.Mode) if !s.queues.HasQueuePair(agentInfo.ClientID) { logCtx.Tracef("Creating a new queue pair for client %s", agentInfo.ClientID) if err := s.queues.Create(agentInfo.ClientID); err != nil { diff --git a/principal/callbacks.go b/principal/callbacks.go index 55c90e0c..9cd0f135 100644 --- a/principal/callbacks.go +++ b/principal/callbacks.go @@ -32,17 +32,12 @@ func (s *Server) newAppCallback(outbound *v1alpha1.Application) { "application_name": outbound.Name, }) - // Return early if no interested agent is connected if !s.queues.HasQueuePair(outbound.Namespace) { - logCtx.Debug("No agent is connected to this queue, discarding event") - return - } - - // New app events are only relevant for managed agents - mode := s.agentMode(outbound.Namespace) - if mode != types.AgentModeManaged { - logCtx.Tracef("Discarding event for unmanaged agent") - return + if err := s.queues.Create(outbound.Namespace); err != nil { + logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace") + return + } + logCtx.Trace("Created a new queue pair for the existing namespace") } q := s.queues.SendQ(outbound.Namespace) if q == nil { @@ -77,8 +72,11 @@ func (s *Server) updateAppCallback(old *v1alpha1.Application, new *v1alpha1.Appl return } if !s.queues.HasQueuePair(old.Namespace) { - logCtx.Tracef("No agent is connected to this queue, discarding event") - return + if err := s.queues.Create(old.Namespace); err != nil { + logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace") + return + } + logCtx.Trace("Created a new queue pair for the existing agent namespace") } q := s.queues.SendQ(old.Namespace) if q == nil { @@ -98,13 +96,11 @@ func (s *Server) deleteAppCallback(outbound *v1alpha1.Application) { "application_name": outbound.Name, }) if !s.queues.HasQueuePair(outbound.Namespace) { - logCtx.Tracef("No agent is connected to this queue, discarding event") - return - } - mode := s.agentMode(outbound.Namespace) - if !mode.IsManaged() { - logCtx.Tracef("Discarding event for unmanaged agent") - return + if err := s.queues.Create(outbound.Namespace); err != nil { + logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace") + return + } + logCtx.Trace("Created a new queue pair for the existing agent namespace") } q := s.queues.SendQ(outbound.Namespace) if q == nil { @@ -129,8 +125,11 @@ func (s *Server) newAppProjectCallback(outbound *v1alpha1.AppProject) { // Return early if no interested agent is connected if !s.queues.HasQueuePair(outbound.Namespace) { - logCtx.Debug("No agent is connected to this queue, discarding event") - return + if err := s.queues.Create(outbound.Namespace); err != nil { + logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace") + return + } + logCtx.Trace("Created a new queue pair for the existing namespace") } // New appproject events are only relevant for managed agents @@ -172,8 +171,11 @@ func (s *Server) updateAppProjectCallback(old *v1alpha1.AppProject, new *v1alpha return } if !s.queues.HasQueuePair(old.Namespace) { - logCtx.Tracef("No agent is connected to this queue, discarding event") - return + if err := s.queues.Create(old.Namespace); err != nil { + logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace") + return + } + logCtx.Trace("Created a new queue pair for the existing agent namespace") } q := s.queues.SendQ(old.Namespace) if q == nil { @@ -193,13 +195,11 @@ func (s *Server) deleteAppProjectCallback(outbound *v1alpha1.AppProject) { "appproject_name": outbound.Name, }) if !s.queues.HasQueuePair(outbound.Namespace) { - logCtx.Tracef("No agent is connected to this queue, discarding event") - return - } - mode := s.agentMode(outbound.Namespace) - if !mode.IsManaged() { - logCtx.Tracef("Discarding event for unmanaged agent") - return + if err := s.queues.Create(outbound.Namespace); err != nil { + logCtx.WithError(err).Error("failed to create a queue pair for an existing agent namespace") + return + } + logCtx.Trace("Created a new queue pair for the existing agent namespace") } q := s.queues.SendQ(outbound.Namespace) if q == nil { diff --git a/principal/server.go b/principal/server.go index 39d22d3f..41de0042 100644 --- a/principal/server.go +++ b/principal/server.go @@ -44,6 +44,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "google.golang.org/grpc" + kerrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -259,6 +260,8 @@ func (s *Server) Start(ctx context.Context, errch chan error) error { } }() + go s.removeUnusedQueues(s.ctx) + s.events = event.NewEventSource(s.options.serverName) if err := s.appManager.EnsureSynced(waitForSyncedDuration); err != nil { @@ -274,6 +277,37 @@ func (s *Server) Start(ctx context.Context, errch chan error) error { return nil } +// removeUnusedQueues removes the queues that are not associated with any namespace/agent. +func (s *Server) removeUnusedQueues(ctx context.Context) { + ticker := time.NewTimer(10 * time.Minute) + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + queues := s.queues.Names() + for _, queueName := range queues { + s.removeQueueIfUnused(ctx, queueName) + } + } + } +} + +func (s *Server) removeQueueIfUnused(ctx context.Context, agentName string) { + logCtx := log().WithField("agent", agentName) + _, err := s.kubeClient.CoreV1().Namespaces().Get(ctx, agentName, v1.GetOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + logCtx.Trace("Agent namespace not found. Removing the queue") + if err := s.queues.Delete(agentName, true); err != nil { + logCtx.WithError(err).Error("failed to delete the unused queue") + } + return + } + logCtx.WithError(err).Error("failed to get namespace while inspecting for unused queues") + } +} + // Shutdown shuts down the server s. If no server is running, or shutting down // results in an error, an error is returned. func (s *Server) Shutdown() error { diff --git a/principal/server_test.go b/principal/server_test.go index c67c9e93..a60bac93 100644 --- a/principal/server_test.go +++ b/principal/server_test.go @@ -28,6 +28,8 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var certTempl = x509.Certificate{ @@ -95,6 +97,44 @@ func Test_NewServer(t *testing.T) { }) } +func TestRemoveQueueIfUnused(t *testing.T) { + t.Run("should remove the queue if the namespace is not found", func(t *testing.T) { + ctx := context.Background() + fakeClient := kube.NewKubernetesFakeClient() + + s, err := NewServer(context.TODO(), fakeClient, testNamespace, WithGeneratedTokenSigningKey()) + assert.Nil(t, err) + + err = s.queues.Create(testNamespace) + assert.Nil(t, err) + + s.removeQueueIfUnused(ctx, testNamespace) + assert.False(t, s.queues.HasQueuePair(testNamespace)) + }) + + t.Run("shouldn't remove the queue if the namespace is found", func(t *testing.T) { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNamespace, + }, + } + + ctx := context.Background() + fakeClient := kube.NewKubernetesFakeClient() + _, err := fakeClient.Clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) + assert.Nil(t, err) + + s, err := NewServer(context.TODO(), fakeClient, testNamespace, WithGeneratedTokenSigningKey()) + assert.Nil(t, err) + + err = s.queues.Create(testNamespace) + assert.Nil(t, err) + + s.removeQueueIfUnused(ctx, testNamespace) + assert.True(t, s.queues.HasQueuePair(testNamespace)) + }) +} + func init() { logrus.SetLevel(logrus.TraceLevel) }