Skip to content

Commit

Permalink
support new event version (#171)
Browse files Browse the repository at this point in the history
* support new event version

* support new event version
  • Loading branch information
Gentleelephant authored Oct 9, 2024
1 parent 91e09c2 commit b97c9ce
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 14 deletions.
3 changes: 3 additions & 0 deletions cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&configFile, "config.file", "", "Event exporter configuration file path")
flag.StringVar(&util.EventVersion, "event.version", util.EventVersionOld, "event version, eventsv1 or corev1")
}

func main() {
Expand All @@ -42,6 +43,8 @@ func main() {
klog.Fatal("Error building kubernetes clientset: ", e)
}

go util.SetClusterName(kclient)

ctx, cancel := context.WithCancel(context.Background())
wg, ctx := errgroup.WithContext(ctx)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
k8s.io/apimachinery v0.27.4
k8s.io/client-go v12.0.0+incompatible
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.90.1
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/yaml v1.4.0
)
Expand Down Expand Up @@ -114,7 +115,6 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.26.1 // indirect
k8s.io/component-base v0.26.1 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
33 changes: 24 additions & 9 deletions pkg/exporter/kube_events_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"fmt"
"sync"

"github.com/kubesphere/kube-events/pkg/util"
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kubesphere/kube-events/pkg/config"
"github.com/kubesphere/kube-events/pkg/exporter/sinks"
"github.com/kubesphere/kube-events/pkg/exporter/types"
Expand Down Expand Up @@ -104,7 +109,7 @@ func (s *K8sEventSource) waitForCacheSync(stopc <-chan struct{}) error {
return nil
}

func (s *K8sEventSource) drainEvents() (evts []*corev1.Event, shutdown bool) {
func (s *K8sEventSource) drainEvents() (evts []client.Object, shutdown bool) {
var (
i = 0
m = s.workqueue.Len()
Expand All @@ -116,7 +121,7 @@ func (s *K8sEventSource) drainEvents() (evts []*corev1.Event, shutdown bool) {
var obj interface{}
obj, shutdown = s.workqueue.Get()
if obj != nil {
evts = append(evts, obj.(*corev1.Event))
evts = append(evts, obj.(client.Object))
}
i++
if i >= m {
Expand Down Expand Up @@ -150,7 +155,7 @@ func (s *K8sEventSource) sinkEvents(ctx context.Context) {
} else if numRequeues := s.workqueue.NumRequeues(evt); numRequeues >= maxRetries {
s.workqueue.Forget(evt)
klog.Infof("Dropping event %s/%s out of the queue because of failing %d times: %v\n",
evt.Namespace, evt.Name, numRequeues, err)
evt.GetNamespace(), evt.GetName(), numRequeues, err)
} else {
s.workqueue.AddRateLimited(evt)
}
Expand All @@ -162,7 +167,7 @@ func (s *K8sEventSource) sinkEvents(ctx context.Context) {
for _, e := range evts {
events.KubeEvents = append(events.KubeEvents, &types.ExtendedEvent{
Event: e,
Cluster: s.cluster,
Cluster: util.GetCluster(),
})
}

Expand All @@ -185,7 +190,8 @@ func (s *K8sEventSource) enqueueEvent(obj interface{}) {
if obj == nil {
return
}
evt, ok := obj.(*corev1.Event)

evt, ok := obj.(client.Object)
if ok {
evt.SetManagedFields(nil) // set it nil because it is quite verbose
s.workqueue.Add(evt)
Expand All @@ -197,17 +203,26 @@ func NewKubeEventSource(client *kubernetes.Clientset) *K8sEventSource {
client: client,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "events"),
}
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(),
"events", metav1.NamespaceAll, fields.Everything())
s.inf = cache.NewSharedIndexInformer(lw, &corev1.Event{}, 0, cache.Indexers{})
var eventType runtime.Object
var lw *cache.ListWatch
if util.EventVersion == util.EventVersionNew {
eventType = &eventsv1.Event{}
lw = cache.NewListWatchFromClient(client.EventsV1().RESTClient(),
"events", metav1.NamespaceAll, fields.Everything())
} else {
eventType = &corev1.Event{}
lw = cache.NewListWatchFromClient(client.CoreV1().RESTClient(),
"events", metav1.NamespaceAll, fields.Everything())
}
s.inf = cache.NewSharedIndexInformer(lw, eventType, 0, cache.Indexers{})
s.inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.enqueueEvent,
UpdateFunc: func(old, new interface{}) {
s.enqueueEvent(new)
},
})

s.cluster = s.getClusterName()
s.cluster = util.GetCluster()

return s
}
7 changes: 3 additions & 4 deletions pkg/exporter/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package types

import (
"context"

v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Events struct {
KubeEvents []*ExtendedEvent `json:"kubeEvents"`
}

type ExtendedEvent struct {
*v1.Event `json:",inline"`
Cluster string `json:"cluster,omitempty"`
Event client.Object `json:",inline"`
Cluster string `json:"cluster,omitempty"`
}

type Sinker interface {
Expand Down
53 changes: 53 additions & 0 deletions pkg/util/annoutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package util

import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

const (
EventVersionOld = "corev1"
EventVersionNew = "eventsv1"
)

var cluster string
var EventVersion string

func SetClusterName(client *kubernetes.Clientset) {
setCluster(client)
t := time.NewTicker(60 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
if cluster != "" {
return
}
setCluster(client)
klog.Infof("current cluster is [%s]", GetCluster())
}
}

}

func setCluster(client *kubernetes.Clientset) {

ns, err := client.CoreV1().Namespaces().Get(context.Background(), "kubesphere-system", metav1.GetOptions{})
if err != nil {
klog.Errorf("get namespace kubesphere-system error: %s", err)
return
}

if ns.Annotations != nil {
cluster = ns.Annotations["cluster.kubesphere.io/name"]
}

}

func GetCluster() string {
return cluster
}

0 comments on commit b97c9ce

Please sign in to comment.