Skip to content

Commit

Permalink
Go: XADD (#2843)
Browse files Browse the repository at this point in the history
* `XADD`

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Dec 27, 2024
1 parent 994455a commit 2d6ae0b
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 15 deletions.
35 changes: 35 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import "C"

import (
"errors"
"fmt"
"math"
"strconv"
"unsafe"
Expand All @@ -27,6 +28,7 @@ type BaseClient interface {
HashCommands
ListCommands
SetCommands
StreamCommands
SortedSetCommands
ConnectionManagementCommands
GenericBaseCommands
Expand Down Expand Up @@ -1253,6 +1255,39 @@ func (client *baseClient) Renamenx(key string, newKey string) (Result[bool], err
return handleBooleanResponse(result)
}

func (client *baseClient) XAdd(key string, values [][]string) (Result[string], error) {
return client.XAddWithOptions(key, values, options.NewXAddOptions())
}

func (client *baseClient) XAddWithOptions(
key string,
values [][]string,
options *options.XAddOptions,
) (Result[string], error) {
args := []string{}
args = append(args, key)
optionArgs, err := options.ToArgs()
if err != nil {
return CreateNilStringResult(), err
}
args = append(args, optionArgs...)
for _, pair := range values {
if len(pair) != 2 {
return CreateNilStringResult(), fmt.Errorf(
"array entry had the wrong length. Expected length 2 but got length %d",
len(pair),
)
}
args = append(args, pair...)
}

result, err := client.executeCommand(C.XAdd, args)
if err != nil {
return CreateNilStringResult(), err
}
return handleStringOrNullResponse(result)
}

func (client *baseClient) ZAdd(
key string,
membersScoreMap map[string]float64,
Expand Down
20 changes: 8 additions & 12 deletions go/api/command_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,31 +282,27 @@ func (listDirection ListDirection) toString() (string, error) {
// This base option struct represents the common set of optional arguments for the SCAN family of commands.
// Concrete implementations of this class are tied to specific SCAN commands (`SCAN`, `SSCAN`).
type BaseScanOptions struct {
/**
* The match filter is applied to the result of the command and will only include
* strings that match the pattern specified. If the sorted set is large enough for scan commands to return
* only a subset of the sorted set then there could be a case where the result is empty although there are
* items that match the pattern specified. This is due to the default `COUNT` being `10` which indicates
* that it will only fetch and match `10` items from the list.
*/
match string
/**
* `COUNT` is a just a hint for the command for how many elements to fetch from the
* sorted set. `COUNT` could be ignored until the sorted set is large enough for the `SCAN` commands to
* represent the results as compact single-allocation packed encoding.
*/
count int64
}

func NewBaseScanOptionsBuilder() *BaseScanOptions {
return &BaseScanOptions{}
}

// The match filter is applied to the result of the command and will only include
// strings that match the pattern specified. If the sorted set is large enough for scan commands to return
// only a subset of the sorted set then there could be a case where the result is empty although there are
// items that match the pattern specified. This is due to the default `COUNT` being `10` which indicates
// that it will only fetch and match `10` items from the list.
func (scanOptions *BaseScanOptions) SetMatch(m string) *BaseScanOptions {
scanOptions.match = m
return scanOptions
}

// `COUNT` is a just a hint for the command for how many elements to fetch from the
// sorted set. `COUNT` could be ignored until the sorted set is large enough for the `SCAN` commands to
// represent the results as compact single-allocation packed encoding.
func (scanOptions *BaseScanOptions) SetCount(c int64) *BaseScanOptions {
scanOptions.count = c
return scanOptions
Expand Down
120 changes: 120 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

package options

import (
"github.com/valkey-io/valkey-glide/go/glide/utils"
)

type triStateBool int

// Tri-state bool for use option builders. We cannot rely on the default value of an non-initialized variable.
const (
triStateBoolUndefined triStateBool = iota
triStateBoolTrue
triStateBoolFalse
)

// Optional arguments to `XAdd` in [StreamCommands]
type XAddOptions struct {
id string
makeStream triStateBool
trimOptions *XTrimOptions
}

// Create new empty `XAddOptions`
func NewXAddOptions() *XAddOptions {
return &XAddOptions{}
}

// New entry will be added with this `id“.
func (xao *XAddOptions) SetId(id string) *XAddOptions {
xao.id = id
return xao
}

// If set, a new stream won't be created if no stream matches the given key.
func (xao *XAddOptions) SetDontMakeNewStream() *XAddOptions {
xao.makeStream = triStateBoolFalse
return xao
}

// If set, add operation will also trim the older entries in the stream.
func (xao *XAddOptions) SetTrimOptions(options *XTrimOptions) *XAddOptions {
xao.trimOptions = options
return xao
}

func (xao *XAddOptions) ToArgs() ([]string, error) {
args := []string{}
var err error
if xao.makeStream == triStateBoolFalse {
args = append(args, "NOMKSTREAM")
}
if xao.trimOptions != nil {
moreArgs, err := xao.trimOptions.ToArgs()
if err != nil {
return args, err
}
args = append(args, moreArgs...)
}
if xao.id != "" {
args = append(args, xao.id)
} else {
args = append(args, "*")
}
return args, err
}

// Optional arguments for `XTrim` and `XAdd` in [StreamCommands]
type XTrimOptions struct {
exact triStateBool
limit int64
method string
threshold string
}

// Option to trim the stream according to minimum ID.
func NewXTrimOptionsWithMinId(threshold string) *XTrimOptions {
return &XTrimOptions{threshold: threshold, method: "MINID"}
}

// Option to trim the stream according to maximum stream length.
func NewXTrimOptionsWithMaxLen(threshold int64) *XTrimOptions {
return &XTrimOptions{threshold: utils.IntToString(threshold), method: "MAXLEN"}
}

// Match exactly on the threshold.
func (xto *XTrimOptions) SetExactTrimming() *XTrimOptions {
xto.exact = triStateBoolTrue
return xto
}

// Trim in a near-exact manner, which is more efficient.
func (xto *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions {
xto.exact = triStateBoolFalse
return xto
}

// Max number of stream entries to be trimmed for non-exact match.
func (xto *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions {
xto.exact = triStateBoolFalse
xto.limit = limit
return xto
}

func (xto *XTrimOptions) ToArgs() ([]string, error) {
args := []string{}
args = append(args, xto.method)
if xto.exact == triStateBoolTrue {
args = append(args, "=")
} else if xto.exact == triStateBoolFalse {
args = append(args, "~")
}
args = append(args, xto.threshold)
if xto.limit > 0 {
args = append(args, "LIMIT", utils.IntToString(xto.limit))
}
var err error
return args, err
}
52 changes: 52 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

package api

import "github.com/valkey-io/valkey-glide/go/glide/api/options"

// Supports commands and transactions for the "Stream" group of commands for standalone and cluster clients.
//
// See [valkey.io] for details.
//
// [valkey.io]: https://valkey.io/commands/#stream
type StreamCommands interface {
// Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created.
//
// See [valkey.io] for details.
//
// Parameters:
// key - The key of the stream.
// values - Field-value pairs to be added to the entry.
//
// Return value:
// The id of the added entry.
//
// For example:
// result, err := client.XAdd("myStream", [][]string{{"field1", "value1"}, {"field2", "value2"}})
// result.IsNil(): false
// result.Value(): "1526919030474-55"
//
// [valkey.io]: https://valkey.io/commands/xadd/
XAdd(key string, values [][]string) (Result[string], error)

// Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created.
//
// See [valkey.io] for details.
//
// Parameters:
// key - The key of the stream.
// values - Field-value pairs to be added to the entry.
// options - Stream add options.
//
// Return value:
// The id of the added entry.
//
// For example:
// options := options.NewXAddOptions().SetId("100-500").SetDontMakeNewStream()
// result, err := client.XAddWithOptions("myStream", [][]string{{"field1", "value1"}, {"field2", "value2"}}, options)
// result.IsNil(): false
// result.Value(): "100-500"
//
// [valkey.io]: https://valkey.io/commands/xadd/
XAddWithOptions(key string, values [][]string, options *options.XAddOptions) (Result[string], error)
}
4 changes: 2 additions & 2 deletions go/integTest/glide_test_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func extractAddresses(suite *GlideTestSuite, output string) []api.NodeAddress {

func runClusterManager(suite *GlideTestSuite, args []string, ignoreExitCode bool) string {
pythonArgs := append([]string{"../../utils/cluster_manager.py"}, args...)
output, err := exec.Command("python3", pythonArgs...).Output()
output, err := exec.Command("python3", pythonArgs...).CombinedOutput()
if len(output) > 0 {
suite.T().Logf("cluster_manager.py stdout:\n====\n%s\n====\n", string(output))
suite.T().Logf("cluster_manager.py output:\n====\n%s\n====\n", string(output))
}

if err != nil {
Expand Down
61 changes: 60 additions & 1 deletion go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3810,7 +3810,7 @@ func (suite *GlideTestSuite) TestUnlink() {
})
}

func (suite *GlideTestSuite) Test_Rename() {
func (suite *GlideTestSuite) TestRename() {
suite.runWithDefaultClients(func(client api.BaseClient) {
// Test 1 Check if the command successfully renamed
key := "{keyName}" + uuid.NewString()
Expand Down Expand Up @@ -3849,6 +3849,65 @@ func (suite *GlideTestSuite) TestRenamenx() {
})
}

func (suite *GlideTestSuite) TestXAdd() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.NewString()
// stream does not exist
res, err := client.XAdd(key, [][]string{{"field1", "value1"}, {"field1", "value2"}})
assert.Nil(suite.T(), err)
assert.False(suite.T(), res.IsNil())
// don't check the value, because it contains server's timestamp

// adding data to existing stream
res, err = client.XAdd(key, [][]string{{"field3", "value3"}})
assert.Nil(suite.T(), err)
assert.False(suite.T(), res.IsNil())

// incorrect input
_, err = client.XAdd(key, [][]string{})
assert.NotNil(suite.T(), err)
_, err = client.XAdd(key, [][]string{{"1", "2", "3"}})
assert.NotNil(suite.T(), err)

// key is not a string
key = uuid.NewString()
client.Set(key, "abc")
_, err = client.XAdd(key, [][]string{{"f", "v"}})
assert.NotNil(suite.T(), err)
})
}

func (suite *GlideTestSuite) TestXAddWithOptions() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.NewString()
// stream does not exist
res, err := client.XAddWithOptions(
key,
[][]string{{"field1", "value1"}},
options.NewXAddOptions().SetDontMakeNewStream(),
)
assert.Nil(suite.T(), err)
assert.True(suite.T(), res.IsNil())

// adding data to with given ID
res, err = client.XAddWithOptions(key, [][]string{{"field1", "value1"}}, options.NewXAddOptions().SetId("0-1"))
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), "0-1", res.Value())

client.XAdd(key, [][]string{{"field2", "value2"}})
// TODO run XLen there
// this will trim the first entry.
res, err = client.XAddWithOptions(
key,
[][]string{{"field3", "value3"}},
options.NewXAddOptions().SetTrimOptions(options.NewXTrimOptionsWithMaxLen(2).SetExactTrimming()),
)
assert.Nil(suite.T(), err)
assert.False(suite.T(), res.IsNil())
// TODO run XLen there
})
}

func (suite *GlideTestSuite) TestZAddAndZAddIncr() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
Expand Down

0 comments on commit 2d6ae0b

Please sign in to comment.