diff --git a/go/api/base_client.go b/go/api/base_client.go index b954ddabb3..be3b98c198 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1483,3 +1483,23 @@ func (client *baseClient) Persist(key string) (Result[bool], error) { } return handleBooleanResponse(result) } + +func (client *baseClient) XTrim(key string, options *options.XTrimOptions) (Result[int64], error) { + xTrimArgs, err := options.ToArgs() + if err != nil { + return CreateNilInt64Result(), err + } + result, err := client.executeCommand(C.XTrim, append([]string{key}, xTrimArgs...)) + if err != nil { + return CreateNilInt64Result(), err + } + return handleLongResponse(result) +} + +func (client *baseClient) XLen(key string) (Result[int64], error) { + result, err := client.executeCommand(C.XLen, []string{key}) + if err != nil { + return CreateNilInt64Result(), err + } + return handleLongResponse(result) +} diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index 2a07c0ad2c..95a8c69d33 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -85,36 +85,34 @@ func NewXTrimOptionsWithMaxLen(threshold int64) *XTrimOptions { } // Match exactly on the threshold. -func (xto *XTrimOptions) SetExactTrimming() *XTrimOptions { - xto.exact = triStateBoolTrue - return xto +func (xTrimOptions *XTrimOptions) SetExactTrimming() *XTrimOptions { + xTrimOptions.exact = triStateBoolTrue + return xTrimOptions } // Trim in a near-exact manner, which is more efficient. -func (xto *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions { - xto.exact = triStateBoolFalse - return xto +func (xTrimOptions *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions { + xTrimOptions.exact = triStateBoolFalse + return xTrimOptions } // Max number of stream entries to be trimmed for non-exact match. -func (xto *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions { - xto.exact = triStateBoolFalse - xto.limit = limit - return xto +func (xTrimOptions *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions { + xTrimOptions.exact = triStateBoolFalse + xTrimOptions.limit = limit + return xTrimOptions } -func (xto *XTrimOptions) ToArgs() ([]string, error) { - args := []string{} - args = append(args, xto.method) - if xto.exact == triStateBoolTrue { +func (xTrimOptions *XTrimOptions) ToArgs() ([]string, error) { + args := []string{xTrimOptions.method} + if xTrimOptions.exact == triStateBoolTrue { args = append(args, "=") - } else if xto.exact == triStateBoolFalse { + } else if xTrimOptions.exact == triStateBoolFalse { args = append(args, "~") } - args = append(args, xto.threshold) - if xto.limit > 0 { - args = append(args, "LIMIT", utils.IntToString(xto.limit)) + args = append(args, xTrimOptions.threshold) + if xTrimOptions.limit > 0 { + args = append(args, "LIMIT", utils.IntToString(xTrimOptions.limit)) } - var err error - return args, err + return args, nil } diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 1696a168c2..727d93b877 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -49,4 +49,8 @@ type StreamCommands interface { // // [valkey.io]: https://valkey.io/commands/xadd/ XAddWithOptions(key string, values [][]string, options *options.XAddOptions) (Result[string], error) + + XTrim(key string, options *options.XTrimOptions) (Result[int64], error) + + XLen(key string) (Result[int64], error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index df0568e3f4..4084e4991d 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4484,3 +4484,97 @@ func (suite *GlideTestSuite) TestPersist() { assert.False(t, resultInvalidKey.Value()) }) } + +func (suite *GlideTestSuite) Test_XAdd_XLen_XTrim() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key1 := uuid.NewString() + key2 := uuid.NewString() + field1 := uuid.NewString() + field2 := uuid.NewString() + t := suite.T() + xAddResult, err := client.XAddWithOptions( + key1, + [][]string{{field1, "foo"}, {field2, "bar"}}, + options.NewXAddOptions().SetDontMakeNewStream(), + ) + assert.Nil(t, err) + assert.True(t, xAddResult.IsNil()) + + xAddResult, err = client.XAddWithOptions( + key1, + [][]string{{field1, "foo1"}, {field2, "bar1"}}, + options.NewXAddOptions().SetId("0-1"), + ) + assert.Nil(t, err) + assert.Equal(t, xAddResult.Value(), "0-1") + + xAddResult, err = client.XAdd( + key1, + [][]string{{field1, "foo2"}, {field2, "bar2"}}, + ) + assert.Nil(t, err) + assert.False(t, xAddResult.IsNil()) + + xLenResult, err := client.XLen(key1) + assert.Nil(t, err) + assert.Equal(t, xLenResult.Value(), int64(2)) + + // Trim the first entry. + xAddResult, err = client.XAddWithOptions( + key1, + [][]string{{field1, "foo3"}, {field2, "bar2"}}, + options.NewXAddOptions().SetTrimOptions( + options.NewXTrimOptionsWithMaxLen(2).SetExactTrimming(), + ), + ) + assert.NotNil(t, xAddResult.Value()) + id := xAddResult.Value() + xLenResult, err = client.XLen(key1) + assert.Nil(t, err) + assert.Equal(t, xLenResult.Value(), int64(2)) + + // Trim the second entry. + xAddResult, err = client.XAddWithOptions( + key1, + [][]string{{field1, "foo4"}, {field2, "bar4"}}, + options.NewXAddOptions().SetTrimOptions( + options.NewXTrimOptionsWithMinId(id).SetExactTrimming(), + ), + ) + assert.NotNil(t, xAddResult.Value()) + xLenResult, err = client.XLen(key1) + assert.Nil(t, err) + assert.Equal(t, xLenResult.Value(), int64(2)) + + // Test xtrim to remove 1 element + xTrimResult, err := client.XTrim( + key1, + options.NewXTrimOptionsWithMaxLen(1).SetExactTrimming(), + ) + assert.Equal(t, xTrimResult.Value(), int64(1)) + xLenResult, err = client.XLen(key1) + assert.Nil(t, err) + assert.Equal(t, xLenResult.Value(), int64(1)) + + // Key does not exist - returns 0 + xTrimResult, err = client.XTrim( + key2, + options.NewXTrimOptionsWithMaxLen(1).SetExactTrimming(), + ) + assert.Equal(t, xTrimResult.Value(), int64(0)) + xLenResult, err = client.XLen(key2) + assert.Nil(t, err) + assert.Equal(t, xLenResult.Value(), int64(0)) + + // Throw Exception: Key exists - but it is not a stream + setResult, err := client.Set(key2, "xtrimtest") + assert.Equal(t, setResult.Value(), "OK") + _, err = client.XTrim(key2, options.NewXTrimOptionsWithMinId("0-1")) + assert.NotNil(t, err) + assert.IsType(t, &api.RequestError{}, err) + + _, err = client.XLen(key2) + assert.NotNil(t, err) + assert.IsType(t, &api.RequestError{}, err) + }) +}