Skip to content

Commit

Permalink
Merge pull request #691 from helixml/fix_subscription
Browse files Browse the repository at this point in the history
Fix subscription
  • Loading branch information
nessie993 authored Jan 9, 2025
2 parents 76c9af6 + d816535 commit 8b5ed99
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 18 deletions.
2 changes: 1 addition & 1 deletion api/pkg/openai/helix_openai_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (suite *HelixClientTestSuite) SetupTest() {
suite.ctx = context.Background()
suite.ctrl = gomock.NewController(suite.T())

pubsub, err := pubsub.NewInMemoryNats(suite.T().TempDir())
pubsub, err := pubsub.NewInMemoryNats()
suite.Require().NoError(err)

suite.pubsub = pubsub
Expand Down
2 changes: 1 addition & 1 deletion api/pkg/openai/helix_openai_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (suite *HelixOpenAiServerTestSuite) SetupTest() {
suite.ctx = context.Background()
suite.ctrl = gomock.NewController(suite.T())

pubsub, err := pubsub.NewInMemoryNats(suite.T().TempDir())
pubsub, err := pubsub.NewInMemoryNats()
suite.Require().NoError(err)

suite.pubsub = pubsub
Expand Down
41 changes: 36 additions & 5 deletions api/pkg/pubsub/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pubsub
import (
"context"
"fmt"
"os"
"sync"
"time"

Expand All @@ -12,6 +13,11 @@ import (
"github.com/rs/zerolog/log"
)

const (
scriptStreamName = "SCRIPTS_STREAM"
scriptsSubject = "SCRIPTS.*"
)

type Nats struct {
conn *nats.Conn
js jetstream.JetStream
Expand All @@ -22,13 +28,18 @@ type Nats struct {
consumer jetstream.Consumer
}

func NewInMemoryNats(storeDir string) (*Nats, error) {
func NewInMemoryNats() (*Nats, error) {
tmpDir, err := os.MkdirTemp(os.TempDir(), "helix-nats")
if err != nil {
return nil, fmt.Errorf("failed to create temp dir: %w", err)
}

opts := &server.Options{
Host: "127.0.0.1",
Port: server.RANDOM_PORT,
NoSigs: true,
JetStream: true,
StoreDir: storeDir,
StoreDir: tmpDir,
// Setting payload to 32 MB
MaxPayload: 32 * 1024 * 1024,
}
Expand Down Expand Up @@ -58,9 +69,12 @@ func NewInMemoryNats(storeDir string) (*Nats, error) {
return nil, fmt.Errorf("failed to create jetstream context: %w", err)
}

// Clean up old streams
gcJetStream(js)

stream, err := js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
Name: "SCRIPTS_STREAM",
Subjects: []string{"SCRIPTS.*"},
Name: scriptStreamName,
Subjects: []string{scriptsSubject},
Retention: jetstream.WorkQueuePolicy,
// Storage: jetstream.MemoryStorage,
Discard: jetstream.DiscardOld,
Expand All @@ -70,7 +84,7 @@ func NewInMemoryNats(storeDir string) (*Nats, error) {
// },
})
if err != nil {
return nil, fmt.Errorf("failed to create jetstream stream: %w", err)
return nil, fmt.Errorf("failed to create internal jetstream stream: %w", err)
}

ctx := context.Background()
Expand Down Expand Up @@ -337,3 +351,20 @@ type consumerWrapper struct{}
func (c *consumerWrapper) Unsubscribe() error {
return nil
}

// gcJetStream is a helper function to clean up old streams
func gcJetStream(js jetstream.JetStream) {
streams := js.ListStreams(context.Background())

for s := range streams.Info() {
log.Debug().
Str("name", s.Config.Name).
Strs("subjects", s.Config.Subjects).
Msg("checking stream for cleanup")
if s.Config.Subjects[0] == "SCRIPTS.*" {
if err := js.DeleteStream(context.Background(), s.Config.Name); err != nil {
log.Err(err).Str("name", s.Config.Name).Msg("failed to delete stream")
}
}
}
}
18 changes: 9 additions & 9 deletions api/pkg/pubsub/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestNatsPubsub(t *testing.T) {

t.Run("Subscribe", func(t *testing.T) {
pubsub, err := NewInMemoryNats(t.TempDir())
pubsub, err := NewInMemoryNats()
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -45,7 +45,7 @@ func TestNatsPubsub(t *testing.T) {
})

t.Run("Subscribe_Wildcard", func(t *testing.T) {
pubsub, err := NewInMemoryNats(t.TempDir())
pubsub, err := NewInMemoryNats()
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestNatsPubsub(t *testing.T) {
})

t.Run("Subscribe_Resubscribe", func(t *testing.T) {
pubsub, err := NewInMemoryNats(t.TempDir())
pubsub, err := NewInMemoryNats()
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestNatsPubsub(t *testing.T) {
}

func TestQueueMultipleSubs(t *testing.T) {
pubsub, err := NewInMemoryNats(t.TempDir())
pubsub, err := NewInMemoryNats()
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestQueueMultipleSubs(t *testing.T) {

func TestNatsStreaming(t *testing.T) {
t.Run("SubscribeLater", func(t *testing.T) {
pubsub, err := NewInMemoryNats(t.TempDir())
pubsub, err := NewInMemoryNats()
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestNatsStreaming(t *testing.T) {
}

func TestStreamRetries(t *testing.T) {
pubsub, err := NewInMemoryNats(t.TempDir())
pubsub, err := NewInMemoryNats()
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestStreamRetries(t *testing.T) {
}

func TestStreamMultipleSubs(t *testing.T) {
pubsub, err := NewInMemoryNats(t.TempDir())
pubsub, err := NewInMemoryNats()
require.NoError(t, err)

// Leaving a little bit of time to go into inactive state,
Expand Down Expand Up @@ -433,7 +433,7 @@ func TestStreamMultipleSubs(t *testing.T) {
}

func TestStreamAfterDelay(t *testing.T) {
pubsub, err := NewInMemoryNats(t.TempDir())
pubsub, err := NewInMemoryNats()
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestStreamAfterDelay(t *testing.T) {
}

func TestStreamFailOne(t *testing.T) {
pubsub, err := NewInMemoryNats(t.TempDir())
pubsub, err := NewInMemoryNats()
require.NoError(t, err)

ctx := context.Background()
Expand Down
4 changes: 2 additions & 2 deletions api/pkg/pubsub/pubsub_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ const (
// TODO: NATS/Redis
)

func New(storeDir string) (PubSub, error) {
func New(_ string) (PubSub, error) {
// TODO: switch on the provider type
return NewInMemoryNats(storeDir)
return NewInMemoryNats()
}

type Config struct {
Expand Down

0 comments on commit 8b5ed99

Please sign in to comment.