Skip to content

Commit

Permalink
async synchronize target reset
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Jogeleit <[email protected]>
  • Loading branch information
fjogeleit committed Sep 30, 2024
1 parent 99afcdb commit f7e8f00
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 3 deletions.
2 changes: 2 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func newRunCMD(version string) *cobra.Command {
k8sConfig.Burst = c.K8sClient.Burst

readinessProbe := config.NewReadinessProbe(c)
defer readinessProbe.Close()

resolver := config.NewResolver(c, k8sConfig)
logger, err := resolver.Logger()
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/config/readinessprobe.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ func (r *ReadinessProbe) Ready() {
func (r *ReadinessProbe) Wait() {
if r.required() && !r.running {
r.running = <-r.ready
close(r.ready)
zap.L().Debug("readiness probe finished")
return
}
}

func (r *ReadinessProbe) Close() {
close(r.ready)
}

func (r *ReadinessProbe) Running() bool {
return r.running
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/listener/sync_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,25 @@ import (
const SendSyncResults = "send_sync_results_listener"

func NewSendSyncResultsListener(targets *target.Collection) report.SyncResultsListener {
targets.Reset(context.Background())
ready := make(chan bool)
ok := false
go func() {
ok = targets.Reset(context.Background())
if ok {
close(ready)
}
}()

return func(rep v1alpha2.ReportInterface) {
clients := targets.SyncClients()
if len(clients) == 0 {
return
}

if !ok {
<-ready
}

wg := &sync.WaitGroup{}
wg.Add(len(clients))

Expand Down
4 changes: 3 additions & 1 deletion pkg/target/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,16 @@ func (c *Collection) Update(t *Target) {
c.mx.Unlock()
}

func (c *Collection) Reset(ctx context.Context) {
func (c *Collection) Reset(ctx context.Context) bool {
clients := c.SyncClients()

for _, c := range clients {
if err := c.Reset(ctx); err != nil {
zap.L().Error("failed to reset target", zap.String("type", c.Type()), zap.String("name", c.Name()))
}
}

return true
}

func (c *Collection) Targets() []*Target {
Expand Down

0 comments on commit f7e8f00

Please sign in to comment.