Skip to content

Commit

Permalink
Add support for TTLs and subject delete markers to memory store (#6400)
Browse files Browse the repository at this point in the history
This adds support for TTLs and subject delete markers into the memory
store. Much of this is just replicating and aligning the logic from the
filestore. None of the recovery/rebuilding/re-indexing code is needed as
the memory store does not recover from disk.

Some of the relevant JetStream unit tests have been updated to test both
`MemoryStorage` and `FileStorage`. The relevant filestore tests have
also been replicated into the memstore tests.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
derekcollison authored Jan 23, 2025
2 parents dec670a + fdba068 commit 257319c
Show file tree
Hide file tree
Showing 3 changed files with 394 additions and 183 deletions.
312 changes: 168 additions & 144 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24935,43 +24935,47 @@ func TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor(t *testing.T) {
}

func TestJetStreamMessageTTL(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: storage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}
msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

for i := 1; i <= 10; i++ {
msg.Header.Set(JSMessageTTL, "1s")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}
for i := 1; i <= 10; i++ {
msg.Header.Set(JSMessageTTL, "1s")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 10)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 10)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 10)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 10)

time.Sleep(time.Second * 2)
time.Sleep(time.Second * 2)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)
require_Equal(t, si.State.FirstSeq, 11)
require_Equal(t, si.State.LastSeq, 10)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)
require_Equal(t, si.State.FirstSeq, 11)
require_Equal(t, si.State.LastSeq, 10)
})
}
}

func TestJetStreamMessageTTLRestart(t *testing.T) {
Expand Down Expand Up @@ -25088,31 +25092,35 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) {
}

func TestJetStreamMessageTTLInvalid(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: storage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}
msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

msg.Header.Set(JSMessageTTL, "500ms")
_, err := js.PublishMsg(msg)
require_Error(t, err)
msg.Header.Set(JSMessageTTL, "500ms")
_, err := js.PublishMsg(msg)
require_Error(t, err)

msg.Header.Set(JSMessageTTL, "something")
_, err = js.PublishMsg(msg)
require_Error(t, err)
msg.Header.Set(JSMessageTTL, "something")
_, err = js.PublishMsg(msg)
require_Error(t, err)
})
}
}

func TestJetStreamMessageTTLNotUpdatable(t *testing.T) {
Expand All @@ -25139,74 +25147,82 @@ func TestJetStreamMessageTTLNotUpdatable(t *testing.T) {
}

func TestJetStreamMessageTTLNeverExpire(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
MaxAge: time.Second,
})
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: storage,
Subjects: []string{"test"},
AllowMsgTTL: true,
MaxAge: time.Second,
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}
msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

// The first message we publish is set to "never expire", therefore it
// won't age out with the MaxAge policy.
msg.Header.Set(JSMessageTTL, "never")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
// The first message we publish is set to "never expire", therefore it
// won't age out with the MaxAge policy.
msg.Header.Set(JSMessageTTL, "never")
_, err := js.PublishMsg(msg)
require_NoError(t, err)

// Following messages will be published as normal and will age out.
msg.Header.Del(JSMessageTTL)
for i := 1; i <= 10; i++ {
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}
// Following messages will be published as normal and will age out.
msg.Header.Del(JSMessageTTL)
for i := 1; i <= 10; i++ {
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 11)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 11)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 11)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 11)

time.Sleep(time.Second * 2)
time.Sleep(time.Second * 2)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 1)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 11)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 1)
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 11)
})
}
}

func TestJetStreamMessageTTLDisabled(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
})
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: storage,
Subjects: []string{"test"},
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}
msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

msg.Header.Set(JSMessageTTL, "1s")
_, err := js.PublishMsg(msg)
require_Error(t, err)
msg.Header.Set(JSMessageTTL, "1s")
_, err := js.PublishMsg(msg)
require_Error(t, err)
})
}
}

func TestJetStreamMessageTTLWhenSourcing(t *testing.T) {
Expand Down Expand Up @@ -25338,40 +25354,44 @@ func TestJetStreamMessageTTLWhenMirroring(t *testing.T) {
}

func TestJetStreamSubjectDeleteMarkers(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
MaxAge: time.Second,
AllowMsgTTL: true,
SubjectDeleteMarkers: true,
SubjectDeleteMarkerTTL: "1s",
})
jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: storage,
Subjects: []string{"test"},
MaxAge: time.Second,
AllowMsgTTL: true,
SubjectDeleteMarkers: true,
SubjectDeleteMarkerTTL: "1s",
})

sub, err := js.SubscribeSync("test")
require_NoError(t, err)
sub, err := js.SubscribeSync("test")
require_NoError(t, err)

for i := 0; i < 3; i++ {
_, err = js.Publish("test", nil)
require_NoError(t, err)
}
for i := 0; i < 3; i++ {
_, err = js.Publish("test", nil)
require_NoError(t, err)
}

for i := 0; i < 3; i++ {
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_NoError(t, msg.AckSync())
}
for i := 0; i < 3; i++ {
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_NoError(t, msg.AckSync())
}

msg, err := sub.NextMsg(time.Second * 10)
require_NoError(t, err)
require_Equal(t, msg.Header.Get(JSAppliedLimit), "MaxAge")
require_Equal(t, msg.Header.Get(JSMessageTTL), "1s")
msg, err := sub.NextMsg(time.Second * 10)
require_NoError(t, err)
require_Equal(t, msg.Header.Get(JSAppliedLimit), "MaxAge")
require_Equal(t, msg.Header.Get(JSMessageTTL), "1s")
})
}
}

func TestJetStreamSubjectDeleteMarkersWithMirror(t *testing.T) {
Expand Down Expand Up @@ -25402,20 +25422,24 @@ func TestJetStreamSubjectDeleteMarkersWithMirror(t *testing.T) {
}

func TestJetStreamSubjectDeleteMarkersDefaultTTL(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()
nc, _ := jsClientConnect(t, s)
defer nc.Close()

sc, err := jsStreamCreate(t, nc, &StreamConfig{
Name: "Origin",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
SubjectDeleteMarkers: true,
})
require_NoError(t, err)
sc, err := jsStreamCreate(t, nc, &StreamConfig{
Name: "Origin",
Storage: storage,
Subjects: []string{"test"},
AllowMsgTTL: true,
SubjectDeleteMarkers: true,
})
require_NoError(t, err)

require_Equal(t, sc.SubjectDeleteMarkerTTL, subjectDeleteMarkerDefaultTTL)
require_Equal(t, sc.SubjectDeleteMarkerTTL, subjectDeleteMarkerDefaultTTL)
})
}
}
Loading

0 comments on commit 257319c

Please sign in to comment.