From 09fcd7df05b56ff0fdda2f3796336fa6ca0e1051 Mon Sep 17 00:00:00 2001 From: jannfis Date: Wed, 18 Dec 2024 14:19:55 +0000 Subject: [PATCH] feat: Live resource view Signed-off-by: jannfis --- agent/agent.go | 21 +- agent/agent_test.go | 10 +- agent/connection.go | 11 +- agent/inbound.go | 98 ++++++- cmd/agent/main.go | 34 +-- cmd/cmdutil/fatal.go | 34 +++ cmd/cmdutil/kube.go | 43 +++ cmd/cmdutil/log.go | 88 ++++++ cmd/cmdutil/term.go | 37 +++ cmd/cmdutil/validators.go | 24 ++ cmd/cmdutil/version.go | 44 +++ cmd/principal/main.go | 142 ++++++++-- hack/dev-env/gen-tls.sh | 68 +++++ hack/dev-env/start-principal.sh | 12 +- internal/argocd/cluster/conversion.go | 79 ++++++ internal/argocd/cluster/informer.go | 134 +++++++++ internal/argocd/cluster/manager.go | 131 +++++++++ internal/argocd/cluster/manager_test.go | 139 ++++++++++ internal/argocd/cluster/mapping.go | 94 +++++++ internal/argocd/cluster/mapping_test.go | 38 +++ internal/config/constants.go | 17 ++ internal/event/event.go | 94 ++++++- internal/kube/client.go | 24 +- internal/resourceproxy/options.go | 152 +++++++++++ internal/resourceproxy/params.go | 33 +++ internal/resourceproxy/resourceproxy.go | 273 +++++++++++++++++++ internal/resourceproxy/resourceproxy_test.go | 84 ++++++ internal/resourceproxy/tracking.go | 84 ++++++ internal/tlsutil/kubernetes.go | 160 +++++++++++ internal/tlsutil/kubernetes_test.go | 257 +++++++++++++++++ internal/tlsutil/testdata/001_test_cert.pem | 21 ++ internal/tlsutil/testdata/001_test_key.pem | 28 ++ internal/tlsutil/tlsutil.go | 46 +++- principal/apis/eventstream/eventstream.go | 20 +- principal/event.go | 63 ++++- principal/event_test.go | 2 +- principal/listen.go | 82 +++--- principal/options.go | 14 + principal/resource.go | 167 ++++++++++++ principal/server.go | 130 ++++++++- test/e2e/e2e_test.go | 14 +- test/testutil/context.go | 1 + test/testutil/file.go | 19 ++ test/testutil/waitfor.go | 30 ++ 44 files changed, 2955 insertions(+), 141 deletions(-) create mode 100644 cmd/cmdutil/fatal.go create mode 100644 cmd/cmdutil/kube.go create mode 100644 cmd/cmdutil/log.go create mode 100644 cmd/cmdutil/term.go create mode 100644 cmd/cmdutil/validators.go create mode 100644 cmd/cmdutil/version.go create mode 100755 hack/dev-env/gen-tls.sh create mode 100644 internal/argocd/cluster/conversion.go create mode 100644 internal/argocd/cluster/informer.go create mode 100644 internal/argocd/cluster/manager.go create mode 100644 internal/argocd/cluster/manager_test.go create mode 100644 internal/argocd/cluster/mapping.go create mode 100644 internal/argocd/cluster/mapping_test.go create mode 100644 internal/config/constants.go create mode 100644 internal/resourceproxy/options.go create mode 100644 internal/resourceproxy/params.go create mode 100644 internal/resourceproxy/resourceproxy.go create mode 100644 internal/resourceproxy/resourceproxy_test.go create mode 100644 internal/resourceproxy/tracking.go create mode 100644 internal/tlsutil/kubernetes.go create mode 100644 internal/tlsutil/kubernetes_test.go create mode 100644 internal/tlsutil/testdata/001_test_cert.pem create mode 100644 internal/tlsutil/testdata/001_test_key.pem create mode 100644 principal/resource.go create mode 100644 test/testutil/context.go create mode 100644 test/testutil/file.go create mode 100644 test/testutil/waitfor.go diff --git a/agent/agent.go b/agent/agent.go index 84648cf5..c6f49fd1 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -25,6 +25,7 @@ import ( kubeappproject "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/appproject" "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/informer" + "github.com/argoproj-labs/argocd-agent/internal/kube" "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj-labs/argocd-agent/internal/manager/application" "github.com/argoproj-labs/argocd-agent/internal/manager/appproject" @@ -38,7 +39,6 @@ import ( "k8s.io/apimachinery/pkg/watch" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" ) const waitForSyncedDuration = 10 * time.Second @@ -66,9 +66,10 @@ type Agent struct { emitter *event.EventSource // At present, 'watchLock' is only acquired on calls to 'addAppUpdateToQueue'. This behaviour was added as a short-term attempt to preserve update event ordering. However, this is known to be problematic due to the potential for race conditions, both within itself, and between other event processors like deleteAppCallback. watchLock sync.RWMutex - version *version.Version eventWriter *event.EventWriter + version *version.Version + kubeClient *kube.KubernetesClient } const defaultQueueName = "default" @@ -86,7 +87,7 @@ type AgentOption func(*Agent) error // NewAgent creates a new agent instance, using the given client interfaces and // options. -func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace string, opts ...AgentOption) (*Agent, error) { +func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace string, opts ...AgentOption) (*Agent, error) { a := &Agent{ version: version.New("argocd-agent", "agent"), } @@ -105,6 +106,8 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s return nil, fmt.Errorf("remote not defined") } + a.kubeClient = client + // Initial state of the agent is disconnected a.connected.Store(false) @@ -127,10 +130,10 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s // appListFunc and watchFunc are anonymous functions for the informer appListFunc := func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) { - return appclient.ArgoprojV1alpha1().Applications(a.namespace).List(ctx, opts) + return client.ApplicationsClientset.ArgoprojV1alpha1().Applications(a.namespace).List(ctx, opts) } appWatchFunc := func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { - return appclient.ArgoprojV1alpha1().Applications(a.namespace).Watch(ctx, opts) + return client.ApplicationsClientset.ArgoprojV1alpha1().Applications(a.namespace).Watch(ctx, opts) } appInformerOptions := []informer.InformerOption[*v1alpha1.Application]{ @@ -160,10 +163,10 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s } projListFunc := func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) { - return appclient.ArgoprojV1alpha1().AppProjects(a.namespace).List(ctx, opts) + return client.ApplicationsClientset.ArgoprojV1alpha1().AppProjects(a.namespace).List(ctx, opts) } projWatchFunc := func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { - return appclient.ArgoprojV1alpha1().AppProjects(a.namespace).Watch(ctx, opts) + return client.ApplicationsClientset.ArgoprojV1alpha1().AppProjects(a.namespace).Watch(ctx, opts) } projInformerOptions := []informer.InformerOption[*v1alpha1.AppProject]{ @@ -178,7 +181,7 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s // The agent only supports Kubernetes as application backend a.appManager, err = application.NewApplicationManager( - kubeapp.NewKubernetesBackend(appclient, a.namespace, appInformer, true), + kubeapp.NewKubernetesBackend(client.ApplicationsClientset, a.namespace, appInformer, true), a.namespace, application.WithAllowUpsert(allowUpsert), application.WithRole(manager.ManagerRoleAgent), @@ -189,7 +192,7 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s } a.projectManager, err = appproject.NewAppProjectManager( - kubeappproject.NewKubernetesBackend(appclient, a.namespace, projInformer, true), + kubeappproject.NewKubernetesBackend(client.ApplicationsClientset, a.namespace, projInformer, true), a.namespace, appProjectManagerOption...) if err != nil { diff --git a/agent/agent_test.go b/agent/agent_test.go index 75a2cb4a..ffef778a 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -18,26 +18,26 @@ import ( "context" "testing" - fakeappclient "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake" "github.com/sirupsen/logrus" "github.com/argoproj-labs/argocd-agent/pkg/client" + "github.com/argoproj-labs/argocd-agent/test/fake/kube" "github.com/stretchr/testify/require" ) func newAgent(t *testing.T) *Agent { t.Helper() - appc := fakeappclient.NewSimpleClientset() + kubec := kube.NewKubernetesFakeClient() remote, err := client.NewRemote("127.0.0.1", 8080) require.NoError(t, err) - agent, err := NewAgent(context.TODO(), appc, "argocd", WithRemote(remote)) + agent, err := NewAgent(context.TODO(), kubec, "argocd", WithRemote(remote)) require.NoError(t, err) return agent } func Test_NewAgent(t *testing.T) { - appc := fakeappclient.NewSimpleClientset() - agent, err := NewAgent(context.TODO(), appc, "agent", WithRemote(&client.Remote{})) + kubec := kube.NewKubernetesFakeClient() + agent, err := NewAgent(context.TODO(), kubec, "agent", WithRemote(&client.Remote{})) require.NotNil(t, agent) require.NoError(t, err) } diff --git a/agent/connection.go b/agent/connection.go index 05cfce8e..08de802f 100644 --- a/agent/connection.go +++ b/agent/connection.go @@ -74,13 +74,18 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error logCtx.Tracef("Queue shutdown in progress") return nil } - logCtx.Tracef("Grabbed an item") + logCtx.Trace("Grabbed an item") if ev == nil { // TODO: Is this really the right thing to do? return nil } - - logCtx.WithField("resource_id", event.ResourceID(ev)).WithField("event_id", event.EventID(ev)).Trace("Adding an event to the event writer") + logCtx = logCtx.WithFields(logrus.Fields{ + "event_target": ev.DataSchema(), + "event_type": ev.Type(), + "resource_id": event.ResourceID(ev), + "event_id": event.EventID(ev), + }) + logCtx.Trace("Adding an event to the event writer") a.eventWriter.Add(ev) return nil diff --git a/agent/inbound.go b/agent/inbound.go index 15c6852d..93151f8b 100644 --- a/agent/inbound.go +++ b/agent/inbound.go @@ -15,7 +15,10 @@ package agent import ( + "context" + "encoding/json" "fmt" + "time" "github.com/argoproj-labs/argocd-agent/internal/backend" "github.com/argoproj-labs/argocd-agent/internal/event" @@ -23,6 +26,10 @@ import ( "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" ) /* @@ -32,6 +39,8 @@ Inbound events are those coming through our gRPC interface, e.g. those that were received from a server. */ +const defaultResourceRequestTimeout = 5 * time.Second + func (a *Agent) processIncomingEvent(ev *event.Event) error { var err error switch ev.Target() { @@ -39,6 +48,8 @@ func (a *Agent) processIncomingEvent(ev *event.Event) error { err = a.processIncomingApplication(ev) case event.TargetAppProject: err = a.processIncomingAppProject(ev) + case event.TargetResource: + err = a.processIncomingResourceRequest(ev) default: err = fmt.Errorf("unknown event target: %s", ev.Target()) } @@ -46,6 +57,81 @@ func (a *Agent) processIncomingEvent(ev *event.Event) error { return err } +// processIncomingResourceRequest processes an incoming event that requests +// to retrieve information from the Kubernetes API. +// +// There can be multiple forms of requests. Currently supported are: +// +// - Request for a particular resource, both namespace and cluster scoped +// - Request for a list of resources of a particular kind (e.g. configmaps, +// pods, etc), both namespace and custer scoped +// - Request for a list of available APIs and + +func (a *Agent) processIncomingResourceRequest(ev *event.Event) error { + rreq, err := ev.ResourceRequest() + if err != nil { + return err + } + logCtx := log().WithFields(logrus.Fields{ + "method": "processIncomingEvents", + "uuid": rreq.UUID, + }) + logCtx.Tracef("Start processing %v", rreq) + + // Create a dynamic kubernetes client and retrieve the resource from the + // cluster. + dynClient, err := dynamic.NewForConfig(a.kubeClient.RestConfig) + if err != nil { + return fmt.Errorf("could not create a dynamic client: %w", err) + } + + // Some of GVR may be empty, that's ok + gvk := schema.GroupVersionResource{Group: rreq.Group, Version: rreq.Version, Resource: rreq.Resource} + rif := dynClient.Resource(gvk) + + ctx, cancel := context.WithTimeout(a.context, defaultResourceRequestTimeout) + defer cancel() + + var jsonres []byte + var unres *unstructured.Unstructured + var unlist *unstructured.UnstructuredList + status := "" + if rreq.Name != "" { + if rreq.Namespace != "" { + unres, err = rif.Namespace(rreq.Namespace).Get(ctx, rreq.Name, v1.GetOptions{}) + } else { + unres, err = rif.Get(ctx, rreq.Name, v1.GetOptions{}) + } + } else { + if rreq.Namespace != "" { + unlist, err = rif.Namespace(rreq.Namespace).List(ctx, v1.ListOptions{}) + } else { + unlist, err = rif.List(ctx, v1.ListOptions{}) + } + } + if err != nil { + logCtx.Errorf("could not request resource: %v", err) + status = "failure" + } else { + // Marshal the unstructured resource to JSON for submission + if unres != nil { + jsonres, err = json.Marshal(unres) + } else if unlist != nil { + jsonres, err = json.Marshal(unlist) + } + if err != nil { + return fmt.Errorf("could not marshal resource to json: %w", err) + } + logCtx.Tracef("marshaled resource") + } + + q := a.queues.SendQ(a.remote.ClientID()) + q.Add(a.emitter.NewResourceResponseEvent(rreq.UUID, status, string(jsonres))) + logCtx.Tracef("Emitted resource response") + + return nil +} + func (a *Agent) processIncomingApplication(ev *event.Event) error { logCtx := log().WithFields(logrus.Fields{ "method": "processIncomingEvents", @@ -172,7 +258,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App // In modes other than "managed", we don't process new application events // that are incoming. if a.mode != types.AgentModeManaged { - logCtx.Trace("Discarding this event, because agent is not in managed mode") + logCtx.Info("Discarding this event, because agent is not in managed mode") return nil, event.NewEventDiscardedErr("cannot create application: agent is not in managed mode") } @@ -181,7 +267,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App // // TODO(jannfis): Handle this situation properly instead of throwing an error. if a.appManager.IsManaged(incoming.QualifiedName()) { - logCtx.Trace("Discarding this event, because application is already managed on this agent") + logCtx.Info("Discarding this event, because application is already managed on this agent") return nil, event.NewEventDiscardedErr("application %s is already managed", incoming.QualifiedName()) } @@ -193,6 +279,10 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App delete(incoming.Annotations, "kubectl.kubernetes.io/last-applied-configuration") } + // Set target cluster to a sensible value + incoming.Spec.Destination.Server = "" + incoming.Spec.Destination.Name = "in-cluster" + created, err := a.appManager.Create(a.context, incoming) if apierrors.IsAlreadyExists(err) { logCtx.Debug("application already exists") @@ -217,6 +307,10 @@ func (a *Agent) updateApplication(incoming *v1alpha1.Application) (*v1alpha1.App logCtx.Tracef("New resource version: %s", incoming.ResourceVersion) } + // Set target cluster to a sensible value + incoming.Spec.Destination.Server = "" + incoming.Spec.Destination.Name = "in-cluster" + logCtx.Infof("Updating application") var err error diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 5ffb16b6..e1a7f873 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -22,7 +22,7 @@ import ( "strings" "github.com/argoproj-labs/argocd-agent/agent" - "github.com/argoproj-labs/argocd-agent/cmd/cmd" + "github.com/argoproj-labs/argocd-agent/cmd/cmdutil" "github.com/argoproj-labs/argocd-agent/internal/auth" "github.com/argoproj-labs/argocd-agent/internal/auth/userpass" "github.com/argoproj-labs/argocd-agent/internal/env" @@ -56,7 +56,7 @@ func NewAgentRunCommand() *cobra.Command { Short: "Run the argocd-agent agent component", Run: func(c *cobra.Command, args []string) { if showVersion { - cmd.PrintVersion(version.New("argocd-agent", "agent"), versionFormat) + cmdutil.PrintVersion(version.New("argocd-agent", "agent"), versionFormat) os.Exit(0) } ctx, cancelFn := context.WithCancel(context.Background()) @@ -69,21 +69,21 @@ func NewAgentRunCommand() *cobra.Command { logLevel = "info" } if logLevel != "" { - lvl, err := cmd.StringToLoglevel(logLevel) + lvl, err := cmdutil.StringToLoglevel(logLevel) if err != nil { - cmd.Fatal("invalid log level: %s. Available levels are: %s", logLevel, cmd.AvailableLogLevels()) + cmdutil.Fatal("invalid log level: %s. Available levels are: %s", logLevel, cmdutil.AvailableLogLevels()) } logrus.SetLevel(lvl) } - if formatter, err := cmd.LogFormatter(logFormat); err != nil { - cmd.Fatal("%s", err.Error()) + if formatter, err := cmdutil.LogFormatter(logFormat); err != nil { + cmdutil.Fatal("%s", err.Error()) } else { logrus.SetFormatter(formatter) } if creds != "" { authMethod, authCreds, err := parseCreds(creds) if err != nil { - cmd.Fatal("Error setting up creds: %v", err) + cmdutil.Fatal("Error setting up creds: %v", err) } remoteOpts = append(remoteOpts, client.WithAuth(authMethod, authCreds)) } @@ -102,29 +102,29 @@ func NewAgentRunCommand() *cobra.Command { if serverAddress != "" && serverPort > 0 && serverPort < 65536 { remote, err = client.NewRemote(serverAddress, serverPort, remoteOpts...) if err != nil { - cmd.Fatal("Error creating remote: %v", err) + cmdutil.Fatal("Error creating remote: %v", err) } } if remote == nil { - cmd.Fatal("No remote specified") + cmdutil.Fatal("No remote specified") } if namespace == "" { - cmd.Fatal("namespace value is empty and must be specified") + cmdutil.Fatal("namespace value is empty and must be specified") } - kubeConfig, err := cmd.GetKubeConfig(ctx, namespace, kubeConfig, kubeContext) + kubeConfig, err := cmdutil.GetKubeConfig(ctx, namespace, kubeConfig, kubeContext) if err != nil { - cmd.Fatal("Could not load Kubernetes config: %v", err) + cmdutil.Fatal("Could not load Kubernetes config: %v", err) } agentOpts = append(agentOpts, agent.WithRemote(remote)) agentOpts = append(agentOpts, agent.WithMode(agentMode)) - ag, err := agent.NewAgent(ctx, kubeConfig.ApplicationsClientset, namespace, agentOpts...) + ag, err := agent.NewAgent(ctx, kubeConfig, namespace, agentOpts...) if err != nil { - cmd.Fatal("Could not create a new agent instance: %v", err) + cmdutil.Fatal("Could not create a new agent instance: %v", err) } if err := ag.Start(ctx); err != nil { - cmd.Fatal("Could not start agent: %v", err) + cmdutil.Fatal("Could not start agent: %v", err) } <-ctx.Done() }, @@ -215,10 +215,10 @@ func loadCreds(path string) (auth.Credentials, error) { } func main() { - cmd.InitLogging() + cmdutil.InitLogging() c := NewAgentRunCommand() err := c.Execute() if err != nil { - cmd.Fatal("ERROR: %v", err) + cmdutil.Fatal("ERROR: %v", err) } } diff --git a/cmd/cmdutil/fatal.go b/cmd/cmdutil/fatal.go new file mode 100644 index 00000000..ddf64623 --- /dev/null +++ b/cmd/cmdutil/fatal.go @@ -0,0 +1,34 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmdutil + +import ( + "fmt" + "os" +) + +func Error(msg string, args ...interface{}) { +} + +func Fatal(msg string, args ...interface{}) { + FatalWithExitCode(1, msg, args...) +} + +func FatalWithExitCode(code int, msg string, args ...interface{}) { + fmt.Fprintf(os.Stderr, "[FATAL]: ") + fmt.Fprintf(os.Stderr, msg, args...) + fmt.Fprintf(os.Stderr, "\n") + os.Exit(code) +} diff --git a/cmd/cmdutil/kube.go b/cmd/cmdutil/kube.go new file mode 100644 index 00000000..09320ec6 --- /dev/null +++ b/cmd/cmdutil/kube.go @@ -0,0 +1,43 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmdutil + +import ( + "context" + "fmt" + "path/filepath" + + "github.com/argoproj-labs/argocd-agent/internal/kube" +) + +func GetKubeConfig(ctx context.Context, namespace string, kubeConfig string, kubecontext string) (*kube.KubernetesClient, error) { + var fullKubeConfigPath string + var kubeClient *kube.KubernetesClient + var err error + + if kubeConfig != "" { + fullKubeConfigPath, err = filepath.Abs(kubeConfig) + if err != nil { + return nil, fmt.Errorf("cannot expand path %s: %v", kubeConfig, err) + } + } + + kubeClient, err = kube.NewKubernetesClientFromConfig(ctx, namespace, fullKubeConfigPath, kubecontext) + if err != nil { + return nil, err + } + + return kubeClient, nil +} diff --git a/cmd/cmdutil/log.go b/cmd/cmdutil/log.go new file mode 100644 index 00000000..ac9a78dc --- /dev/null +++ b/cmd/cmdutil/log.go @@ -0,0 +1,88 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmdutil + +import ( + "fmt" + "io" + "os" + "strings" + + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/writer" +) + +func StringToLoglevel(l string) (logrus.Level, error) { + switch strings.ToLower(l) { + case strings.ToLower(logrus.FatalLevel.String()): + return logrus.FatalLevel, nil + case strings.ToLower(logrus.ErrorLevel.String()): + return logrus.ErrorLevel, nil + case strings.ToLower(logrus.WarnLevel.String()): + return logrus.WarnLevel, nil + case strings.ToLower(logrus.InfoLevel.String()): + return logrus.InfoLevel, nil + case strings.ToLower(logrus.DebugLevel.String()): + return logrus.DebugLevel, nil + case strings.ToLower(logrus.TraceLevel.String()): + return logrus.TraceLevel, nil + default: + return 0, fmt.Errorf("unknown log level: %s", l) + } +} + +func AvailableLogLevels() string { + levels := make([]string, len(logrus.AllLevels)) + for i, l := range logrus.AllLevels { + levels[i] = l.String() + } + return strings.Join(levels, ", ") +} + +// InitLogging will initialize logrus with the setting and hooks we want it to +// use by default. +func InitLogging() { + logrus.SetOutput(io.Discard) // Send all logs to nowhere by default + logrus.SetLevel(logrus.DebugLevel) + logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.AddHook(&writer.Hook{ // Send logs with level higher than warning to stderr + Writer: os.Stderr, + LogLevels: []logrus.Level{ + logrus.PanicLevel, + logrus.FatalLevel, + logrus.ErrorLevel, + logrus.WarnLevel, + }, + }) + logrus.AddHook(&writer.Hook{ // Send info and debug logs to stdout + Writer: os.Stdout, + LogLevels: []logrus.Level{ + logrus.InfoLevel, + logrus.DebugLevel, + logrus.TraceLevel, + }, + }) +} + +func LogFormatter(format string) (logrus.Formatter, error) { + switch strings.ToLower(format) { + case "text": + return &logrus.TextFormatter{}, nil + case "json": + return &logrus.JSONFormatter{}, nil + default: + return nil, fmt.Errorf("invalid format '%s', must be one of text, json", format) + } +} diff --git a/cmd/cmdutil/term.go b/cmd/cmdutil/term.go new file mode 100644 index 00000000..96c221ac --- /dev/null +++ b/cmd/cmdutil/term.go @@ -0,0 +1,37 @@ +package cmdutil + +import ( + "bufio" + "fmt" + "os" + "strings" +) + +func ReadFromTerm(prompt string, maxtries int, valid func(string) bool) (string, error) { + tries := 0 + for { + tries += 1 + fmt.Printf("%s: ", prompt) + reader := bufio.NewReader(os.Stdin) + val, err := reader.ReadString('\n') + if err != nil { + return "", err + } + val = strings.TrimSuffix(val, "\n") + if valid != nil { + if valid(val) { + return val, nil + } else { + if maxtries == -1 { + continue + } else { + if tries > maxtries { + return "", fmt.Errorf("%s: invalid value", val) + } + } + } + } else { + return val, nil + } + } +} diff --git a/cmd/cmdutil/validators.go b/cmd/cmdutil/validators.go new file mode 100644 index 00000000..69b96357 --- /dev/null +++ b/cmd/cmdutil/validators.go @@ -0,0 +1,24 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmdutil + +import "fmt" + +func ValidPort(num int) error { + if num < 0 || num > 65536 { + return fmt.Errorf("%d: not a valid port number", num) + } + return nil +} diff --git a/cmd/cmdutil/version.go b/cmd/cmdutil/version.go new file mode 100644 index 00000000..ad6730c3 --- /dev/null +++ b/cmd/cmdutil/version.go @@ -0,0 +1,44 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmdutil + +import ( + "fmt" + + "github.com/argoproj-labs/argocd-agent/internal/version" +) + +const ( + VersionFormatText = "text" + VersionFormatJSONCompact = "json" + VersionFormatJSONIndent = "json-indent" + VersionFormatYAML = "yaml" +) + +func PrintVersion(v *version.Version, format string) { + switch format { + case VersionFormatText: + fmt.Println(v.Version()) + case VersionFormatJSONCompact: + fmt.Println(v.JSON(false)) + case VersionFormatJSONIndent: + fmt.Println(v.JSON(true)) + case VersionFormatYAML: + fmt.Println(v.YAML()) + default: + fmt.Printf("Warning: Unknown version format '%s', falling back to %s", format, VersionFormatText) + fmt.Println(v.Version()) + } +} diff --git a/cmd/principal/main.go b/cmd/principal/main.go index 0159b070..5b7dff78 100644 --- a/cmd/principal/main.go +++ b/cmd/principal/main.go @@ -16,21 +16,34 @@ package main import ( "context" + "crypto/tls" + "fmt" + "net/http" + _ "net/http/pprof" "os" "runtime" "time" - "github.com/argoproj-labs/argocd-agent/cmd/cmd" + "github.com/argoproj-labs/argocd-agent/cmd/cmdutil" "github.com/argoproj-labs/argocd-agent/internal/auth" "github.com/argoproj-labs/argocd-agent/internal/auth/userpass" + "github.com/argoproj-labs/argocd-agent/internal/config" "github.com/argoproj-labs/argocd-agent/internal/env" + "github.com/argoproj-labs/argocd-agent/internal/kube" "github.com/argoproj-labs/argocd-agent/internal/labels" + "github.com/argoproj-labs/argocd-agent/internal/tlsutil" "github.com/argoproj-labs/argocd-agent/principal" "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) +// proxyTlsCertName is the name of the secret containing the TLS certificate for kube-proxy +const proxyTlsCertName = "kube-proxy-tls" + +// proxyClientCAName is the name of the secret containing the TLS root CA certificate for kube-proxy +const proxyClientCAName = "kube-proxy-ca" + func NewPrincipalRunCommand() *cobra.Command { var ( listenHost string @@ -56,6 +69,11 @@ func NewPrincipalRunCommand() *cobra.Command { autoNamespacePattern string autoNamespaceLabels []string enableWebSocket bool + enableResourceProxy bool + enablePprof bool + resourceProxyCertPath string + resourceProxyKeyPath string + resourceProxyCAPath string ) var command = &cobra.Command{ Short: "Run the argocd-agent principal component", @@ -63,24 +81,30 @@ func NewPrincipalRunCommand() *cobra.Command { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() + if enablePprof { + go func() { + http.ListenAndServe("127.0.0.1:6060", nil) + }() + } + opts := []principal.ServerOption{} if logLevel != "" { - lvl, err := cmd.StringToLoglevel(logLevel) + lvl, err := cmdutil.StringToLoglevel(logLevel) if err != nil { - cmd.Fatal("invalid log level: %s. Available levels are: %s", logLevel, cmd.AvailableLogLevels()) + cmdutil.Fatal("invalid log level: %s. Available levels are: %s", logLevel, cmdutil.AvailableLogLevels()) } logrus.Printf("Setting loglevel to %s", logLevel) logrus.SetLevel(lvl) } - if formatter, err := cmd.LogFormatter(logFormat); err != nil { - cmd.Fatal("%s", err.Error()) + if formatter, err := cmdutil.LogFormatter(logFormat); err != nil { + cmdutil.Fatal("%s", err.Error()) } else { logrus.SetFormatter(formatter) } - kubeConfig, err := cmd.GetKubeConfig(ctx, namespace, kubeConfig, kubeContext) + kubeConfig, err := cmdutil.GetKubeConfig(ctx, namespace, kubeConfig, kubeContext) if err != nil { - cmd.Fatal("Could not load Kubernetes config: %v", err) + cmdutil.Fatal("Could not load Kubernetes config: %v", err) } opts = append(opts, principal.WithListenerAddress(listenHost)) @@ -91,7 +115,7 @@ func NewPrincipalRunCommand() *cobra.Command { !(len(autoNamespaceLabels) == 1 && autoNamespaceLabels[0] == "") { nsLabels, err = labels.StringsToMap(autoNamespaceLabels) if err != nil { - cmd.Fatal("Could not parse auto namespace labels: %v", err) + cmdutil.Fatal("Could not parse auto namespace labels: %v", err) } } opts = append(opts, principal.WithAutoNamespaceCreate(autoNamespaceAllow, autoNamespacePattern, nsLabels)) @@ -105,26 +129,37 @@ func NewPrincipalRunCommand() *cobra.Command { if tlsCert != "" && tlsKey != "" { opts = append(opts, principal.WithTLSKeyPairFromPath(tlsCert, tlsKey)) } else if (tlsCert != "" && tlsKey == "") || (tlsCert == "" && tlsKey != "") { - cmd.Fatal("Both --tls-cert and --tls-key have to be given") + cmdutil.Fatal("Both --tls-cert and --tls-key have to be given") } else if allowTlsGenerate { opts = append(opts, principal.WithGeneratedTLS("argocd-agent")) } else { - cmd.Fatal("No TLS configuration given and auto generation not allowed.") + cmdutil.Fatal("No TLS configuration given and auto generation not allowed.") } if rootCaPath != "" { opts = append(opts, principal.WithTLSRootCaFromFile(rootCaPath)) } + var proxyTls *tls.Config + if resourceProxyCertPath != "" && resourceProxyKeyPath != "" && resourceProxyCAPath != "" { + proxyTls, err = getKubeProxyTLSConfigFromFiles(resourceProxyCertPath, resourceProxyKeyPath, resourceProxyCAPath) + } else { + proxyTls, err = getKubeProxyTLSConfigFromKube(kubeConfig, namespace) + } + if err != nil { + cmdutil.Fatal("Error reading TLS config for resource proxy: %v", err) + } opts = append(opts, principal.WithRequireClientCerts(requireClientCerts)) opts = append(opts, principal.WithClientCertSubjectMatch(clientCertSubjectMatch)) + opts = append(opts, principal.WithKubeProxyEnabled(enableResourceProxy)) + opts = append(opts, principal.WithKubeProxyTLS(proxyTls)) if jwtKey != "" { opts = append(opts, principal.WithTokenSigningKeyFromFile(jwtKey)) } else if allowJwtGenerate { opts = append(opts, principal.WithGeneratedTokenSigningKey()) } else { - cmd.Fatal("No JWT signing key given and auto generation not allowed.") + cmdutil.Fatal("No JWT signing key given and auto generation not allowed.") } authMethods := auth.NewMethods() @@ -133,11 +168,11 @@ func NewPrincipalRunCommand() *cobra.Command { if userDB != "" { err = userauth.LoadAuthDataFromFile(userDB) if err != nil { - cmd.Fatal("Could not load user database: %v", err) + cmdutil.Fatal("Could not load user database: %v", err) } err = authMethods.RegisterMethod("userpass", userauth) if err != nil { - cmd.Fatal("Could not register userpass auth method") + cmdutil.Fatal("Could not register userpass auth method") } opts = append(opts, principal.WithAuthMethods(authMethods)) } @@ -153,12 +188,12 @@ func NewPrincipalRunCommand() *cobra.Command { s, err := principal.NewServer(ctx, kubeConfig, namespace, opts...) if err != nil { - cmd.Fatal("Could not create new server instance: %v", err) + cmdutil.Fatal("Could not create new server instance: %v", err) } errch := make(chan error) err = s.Start(ctx, errch) if err != nil { - cmd.Fatal("Could not start server: %v", err) + cmdutil.Fatal("Could not start server: %v", err) } <-ctx.Done() }, @@ -167,7 +202,7 @@ func NewPrincipalRunCommand() *cobra.Command { env.StringWithDefault("ARGOCD_PRINCIPAL_LISTEN_HOST", nil, ""), "Name of the host to listen on") command.Flags().IntVar(&listenPort, "listen-port", - env.NumWithDefault("ARGOCD_PRINCIPAL_LISTEN_PORT", cmd.ValidPort, 8443), + env.NumWithDefault("ARGOCD_PRINCIPAL_LISTEN_PORT", cmdutil.ValidPort, 8443), "Port the gRPC server will listen on") command.Flags().StringVar(&logLevel, "log-level", @@ -178,7 +213,7 @@ func NewPrincipalRunCommand() *cobra.Command { "The log format to use (one of: text, json)") command.Flags().IntVar(&metricsPort, "metrics-port", - env.NumWithDefault("ARGOCD_PRINCIPAL_METRICS_PORT", cmd.ValidPort, 8000), + env.NumWithDefault("ARGOCD_PRINCIPAL_METRICS_PORT", cmdutil.ValidPort, 8000), "Port the metrics server will listen on") command.Flags().BoolVar(&disableMetrics, "disable-metrics", env.BoolWithDefault("ARGOCD_PRINCIPAL_DISABLE_METRICS", false), @@ -211,7 +246,7 @@ func NewPrincipalRunCommand() *cobra.Command { "INSECURE: Generate and use temporary TLS cert and key") command.Flags().StringVar(&rootCaPath, "root-ca-path", env.StringWithDefault("ARGOCD_PRINCIPAL_TLS_SERVER_ROOT_CA_PATH", nil, ""), - "Path to a file containing root CA certificate for verifying client certs") + "Path to a file containing the root CA certificate for verifying client certs of agents") command.Flags().BoolVar(&requireClientCerts, "require-client-certs", env.BoolWithDefault("ARGOCD_PRINCIPAL_TLS_CLIENT_CERT_REQUIRE", false), "Whether to require agents to present a client certificate") @@ -219,6 +254,16 @@ func NewPrincipalRunCommand() *cobra.Command { env.BoolWithDefault("ARGOCD_PRINCIPAL_TLS_CLIENT_CERT_MATCH_SUBJECT", false), "Whether a client cert's subject must match the agent name") + command.Flags().StringVar(&resourceProxyCertPath, "resource-proxy-cert-path", + env.StringWithDefault("ARGOCD_PRINCIPAL_RESOURCE_PROXY_TLS_CERT_PATH", nil, ""), + "Path to a file containing the resource proxy's TLS certificate") + command.Flags().StringVar(&resourceProxyKeyPath, "resource-proxy-key-path", + env.StringWithDefault("ARGOCD_PRINCIPAL_RESOURCE_PROXY_TLS_KEY_PATH", nil, ""), + "Path to a file containing the resource proxy's TLS private key") + command.Flags().StringVar(&resourceProxyCAPath, "resource-proxy-ca-path", + env.StringWithDefault("ARGOCD_PRINCIPAL_RESOURCE_PROXY_TLS_CA_PATH", nil, ""), + "Path to a file containing the resource proxy's TLS CA data") + command.Flags().StringVar(&jwtKey, "jwt-key", env.StringWithDefault("ARGOCD_PRINCIPAL_JWT_KEY_PATH", nil, ""), "Use JWT signing key from path") @@ -234,8 +279,13 @@ func NewPrincipalRunCommand() *cobra.Command { env.BoolWithDefault("ARGOCD_PRINCIPAL_ENABLE_WEBSOCKET", false), "Principal will rely on gRPC over WebSocket to stream events to the Agent") + command.Flags().BoolVar(&enableResourceProxy, "enable-resource-proxy", + env.BoolWithDefault("ARGOCD_PRINCIPAL_ENABLE_RESOURCE_PROXY", true), + "Whether to enable the resource proxy") + command.Flags().StringVar(&kubeConfig, "kubeconfig", "", "Path to a kubeconfig file to use") command.Flags().StringVar(&kubeContext, "kubecontext", "", "Override the default kube context") + command.Flags().BoolVar(&enablePprof, "enable-pprof", false, "Enable pprof server") return command @@ -257,8 +307,62 @@ func observer(interval time.Duration) { }() } +// getKubeProxyTLSConfigFromKube reads the kubeproxy TLS configuration from the +// cluster and returns it. +// +// The secret names where the certificates are stored in are hard-coded at the +// moment. +func getKubeProxyTLSConfigFromKube(kubeClient *kube.KubernetesClient, namespace string) (*tls.Config, error) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + proxyCert, err := tlsutil.TLSCertFromSecret(ctx, kubeClient.Clientset, namespace, config.SecretNameProxyTls) + if err != nil { + return nil, fmt.Errorf("error getting proxy certificate: %w", err) + } + + clientCA, err := tlsutil.X509CertPoolFromSecret(ctx, kubeClient.Clientset, namespace, config.SecretNamePrincipalCA, "tls.crt") + if err != nil { + return nil, fmt.Errorf("error getting client CA certificate: %w", err) + } + logrus.Infof("Loaded %d certs into pool", len(clientCA.Subjects())) + + proxyTls := &tls.Config{ + Certificates: []tls.Certificate{ + proxyCert, + }, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: clientCA, + } + + return proxyTls, nil +} + +// getKubeProxyTLSConfigFromFile reads the kubeproxy TLS configuration from +// given files and returns it. +func getKubeProxyTLSConfigFromFiles(certPath, keyPath, caPath string) (*tls.Config, error) { + proxyCert, err := tlsutil.TlsCertFromFile(certPath, keyPath, false) + if err != nil { + return nil, err + } + + clientCA, err := tlsutil.X509CertPoolFromFile(caPath) + if err != nil { + return nil, err + } + + proxyTls := &tls.Config{ + Certificates: []tls.Certificate{ + proxyCert, + }, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: clientCA, + } + + return proxyTls, nil +} + func main() { - cmd.InitLogging() + cmdutil.InitLogging() cmd := NewPrincipalRunCommand() err := cmd.Execute() if err != nil { diff --git a/hack/dev-env/gen-tls.sh b/hack/dev-env/gen-tls.sh new file mode 100755 index 00000000..b12de9c2 --- /dev/null +++ b/hack/dev-env/gen-tls.sh @@ -0,0 +1,68 @@ +#!/bin/bash +# Copyright 2025 The argocd-agent Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +############################################################################## +# Script to generate TLS certs for development/e2e-tests of argocd-agent. +# +# WARNING: Development script. Do not use to produce production credentials. +# This script comes without any promises. It should only be used to generate +# certificates for your dev or demo environments. +############################################################################## + +set -eo pipefail + +SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" +cp_context=vcluster-control-plane +rp_tls_secret=resource-proxy-tls +rp_ca_secret=argocd-agent-ca + +creds_path=${SCRIPTPATH}/creds +test -d ${creds_path} || mkdir ${creds_path} +days=${TLS_VALID_DAYS:-30} +kubectl=$(which kubectl 2>/dev/null) || true +if test "x${kubectl}" = "x"; then + kubectl=$(which oc 2>/dev/null) || true +fi +if test "x${kubectl}" = "x"; then + echo "No kubectl or oc found in \$PATH" >&2 + exit 1 +fi +generate_rp_ca() { + echo "Generating CA" + openssl genrsa -out ${creds_path}/ca.key + openssl req -x509 -new -nodes -key ${creds_path}/ca.key -sha256 -days ${days} -out ${creds_path}/ca.crt -subj '/CN=DO NOT USE FOR PRODUCTION/O=resource-proxy' +} + +generate_rp_cert() { + echo "Generating ResourceProxy TLS server certificate" + openssl genrsa -out ${creds_path}/rp.key + openssl req -new -sha256 \ + -key ${creds_path}/rp.key \ + -subj "/CN=DO NOT USE FOR PRODUCTION/O=resource-proxy" \ + -reqexts SAN \ + -config <(cat /etc/ssl/openssl.cnf \ + <(printf "\n[SAN]\nsubjectAltName=DNS:localhost,IP:127.0.0.1")) \ + -out ${creds_path}/rp.csr + openssl x509 -req -in ${creds_path}/rp.csr -CA ${creds_path}/ca.crt -CAkey ${creds_path}/ca.key -CAcreateserial -out ${creds_path}/rp.crt -days ${days} -sha256 +} + +generate_rp_ca +generate_rp_cert + +echo "Generating secrets..." +kubectl --context=${cp_context} delete secret -n argocd ${rp_tls_secret} --ignore-not-found +kubectl --context=${cp_context} delete secret -n argocd ${rp_ca_secret} --ignore-not-found +kubectl --context=${cp_context} create secret -n argocd tls ${rp_tls_secret} --key=${creds_path}/rp.key --cert=${creds_path}/rp.crt +kubectl --context=${cp_context} create secret -n argocd generic ${rp_ca_secret} --from-file=ca.crt=${creds_path}/ca.crt \ No newline at end of file diff --git a/hack/dev-env/start-principal.sh b/hack/dev-env/start-principal.sh index 9a0ab0df..e942c497 100755 --- a/hack/dev-env/start-principal.sh +++ b/hack/dev-env/start-principal.sh @@ -20,4 +20,14 @@ if ! kubectl config get-contexts | tail -n +2 | awk '{ print $2 }' | grep -qE '^ exit 1 fi SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" -go run github.com/argoproj-labs/argocd-agent/cmd/principal --allowed-namespaces '*' --insecure-tls-generate --insecure-jwt-generate --kubecontext vcluster-control-plane --log-level trace --passwd ${SCRIPTPATH}/creds/users.control-plane $ARGS +go run github.com/argoproj-labs/argocd-agent/cmd/principal \ + --allowed-namespaces '*' \ + --insecure-tls-generate \ + --insecure-jwt-generate \ + --kubecontext vcluster-control-plane \ + --log-level trace \ + --namespace argocd \ + --passwd ${SCRIPTPATH}/creds/users.control-plane $ARGS + #--resource-proxy-ca-path ${SCRIPTPATH}/creds/ca.crt \ + #--resource-proxy-cert-path ${SCRIPTPATH}/creds/rp.crt \ + #--resource-proxy-key-path ${SCRIPTPATH}/creds/rp.key \ diff --git a/internal/argocd/cluster/conversion.go b/internal/argocd/cluster/conversion.go new file mode 100644 index 00000000..8b00525a --- /dev/null +++ b/internal/argocd/cluster/conversion.go @@ -0,0 +1,79 @@ +package cluster + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + v1 "k8s.io/api/core/v1" +) + +/* +The ClusterToSecret function has been copied from Argo CD util/db/cluster.go +and was slightly modified. It is a package-private function in Argo CD, so +we are not able to just use it from the package. + +TODO(jannfis): Submit PR to Argo CD to make this function public, so we can +just use it directly from github.com/argoproj/argo-cd/v2 package. +*/ + +// ClusterToSecret converts a cluster object to string data for serialization to a secret +func ClusterToSecret(c *v1alpha1.Cluster, secret *v1.Secret) error { + data := make(map[string][]byte) + data["server"] = []byte(strings.TrimRight(c.Server, "/")) + if c.Name == "" { + data["name"] = []byte(c.Server) + } else { + data["name"] = []byte(c.Name) + } + if len(c.Namespaces) != 0 { + data["namespaces"] = []byte(strings.Join(c.Namespaces, ",")) + } + configBytes, err := json.Marshal(c.Config) + if err != nil { + return err + } + data["config"] = configBytes + if c.Shard != nil { + data["shard"] = []byte(strconv.Itoa(int(*c.Shard))) + } + if c.ClusterResources { + data["clusterResources"] = []byte("true") + } + if c.Project != "" { + data["project"] = []byte(c.Project) + } + secret.Data = data + + secret.Labels = c.Labels + if c.Annotations != nil && c.Annotations[v1.LastAppliedConfigAnnotation] != "" { + return fmt.Errorf("annotation %s cannot be set", v1.LastAppliedConfigAnnotation) + } + secret.Annotations = c.Annotations + + if secret.Annotations == nil { + secret.Annotations = make(map[string]string) + } + + if c.RefreshRequestedAt != nil { + secret.Annotations[v1alpha1.AnnotationKeyRefresh] = c.RefreshRequestedAt.Format(time.RFC3339) + } else { + delete(secret.Annotations, v1alpha1.AnnotationKeyRefresh) + } + + if secret.Annotations == nil { + secret.Annotations = map[string]string{} + } + secret.Annotations[common.AnnotationKeyManagedBy] = LabelValueManagerName + + if secret.Labels == nil { + secret.Labels = map[string]string{} + } + secret.Labels[common.LabelKeySecretType] = common.LabelValueSecretTypeCluster + + return nil +} diff --git a/internal/argocd/cluster/informer.go b/internal/argocd/cluster/informer.go new file mode 100644 index 00000000..d5acdbda --- /dev/null +++ b/internal/argocd/cluster/informer.go @@ -0,0 +1,134 @@ +package cluster + +import ( + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/argoproj/argo-cd/v2/util/db" + v1 "k8s.io/api/core/v1" +) + +/* +The code in this file does not implement an actual informer, but the handlers +that are used for the Manager's informer on add/update/delete events. +*/ + +// onClusterAdded is called by the informer whenever the manager's informer is +// notified about a new cluster secret. This usually happens on two occasions: +// A new, correctly labeled secret is created on the cluster, or an existing +// unlabeled secret is being labeled correctly. +func (m *Manager) onClusterAdded(res *v1.Secret) { + log().Tracef("Executing cluster add handler for secret %s/%s", res.Namespace, res.Name) + m.mutex.Lock() + defer m.mutex.Unlock() + + // This should not happen, because our filter already ensures this. But we + // check again just to be sure. + agent, ok := res.Labels[LabelKeyClusterAgentMapping] + if !ok || agent == "" { + return + } + + // Check if we already have a mapping for the requested agent + existing := m.mapping("agent") + if existing != nil { + log().Errorf("Agent %s is already mapped to cluster %s", agent, existing.Name) + return + } + + // Unmarshal the cluster from the secret + cluster, err := db.SecretToCluster(res) + if err != nil { + log().WithError(err).Error("Not a cluster secret or malformed data") + } + + // Map the cluster to the agent + err = m.mapCluster(agent, cluster) + if err != nil { + log().WithError(err).Infof("Error mapping cluster %s to agent %s", cluster.Name, agent) + return + } + + log().Infof("Mapped cluster %s to agent %s", cluster.Name, agent) +} + +// onClusterUpdated is called by the informer whenever there is a change to a +// secret that is watched by the informer. However, when the change involves +// the resource to be added or removed from the informer's cache (e.g. a label +// was added or removed), onClusterAdded or onClusterDeleted will be called +// respectively, instead. +func (m *Manager) onClusterUpdated(old *v1.Secret, new *v1.Secret) { + log().Tracef("Executing cluster update callback for secret %s/%s", old.Namespace, old.Name) + m.mutex.Lock() + defer m.mutex.Unlock() + + // This should never happen, but we check anyway + oldAgent, oldOk := old.Labels[LabelKeyClusterAgentMapping] + newAgent, newOk := new.Labels[LabelKeyClusterAgentMapping] + if !oldOk && !newOk { + log().Errorf("Could not update cluster mapping: Either secret is not labeled properly") + return + } + + c, err := db.SecretToCluster(new) + if err != nil { + log().WithError(err).Errorf("Could not update ") + return + } + + // The mapping label has changed value. Check existing mappings. + if oldAgent != newAgent { + if !m.hasMapping(oldAgent) || m.hasMapping(newAgent) { + log().Errorf("Remapping secret from %s to %s not possible", oldAgent, newAgent) + return + } + } + + // Unmap cluster from old agent + log().Tracef("Unmapping cluster %s from agent %s", c.Name, oldAgent) + if err = m.unmapCluster(oldAgent); err != nil { + log().WithError(err).Errorf("Could not unmap cluster %s from agent %s", c.Name, oldAgent) + return + } + + // Map cluster to new agent + log().Tracef("Mapping cluster %s to agent %s", c.Name, newAgent) + if err = m.mapCluster(newAgent, c); err != nil { + log().WithError(err).Errorf("Could not map cluster %s to agent %s", c.Name, newAgent) + return + } + + log().Infof("Updated cluster mapping for agent %s", newAgent) +} + +// onClusterDeleted will be called by the informer on certain ocassions when a +// cluster secret is removed from the informer. Usually, this happens in the +// following cases: The cluster secret is deleted from the cluster, or one of +// the labels that are required are removed from a cluster secret. +func (m *Manager) onClusterDeleted(res *v1.Secret) { + log().Trace("Executing cluster delete callback") + m.mutex.Lock() + defer m.mutex.Unlock() + + // This situation should not happen, but we test anyway. + agent, ok := res.Labels[LabelKeyClusterAgentMapping] + if !ok { + log().Errorf("Secret %s should have agent mapping label, but does not", res.Name) + return + } + + // Log out if there is no mapping for the requested agent. This indicates + // an informer cache out of sync or some other problem that needs to be + // investigated. + var c *v1alpha1.Cluster + if c = m.mapping(agent); c == nil { + log().Errorf("There should be a cluster mapped to agent %s, but there is not.", agent) + return + } + + // Finally, unmap the cluster + if err := m.unmapCluster(agent); err != nil { + log().WithError(err).Errorf("Could not unmap cluster %s from agent %s", c.Name, agent) + return + } + + log().Infof("Unmapped cluster from agent %s", agent) +} diff --git a/internal/argocd/cluster/manager.go b/internal/argocd/cluster/manager.go new file mode 100644 index 00000000..5cad06fb --- /dev/null +++ b/internal/argocd/cluster/manager.go @@ -0,0 +1,131 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package cluster implements various functions for working with Argo CD cluster +configuration. + +Its main component is the ClusterManager, which essentially manages Argo CD +cluster secrets and maps them to agents. +*/ +package cluster + +import ( + "context" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + + "github.com/argoproj-labs/argocd-agent/internal/filter" + "github.com/argoproj-labs/argocd-agent/internal/informer" + "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/sirupsen/logrus" +) + +const LabelKeyClusterAgentMapping = "argocd-agent.argoproj-labs.io/agent-name" + +const LabelValueManagerName = "argocd-agent" + +// syncTimeout is the duration to wait until the manager's informer has synced +const syncTimeout = 30 * time.Second + +// Manager manages Argo CD cluster secrets on the principal +type Manager struct { + mutex sync.RWMutex + ctx context.Context + clusters map[string]*v1alpha1.Cluster + informer *informer.Informer[*v1.Secret] + namespace string + kubeclient kubernetes.Interface + filters *filter.Chain[*v1.Secret] +} + +// NewManager instantiates and initializes a new Manager. +func NewManager(ctx context.Context, namespace string, kubeclient kubernetes.Interface) (*Manager, error) { + var err error + m := &Manager{ + clusters: make(map[string]*v1alpha1.Cluster), + namespace: namespace, + kubeclient: kubeclient, + ctx: ctx, + filters: filter.NewFilterChain[*v1.Secret](), + } + + // We are only interested in secrets that have both, Argo CD's label for + // cluster secrets and the label that holds our agent name. + m.filters.AppendAdmitFilter(func(c *v1.Secret) bool { + if v, ok := c.Labels[common.LabelKeySecretType]; !ok || v != common.LabelValueSecretTypeCluster { + log().Tracef("No label secret-type of cluster found on secret %s/%s", c.Namespace, c.Name) + return false + } + if v, ok := c.Labels[LabelKeyClusterAgentMapping]; !ok || v == "" { + log().Tracef("No label for agent-mapping found on secret %s/%s", c.Namespace, c.Name) + return false + } + return true + }) + + // Create the informer for our cluster secrets + m.informer, err = informer.NewInformer[*v1.Secret](ctx, + informer.WithListHandler[*v1.Secret](func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return kubeclient.CoreV1().Secrets(m.namespace).List(ctx, opts) + }), + informer.WithWatchHandler[*v1.Secret](func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kubeclient.CoreV1().Secrets(m.namespace).Watch(ctx, opts) + }), + informer.WithAddHandler(m.onClusterAdded), + informer.WithUpdateHandler(m.onClusterUpdated), + informer.WithDeleteHandler(m.onClusterDeleted), + informer.WithFilters(m.filters), + ) + if err != nil { + return nil, err + } + + return m, nil +} + +// Start starts the manager m and its informer. Start waits for the informer +// to be synced before returning. If the informer could not sync before the +// timeout expires, Start will return an error. +func (m *Manager) Start() error { + log().Info("Starting cluster manager") + go func() { + err := m.informer.Start(m.ctx) + if err != nil { + log().WithError(err).Error("Could not start informer") + } + }() + ctx, cancel := context.WithTimeout(m.ctx, syncTimeout) + defer cancel() + return m.informer.WaitForSync(ctx) +} + +func (m *Manager) Stop() error { + log().Info("Stopping cluster manager") + m.mutex.Lock() + defer m.mutex.Unlock() + m.clusters = make(map[string]*v1alpha1.Cluster) + return m.informer.Stop() +} + +func log() *logrus.Entry { + return logrus.WithField("component", "ClusterManager") +} diff --git a/internal/argocd/cluster/manager_test.go b/internal/argocd/cluster/manager_test.go new file mode 100644 index 00000000..1e886439 --- /dev/null +++ b/internal/argocd/cluster/manager_test.go @@ -0,0 +1,139 @@ +package cluster + +import ( + "context" + "testing" + "time" + + "github.com/argoproj-labs/argocd-agent/test/fake/kube" + "github.com/argoproj-labs/argocd-agent/test/testutil" + "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const secretName = "cluster-123" + +func newClusterSecret(t *testing.T, agentName string) (*v1alpha1.Cluster, *v1.Secret) { + t.Helper() + labels := map[string]string{ + common.LabelKeySecretType: common.LabelValueSecretTypeCluster, + } + if agentName != "" { + labels[LabelKeyClusterAgentMapping] = agentName + } + c := &v1alpha1.Cluster{ + Labels: labels, + Name: "my-cluster", + Server: "127.0.0.1:8080", + } + s := &v1.Secret{} + err := ClusterToSecret(c, s) + require.NoError(t, err) + s.Namespace = "argocd" + s.Name = secretName + return c, s +} + +func Test_onClusterAdd(t *testing.T) { + clt := kube.NewFakeClientsetWithResources() + m, err := NewManager(context.TODO(), "argocd", clt) + require.NoError(t, err) + require.NotNil(t, m) + err = m.Start() + require.NoError(t, err) + + t.Run("Cluster is mapped successfully when secret is added", func(t *testing.T) { + c, s := newClusterSecret(t, "agent-1") + _, err := clt.CoreV1().Secrets("argocd").Create(context.TODO(), s, metav1.CreateOptions{}) + require.NoError(t, err) + var nc *v1alpha1.Cluster + testutil.WaitForChange(t, 1*time.Second, func() bool { + nc = m.Mapping("agent-1") + return nc != nil + }) + assert.NotNil(t, nc) + assert.Equal(t, c.Name, nc.Name) + assert.Equal(t, c.Server, nc.Server) + }) + + t.Run("Cluster is mapped when existing secret is labeled", func(t *testing.T) { + require.False(t, m.HasMapping("agent-2")) + c, s := newClusterSecret(t, "") + s.Name = "test-234" + _, err := clt.CoreV1().Secrets("argocd").Create(context.TODO(), s, metav1.CreateOptions{}) + require.NoError(t, err) + // Let the informer catch and ignore the event + time.Sleep(500 * time.Millisecond) + s.Labels[LabelKeyClusterAgentMapping] = "agent-2" + _, err = clt.CoreV1().Secrets("argocd").Update(context.TODO(), s, metav1.UpdateOptions{}) + require.NoError(t, err) + testutil.WaitForChange(t, 1*time.Second, func() bool { + if m.HasMapping("agent-2") { + return m.Mapping("agent-2").Server == c.Server + } + return false + }) + }) + + t.Run("Cluster is remapped when agent label changes", func(t *testing.T) { + s, err := clt.CoreV1().Secrets("argocd").Get(context.TODO(), "test-234", metav1.GetOptions{}) + require.NoError(t, err) + require.NotEmpty(t, s.Labels) + require.Equal(t, "agent-2", s.Labels[LabelKeyClusterAgentMapping]) + require.True(t, m.HasMapping("agent-2")) + s.Labels[LabelKeyClusterAgentMapping] = "agent-3" + _, err = clt.CoreV1().Secrets("argocd").Update(context.TODO(), s, metav1.UpdateOptions{}) + require.NoError(t, err) + testutil.WaitForChange(t, 1*time.Second, func() bool { + return !m.HasMapping("agent-2") && m.HasMapping("agent-3") + }) + }) + + t.Run("Cluster mapping is updated when secret is updated", func(t *testing.T) { + s, err := clt.CoreV1().Secrets("argocd").Get(context.TODO(), secretName, metav1.GetOptions{}) + require.NoError(t, err) + s.Data["server"] = []byte("127.0.0.1:8081") + _, err = clt.CoreV1().Secrets("argocd").Update(context.TODO(), s, metav1.UpdateOptions{}) + assert.NoError(t, err) + testutil.WaitForChange(t, 1*time.Second, func() bool { + c := m.Mapping("agent-1") + if c == nil { + return false + } + return c.Server == "127.0.0.1:8081" + }) + }) + + t.Run("Cluster mapping is deleted when label is removed", func(t *testing.T) { + s, err := clt.CoreV1().Secrets("argocd").Get(context.TODO(), secretName, metav1.GetOptions{}) + require.NoError(t, err) + delete(s.Labels, LabelKeyClusterAgentMapping) + _, err = clt.CoreV1().Secrets("argocd").Update(context.TODO(), s, metav1.UpdateOptions{}) + assert.NoError(t, err) + // Cluster should be unmapped + testutil.WaitForChange(t, 1*time.Second, func() bool { + return !m.HasMapping("agent-1") + }) + }) + + t.Run("Cluster mapping is deleted when secret is deleted", func(t *testing.T) { + _, err := clt.CoreV1().Secrets("argocd").Get(context.TODO(), "test-234", metav1.GetOptions{}) + require.NoError(t, err) + require.True(t, m.HasMapping("agent-3")) + err = clt.CoreV1().Secrets("argocd").Delete(context.TODO(), "test-234", metav1.DeleteOptions{}) + require.NoError(t, err) + testutil.WaitForChange(t, 1*time.Second, func() bool { + return !m.HasMapping("agent-3") + }) + }) + +} + +func init() { + logrus.SetLevel(logrus.TraceLevel) +} diff --git a/internal/argocd/cluster/mapping.go b/internal/argocd/cluster/mapping.go new file mode 100644 index 00000000..e810f0fb --- /dev/null +++ b/internal/argocd/cluster/mapping.go @@ -0,0 +1,94 @@ +package cluster + +import ( + "errors" + + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" +) + +// ErrAlreadyMapped indicates that an agent has a cluster mapped to it already +var ErrAlreadyMapped = errors.New("agent already mapped") + +// ErrNotMapped indicates that an agent has no cluster mapped to it +var ErrNotMapped = errors.New("agent not mapped") + +// Mapping returns the cluster mapping for an agent with the given name. If the +// agent has no cluster mapped to it, Mapping returns nil. +func (m *Manager) Mapping(agent string) *v1alpha1.Cluster { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.mapping(agent) +} + +// mapping returns the cluster mapping for an agent with the given name. If the +// agent has no cluster mapped to it, mapping returns nil. +// +// This function is not thread safe, unless the caller holds the manager's +// mutex in read mode. +func (m *Manager) mapping(agent string) *v1alpha1.Cluster { + c, ok := m.clusters[agent] + if !ok { + return nil + } + return c +} + +// HasMapping returns true when the manager has a cluster mapping for an agent +// with the given name. +func (m *Manager) HasMapping(agent string) bool { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.hasMapping(agent) +} + +// hasMapping returns true when the manager has a cluster mapping for an agent +// with the given name. +// +// This function is not thread safe, unless the caller holds the manager's +// mutex in read mode. +func (m *Manager) hasMapping(agent string) bool { + _, ok := m.clusters[agent] + return ok +} + +// MapCluster maps the given cluster to the agent with the given name. It may +// return an error if the agent has a cluster mapped to it already. +func (m *Manager) MapCluster(agent string, cluster *v1alpha1.Cluster) error { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.mapCluster(agent, cluster) +} + +// mapCluster maps the given cluster to the agent with the given name. It may +// return an error if the agent has a cluster mapped to it already. +// +// This function is not thread safe, unless the caller holds the manager's +// mutex in write mode. +func (m *Manager) mapCluster(agent string, cluster *v1alpha1.Cluster) error { + if m.hasMapping(agent) { + return ErrAlreadyMapped + } + m.clusters[agent] = cluster + return nil +} + +// UnmapCluster removes the mapping for an agent with the given name. If there +// is no mapping for the agent, UnmapCluster will return an error. +func (m *Manager) UnmapCluster(agent string) error { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.unmapCluster(agent) +} + +// unmapCluster removes the mapping for an agent with the given name. If there +// is no mapping for the agent, unmapCluster will return an error. +// +// This function is not thread safe, unless the caller holds the manager's +// mutex in write mode. +func (m *Manager) unmapCluster(agent string) error { + if !m.hasMapping(agent) { + return ErrNotMapped + } + delete(m.clusters, agent) + return nil +} diff --git a/internal/argocd/cluster/mapping_test.go b/internal/argocd/cluster/mapping_test.go new file mode 100644 index 00000000..28ba878b --- /dev/null +++ b/internal/argocd/cluster/mapping_test.go @@ -0,0 +1,38 @@ +package cluster + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" +) + +func Test_ClusterMappings(t *testing.T) { + m := &Manager{ + clusters: make(map[string]*v1alpha1.Cluster), + } + t.Run("Map cluster to agent", func(t *testing.T) { + err := m.MapCluster("agent", &v1alpha1.Cluster{Name: "cluster"}) + require.NoError(t, err) + }) + t.Run("Cluster is mapped successfully", func(t *testing.T) { + require.True(t, m.HasMapping("agent")) + require.Equal(t, "cluster", m.Mapping("agent").Name) + }) + t.Run("Agent cannot be mapped again", func(t *testing.T) { + err := m.MapCluster("agent", &v1alpha1.Cluster{}) + require.ErrorIs(t, err, ErrAlreadyMapped) + }) + t.Run("Mapping can be deleted", func(t *testing.T) { + err := m.UnmapCluster("agent") + require.NoError(t, err) + }) + t.Run("Mapping has been deleted", func(t *testing.T) { + require.False(t, m.HasMapping("agent")) + require.Nil(t, m.Mapping("agent")) + }) + t.Run("Unmap on unmapped agent returns error", func(t *testing.T) { + require.ErrorIs(t, m.UnmapCluster("agent"), ErrNotMapped) + }) +} diff --git a/internal/config/constants.go b/internal/config/constants.go new file mode 100644 index 00000000..e17c3bc4 --- /dev/null +++ b/internal/config/constants.go @@ -0,0 +1,17 @@ +/* +Package config provides functions and constants around the configuration of the +various argocd-agent components. +*/ +package config + +// SecretNamePrincipalCA is the name of the secret containing the TLS +// configuration for the principal's Certificate Authority +const SecretNamePrincipalCA = "argocd-agent-ca" + +// SecretNamePrincipalGrpcTls is the name of the secret containing the TLS +// configuration for the principal's gRPC service. +const SecretNameGrpcTls = "argocd-agent-grpc-tls" + +// SecretNamePrincipalProxyTls is the name of the secret containing the TLS +// configuration for the principal's resource proxy. +const SecretNameProxyTls = "resource-proxy-tls" diff --git a/internal/event/event.go b/internal/event/event.go index 133718cd..6f67d8ce 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -24,11 +24,11 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/grpcutil" "github.com/argoproj-labs/argocd-agent/pkg/api/grpc/eventstreamapi" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/google/uuid" "github.com/sirupsen/logrus" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" - _ "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -41,7 +41,8 @@ type EventTarget string const TypePrefix = "io.argoproj.argocd-agent.event" -// Supported EventTypes that are sent agent <-> principal +// Supported EventTypes that are sent agent <-> principal. Note that not every +// EventType is supported by every EventTarget. const ( Ping EventType = TypePrefix + ".ping" Pong EventType = TypePrefix + ".pong" @@ -52,6 +53,8 @@ const ( StatusUpdate EventType = TypePrefix + ".status-update" OperationUpdate EventType = TypePrefix + ".operation-update" EventProcessed EventType = TypePrefix + ".processed" + GetRequest EventType = TypePrefix + ".get" + GetResponse EventType = TypePrefix + ".response" ) const ( @@ -59,6 +62,7 @@ const ( TargetApplication EventTarget = "application" TargetAppProject EventTarget = "appproject" TargetEventAck EventTarget = "eventProcessed" + TargetResource EventTarget = "resource" ) const ( @@ -67,8 +71,10 @@ const ( ) var ( - ErrEventDiscarded error = errors.New("event discarded") - ErrEventNotAllowed error = errors.New("event not allowed in this agent mode") + ErrEventDiscarded error = errors.New("event discarded") + ErrEventNotAllowed error = errors.New("event not allowed in this agent mode") + ErrEventNotSupported error = errors.New("event not supported by target") + ErrEventUnknown error = errors.New("unknown event") ) func (t EventType) String() string { @@ -153,6 +159,64 @@ func (evs EventSource) AppProjectEvent(evType EventType, appProject *v1alpha1.Ap return &cev } +type ResourceRequest struct { + UUID string `json:"uuid"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name"` + v1.GroupVersionResource +} + +type ResourceResponse struct { + UUID string `json:"uuid"` + Status string `json:"status"` + Resource string `json:"resource,omitempty"` +} + +// NewResourceRequestEvent creates a cloud event for requesting a resource from +// an agent. The resource will be specified by the GroupVersionResource id in +// gvr, and its name and namespace. If namespace is empty, the request will +// be for a cluster-scoped resource. +// +// The event's resource ID and event ID will be set to a randomly generated +// UUID, because we'll have +func (evs EventSource) NewResourceRequestEvent(gvr v1.GroupVersionResource, namespace string, name string) (*cloudevents.Event, error) { + reqUUID := uuid.NewString() + rr := &ResourceRequest{ + UUID: reqUUID, + Namespace: namespace, + Name: name, + GroupVersionResource: gvr, + } + cev := cloudevents.NewEvent() + cev.SetSource(evs.source) + cev.SetSpecVersion(cloudEventSpecVersion) + cev.SetType(GetRequest.String()) + cev.SetDataSchema(TargetResource.String()) + cev.SetExtension(resourceID, reqUUID) + cev.SetExtension(eventID, reqUUID) + _ = cev.SetData(cloudevents.ApplicationJSON, rr) + return &cev, nil +} + +func (evs EventSource) NewResourceResponseEvent(reqUUID string, status string, data string) *cloudevents.Event { + resUUID := uuid.NewString() + rr := &ResourceResponse{ + UUID: reqUUID, + Status: status, + Resource: data, + } + cev := cloudevents.NewEvent() + cev.SetSource(evs.source) + cev.SetSpecVersion(cloudEventSpecVersion) + cev.SetType(GetResponse.String()) + cev.SetDataSchema(TargetResource.String()) + cev.SetExtension(resourceID, resUUID) + // eventid must be set to the requested resource's uuid + cev.SetExtension(eventID, reqUUID) + _ = cev.SetData(cloudevents.ApplicationJSON, rr) + return &cev +} + func (evs EventSource) ProcessedEvent(evType EventType, ev *Event) *cloudevents.Event { cev := cloudevents.NewEvent() cev.SetSource(evs.source) @@ -190,6 +254,8 @@ func Target(raw *cloudevents.Event) EventTarget { return TargetApplication case TargetAppProject.String(): return TargetAppProject + case TargetResource.String(): + return TargetResource case TargetEventAck.String(): return TargetEventAck } @@ -242,6 +308,20 @@ func (ev Event) AppProject() (*v1alpha1.AppProject, error) { return proj, err } +// ResourceRequest gets the resource request payload from an event +func (ev Event) ResourceRequest() (*ResourceRequest, error) { + req := &ResourceRequest{} + err := ev.event.DataAs(req) + return req, err +} + +// ResourceResponse gets the resource response payload from an event +func (ev Event) ResourceResponse() (*ResourceResponse, error) { + resp := &ResourceResponse{} + err := ev.event.DataAs(resp) + return resp, err +} + type streamWriter interface { Send(*eventstreamapi.Event) error Context() context.Context @@ -395,8 +475,10 @@ func (ew *EventWriter) sendEvent(resID string) { }() logCtx := ew.log.WithFields(logrus.Fields{ - "resource_id": resID, - "event_id": EventID(eventMsg.event), + "resource_id": resID, + "event_id": EventID(eventMsg.event), + "event_target": eventMsg.event.DataSchema(), + "event_type": eventMsg.event.Type(), }) // Check if it is time to resend the event. diff --git a/internal/kube/client.go b/internal/kube/client.go index 953a5c24..429fae84 100644 --- a/internal/kube/client.go +++ b/internal/kube/client.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -37,6 +38,7 @@ type KubernetesClient struct { ApplicationsClientset versioned.Interface Context context.Context Namespace string + RestConfig *rest.Config } func NewKubernetesClient(ctx context.Context, client kubernetes.Interface, applicationsClientset versioned.Interface, namespace string) *KubernetesClient { @@ -83,7 +85,27 @@ func NewKubernetesClientFromConfig(ctx context.Context, namespace string, kubeco return nil, err } - return NewKubernetesClient(ctx, clientset, applicationsClientset, namespace), nil + cl := NewKubernetesClient(ctx, clientset, applicationsClientset, namespace) + cl.RestConfig = config + return cl, nil +} + +func NewRestConfig(config string, context string) (*rest.Config, error) { + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig + loadingRules.ExplicitPath = config + overrides := clientcmd.ConfigOverrides{} + if context != "" { + overrides.CurrentContext = context + } + clientConfig := clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin) + + restCfg, err := clientConfig.ClientConfig() + if err != nil { + return nil, err + } + + return restCfg, nil } // IsRetryableError is a helper method to see whether an error returned from the dynamic client diff --git a/internal/resourceproxy/options.go b/internal/resourceproxy/options.go new file mode 100644 index 00000000..19cd2a3f --- /dev/null +++ b/internal/resourceproxy/options.go @@ -0,0 +1,152 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourceproxy + +import ( + "crypto/tls" + "fmt" + "net/http" + "net/url" + "regexp" + + "github.com/argoproj-labs/argocd-agent/internal/tlsutil" + "k8s.io/client-go/rest" +) + +// ResourceProxyOption is an option setting callback function +type ResourceProxyOption func(p *ResourceProxy) error + +// podMatcher is the regular expression used to match a request path for pods +const podMatcher = `^/api/v1/namespaces/([^\/]+)/pods/([^\/]+)$` + +// logMatcher is the regular expression used to match a request path for logs +const logMatcher = `^/api/v1/namespaces/([^\/]+)/pods/([^\/]+)/log$` + +// execMatcher is the regular expression used to match a request path for exec +const execMatcher = `^/api/v1/namespaces/([^\/]+)/pods/([^\/]+)/exec$` + +// WithRestConfig configures the proxy to use information from the given REST +// config for connecting to upstream. +func WithRestConfig(config *rest.Config) ResourceProxyOption { + return func(p *ResourceProxy) error { + tr, err := tlsutil.TransportFromConfig(config) + if err != nil { + return err + } + p.upstreamTransport = tr + u, err := url.Parse(config.Host) + if err != nil { + return fmt.Errorf("error parsing upstream server: %w", err) + } + p.upstreamAddr = fmt.Sprintf("%s:%s", u.Hostname(), u.Port()) + p.upstreamScheme = u.Scheme + return nil + } +} + +func WithTLSConfig(t *tls.Config) ResourceProxyOption { + return func(p *ResourceProxy) error { + p.tlsConfig = t + return nil + } +} + +// WithUpstreamTransport sets the transport to use when connecting to the +// upstream API server. +func WithUpstreamTransport(t *http.Transport) ResourceProxyOption { + return func(p *ResourceProxy) error { + p.upstreamTransport = t + return nil + } +} + +// WithUpstreamAddress sets the address for our upstream Kube API +func WithUpstreamAddress(host string, scheme string) ResourceProxyOption { + return func(p *ResourceProxy) error { + p.upstreamAddr = host + p.upstreamScheme = scheme + return nil + } +} + +// WithRequestMatcher adds a request matcher to the proxy. The handler fn will +// be executed when pattern matches on the request URI's path. +func WithRequestMatcher(pattern string, methods []string, fn HandlerFunc) ResourceProxyOption { + return func(p *ResourceProxy) error { + rm, err := matcher(pattern, methods, fn) + if err != nil { + return err + } + p.interceptors = append(p.interceptors, rm) + return nil + } +} + +// WithPodMatcher registers a handler for calls to pods +func WithPodMatcher(fn HandlerFunc) ResourceProxyOption { + return func(p *ResourceProxy) error { + rm, err := matcher(podMatcher, []string{"namespace", "podname"}, fn) + if err != nil { + return err + } + p.interceptors = append(p.interceptors, rm) + return nil + } +} + +// WithLogMatcher registers a handler for calls to pod logs +func WithLogMatcher(fn HandlerFunc) ResourceProxyOption { + return func(p *ResourceProxy) error { + rm, err := matcher(logMatcher, []string{"namespace", "podname"}, fn) + if err != nil { + return err + } + p.interceptors = append(p.interceptors, rm) + return nil + } +} + +// WithExecMatcher registers a handler for calls to pod exec +func WithExecMatcher(fn HandlerFunc) ResourceProxyOption { + return func(p *ResourceProxy) error { + rm, err := matcher(execMatcher, []string{"namespace", "podname"}, fn) + if err != nil { + return err + } + p.interceptors = append(p.interceptors, rm) + return nil + } +} + +// matcher creates and returns a new request matcher for the given pattern. +// If the pattern contains submatches, mapping +func matcher(pattern string, methods []string, fn HandlerFunc) (requestMatcher, error) { + matcher, err := regexp.Compile(pattern) + if err != nil { + return requestMatcher{}, err + } + rm := requestMatcher{ + pattern: pattern, + matcher: matcher, + methods: methods, + fn: fn, + } + return rm, nil +} + +func (rp *ResourceProxy) WithRequestMatcher(pattern string, mapping []string, fn HandlerFunc) error { + f := WithRequestMatcher(pattern, mapping, fn) + return f(rp) +} diff --git a/internal/resourceproxy/params.go b/internal/resourceproxy/params.go new file mode 100644 index 00000000..37160fd0 --- /dev/null +++ b/internal/resourceproxy/params.go @@ -0,0 +1,33 @@ +package resourceproxy + +// Params is a typed map for storing interceptor parameters in +type Params map[string]string + +// Set sets param key to the given value +func (p Params) Set(key string, value string) { + _, ok := p[key] + if !ok { + p[key] = value + } +} + +// Get retrieves the param with given key. If no such param exists, nil is +// returned. +func (p Params) Get(key string) string { + v, ok := p[key] + if !ok { + return "" + } + return v +} + +// Has returns true if the given param exists +func (p Params) Has(key string) bool { + _, ok := p[key] + return ok +} + +// NewParams returns a new set of Params +func NewParams() Params { + return make(Params) +} diff --git a/internal/resourceproxy/resourceproxy.go b/internal/resourceproxy/resourceproxy.go new file mode 100644 index 00000000..9a1125b1 --- /dev/null +++ b/internal/resourceproxy/resourceproxy.go @@ -0,0 +1,273 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package resourceproxy implements a very specific, non-general purpose HTTP +proxy server for intercepting a configurable list of calls to the Kubernetes +API. + +If a requests matches any of the configured patterns, +*/ +package resourceproxy + +import ( + "context" + "crypto/tls" + "fmt" + golog "log" + "net" + "net/http" + "net/http/httputil" + "net/netip" + "regexp" + "runtime" + "strings" + "time" + + "github.com/sirupsen/logrus" +) + +// defaultUpstreamHost is the default upstream host to use +const defaultUpstreamHost = "kubernetes.default.svc:6443" + +// defaultUpstreamScheme is the default scheme to use with upstream +const defaultUpstreamScheme = "https" + +const defaultIdleTimeout = 5 * time.Second +const defaultReadTimeout = 5 * time.Second + +// ResourceProxy represents a reverse proxy with interceptors for the Kubernetes +// API. Always Use New to get a new instance. +type ResourceProxy struct { + // addr specifies the address where this proxy listens on. Must be in the + // format [address:port]. + addr string + // tlsConfig is the TLS configuration for the proxy's listener + tlsConfig *tls.Config + + // upstreamAddr is the address of our upstream + upstreamAddr string + // upstreamScheme is the protocol scheme to use for talking to the upstream + upstreamScheme string + // upstreamTransport is the transport to use when talking to the upstream + upstreamTransport *http.Transport + + idleTimeout time.Duration + readTimeout time.Duration + + // mux is the proxy's HTTP serve mux + mux *http.ServeMux + + // server is the proxy's HTTP server + server *http.Server + + // proxy is the proxy's reverse proxy configuration + proxy *httputil.ReverseProxy + + // interceptors is a list of interceptors for intercepting requests + interceptors []requestMatcher + + // state holds state information about requests + statemap requestState +} + +// HandlerFunc is a parameterized HTTP handler function +type HandlerFunc func(w http.ResponseWriter, r *http.Request, params Params) + +// requestMatcher holds information for matching requests against +type requestMatcher struct { + pattern string + matcher *regexp.Regexp + methods []string + fn HandlerFunc +} + +// New returns a new instance of ResourceProxy for connecting to the given upstream +// with the given options. The proxy will listen on host, which must be given +// in the form of [address:port]. +// +// The upstream will be defined by the supplied Kubernetes client REST config, +// which needs to hold all properties required to connect to the upstream API. +func New(addr string, options ...ResourceProxyOption) (*ResourceProxy, error) { + // Just to make sure addr is valid before passing it further down + _, err := netip.ParseAddrPort(addr) + if err != nil { + return nil, fmt.Errorf("invalid listener definition: %w", err) + } + p := &ResourceProxy{ + addr: addr, + idleTimeout: defaultIdleTimeout, + readTimeout: defaultReadTimeout, + } + + // Options may return error, which aborts instantiation + for _, o := range options { + if err := o(p); err != nil { + return nil, err + } + } + + // If not upstream host was set, we use the default + if p.upstreamAddr == "" { + p.upstreamAddr = defaultUpstreamHost + } + + // If not upstream scheme was set, we use the default + if p.upstreamScheme == "" { + p.upstreamScheme = defaultUpstreamScheme + } + + p.mux = http.NewServeMux() + + // Configure the reverse proxy that proxies most of the requests to our + // upstream Kube API. Anything that's not explicitly handled otherwise + // will be routed to the upstream as-is. + p.proxy = &httputil.ReverseProxy{ + ErrorLog: nil, + Director: func(r *http.Request) { + r.URL.Host = p.upstreamAddr + r.URL.Scheme = p.upstreamScheme + }, + Transport: p.upstreamTransport, + ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { + log().WithError(err).Errorf("Could not connect to upstream '%s://%s'", p.upstreamScheme, p.upstreamAddr) + w.WriteHeader(http.StatusBadGateway) + }, + } + + // proxyHandler is the main HTTP handler, used to process every request + p.mux.HandleFunc("/", p.proxyHandler) + + httpLogger := log().WriterLevel(logrus.ErrorLevel) + + // Configure the HTTP server + p.server = &http.Server{ + Addr: p.addr, + Handler: p.mux, + TLSConfig: p.tlsConfig, + ErrorLog: golog.New(httpLogger, "", 0), + IdleTimeout: p.idleTimeout, + ReadHeaderTimeout: p.readTimeout, + } + + p.statemap = requestState{requests: make(map[string]*requestWrapper)} + return p, nil +} + +// Start starts the proxy in the background and immediately returns. The caller +// may read an error from the returned channel. The channel will only be +// written to in case of a start-up error, or when the proxy has shut down. +func (p *ResourceProxy) Start(ctx context.Context) (<-chan error, error) { + log().Infof("Starting ResourceProxy on %s (upstream=%s)", p.addr, p.upstreamAddr) + errCh := make(chan error) + var l net.Listener + var err error + + addr, err := netip.ParseAddrPort(p.addr) + if err != nil { + return nil, fmt.Errorf("invalid listener address: %w", err) + } + + // We support both, IPv4 and IPv6 for the proxy + var network string + if addr.Addr().Is4() { + network = "tcp" + } else if addr.Addr().Is6() { + network = "tcp6" + } else { + return nil, fmt.Errorf("could not figure out address type for %s", p.addr) + } + + // Although we really should only support TLS, we do support plain text + // connections too. But at least, we print a fat warning in that case. + if p.tlsConfig != nil { + l, err = tls.Listen(network, p.addr, p.tlsConfig) + } else { + log().Warn("INSECURE: kube-proxy is listening in non-TLS mode") + l, err = net.Listen(network, p.addr) + } + if err != nil { + return nil, err + } + + // Start the HTTP server in the background + go func() { + errCh <- p.server.Serve(l) + }() + + return errCh, nil +} + +// Stop can be used to gracefully shut down the proxy server. +func (p *ResourceProxy) Stop(ctx context.Context) error { + return p.server.Shutdown(ctx) +} + +// proxyHandler is a HTTP request handler that inspects incoming requests. By +// default, every request will be passed down to the reverse proxy. +func (p *ResourceProxy) proxyHandler(w http.ResponseWriter, r *http.Request) { + log().Debugf("Processing URI %s %s (goroutines:%d)", r.Method, r.RequestURI, runtime.NumGoroutine()) + + // Loop through all registered matchers and match them against the request + // URI's path. First match wins. This is obviously not the most efficient + // nor performant way to do it, but we need regexp matching with submatch + // extraction. + for _, m := range p.interceptors { + matches := m.matcher.FindStringSubmatch(r.URL.Path) + if matches == nil { + continue + } else { + validMethod := false + for _, method := range m.methods { + if strings.EqualFold(r.Method, method) { + validMethod = true + } + } + // We must have a callback function defined. Also, method must be + // allowed. + if !validMethod || m.fn == nil { + w.WriteHeader(http.StatusForbidden) + return + } + + // uriParams will hold the named matches from the regexp + uriParams := NewParams() + for i, name := range m.matcher.SubexpNames() { + if i != 0 && name != "" { + uriParams.Set(name, matches[i]) + } + } + + // Call the handler with our params. The connection will stay open + // until the handler returns. + log().Tracef("Executing callback for %v", uriParams) + m.fn(w, r, uriParams) + return + } + } + + // Finally, if we had no handler match, push the request through the proxy + // so that it ends up at the real Kubernetes API server. + if p.upstreamTransport != nil { + log().Debugf("No interceptor matched %s %s - proxying to upstream", r.Method, r.RequestURI) + p.proxy.ServeHTTP(w, r) + } else { + log().Debugf("No interceptor matched %s %s - no upstream configured", r.Method, r.RequestURI) + w.WriteHeader(http.StatusForbidden) + } +} + +func log() *logrus.Entry { + return logrus.WithField("module", "proxy") +} diff --git a/internal/resourceproxy/resourceproxy_test.go b/internal/resourceproxy/resourceproxy_test.go new file mode 100644 index 00000000..b7cdc7cf --- /dev/null +++ b/internal/resourceproxy/resourceproxy_test.go @@ -0,0 +1,84 @@ +package resourceproxy + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_NewProxy(t *testing.T) { + t.Run("It can be instantiated without options", func(t *testing.T) { + p, err := New("127.0.0.1:8080") + assert.NoError(t, err) + assert.NotNil(t, p) + + // Assert some defaults + assert.Equal(t, defaultUpstreamHost, p.upstreamAddr) + assert.Equal(t, defaultUpstreamScheme, p.upstreamScheme) + + // Interceptors should be empty + assert.Len(t, p.interceptors, 0) + }) + + t.Run("It requires a valid listener address", func(t *testing.T) { + p, err := New("127.0.0.1") + assert.ErrorContains(t, err, "invalid listener") + assert.Nil(t, p) + }) +} + +func Test_proxyHandler(t *testing.T) { + t.Run("It routes to the proxy", func(t *testing.T) { + proxied := false + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + proxied = true + })) + defer s.Close() + p, err := New("127.0.0.1:8080", + WithUpstreamAddress(s.Listener.Addr().String(), "http"), + WithUpstreamTransport(&http.Transport{}), + ) + require.NoError(t, err) + require.NotNil(t, p) + rec := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/test", nil) + p.proxyHandler(rec, r) + assert.True(t, proxied) + }) + t.Run("It intercepts correctly", func(t *testing.T) { + proxied := false + intercepted := true + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + proxied = true + })) + defer s.Close() + p, err := New("127.0.0.1:8080", + WithUpstreamAddress(s.Listener.Addr().String(), "http"), + WithUpstreamTransport(&http.Transport{}), + WithRequestMatcher("^/test/foo$", nil, func(w http.ResponseWriter, r *http.Request, params Params) { + intercepted = true + }), + ) + require.NoError(t, err) + require.NotNil(t, p) + rec := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/test", nil) + p.proxyHandler(rec, r) + assert.True(t, proxied) + proxied = false + + r = httptest.NewRequest(http.MethodGet, "/test/foo", nil) + p.proxyHandler(rec, r) + assert.False(t, proxied) + assert.True(t, intercepted) + intercepted = false + r = httptest.NewRequest(http.MethodGet, "/test/foo/bar", nil) + p.proxyHandler(rec, r) + assert.True(t, proxied) + assert.False(t, intercepted) + + }) +} diff --git a/internal/resourceproxy/tracking.go b/internal/resourceproxy/tracking.go new file mode 100644 index 00000000..55b708d2 --- /dev/null +++ b/internal/resourceproxy/tracking.go @@ -0,0 +1,84 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourceproxy + +import ( + "fmt" + "sync" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/sirupsen/logrus" +) + +// requestWrapper holds information about a given request to be tracked. +type requestWrapper struct { + // agentName is the name of the agent for which a resource is tracked + agentName string + // evCh is a channel where the requester expects the response to its + // requests is written to. + evCh chan *cloudevents.Event +} + +// requestState holds all currently tracked requests +type requestState struct { + mutex sync.RWMutex + requests map[string]*requestWrapper +} + +// Tracked returns the tracked event identified by resId +func (p *ResourceProxy) Tracked(eventId string) (string, chan *cloudevents.Event) { + p.statemap.mutex.RLock() + defer p.statemap.mutex.RUnlock() + r, ok := p.statemap.requests[eventId] + if !ok || r == nil { + return "", nil + } + return r.agentName, r.evCh +} + +// Track starts tracking a resource request identified by its UUID for an agent +// with the given name. It will return a channel the caller can use to read the +// response event from. +func (p *ResourceProxy) Track(eventId string, agentName string) (<-chan *cloudevents.Event, error) { + p.statemap.mutex.Lock() + defer p.statemap.mutex.Unlock() + log().WithFields(logrus.Fields{"event_id": eventId, "agent": agentName}).Trace("Tracking new request") + _, ok := p.statemap.requests[eventId] + if ok { + return nil, fmt.Errorf("resource with ID %s already tracked", eventId) + } + ch := make(chan *cloudevents.Event, 1) + p.statemap.requests[eventId] = &requestWrapper{agentName: agentName, evCh: ch} + return ch, nil +} + +// StopTracking will stop tracking a particular resource and close any event +// channel that may still be open. +func (p *ResourceProxy) StopTracking(eventId string) { + p.statemap.mutex.Lock() + defer p.statemap.mutex.Unlock() + r, ok := p.statemap.requests[eventId] + if ok { + close(r.evCh) + logCtx := log().WithFields(logrus.Fields{ + "event_id": eventId, + "agent": r.agentName, + }) + logCtx.Trace("Finished tracking request") + delete(p.statemap.requests, eventId) + } else { + log().WithField("event_id", eventId).Warn("Resource not tracked -- is this a bug?") + } +} diff --git a/internal/tlsutil/kubernetes.go b/internal/tlsutil/kubernetes.go new file mode 100644 index 00000000..24c43020 --- /dev/null +++ b/internal/tlsutil/kubernetes.go @@ -0,0 +1,160 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tlsutil + +/* +This file holds all TLS utility functions that interact with Kubernetes +*/ + +import ( + "context" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "fmt" + "net/http" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +const tlsCertFieldName = "tls.crt" +const tlsKeyFieldName = "tls.key" +const tlsTypeLabelValue = "kubernetes.io/tls" + +// TLSCertFromSecret reads a Kubernetes TLS secrets, and parses its data into +// a tls.Certificate. +func TLSCertFromSecret(ctx context.Context, kube kubernetes.Interface, namespace, name string) (tls.Certificate, error) { + secret, err := kube.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return tls.Certificate{}, fmt.Errorf("could not read TLS secret %s/%s: %w", namespace, name, err) + } + if secret.Type != tlsTypeLabelValue { + return tls.Certificate{}, fmt.Errorf("%s/%s: not a TLS secret", namespace, name) + } + if len(secret.Data) == 0 { + return tls.Certificate{}, fmt.Errorf("%s/%s: empty secret", namespace, name) + } + crt := secret.Data[tlsCertFieldName] + key := secret.Data[tlsKeyFieldName] + if crt == nil || key == nil { + return tls.Certificate{}, fmt.Errorf("either cert or key is missing in the secret") + } + cert, err := tls.X509KeyPair(crt, key) + if err != nil { + return tls.Certificate{}, fmt.Errorf("invalid cert or key data: %w", err) + } + + return cert, nil +} + +// TLSCertToSecret writes a TLS certificate to a Kubernetes TLS secret. The data +// in the TLS certificate will be converted to PEM prior to being written out. +func TLSCertToSecret(ctx context.Context, kube kubernetes.Interface, namespace, name string, tlsCert tls.Certificate) error { + rsaKey, ok := tlsCert.PrivateKey.(*rsa.PrivateKey) + if !ok { + return fmt.Errorf("invalid private key format") + } + certPem, err := CertDataToPEM(tlsCert.Certificate[0]) + if err != nil { + return err + } + keyPem, err := KeyDataToPEM(rsaKey) + if err != nil { + return err + } + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Type: tlsTypeLabelValue, + Data: map[string][]byte{ + tlsCertFieldName: []byte(certPem), + tlsKeyFieldName: []byte(keyPem), + }, + } + _, err = kube.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) + if err != nil { + return err + } + return nil +} + +// X509CertPoolFromSecret reads certificate data from a Kubernetes secret and +// appends the data to a X509 cert pool to be returned. If field is given, +// only data from this field will be parsed into the cert pool. Otherwise, if +// field is the empty string, all fields in the secret are expected to have +// valid certificate data and will be parsed. +func X509CertPoolFromSecret(ctx context.Context, kube kubernetes.Interface, namespace, name, field string) (*x509.CertPool, error) { + secret, err := kube.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("could not read secret: %w", err) + } + if len(secret.Data) == 0 { + return nil, fmt.Errorf("%s/%s: empty secret", namespace, name) + } + + pool := x509.NewCertPool() + certsInPool := 0 + for f, crtBytes := range secret.Data { + if field == "" || f == field { + ok := pool.AppendCertsFromPEM(crtBytes) + if !ok { + return nil, fmt.Errorf("%s/%s: field %s does not hold valid certificate data", namespace, name, f) + } + certsInPool += 1 + } + } + + return pool, nil +} + +// TransportFromConfig creates an HTTP transport that is configured to use the +// TLS credentials and configuration from the given REST config. +func TransportFromConfig(config *rest.Config) (*http.Transport, error) { + pool := x509.NewCertPool() + pool.AppendCertsFromPEM(config.CAData) + ccert, err := GetKubeConfigClientCert(config) + if err != nil { + return nil, err + } + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + Certificates: append([]tls.Certificate{}, *ccert), + RootCAs: pool, + }, + } + return tr, nil +} + +// GetKubeConfigClientCert extracts the client certificate and private key from +// a given Kubernetes configuration. It supports both, loading the data from a +// file, as well as using base64-encoded embedded data. +func GetKubeConfigClientCert(conf *rest.Config) (*tls.Certificate, error) { + var cert tls.Certificate + var err error + if conf.CertFile != "" && conf.KeyFile != "" { + cert, err = TlsCertFromFile(conf.CertFile, conf.KeyFile, true) + } else if len(conf.CertData) > 0 && len(conf.KeyData) > 0 { + cert, err = tls.X509KeyPair(conf.CertData, conf.KeyData) + } else { + return nil, fmt.Errorf("invalid TLS config in configuration") + } + + return &cert, err +} diff --git a/internal/tlsutil/kubernetes_test.go b/internal/tlsutil/kubernetes_test.go new file mode 100644 index 00000000..18f248a7 --- /dev/null +++ b/internal/tlsutil/kubernetes_test.go @@ -0,0 +1,257 @@ +package tlsutil + +import ( + "context" + "testing" + + "github.com/argoproj-labs/argocd-agent/test/fake/kube" + "github.com/argoproj-labs/argocd-agent/test/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/rest" +) + +func Test_TLSCertFromSecret(t *testing.T) { + t.Run("Secret not found", func(t *testing.T) { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tls-one", + Namespace: "argocd", + }, + Type: tlsTypeLabelValue, + Data: map[string][]byte{ + tlsCertFieldName: testutil.MustReadFile("testdata/001_test_cert.pem"), + tlsKeyFieldName: testutil.MustReadFile("testdata/001_test_key.pem"), + }, + } + kcl := kube.NewFakeClientsetWithResources(secret) + cert, err := TLSCertFromSecret(context.TODO(), kcl, "argocd", "tls-two") + assert.ErrorContains(t, err, "not found") + assert.NotNil(t, cert) + }) + t.Run("Valid secret", func(t *testing.T) { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tls-one", + Namespace: "argocd", + }, + Type: tlsTypeLabelValue, + Data: map[string][]byte{ + tlsCertFieldName: testutil.MustReadFile("testdata/001_test_cert.pem"), + tlsKeyFieldName: testutil.MustReadFile("testdata/001_test_key.pem"), + }, + } + kcl := kube.NewFakeClientsetWithResources(secret) + cert, err := TLSCertFromSecret(context.TODO(), kcl, "argocd", "tls-one") + assert.NoError(t, err) + assert.NotNil(t, cert) + }) + t.Run("Missing data in secret", func(t *testing.T) { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tls-one", + Namespace: "argocd", + }, + Type: tlsTypeLabelValue, + Data: nil, + } + kcl := kube.NewFakeClientsetWithResources(secret) + cert, err := TLSCertFromSecret(context.TODO(), kcl, "argocd", "tls-one") + assert.ErrorContains(t, err, "empty secret") + assert.NotNil(t, cert) + }) + t.Run("Missing cert in secret", func(t *testing.T) { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tls-one", + Namespace: "argocd", + }, + Type: tlsTypeLabelValue, + Data: map[string][]byte{ + tlsKeyFieldName: testutil.MustReadFile("testdata/001_test_key.pem"), + }, + } + kcl := kube.NewFakeClientsetWithResources(secret) + cert, err := TLSCertFromSecret(context.TODO(), kcl, "argocd", "tls-one") + assert.ErrorContains(t, err, "either cert or key is missing") + assert.NotNil(t, cert) + + }) + t.Run("Missing key in secret", func(t *testing.T) { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tls-one", + Namespace: "argocd", + }, + Type: tlsTypeLabelValue, + Data: map[string][]byte{ + tlsCertFieldName: testutil.MustReadFile("testdata/001_test_cert.pem"), + }, + } + kcl := kube.NewFakeClientsetWithResources(secret) + cert, err := TLSCertFromSecret(context.TODO(), kcl, "argocd", "tls-one") + assert.ErrorContains(t, err, "either cert or key is missing") + assert.NotNil(t, cert) + + }) + + t.Run("Not a TLS secret", func(t *testing.T) { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tls-one", + Namespace: "argocd", + }, + Data: map[string][]byte{ + tlsKeyFieldName: testutil.MustReadFile("testdata/001_test_key.pem"), + tlsCertFieldName: testutil.MustReadFile("testdata/001_test_cert.pem"), + }, + } + kcl := kube.NewFakeClientsetWithResources(secret) + cert, err := TLSCertFromSecret(context.TODO(), kcl, "argocd", "tls-one") + assert.ErrorContains(t, err, "not a TLS secret") + assert.NotNil(t, cert) + + }) + + t.Run("Invalid data in secret", func(t *testing.T) { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tls-one", + Namespace: "argocd", + }, + Type: tlsTypeLabelValue, + Data: map[string][]byte{ + tlsKeyFieldName: []byte("something"), + tlsCertFieldName: []byte("someother"), + }, + } + kcl := kube.NewFakeClientsetWithResources(secret) + cert, err := TLSCertFromSecret(context.TODO(), kcl, "argocd", "tls-one") + assert.ErrorContains(t, err, "invalid cert or key data") + assert.NotNil(t, cert) + + }) + +} + +func Test_TlsCertToSecret(t *testing.T) { + keyPem := testutil.MustReadFile("testdata/001_test_key.pem") + certPem := testutil.MustReadFile("testdata/001_test_cert.pem") + + t.Run("Successfully create a secret", func(t *testing.T) { + kcl := kube.NewFakeClientsetWithResources() + cert, err := TlsCertFromFile("testdata/001_test_cert.pem", "testdata/001_test_key.pem", false) + require.NoError(t, err) + err = TLSCertToSecret(context.TODO(), kcl, "argocd", "tls-one", cert) + assert.NoError(t, err) + s, err := kcl.CoreV1().Secrets("argocd").Get(context.TODO(), "tls-one", metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, s) + assert.Equal(t, keyPem, s.Data[tlsKeyFieldName]) + assert.Equal(t, certPem, s.Data[tlsCertFieldName]) + }) + + t.Run("Incomplete certificate", func(t *testing.T) { + kcl := kube.NewFakeClientsetWithResources() + cert, err := TlsCertFromFile("testdata/001_test_cert.pem", "testdata/001_test_key.pem", false) + require.NoError(t, err) + cert.PrivateKey = nil + err = TLSCertToSecret(context.TODO(), kcl, "argocd", "tls-one", cert) + assert.ErrorContains(t, err, "invalid private key") + _, err = kcl.CoreV1().Secrets("argocd").Get(context.TODO(), "tls-one", metav1.GetOptions{}) + assert.True(t, errors.IsNotFound(err)) + }) + +} + +func Test_X509CertPoolFromSecret(t *testing.T) { + certPem := testutil.MustReadFile("testdata/001_test_cert.pem") + s := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ca-certs", + Namespace: "argocd", + }, + Data: map[string][]byte{ + "ca.crt": []byte(certPem), + }, + } + kcl := kube.NewFakeClientsetWithResources(s) + t.Run("Read pool from single field", func(t *testing.T) { + pool, err := X509CertPoolFromSecret(context.TODO(), kcl, "argocd", "ca-certs", "ca.crt") + assert.NoError(t, err) + assert.NotNil(t, pool) + }) + t.Run("Read pool from all fields", func(t *testing.T) { + pool, err := X509CertPoolFromSecret(context.TODO(), kcl, "argocd", "ca-certs", "") + assert.NoError(t, err) + assert.NotNil(t, pool) + }) + t.Run("Read pool from non-existing field", func(t *testing.T) { + pool, err := X509CertPoolFromSecret(context.TODO(), kcl, "argocd", "ca-certs", "") + assert.NoError(t, err) + assert.NotNil(t, pool) + }) + + t.Run("Read pool from non-existing secret", func(t *testing.T) { + pool, err := X509CertPoolFromSecret(context.TODO(), kcl, "argocd", "ca-certs-2", "") + assert.True(t, errors.IsNotFound(err)) + assert.Nil(t, pool) + }) +} + +func Test_TransportFromConfig(t *testing.T) { + keyPem := testutil.MustReadFile("testdata/001_test_key.pem") + certPem := testutil.MustReadFile("testdata/001_test_cert.pem") + t.Run("Valid embedded client certificate data", func(t *testing.T) { + restConf := &rest.Config{ + TLSClientConfig: rest.TLSClientConfig{ + CertData: []byte(certPem), + KeyData: []byte(keyPem), + }, + } + ht, err := TransportFromConfig(restConf) + assert.NoError(t, err) + assert.Len(t, ht.TLSClientConfig.Certificates, 1) + }) + t.Run("Invalid embedded client certificate data", func(t *testing.T) { + restConf := &rest.Config{ + TLSClientConfig: rest.TLSClientConfig{ + CertData: []byte("foo"), + KeyData: []byte("bar"), + }, + } + ht, err := TransportFromConfig(restConf) + assert.Error(t, err) + assert.Nil(t, ht) + }) + t.Run("Valid client certificate data in file", func(t *testing.T) { + restConf := &rest.Config{ + TLSClientConfig: rest.TLSClientConfig{ + CertFile: "testdata/001_test_cert.pem", + KeyFile: "testdata/001_test_key.pem", + }, + } + ht, err := TransportFromConfig(restConf) + assert.NoError(t, err) + assert.Len(t, ht.TLSClientConfig.Certificates, 1) + }) + t.Run("Client certificate data from non-existing file", func(t *testing.T) { + restConf := &rest.Config{ + TLSClientConfig: rest.TLSClientConfig{ + CertFile: "testdata/00x_test_cert.pem", + KeyFile: "testdata/00x_test_key.pem", + }, + } + ht, err := TransportFromConfig(restConf) + assert.Error(t, err) + assert.Nil(t, ht) + }) + t.Run("No client certificate data specified", func(t *testing.T) { + ht, err := TransportFromConfig(&rest.Config{}) + assert.Error(t, err) + assert.Nil(t, ht) + }) +} diff --git a/internal/tlsutil/testdata/001_test_cert.pem b/internal/tlsutil/testdata/001_test_cert.pem new file mode 100644 index 00000000..dee7cb09 --- /dev/null +++ b/internal/tlsutil/testdata/001_test_cert.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDZTCCAk2gAwIBAgIUFAk2kiIEyjS0nCZd/ymtM1qKlfYwDQYJKoZIhvcNAQEL +BQAwQjELMAkGA1UEBhMCWFgxFTATBgNVBAcMDERlZmF1bHQgQ2l0eTEcMBoGA1UE +CgwTRGVmYXVsdCBDb21wYW55IEx0ZDAeFw0yNDEyMTcxNDE5MDhaFw0yNTEyMTcx +NDE5MDhaMEIxCzAJBgNVBAYTAlhYMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAa +BgNVBAoME0RlZmF1bHQgQ29tcGFueSBMdGQwggEiMA0GCSqGSIb3DQEBAQUAA4IB +DwAwggEKAoIBAQCiKV5G7Pu9aC/ZapHuu3yAC6XrVNAgcPHVK0/oQZWCn9/w+3gO +8LZKo9x0rgz7zrlNjg+lR2lQIQ37Zk6b2tVBhl2O9Qm8HShoY17etBuJJI2a7Ru7 +j4IW+GPDON2POiQwFQxoXo3AKyCEzohV7vJ0ShF0M5viJA6Y5jcVSzkQXrDqt1m6 +hLA3mVxkFZ76qCSnOzMoA80+hHWpI64rgZKsKlWQPWe9fE+oIp/JyKU4TXhP/Kua +6ulkQ31dP0gzpANWaROvU3NUy0O+pxyGqc6lpFD94DqqTV8EVh1CknA3KO3pfmX+ +skHm5jL4DYdPn8ciNibRETh+8AhWjOQcNXnFAgMBAAGjUzBRMB0GA1UdDgQWBBQL +rIsnR3OoNtm7InHaaaJhYhFG2zAfBgNVHSMEGDAWgBQLrIsnR3OoNtm7InHaaaJh +YhFG2zAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQB/EECtLv5D +7gFkz3NxcaJfjvGPO2vTSJx79T0ndYKwXqqGisOXfY2SVkq3B03HMRCeTNWJmEie +Gd5zxdBw/lVYFmhJmBzGchD7t4eDx0Yavm8kyc2oU1HAWgZBxtye54ZjAYqBFsrw +qYRiJH5Zr1+8Xacc4usoNGxeGDZWtNrz/qku7pb1pdBGQzUADI+d3DG8+XfhMZtr +jpuq+kZfR/GvFKxHMwyjRw1vwRDL0FYbDtcIbqqJTsR9lxYLhtYbOSzCkGqjmbpQ +rUZ4rsIYeXh+N7vbIVqYjjbmA6fB1N0S3/svVuHtzXxfbuzxBcdDdPrZr3VmoZIh +et6RvI7KUqx+ +-----END CERTIFICATE----- diff --git a/internal/tlsutil/testdata/001_test_key.pem b/internal/tlsutil/testdata/001_test_key.pem new file mode 100644 index 00000000..e29d93aa --- /dev/null +++ b/internal/tlsutil/testdata/001_test_key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCiKV5G7Pu9aC/Z +apHuu3yAC6XrVNAgcPHVK0/oQZWCn9/w+3gO8LZKo9x0rgz7zrlNjg+lR2lQIQ37 +Zk6b2tVBhl2O9Qm8HShoY17etBuJJI2a7Ru7j4IW+GPDON2POiQwFQxoXo3AKyCE +zohV7vJ0ShF0M5viJA6Y5jcVSzkQXrDqt1m6hLA3mVxkFZ76qCSnOzMoA80+hHWp +I64rgZKsKlWQPWe9fE+oIp/JyKU4TXhP/Kua6ulkQ31dP0gzpANWaROvU3NUy0O+ +pxyGqc6lpFD94DqqTV8EVh1CknA3KO3pfmX+skHm5jL4DYdPn8ciNibRETh+8AhW +jOQcNXnFAgMBAAECggEADOfH61MZkcWvO9lXCy3RRwt7mpKhtwM7a8tKTQydznXU +x66WVnINFTxHOONP1eEU2Y7lwIdCDVt5a7mE+12wvR8+u42AHIAhjXU4a/bffiGc +7L3UCDBFRSNiz9BAabv8fKB1iqouaSMa2mw+vhVfJvwXZC0QnzBewl0H+IzdBL1e +g4JLylJk1k0YIStzI1qDP32t78mXuNvHPeQqDzZaFPHcAP2okMCCVMiCspnfa4RO +lvpW6IC1rr0cJYmYqkTFDft2ny/OaSKNc7bw9/OKwP+SaZywXQcZfMeS3iDpHdgs +1PbXpTjGayPU1zFLac4ChdxuPMIKZ1bWD8pUC1dWvQKBgQDjCvYhNGZn7zf3dBH8 +hIzPauDcR/Bawcxqd8Wgbvu/zTbicxdLni0ECrRz625eJatkG9GdRGvtRlWzZUbC +FYSEV1SnfJkmaCl3tcY3AZpnb/aEgb+kHQl0vMUgciCOsDjYTYz47is+7Yp1qXor +QmSK0kTcE3KyjVipukEuSImvLwKBgQC22AJObwHv9wEfIPRBrfbAxrRrDuQ0V6tk +S2TlHCpFCh00ksBuj7jmvDq92hqAZhqSeuZtZx8EyFam6HT0l/EYzTgG9RMxaHhP +PIshry5S0FK2pnsZIE+vpdXRr1uFFVRPqtj4HP17jvvEW3pQXXYNtMs3FPkfTyGq +XR+PZRGJSwKBgCDng8hIKddCSiAoyDqKk0W0PaZvHpxondGITjH0I7Qmb5/eAjBJ +WkjNrF1ob3RhjTdS+MwMEIAww1behKS4LZ5ocbJcUm3Ihsn8pB9wsgnvphCKJVYJ +h0dN3FvZbnJ/g52Fj7q7+bSDBKAM0dHXK28bDjO+9c5+wazHe47ToHCtAoGAZqZk +vRXzN34rogdFOe5pnpavyX7lvUEO1tLBBSNH09S2ysIsyKVlgBxiuh1NTZKFDoFz +Bi6jqnKyuye8KWl4EJ19++Hw8YceLBXoYnPQBOwx05spdtS+B/WJUhwpvFBaMhPP +lZPo90oxrG5S//VIhq9ee0EKD3rEgrmfM0jhjHsCgYAH7V5L3HnhRdhMPb5Ggvxm +dH00H5t52OnJTxiCjqOFl02BdsRWkweyPFSecACC64oEDJ2OEZvTP9tAj+OUy3Sf +/pPeVFagWYvijo5fOjKreqViwB8ipFk6syZTLhCyfN9um1qfjuad1IjGwU3Cwklh +0RiX8twiB5IDY3fDvuBB1Q== +-----END PRIVATE KEY----- diff --git a/internal/tlsutil/tlsutil.go b/internal/tlsutil/tlsutil.go index 2936a1f1..87f16ef0 100644 --- a/internal/tlsutil/tlsutil.go +++ b/internal/tlsutil/tlsutil.go @@ -25,6 +25,7 @@ import ( "crypto/x509" "encoding/pem" "fmt" + "os" "time" ) @@ -87,10 +88,43 @@ func TlsCertFromX509(cert *x509.Certificate, key crypto.PrivateKey) (tls.Certifi return tlsCert, nil } -// func X509CertFromFile(path string) (*x509.Certificate, error) { -// b, err := os.ReadFile(path) -// if err != nil { -// return nil, err -// } +func X509CertPoolFromFile(path string) (*x509.CertPool, error) { + b, err := os.ReadFile(path) + if err != nil { + return nil, err + } + pool := x509.NewCertPool() + ok := pool.AppendCertsFromPEM(b) + if !ok { + return nil, fmt.Errorf("%s: invalid PEM data", path) + } + + return pool, nil +} + +func CertDataToPEM(b []byte) (string, error) { + certPem := new(bytes.Buffer) + err := pem.Encode(certPem, &pem.Block{ + Type: "CERTIFICATE", + Bytes: b, + }) + return certPem.String(), err +} -// } +func KeyDataToPEM(k crypto.PrivateKey) (string, error) { + keyPem := new(bytes.Buffer) + var keyBytes []byte + var err error + + if keyBytes, err = x509.MarshalPKCS8PrivateKey(k); err != nil { + return "", err + } + + err = pem.Encode(keyPem, &pem.Block{ + Type: "PRIVATE KEY", + Bytes: keyBytes, + }) + + return keyPem.String(), err + +} diff --git a/principal/apis/eventstream/eventstream.go b/principal/apis/eventstream/eventstream.go index 2c02c3d8..aa86e81f 100644 --- a/principal/apis/eventstream/eventstream.go +++ b/principal/apis/eventstream/eventstream.go @@ -208,18 +208,30 @@ func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe if streamEvent == nil || streamEvent.Event == nil { return fmt.Errorf("invalid wire transmission") } + app := &v1alpha1.Application{} + proj := &v1alpha1.AppProject{} + resResp := &event.ResourceResponse{} incomingEvent, err := format.FromProto(streamEvent.Event) if err != nil { return fmt.Errorf("could not unserialize event from wire: %w", err) } logCtx = logCtx.WithFields(logrus.Fields{ - "resource_id": event.ResourceID(incomingEvent), - "event_id": event.EventID(incomingEvent), + "resource_id": event.ResourceID(incomingEvent), + "event_id": event.EventID(incomingEvent), + "event_target": incomingEvent.DataSchema(), + "event_type": incomingEvent.Type(), }) - err = incomingEvent.DataAs(app) + switch event.Target(incomingEvent) { + case event.TargetApplication: + err = incomingEvent.DataAs(app) + case event.TargetAppProject: + err = incomingEvent.DataAs(proj) + case event.TargetResource: + err = incomingEvent.DataAs(resResp) + } if err != nil { return fmt.Errorf("could not unserialize app data from wire: %w", err) } @@ -266,10 +278,10 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe if shutdown { return fmt.Errorf("sendq shutdown in progress") } - logCtx.Tracef("Grabbed an item") if ev == nil { return fmt.Errorf("panic: nil item in queue") } + logCtx.WithField("event_target", ev.DataSchema()).WithField("event_type", ev.Type()).Trace("Grabbed an item") eventWriter := s.eventWriters.Get(c.agentName) if eventWriter == nil { diff --git a/principal/event.go b/principal/event.go index fb71c9c1..662dccfb 100644 --- a/principal/event.go +++ b/principal/event.go @@ -20,6 +20,7 @@ import ( "time" "github.com/argoproj-labs/argocd-agent/internal/backend" + "github.com/argoproj-labs/argocd-agent/internal/checkpoint" "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/kube" "github.com/argoproj-labs/argocd-agent/internal/namedlock" @@ -39,17 +40,19 @@ import ( // server's backend. func (s *Server) processRecvQueue(ctx context.Context, agentName string, q workqueue.TypedRateLimitingInterface[*cloudevents.Event]) (*cloudevents.Event, error) { ev, _ := q.Get() - agentMode := s.agentMode(agentName) - incoming := &v1alpha1.Application{} + logCtx := log().WithFields(logrus.Fields{ - "module": "QueueProcessor", - "client": agentName, - "mode": agentMode.String(), - "event": ev.Type(), - "incoming": incoming.QualifiedName(), + "module": "QueueProcessor", + "client": agentName, + "event_target": ev.DataSchema(), + "event_type": ev.Type(), }) logCtx.Debugf("Processing event") + + // Start measuring time for event processing + cp := checkpoint.NewCheckpoint("process_recv_queue") + var err error target := event.Target(ev) switch target { @@ -57,13 +60,24 @@ func (s *Server) processRecvQueue(ctx context.Context, agentName string, q workq err = s.processApplicationEvent(ctx, agentName, ev) case event.TargetAppProject: err = s.processAppProjectEvent(ctx, agentName, ev) + case event.TargetResource: + err = s.processResourceEvent(ctx, agentName, ev) default: - err = fmt.Errorf("unable to process event with unknown target %s", target) + err = fmt.Errorf("unknown target: '%s'", target) } + + // Mark event as processed q.Done(ev) + + // Stop and log checkpoint information + cp.End() + logCtx.Debug(cp.String()) + return ev, err } +// processApplicationEvent processes an incoming event that has an application +// target. func (s *Server) processApplicationEvent(ctx context.Context, agentName string, ev *cloudevents.Event) error { incoming := &v1alpha1.Application{} err := ev.DataAs(incoming) @@ -204,6 +218,39 @@ func (s *Server) processAppProjectEvent(ctx context.Context, agentName string, e return nil } +// processResourceEvent will process a response to a resource request event. +func (s *Server) processResourceEvent(ctx context.Context, agentName string, ev *cloudevents.Event) error { + UUID := event.EventID(ev) + // We need to make sure that a) the event is tracked at all, and b) the + // event is for the currently procesed agent. + trAgent, evChan := s.resourceProxy.Tracked(UUID) + if evChan == nil { + return fmt.Errorf("resource response not tracked") + } + if trAgent != agentName { + return fmt.Errorf("agent mismap between event and tracking") + } + + // Get the resource response body from the event + resReq := &event.ResourceResponse{} + err := ev.DataAs(resReq) + if err != nil { + return err + } + + // There should be someone at the receiving end of the channel to read and + // process the event. Typically, this will be the resource proxy. However, + // we will not wait forever for the response to be read. + select { + case evChan <- ev: + err = nil + case <-ctx.Done(): + err = fmt.Errorf("error waiting for response to be read: %w", ctx.Err()) + } + + return err +} + // eventProcessor is the main loop to process event from the receiver queue, // i.e. events coming from the connect agents. It will process events from // different agents in parallel, but it will not parallelize processing of diff --git a/principal/event_test.go b/principal/event_test.go index ed58a0e8..e96fa926 100644 --- a/principal/event_test.go +++ b/principal/event_test.go @@ -39,7 +39,7 @@ func Test_InvalidEvents(t *testing.T) { s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) - assert.ErrorContains(t, err, "unable to process event with unknown target") + assert.ErrorContains(t, err, "unknown target") assert.Equal(t, ev, *got) }) diff --git a/principal/listen.go b/principal/listen.go index a82a25c5..c7a2ca02 100644 --- a/principal/listen.go +++ b/principal/listen.go @@ -85,6 +85,7 @@ func addrToListener(l net.Listener) (*Listener, error) { return &Listener{host: host, port: port, l: l}, nil } +// Listen configures and starts the server's TCP listener. func (s *Server) Listen(ctx context.Context, backoff wait.Backoff) error { var c net.Listener var err error @@ -98,13 +99,14 @@ func (s *Server) Listen(ctx context.Context, backoff wait.Backoff) error { if try == 1 { log().Debugf("Starting TCP listener on %s", bind) } - // Even though we load TLS configuration here, we will not use create + // Even though we load TLS configuration here, we will not yet create // a TLS listener. TLS will be setup using the appropriate grpc-go API // functions. s.tlsConfig, lerr = s.loadTLSConfig() if lerr != nil { return false, lerr } + // Start the TCP listener and bail out on errors. c, lerr = net.Listen("tcp", bind) if lerr != nil { log().WithError(err).Debugf("Retrying to start TCP listener on %s (retry %d/%d)", bind, try, listenerRetries) @@ -136,34 +138,31 @@ func (s *Server) serveGRPC(ctx context.Context, errch chan error) error { return fmt.Errorf("could not start listener: %w", err) } - s.grpcServer = grpc.NewServer( + grpcOpts := []grpc.ServerOption{ + // Global interceptors for gRPC streams grpc.ChainStreamInterceptor( - streamRequestLogger(), - // logging.StreamServerInterceptor(InterceptorLogger(logrus.New()), - // logging.WithLogOnEvents(logging.StartCall, logging.FinishCall), - // ), - s.streamAuthInterceptor, - // grpc_auth.StreamServerInterceptor(func(ctx context.Context) (context.Context, error) { - // return s.authenticate(ctx) - // }), + streamRequestLogger(), // logging + s.streamAuthInterceptor, // auth ), + // Global interceptors for gRPC unary calls grpc.ChainUnaryInterceptor( - unaryRequestLogger(), - // logging.UnaryServerInterceptor(InterceptorLogger(logrus.New()), - // logging.WithLogOnEvents(logging.StartCall, logging.FinishCall), - // ), - s.unaryAuthInterceptor, + unaryRequestLogger(), // logging + s.unaryAuthInterceptor, // auth ), + // TLS credentials grpc.Creds(credentials.NewTLS(s.tlsConfig)), - ) - authSrv, err := auth.NewServer(s.queues, s.authMethods, s.issuer) - if err != nil { - return fmt.Errorf("could not create new auth server: %w", err) } - authapi.RegisterAuthenticationServer(s.grpcServer, authSrv) - versionapi.RegisterVersionServer(s.grpcServer, version.NewServer(s.authenticate)) - eventstreamapi.RegisterEventStreamServer(s.grpcServer, eventstream.NewServer(s.queues)) + // Instantiate server with given opts + s.grpcServer = grpc.NewServer(grpcOpts...) + + // Register all gRPC services with the server + if err := s.registerGrpcServices(); err != nil { + return fmt.Errorf("could not register gRPC services: %w", err) + } + + // If the server is configured with HTTP/1 support enabled, configured and + // start the downgrading proxy. Otherwise, start the gRPC server. if s.enableWebSocket { opts := []grpchttp1server.Option{grpchttp1server.PreferGRPCWeb(true)} @@ -188,31 +187,6 @@ func (s *Server) serveGRPC(ctx context.Context, errch chan error) error { return nil } -// func (s *Server) ServeHTTP(ctx context.Context, errch chan error) error { -// err := s.Listen(ctx, listenerBackoff) -// if err != nil { -// return fmt.Errorf("could not start listener: %w", err) -// } -// mux := http.NewServeMux() -// mux.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) { -// w.WriteHeader(200) -// w.Write([]byte("hello workd")) -// }) -// s.server = &http.Server{ -// BaseContext: func(l net.Listener) context.Context { -// return s.listener.ctx -// }, -// TLSConfig: s.tlsConfig, -// ErrorLog: golog.New(log.New().WriterLevel(log.WarnLevel), "", 0), -// Handler: mux, -// } -// go func() { -// err = s.server.Serve(s.listener.l) -// errch <- err -// }() -// return nil -// } - func (l *Listener) Host() string { return l.host } @@ -224,3 +198,17 @@ func (l *Listener) Port() int { func (l *Listener) Address() string { return fmt.Sprintf("%s:%d", l.host, l.port) } + +// registerGrpcServices registers all required gRPC services to the server s. +// This method should be called after the server is configured, and has all +// required configuration properties set. +func (s *Server) registerGrpcServices() error { + authSrv, err := auth.NewServer(s.queues, s.authMethods, s.issuer) + if err != nil { + return fmt.Errorf("could not create new auth server: %w", err) + } + authapi.RegisterAuthenticationServer(s.grpcServer, authSrv) + versionapi.RegisterVersionServer(s.grpcServer, version.NewServer(s.authenticate)) + eventstreamapi.RegisterEventStreamServer(s.grpcServer, eventstream.NewServer(s.queues)) + return nil +} diff --git a/principal/options.go b/principal/options.go index 37cffd9f..03945c03 100644 --- a/principal/options.go +++ b/principal/options.go @@ -363,3 +363,17 @@ func WithWebSocket(enableWebSocket bool) ServerOption { return nil } } + +func WithKubeProxyEnabled(enabled bool) ServerOption { + return func(o *Server) error { + o.resourceProxyEnabled = enabled + return nil + } +} + +func WithKubeProxyTLS(tlsConfig *tls.Config) ServerOption { + return func(o *Server) error { + o.resourceProxyTlsConfig = tlsConfig + return nil + } +} diff --git a/principal/resource.go b/principal/resource.go new file mode 100644 index 00000000..2cbb8157 --- /dev/null +++ b/principal/resource.go @@ -0,0 +1,167 @@ +// Copyright 2024 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package principal + +import ( + "context" + "net/http" + "time" + + "github.com/argoproj-labs/argocd-agent/internal/event" + "github.com/argoproj-labs/argocd-agent/internal/resourceproxy" + "k8s.io/apimachinery/pkg/api/validation" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// resourceRequestRegexp is the regexp used to match requests for retrieving a +// resource or a list of resources from the server. It makes use of named +// capture groups. +const resourceRequestRegexp = `^/(?:api|apis/(?P[^\/]+))/(?Pv[^\/]+)(?:/(?:namespaces/(?P[^\/]+)/)?)?(?:(?P[^\/]+)(?:/(?P[^\/]+))?)?$` + +// requestTimeout is the timeout that's being applied to requests for any live +// resource. +const requestTimeout = 10 * time.Second + +// resourceRequester is being executed by the resource proxy once it received a +// request for a specific resource. It will encapsulate this request into an +// event and add this event to the target agent's event queue. It will then +// wait for a response from the agent, which comes in asynchronously. +func (s *Server) resourceRequester(w http.ResponseWriter, r *http.Request, params resourceproxy.Params) { + logCtx := log().WithField("function", "resourceRequester") + + // Make sure our request carries a client certificate + if len(r.TLS.PeerCertificates) < 1 { + logCtx.Errorf("Unauthenticated request from client %s", r.RemoteAddr) + w.WriteHeader(http.StatusUnauthorized) + return + } + + // For now, we can only validate the first client cert presented by the + // client. Under normal circumstances, the client is the Argo CD API, + // which uses the client certificate of the cluster secret, which is + // usually configured by us. + cert := r.TLS.PeerCertificates[0] + + // Make sure the agent name in the certificate is good + agentName := cert.Subject.CommonName + errs := validation.NameIsDNSLabel(agentName, false) + if len(errs) > 0 { + logCtx.Errorf("CRITICAL: Invalid agent name in client certificate: %v", errs) + w.WriteHeader(http.StatusUnauthorized) + return + } + + logCtx = logCtx.WithField("agent", cert.Subject.CommonName) + + // Agent is not connected. Return early. + if !s.queues.HasQueuePair(agentName) { + logCtx.Debugf("Agent is not connected, stop proxying") + w.WriteHeader(http.StatusBadGateway) + return + } + + q := s.queues.SendQ(agentName) + if q == nil { + logCtx.Errorf("Help! Queue disappeared") + w.WriteHeader(http.StatusInternalServerError) + return + } + + gvr := metav1.GroupVersionResource{ + Group: params.Get("group"), + Resource: params.Get("resource"), + Version: params.Get("version"), + } + + // Create the event + sentEv, err := s.events.NewResourceRequestEvent(gvr, params.Get("namespace"), params.Get("name")) + if err != nil { + logCtx.Errorf("Could not create event: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + // Remember the resource ID of the sent event + sentUuid := event.EventID(sentEv) + + // Start tracking the event, so we can later get the response + eventCh, err := s.resourceProxy.Track(sentUuid, agentName) + if err != nil { + logCtx.Errorf("Could not track event %s: %v", sentUuid, err) + } + defer s.resourceProxy.StopTracking(sentUuid) + + // Submit the event to the queue + logCtx.Tracef("Submitting event: %v", sentEv) + q.Add(sentEv) + + logCtx.Infof("Proxying request for resource %s %s/%s", params.Get("resource"), params.Get("namespace"), params.Get("name")) + + // Wait for the event from the agent + ctx, cancel := context.WithTimeout(s.ctx, requestTimeout) + defer cancel() + defer func() { + if err := r.Body.Close(); err != nil { + logCtx.WithError(err).Error("Uh oh") + } + }() + + // The response is being read through a channel that is kept open and + // written to by the resource proxy. + for { + select { + case <-ctx.Done(): + log().Infof("Timeout communicating to the agent, closing proxy connection.") + w.WriteHeader(http.StatusGatewayTimeout) + w.Write([]byte("Timeout communicating to agent.\n")) + return + case rcvdEv, ok := <-eventCh: + // Channel was closed. Bail out. + if !ok { + log().Info("EventQueue has closed the channel") + w.WriteHeader(http.StatusGatewayTimeout) + return + } + + // Make sure that we have the right response event. This should + // usually not happen, because the resource proxy has a mapping + // of request to response, but we'll be vigilante. + rcvdUuid := event.EventID(rcvdEv) + if rcvdUuid != sentUuid { + log().Error("Received mismatching UUID in response") + w.WriteHeader(http.StatusForbidden) + return + } + + // Get the resource out of the event + resp := &event.ResourceResponse{} + err = rcvdEv.DataAs(resp) + if err != nil { + logCtx.WithError(err).Error("Could not get data from event") + w.WriteHeader(http.StatusInternalServerError) + return + } + + // We are good to send the response to the caller + log().Info("Writing resource to caller") + w.Header().Set("Content-type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(resp.Resource)) + return + default: + time.Sleep(100 * time.Millisecond) + } + } +} diff --git a/principal/server.go b/principal/server.go index 39d22d3f..913d3031 100644 --- a/principal/server.go +++ b/principal/server.go @@ -17,12 +17,15 @@ package principal import ( context "context" "crypto/tls" + "encoding/json" "fmt" "net/http" "regexp" + goruntime "runtime" "sync" "time" + "github.com/argoproj-labs/argocd-agent/internal/argocd/cluster" "github.com/argoproj-labs/argocd-agent/internal/auth" kubeapp "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/application" kubeappproject "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/appproject" @@ -36,6 +39,7 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/manager/appproject" "github.com/argoproj-labs/argocd-agent/internal/metrics" "github.com/argoproj-labs/argocd-agent/internal/queue" + "github.com/argoproj-labs/argocd-agent/internal/resourceproxy" "github.com/argoproj-labs/argocd-agent/internal/tlsutil" "github.com/argoproj-labs/argocd-agent/internal/version" "github.com/argoproj-labs/argocd-agent/pkg/types" @@ -44,7 +48,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "google.golang.org/grpc" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -92,6 +96,18 @@ type Server struct { // The Principal will rely on gRPC over WebSocket for bi-directional streaming. This option could be enabled // when there is an intermediate component that is HTTP/2 incompatible and downgrades the incoming request to HTTP/1.1 enableWebSocket bool + + // resourceProxyEnabled indicates whether the resource proxy should be enabled + resourceProxyEnabled bool + // resourceProxy intercepts requests to the agent Kubernetes APIs + resourceProxy *resourceproxy.ResourceProxy + // resourceProxyListenAddr is the listener address for the resource proxy + resourceProxyListenAddr string + // resourceProxyTlsConfig is the TLS configuration for the resource proxy + resourceProxyTlsConfig *tls.Config + + // clusterManager manages Argo CD cluster secrets and their mappings to agents + clusterMgr *cluster.Manager } // noAuthEndpoints is a list of endpoints that are available without the need @@ -103,6 +119,9 @@ var noAuthEndpoints = map[string]bool{ const waitForSyncedDuration = 60 * time.Second +// defaultKubeProxyListenerAddr is the default listener address for kube-proxy +const defaultKubeProxyListenerAddr = "0.0.0.0:9090" + func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace string, opts ...ServerOption) (*Server, error) { s := &Server{ options: defaultOptions(), @@ -140,10 +159,10 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace appFilters := s.defaultAppFilterChain() appInformerOpts := []informer.InformerOption[*v1alpha1.Application]{ - informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) { + informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.ApplicationsClientset.ArgoprojV1alpha1().Applications("").List(ctx, opts) }), - informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.ApplicationsClientset.ArgoprojV1alpha1().Applications("").Watch(ctx, opts) }), informer.WithAddHandler[*v1alpha1.Application](s.newAppCallback), @@ -158,10 +177,10 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace } projInformerOpts := []informer.InformerOption[*v1alpha1.AppProject]{ - informer.WithListHandler[*v1alpha1.AppProject](func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) { + informer.WithListHandler[*v1alpha1.AppProject](func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.ApplicationsClientset.ArgoprojV1alpha1().AppProjects("").List(ctx, opts) }), - informer.WithWatchHandler[*v1alpha1.AppProject](func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + informer.WithWatchHandler[*v1alpha1.AppProject](func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.ApplicationsClientset.ArgoprojV1alpha1().AppProjects("").Watch(ctx, opts) }), informer.WithAddHandler[*v1alpha1.AppProject](s.newAppProjectCallback), @@ -212,9 +231,84 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace "argocd": types.AgentModeAutonomous, } + if s.resourceProxyListenAddr == "" { + s.resourceProxyListenAddr = defaultKubeProxyListenerAddr + } + + // Instantiate our KubeProxy to intercept Kubernetes requests from Argo CD's + // API server. + if s.resourceProxyEnabled { + s.resourceProxy, err = resourceproxy.New(s.resourceProxyListenAddr, + // For matching resource requests from the Argo CD API + resourceproxy.WithRequestMatcher( + // `^/api(?:(?:/(?P[^\/]+))?/(?Pv[^\/]+)/namespaces/(?P[^\/]+)/(?P[^\/]+)/(?P[^\/]+))?$`, + resourceRequestRegexp, + []string{"get"}, + s.resourceRequester, + ), + // Fake version output + resourceproxy.WithRequestMatcher( + `^/version$`, + []string{"get"}, + s.proxyVersion, + ), + resourceproxy.WithTLSConfig(s.resourceProxyTlsConfig), + ) + if err != nil { + return nil, err + } + } + + // Instantiate the cluster manager to handle Argo CD cluster secrets for + // agents. + s.clusterMgr, err = cluster.NewManager(s.ctx, s.namespace, s.kubeClient) + if err != nil { + return nil, err + } + return s, nil } +func (s *Server) apiResources(w http.ResponseWriter, r *http.Request, params resourceproxy.Params) { + w.Header().Add("Content-type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte("{}")) +} + +func (s *Server) proxyVersion(w http.ResponseWriter, r *http.Request, params resourceproxy.Params) { + var kubeVersion = struct { + Major string + Minor string + GitVersion string + GitCommit string + GitTreeState string + BuildDate string + GoVersion string + Compiler string + Platform string + }{ + Major: "1", + Minor: "28", + GitVersion: "1.28.5+argocd-agent", + GitCommit: "841856557ef0f6a399096c42635d114d6f2cf7f4", + GitTreeState: "clean", + BuildDate: "n/a", + GoVersion: goruntime.Version(), + Compiler: goruntime.Compiler, + Platform: fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH), + } + + version, err := json.MarshalIndent(kubeVersion, "", " ") + if err != nil { + log().Errorf("Could not marshal JSON: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Add("Content-type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(version) +} + // Start starts the Server s and its listeners in their own go routines. Any // error during startup, before the go routines are running, will be returned // immediately. Errors during the runtime will be propagated via errch. @@ -271,6 +365,22 @@ func (s *Server) Start(ctx context.Context, errch chan error) error { } log().Infof("AppProject informer synced and ready") + // Start resource proxy if it is enabled + if s.resourceProxy != nil { + _, err = s.resourceProxy.Start(s.ctx) + if err != nil { + return fmt.Errorf("unable to start KubeProxy: %w", err) + } + log().Infof("Resource proxy started") + } else { + log().Infof("Resource proxy is disabled") + } + + err = s.clusterMgr.Start() + if err != nil { + return err + } + return nil } @@ -279,6 +389,16 @@ func (s *Server) Start(ctx context.Context, errch chan error) error { func (s *Server) Shutdown() error { var err error + if s.resourceProxy != nil { + if err = s.resourceProxy.Stop(s.ctx); err != nil { + return err + } + } + + if err = s.clusterMgr.Stop(); err != nil { + return err + } + log().Debugf("Shutdown requested") // Cancel server-wide context s.ctxCancel() diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index acf4542e..b87f6863 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -240,13 +240,13 @@ func Test_AgentServer(t *testing.T) { } sctx, scancel := context.WithTimeout(context.Background(), 20*time.Second) actx, acancel := context.WithTimeout(context.Background(), 20*time.Second) - fakeAppcServer := fakeappclient.NewSimpleClientset() + fakeAppcServer := fakekube.NewKubernetesFakeClient() am := auth.NewMethods() up := userpass.NewUserPassAuthentication("") err := am.RegisterMethod("userpass", up) require.NoError(t, err) up.UpsertUser("client", "insecure") - s, err := principal.NewServer(sctx, fakekube.NewKubernetesFakeClient(), "server", + s, err := principal.NewServer(sctx, fakeAppcServer, "server", principal.WithGRPC(true), principal.WithListenerPort(0), principal.WithServerName("control-plane"), @@ -267,7 +267,7 @@ func Test_AgentServer(t *testing.T) { client.WithAuth("userpass", auth.Credentials{userpass.ClientIDField: "client", userpass.ClientSecretField: "insecure"}), ) require.NoError(t, err) - fakeAppcAgent := fakeappclient.NewSimpleClientset() + fakeAppcAgent := fakekube.NewKubernetesFakeClient() a, err := agent.NewAgent(actx, fakeAppcAgent, "client", agent.WithRemote(remote), agent.WithMode(types.AgentModeManaged.String()), @@ -280,10 +280,10 @@ func Test_AgentServer(t *testing.T) { time.Sleep(100 * time.Millisecond) } log().Infof("Creating application") - _, err = fakeAppcServer.ArgoprojV1alpha1().Applications("client").Create(sctx, app, v1.CreateOptions{}) + _, err = fakeAppcServer.ApplicationsClientset.ArgoprojV1alpha1().Applications("client").Create(sctx, app, v1.CreateOptions{}) require.NoError(t, err) for i := 0; i < 5; i += 1 { - app, err = fakeAppcServer.ArgoprojV1alpha1().Applications("client").Get(actx, "testapp", v1.GetOptions{}) + app, err = fakeAppcServer.ApplicationsClientset.ArgoprojV1alpha1().Applications("client").Get(actx, "testapp", v1.GetOptions{}) if err == nil { break } @@ -293,7 +293,7 @@ func Test_AgentServer(t *testing.T) { require.NotNil(t, app) app.Spec.Project = "hulahup" time.Sleep(1 * time.Second) - _, err = fakeAppcServer.ArgoprojV1alpha1().Applications("client").Update(actx, app, v1.UpdateOptions{}) + _, err = fakeAppcServer.ApplicationsClientset.ArgoprojV1alpha1().Applications("client").Update(actx, app, v1.UpdateOptions{}) require.NoError(t, err) <-sctx.Done() @@ -363,7 +363,7 @@ func Test_WithHTTP1WebSocket(t *testing.T) { startAgent := func(t *testing.T, ctx context.Context, remote *client.Remote) (*client.Remote, *agent.Agent) { t.Helper() - fakeAppcAgent := fakeappclient.NewSimpleClientset() + fakeAppcAgent := fakekube.NewKubernetesFakeClient() a, err := agent.NewAgent(ctx, fakeAppcAgent, "client", agent.WithRemote(remote), agent.WithMode(types.AgentModeManaged.String()), diff --git a/test/testutil/context.go b/test/testutil/context.go new file mode 100644 index 00000000..110b2e6a --- /dev/null +++ b/test/testutil/context.go @@ -0,0 +1 @@ +package testutil diff --git a/test/testutil/file.go b/test/testutil/file.go new file mode 100644 index 00000000..a3d715a2 --- /dev/null +++ b/test/testutil/file.go @@ -0,0 +1,19 @@ +package testutil + +import ( + "io" + "os" +) + +func MustReadFile(path string) []byte { + f, err := os.Open(path) + if err != nil { + panic(err) + } + defer f.Close() + b, err := io.ReadAll(f) + if err != nil { + panic(err) + } + return b +} diff --git a/test/testutil/waitfor.go b/test/testutil/waitfor.go new file mode 100644 index 00000000..499f3997 --- /dev/null +++ b/test/testutil/waitfor.go @@ -0,0 +1,30 @@ +package testutil + +import ( + "testing" + "time" +) + +// WaitForChange is a test helper that waits for some kind of change to +// happen. The condition is given as callback f, which must return either +// true or false. The callback f is executed repeatedly until it returns true +// or the duration d been reached. In the latter case, the helper will fail. +// +// This helper is intended to be used for testing asynchronous events. +func WaitForChange(t *testing.T, d time.Duration, f func() bool) { + t.Helper() + tmr := time.NewTicker(d) + for { + select { + case <-tmr.C: + t.Fatalf("Timeout waiting for condition to be true after %.02fs", d.Seconds()) + default: + ok := f() + if ok { + tmr.Stop() + return + } + time.Sleep(100 * time.Millisecond) + } + } +}