From 2d6ae0bccb8a9030ca953b2712ef5e4db2f754da Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 27 Dec 2024 15:10:08 -0800 Subject: [PATCH] Go: `XADD` (#2843) * `XADD` Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 35 ++++++++ go/api/command_options.go | 20 ++--- go/api/options/stream_options.go | 120 ++++++++++++++++++++++++++ go/api/stream_commands.go | 52 +++++++++++ go/integTest/glide_test_suite_test.go | 4 +- go/integTest/shared_commands_test.go | 61 ++++++++++++- 6 files changed, 277 insertions(+), 15 deletions(-) create mode 100644 go/api/options/stream_options.go create mode 100644 go/api/stream_commands.go diff --git a/go/api/base_client.go b/go/api/base_client.go index 6f9813337f..3de2a5170f 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -11,6 +11,7 @@ import "C" import ( "errors" + "fmt" "math" "strconv" "unsafe" @@ -27,6 +28,7 @@ type BaseClient interface { HashCommands ListCommands SetCommands + StreamCommands SortedSetCommands ConnectionManagementCommands GenericBaseCommands @@ -1253,6 +1255,39 @@ func (client *baseClient) Renamenx(key string, newKey string) (Result[bool], err return handleBooleanResponse(result) } +func (client *baseClient) XAdd(key string, values [][]string) (Result[string], error) { + return client.XAddWithOptions(key, values, options.NewXAddOptions()) +} + +func (client *baseClient) XAddWithOptions( + key string, + values [][]string, + options *options.XAddOptions, +) (Result[string], error) { + args := []string{} + args = append(args, key) + optionArgs, err := options.ToArgs() + if err != nil { + return CreateNilStringResult(), err + } + args = append(args, optionArgs...) + for _, pair := range values { + if len(pair) != 2 { + return CreateNilStringResult(), fmt.Errorf( + "array entry had the wrong length. Expected length 2 but got length %d", + len(pair), + ) + } + args = append(args, pair...) + } + + result, err := client.executeCommand(C.XAdd, args) + if err != nil { + return CreateNilStringResult(), err + } + return handleStringOrNullResponse(result) +} + func (client *baseClient) ZAdd( key string, membersScoreMap map[string]float64, diff --git a/go/api/command_options.go b/go/api/command_options.go index bbfaf982a0..d2934b869e 100644 --- a/go/api/command_options.go +++ b/go/api/command_options.go @@ -282,19 +282,7 @@ func (listDirection ListDirection) toString() (string, error) { // This base option struct represents the common set of optional arguments for the SCAN family of commands. // Concrete implementations of this class are tied to specific SCAN commands (`SCAN`, `SSCAN`). type BaseScanOptions struct { - /** - * The match filter is applied to the result of the command and will only include - * strings that match the pattern specified. If the sorted set is large enough for scan commands to return - * only a subset of the sorted set then there could be a case where the result is empty although there are - * items that match the pattern specified. This is due to the default `COUNT` being `10` which indicates - * that it will only fetch and match `10` items from the list. - */ match string - /** - * `COUNT` is a just a hint for the command for how many elements to fetch from the - * sorted set. `COUNT` could be ignored until the sorted set is large enough for the `SCAN` commands to - * represent the results as compact single-allocation packed encoding. - */ count int64 } @@ -302,11 +290,19 @@ func NewBaseScanOptionsBuilder() *BaseScanOptions { return &BaseScanOptions{} } +// The match filter is applied to the result of the command and will only include +// strings that match the pattern specified. If the sorted set is large enough for scan commands to return +// only a subset of the sorted set then there could be a case where the result is empty although there are +// items that match the pattern specified. This is due to the default `COUNT` being `10` which indicates +// that it will only fetch and match `10` items from the list. func (scanOptions *BaseScanOptions) SetMatch(m string) *BaseScanOptions { scanOptions.match = m return scanOptions } +// `COUNT` is a just a hint for the command for how many elements to fetch from the +// sorted set. `COUNT` could be ignored until the sorted set is large enough for the `SCAN` commands to +// represent the results as compact single-allocation packed encoding. func (scanOptions *BaseScanOptions) SetCount(c int64) *BaseScanOptions { scanOptions.count = c return scanOptions diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go new file mode 100644 index 0000000000..2a07c0ad2c --- /dev/null +++ b/go/api/options/stream_options.go @@ -0,0 +1,120 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +package options + +import ( + "github.com/valkey-io/valkey-glide/go/glide/utils" +) + +type triStateBool int + +// Tri-state bool for use option builders. We cannot rely on the default value of an non-initialized variable. +const ( + triStateBoolUndefined triStateBool = iota + triStateBoolTrue + triStateBoolFalse +) + +// Optional arguments to `XAdd` in [StreamCommands] +type XAddOptions struct { + id string + makeStream triStateBool + trimOptions *XTrimOptions +} + +// Create new empty `XAddOptions` +func NewXAddOptions() *XAddOptions { + return &XAddOptions{} +} + +// New entry will be added with this `id“. +func (xao *XAddOptions) SetId(id string) *XAddOptions { + xao.id = id + return xao +} + +// If set, a new stream won't be created if no stream matches the given key. +func (xao *XAddOptions) SetDontMakeNewStream() *XAddOptions { + xao.makeStream = triStateBoolFalse + return xao +} + +// If set, add operation will also trim the older entries in the stream. +func (xao *XAddOptions) SetTrimOptions(options *XTrimOptions) *XAddOptions { + xao.trimOptions = options + return xao +} + +func (xao *XAddOptions) ToArgs() ([]string, error) { + args := []string{} + var err error + if xao.makeStream == triStateBoolFalse { + args = append(args, "NOMKSTREAM") + } + if xao.trimOptions != nil { + moreArgs, err := xao.trimOptions.ToArgs() + if err != nil { + return args, err + } + args = append(args, moreArgs...) + } + if xao.id != "" { + args = append(args, xao.id) + } else { + args = append(args, "*") + } + return args, err +} + +// Optional arguments for `XTrim` and `XAdd` in [StreamCommands] +type XTrimOptions struct { + exact triStateBool + limit int64 + method string + threshold string +} + +// Option to trim the stream according to minimum ID. +func NewXTrimOptionsWithMinId(threshold string) *XTrimOptions { + return &XTrimOptions{threshold: threshold, method: "MINID"} +} + +// Option to trim the stream according to maximum stream length. +func NewXTrimOptionsWithMaxLen(threshold int64) *XTrimOptions { + return &XTrimOptions{threshold: utils.IntToString(threshold), method: "MAXLEN"} +} + +// Match exactly on the threshold. +func (xto *XTrimOptions) SetExactTrimming() *XTrimOptions { + xto.exact = triStateBoolTrue + return xto +} + +// Trim in a near-exact manner, which is more efficient. +func (xto *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions { + xto.exact = triStateBoolFalse + return xto +} + +// Max number of stream entries to be trimmed for non-exact match. +func (xto *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions { + xto.exact = triStateBoolFalse + xto.limit = limit + return xto +} + +func (xto *XTrimOptions) ToArgs() ([]string, error) { + args := []string{} + args = append(args, xto.method) + if xto.exact == triStateBoolTrue { + args = append(args, "=") + } else if xto.exact == triStateBoolFalse { + args = append(args, "~") + } + args = append(args, xto.threshold) + if xto.limit > 0 { + args = append(args, "LIMIT", utils.IntToString(xto.limit)) + } + var err error + return args, err +} diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go new file mode 100644 index 0000000000..1696a168c2 --- /dev/null +++ b/go/api/stream_commands.go @@ -0,0 +1,52 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +package api + +import "github.com/valkey-io/valkey-glide/go/glide/api/options" + +// Supports commands and transactions for the "Stream" group of commands for standalone and cluster clients. +// +// See [valkey.io] for details. +// +// [valkey.io]: https://valkey.io/commands/#stream +type StreamCommands interface { + // Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created. + // + // See [valkey.io] for details. + // + // Parameters: + // key - The key of the stream. + // values - Field-value pairs to be added to the entry. + // + // Return value: + // The id of the added entry. + // + // For example: + // result, err := client.XAdd("myStream", [][]string{{"field1", "value1"}, {"field2", "value2"}}) + // result.IsNil(): false + // result.Value(): "1526919030474-55" + // + // [valkey.io]: https://valkey.io/commands/xadd/ + XAdd(key string, values [][]string) (Result[string], error) + + // Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created. + // + // See [valkey.io] for details. + // + // Parameters: + // key - The key of the stream. + // values - Field-value pairs to be added to the entry. + // options - Stream add options. + // + // Return value: + // The id of the added entry. + // + // For example: + // options := options.NewXAddOptions().SetId("100-500").SetDontMakeNewStream() + // result, err := client.XAddWithOptions("myStream", [][]string{{"field1", "value1"}, {"field2", "value2"}}, options) + // result.IsNil(): false + // result.Value(): "100-500" + // + // [valkey.io]: https://valkey.io/commands/xadd/ + XAddWithOptions(key string, values [][]string, options *options.XAddOptions) (Result[string], error) +} diff --git a/go/integTest/glide_test_suite_test.go b/go/integTest/glide_test_suite_test.go index 51efe6d7fd..eb80993d9d 100644 --- a/go/integTest/glide_test_suite_test.go +++ b/go/integTest/glide_test_suite_test.go @@ -114,9 +114,9 @@ func extractAddresses(suite *GlideTestSuite, output string) []api.NodeAddress { func runClusterManager(suite *GlideTestSuite, args []string, ignoreExitCode bool) string { pythonArgs := append([]string{"../../utils/cluster_manager.py"}, args...) - output, err := exec.Command("python3", pythonArgs...).Output() + output, err := exec.Command("python3", pythonArgs...).CombinedOutput() if len(output) > 0 { - suite.T().Logf("cluster_manager.py stdout:\n====\n%s\n====\n", string(output)) + suite.T().Logf("cluster_manager.py output:\n====\n%s\n====\n", string(output)) } if err != nil { diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index c812f93bcc..b508b69004 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -3810,7 +3810,7 @@ func (suite *GlideTestSuite) TestUnlink() { }) } -func (suite *GlideTestSuite) Test_Rename() { +func (suite *GlideTestSuite) TestRename() { suite.runWithDefaultClients(func(client api.BaseClient) { // Test 1 Check if the command successfully renamed key := "{keyName}" + uuid.NewString() @@ -3849,6 +3849,65 @@ func (suite *GlideTestSuite) TestRenamenx() { }) } +func (suite *GlideTestSuite) TestXAdd() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.NewString() + // stream does not exist + res, err := client.XAdd(key, [][]string{{"field1", "value1"}, {"field1", "value2"}}) + assert.Nil(suite.T(), err) + assert.False(suite.T(), res.IsNil()) + // don't check the value, because it contains server's timestamp + + // adding data to existing stream + res, err = client.XAdd(key, [][]string{{"field3", "value3"}}) + assert.Nil(suite.T(), err) + assert.False(suite.T(), res.IsNil()) + + // incorrect input + _, err = client.XAdd(key, [][]string{}) + assert.NotNil(suite.T(), err) + _, err = client.XAdd(key, [][]string{{"1", "2", "3"}}) + assert.NotNil(suite.T(), err) + + // key is not a string + key = uuid.NewString() + client.Set(key, "abc") + _, err = client.XAdd(key, [][]string{{"f", "v"}}) + assert.NotNil(suite.T(), err) + }) +} + +func (suite *GlideTestSuite) TestXAddWithOptions() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.NewString() + // stream does not exist + res, err := client.XAddWithOptions( + key, + [][]string{{"field1", "value1"}}, + options.NewXAddOptions().SetDontMakeNewStream(), + ) + assert.Nil(suite.T(), err) + assert.True(suite.T(), res.IsNil()) + + // adding data to with given ID + res, err = client.XAddWithOptions(key, [][]string{{"field1", "value1"}}, options.NewXAddOptions().SetId("0-1")) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "0-1", res.Value()) + + client.XAdd(key, [][]string{{"field2", "value2"}}) + // TODO run XLen there + // this will trim the first entry. + res, err = client.XAddWithOptions( + key, + [][]string{{"field3", "value3"}}, + options.NewXAddOptions().SetTrimOptions(options.NewXTrimOptionsWithMaxLen(2).SetExactTrimming()), + ) + assert.Nil(suite.T(), err) + assert.False(suite.T(), res.IsNil()) + // TODO run XLen there + }) +} + func (suite *GlideTestSuite) TestZAddAndZAddIncr() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.New().String()