Skip to content

Commit

Permalink
prune in batches to avoid writer starvation
Browse files Browse the repository at this point in the history
  • Loading branch information
draganm committed Jun 27, 2023
1 parent 14fa357 commit 0ad7c10
Showing 1 changed file with 44 additions and 30 deletions.
74 changes: 44 additions & 30 deletions server/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,55 @@ import (
"github.com/gofrs/uuid"
)

const batchSize = 10000

func (s Server) Prune(cutoffTime time.Time) (err error) {
return bolted.SugaredWrite(s.db, func(tx bolted.SugaredWriteTx) (err error) {
toDelete := []string{}
defer func() {
if err == nil {
s.log.Info("pruned state events", "count", len(toDelete))
}
}()
it := tx.Iterator(eventsPath)
for ; !it.IsDone(); it.Next() {
id, err := uuid.FromString(it.GetKey())
if err != nil {
return fmt.Errorf("could not parse uuid %s: %w", it.GetKey(), err)
}

ts, err := uuid.TimestampFromV6(id)
if err != nil {
return fmt.Errorf("could not get uuid timestamp: %w", err)
}
eventsDeleted := true

for eventsDeleted {
err = bolted.SugaredWrite(s.db, func(tx bolted.SugaredWriteTx) (err error) {
toDelete := []string{}
defer func() {
if err == nil {
s.log.Info("pruned state events", "count", len(toDelete))
}
}()
it := tx.Iterator(eventsPath)
for ; !it.IsDone(); it.Next() {
id, err := uuid.FromString(it.GetKey())
if err != nil {
return fmt.Errorf("could not parse uuid %s: %w", it.GetKey(), err)
}

ts, err := uuid.TimestampFromV6(id)
if err != nil {
return fmt.Errorf("could not get uuid timestamp: %w", err)
}

t, err := ts.Time()
if err != nil {
return fmt.Errorf("could not get time from uuid timestamp: %w", err)
}

if !t.Before(cutoffTime) {
break
}
toDelete = append(toDelete, it.GetKey())

if len(toDelete) >= batchSize {
break
}

t, err := ts.Time()
if err != nil {
return fmt.Errorf("could not get time from uuid timestamp: %w", err)
}

if !t.Before(cutoffTime) {
break
eventsDeleted = len(toDelete) > 0
for _, id := range toDelete {
tx.Delete(eventsPath.Append(id))
}
toDelete = append(toDelete, it.GetKey())

}
return nil
})
}

for _, id := range toDelete {
tx.Delete(eventsPath.Append(id))
}
return nil
})
return
}

0 comments on commit 0ad7c10

Please sign in to comment.