diff --git a/backend/src/apiserver/client/argo.go b/backend/src/apiserver/client/argo.go index 7c15aa1e34c..b068c2f2e10 100644 --- a/backend/src/apiserver/client/argo.go +++ b/backend/src/apiserver/client/argo.go @@ -23,7 +23,6 @@ import ( "github.com/golang/glog" "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/pkg/errors" - "k8s.io/client-go/rest" ) type ArgoClientInterface interface { @@ -41,7 +40,7 @@ func (argoClient *ArgoClient) Workflow(namespace string) argoprojv1alpha1.Workfl func NewArgoClientOrFatal(initConnectionTimeout time.Duration, clientParams util.ClientParameters) *ArgoClient { var argoProjClient argoprojv1alpha1.ArgoprojV1alpha1Interface operation := func() error { - restConfig, err := rest.InClusterConfig() + restConfig, err := util.GetKubernetesConfig() if err != nil { return errors.Wrap(err, "Failed to initialize the RestConfig") } diff --git a/backend/src/apiserver/client/swf.go b/backend/src/apiserver/client/swf.go index 9197f1f2d9d..742e7a89c33 100644 --- a/backend/src/apiserver/client/swf.go +++ b/backend/src/apiserver/client/swf.go @@ -25,7 +25,6 @@ import ( swfclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned" "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1" "github.com/pkg/errors" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" ) @@ -46,7 +45,7 @@ func (swfClient *SwfClient) ScheduledWorkflow(namespace string) v1beta1.Schedule func NewScheduledWorkflowClientOrFatal(initConnectionTimeout time.Duration, clientParams util.ClientParameters) *SwfClient { var swfClient v1beta1.ScheduledworkflowV1beta1Interface operation := func() error { - restConfig, err := rest.InClusterConfig() + restConfig, err := util.GetKubernetesConfig() if err != nil { return err } diff --git a/backend/src/apiserver/client/util.go b/backend/src/apiserver/client/util.go index aa9ebe6fd85..2100b9851b0 100644 --- a/backend/src/apiserver/client/util.go +++ b/backend/src/apiserver/client/util.go @@ -15,15 +15,15 @@ package client import ( + "os" + "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/pkg/errors" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "os" ) func getKubernetesClientset(clientParams util.ClientParameters) (*kubernetes.Clientset, error) { - restConfig, err := rest.InClusterConfig() + restConfig, err := util.GetKubernetesConfig() if err != nil { return nil, errors.Wrap(err, "Failed to initialize kubernetes client") } diff --git a/backend/src/cache/client/kubernetes_core.go b/backend/src/cache/client/kubernetes_core.go index 5d734847b4f..f88a1f70cc5 100644 --- a/backend/src/cache/client/kubernetes_core.go +++ b/backend/src/cache/client/kubernetes_core.go @@ -9,7 +9,6 @@ import ( "github.com/pkg/errors" "k8s.io/client-go/kubernetes" v1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" ) type KubernetesCoreInterface interface { @@ -25,7 +24,7 @@ func (c *KubernetesCore) PodClient(namespace string) v1.PodInterface { } func createKubernetesCore(clientParams util.ClientParameters) (KubernetesCoreInterface, error) { - restConfig, err := rest.InClusterConfig() + restConfig, err := util.GetKubernetesConfig() if err != nil { return nil, errors.Wrap(err, "Failed to initialize kubernetes client.") } diff --git a/backend/src/common/util/execution_client.go b/backend/src/common/util/execution_client.go index 25d1a315a10..5662eb11604 100644 --- a/backend/src/common/util/execution_client.go +++ b/backend/src/common/util/execution_client.go @@ -28,7 +28,6 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) @@ -87,7 +86,7 @@ func NewExecutionClientOrFatal(execType ExecutionType, initConnectionTimeout tim case ArgoWorkflow: var argoProjClient *argoclient.Clientset operation := func() error { - restConfig, err := rest.InClusterConfig() + restConfig, err := GetKubernetesConfig() if err != nil { return errors.Wrap(err, "Failed to initialize the RestConfig") } @@ -106,8 +105,8 @@ func NewExecutionClientOrFatal(execType ExecutionType, initConnectionTimeout tim return &WorkflowClient{client: argoProjClient} case TektonPipelineRun: var prClient *prclientset.Clientset - var operation = func() error { - restConfig, err := rest.InClusterConfig() + operation := func() error { + restConfig, err := GetKubernetesConfig() if err != nil { return errors.Wrap(err, "Failed to initialize the RestConfig") } @@ -139,7 +138,7 @@ func NewExecutionInformerOrFatal(execType ExecutionType, namespace string, case ArgoWorkflow: var argoInformer argoinformer.SharedInformerFactory operation := func() error { - restConfig, err := rest.InClusterConfig() + restConfig, err := GetKubernetesConfig() if err != nil { return errors.Wrap(err, "Failed to initialize the RestConfig") } @@ -167,8 +166,8 @@ func NewExecutionInformerOrFatal(execType ExecutionType, namespace string, case TektonPipelineRun: var prInformer prinformer.SharedInformerFactory var prClient *prclientset.Clientset - var operation = func() error { - restConfig, err := rest.InClusterConfig() + operation := func() error { + restConfig, err := GetKubernetesConfig() if err != nil { return errors.Wrap(err, "Failed to initialize the RestConfig") } diff --git a/backend/src/common/util/service.go b/backend/src/common/util/service.go index 92c036a31bd..cf0f5379a38 100644 --- a/backend/src/common/util/service.go +++ b/backend/src/common/util/service.go @@ -50,6 +50,26 @@ func WaitForAPIAvailable(initializeTimeout time.Duration, basePath string, apiAd return errors.Wrapf(err, "Waiting for ml pipeline API server failed after all attempts.") } +// GetKubernetesConfig will first try an in-cluster configuration but fallback to using a kubeconfig. +func GetKubernetesConfig() (*rest.Config, error) { + restConfig, errInCluster := rest.InClusterConfig() + if errInCluster == nil { + return restConfig, nil + } + + // Fallback to using a kubeconfig + clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + clientcmd.NewDefaultClientConfigLoadingRules(), &clientcmd.ConfigOverrides{}, + ) + + restConfig, errKubeconfig := clientConfig.ClientConfig() + if errKubeconfig != nil { + return nil, fmt.Errorf("%w; %w", errInCluster, errKubeconfig) + } + + return restConfig, nil +} + func GetKubernetesClientFromClientConfig(clientConfig clientcmd.ClientConfig) ( *kubernetes.Clientset, *rest.Config, string, error, ) { diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 68899f14a5b..f9e782b8c63 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -21,6 +21,7 @@ import ( "strconv" "time" + "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "github.com/golang/glog" @@ -41,7 +42,6 @@ import ( k8sres "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" ) var dummyImages = map[string]string{ @@ -136,7 +136,7 @@ func RootDAG(ctx context.Context, opts Options, mlmd *metadata.Client) (executio // TODO(v2): in pipeline spec, rename GCS output directory to pipeline root. pipelineRoot := opts.RuntimeConfig.GetGcsOutputDirectory() - restConfig, err := rest.InClusterConfig() + restConfig, err := util.GetKubernetesConfig() if err != nil { return nil, fmt.Errorf("failed to initialize kubernetes client: %w", err) } @@ -1922,7 +1922,7 @@ func deletePVC( func createK8sClient() (*kubernetes.Clientset, error) { // Initialize Kubernetes client set - restConfig, err := rest.InClusterConfig() + restConfig, err := util.GetKubernetesConfig() if err != nil { return nil, fmt.Errorf("failed to initialize kubernetes client: %w", err) }