diff --git a/api/pkg/openai/helix_openai_client_test.go b/api/pkg/openai/helix_openai_client_test.go index eadb1764..447799e9 100644 --- a/api/pkg/openai/helix_openai_client_test.go +++ b/api/pkg/openai/helix_openai_client_test.go @@ -41,7 +41,7 @@ func (suite *HelixClientTestSuite) SetupTest() { suite.ctx = context.Background() suite.ctrl = gomock.NewController(suite.T()) - pubsub, err := pubsub.NewInMemoryNats(suite.T().TempDir()) + pubsub, err := pubsub.NewInMemoryNats() suite.Require().NoError(err) suite.pubsub = pubsub diff --git a/api/pkg/openai/helix_openai_server_test.go b/api/pkg/openai/helix_openai_server_test.go index 164fd5cf..65e704c3 100644 --- a/api/pkg/openai/helix_openai_server_test.go +++ b/api/pkg/openai/helix_openai_server_test.go @@ -32,7 +32,7 @@ func (suite *HelixOpenAiServerTestSuite) SetupTest() { suite.ctx = context.Background() suite.ctrl = gomock.NewController(suite.T()) - pubsub, err := pubsub.NewInMemoryNats(suite.T().TempDir()) + pubsub, err := pubsub.NewInMemoryNats() suite.Require().NoError(err) suite.pubsub = pubsub diff --git a/api/pkg/pubsub/nats.go b/api/pkg/pubsub/nats.go index a9279b7d..9eae9666 100644 --- a/api/pkg/pubsub/nats.go +++ b/api/pkg/pubsub/nats.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "fmt" + "os" "sync" "time" @@ -12,6 +13,11 @@ import ( "github.com/rs/zerolog/log" ) +const ( + scriptStreamName = "SCRIPTS_STREAM" + scriptsSubject = "SCRIPTS.*" +) + type Nats struct { conn *nats.Conn js jetstream.JetStream @@ -22,13 +28,18 @@ type Nats struct { consumer jetstream.Consumer } -func NewInMemoryNats(storeDir string) (*Nats, error) { +func NewInMemoryNats() (*Nats, error) { + tmpDir, err := os.MkdirTemp(os.TempDir(), "helix-nats") + if err != nil { + return nil, fmt.Errorf("failed to create temp dir: %w", err) + } + opts := &server.Options{ Host: "127.0.0.1", Port: server.RANDOM_PORT, NoSigs: true, JetStream: true, - StoreDir: storeDir, + StoreDir: tmpDir, // Setting payload to 32 MB MaxPayload: 32 * 1024 * 1024, } @@ -58,9 +69,12 @@ func NewInMemoryNats(storeDir string) (*Nats, error) { return nil, fmt.Errorf("failed to create jetstream context: %w", err) } + // Clean up old streams + gcJetStream(js) + stream, err := js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{ - Name: "SCRIPTS_STREAM", - Subjects: []string{"SCRIPTS.*"}, + Name: scriptStreamName, + Subjects: []string{scriptsSubject}, Retention: jetstream.WorkQueuePolicy, // Storage: jetstream.MemoryStorage, Discard: jetstream.DiscardOld, @@ -70,7 +84,7 @@ func NewInMemoryNats(storeDir string) (*Nats, error) { // }, }) if err != nil { - return nil, fmt.Errorf("failed to create jetstream stream: %w", err) + return nil, fmt.Errorf("failed to create internal jetstream stream: %w", err) } ctx := context.Background() @@ -337,3 +351,20 @@ type consumerWrapper struct{} func (c *consumerWrapper) Unsubscribe() error { return nil } + +// gcJetStream is a helper function to clean up old streams +func gcJetStream(js jetstream.JetStream) { + streams := js.ListStreams(context.Background()) + + for s := range streams.Info() { + log.Debug(). + Str("name", s.Config.Name). + Strs("subjects", s.Config.Subjects). + Msg("checking stream for cleanup") + if s.Config.Subjects[0] == "SCRIPTS.*" { + if err := js.DeleteStream(context.Background(), s.Config.Name); err != nil { + log.Err(err).Str("name", s.Config.Name).Msg("failed to delete stream") + } + } + } +} diff --git a/api/pkg/pubsub/nats_test.go b/api/pkg/pubsub/nats_test.go index e8806c1a..85587f4c 100644 --- a/api/pkg/pubsub/nats_test.go +++ b/api/pkg/pubsub/nats_test.go @@ -16,7 +16,7 @@ import ( func TestNatsPubsub(t *testing.T) { t.Run("Subscribe", func(t *testing.T) { - pubsub, err := NewInMemoryNats(t.TempDir()) + pubsub, err := NewInMemoryNats() require.NoError(t, err) ctx := context.Background() @@ -45,7 +45,7 @@ func TestNatsPubsub(t *testing.T) { }) t.Run("Subscribe_Wildcard", func(t *testing.T) { - pubsub, err := NewInMemoryNats(t.TempDir()) + pubsub, err := NewInMemoryNats() require.NoError(t, err) ctx := context.Background() @@ -74,7 +74,7 @@ func TestNatsPubsub(t *testing.T) { }) t.Run("Subscribe_Resubscribe", func(t *testing.T) { - pubsub, err := NewInMemoryNats(t.TempDir()) + pubsub, err := NewInMemoryNats() require.NoError(t, err) ctx := context.Background() @@ -124,7 +124,7 @@ func TestNatsPubsub(t *testing.T) { } func TestQueueMultipleSubs(t *testing.T) { - pubsub, err := NewInMemoryNats(t.TempDir()) + pubsub, err := NewInMemoryNats() require.NoError(t, err) ctx := context.Background() @@ -200,7 +200,7 @@ func TestQueueMultipleSubs(t *testing.T) { func TestNatsStreaming(t *testing.T) { t.Run("SubscribeLater", func(t *testing.T) { - pubsub, err := NewInMemoryNats(t.TempDir()) + pubsub, err := NewInMemoryNats() require.NoError(t, err) ctx := context.Background() @@ -273,7 +273,7 @@ func TestNatsStreaming(t *testing.T) { } func TestStreamRetries(t *testing.T) { - pubsub, err := NewInMemoryNats(t.TempDir()) + pubsub, err := NewInMemoryNats() require.NoError(t, err) ctx := context.Background() @@ -352,7 +352,7 @@ func TestStreamRetries(t *testing.T) { } func TestStreamMultipleSubs(t *testing.T) { - pubsub, err := NewInMemoryNats(t.TempDir()) + pubsub, err := NewInMemoryNats() require.NoError(t, err) // Leaving a little bit of time to go into inactive state, @@ -433,7 +433,7 @@ func TestStreamMultipleSubs(t *testing.T) { } func TestStreamAfterDelay(t *testing.T) { - pubsub, err := NewInMemoryNats(t.TempDir()) + pubsub, err := NewInMemoryNats() require.NoError(t, err) ctx := context.Background() @@ -518,7 +518,7 @@ func TestStreamAfterDelay(t *testing.T) { } func TestStreamFailOne(t *testing.T) { - pubsub, err := NewInMemoryNats(t.TempDir()) + pubsub, err := NewInMemoryNats() require.NoError(t, err) ctx := context.Background() diff --git a/api/pkg/pubsub/pubsub_provider.go b/api/pkg/pubsub/pubsub_provider.go index dcf62b90..b6555fc5 100644 --- a/api/pkg/pubsub/pubsub_provider.go +++ b/api/pkg/pubsub/pubsub_provider.go @@ -9,9 +9,9 @@ const ( // TODO: NATS/Redis ) -func New(storeDir string) (PubSub, error) { +func New(_ string) (PubSub, error) { // TODO: switch on the provider type - return NewInMemoryNats(storeDir) + return NewInMemoryNats() } type Config struct {