Skip to content

Commit

Permalink
Go: XREADGROUP.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Jan 14, 2025
1 parent bdaf52a commit 5c9b9be
Show file tree
Hide file tree
Showing 5 changed files with 456 additions and 0 deletions.
123 changes: 123 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,129 @@ func (client *baseClient) XAddWithOptions(
return handleStringOrNullResponse(result)
}

// Reads entries from the given streams owned by a consumer group.
//
// Note:
//
// When in cluster mode, all keys in `keysAndIds` must map to the same hash slot.
//
// See [valkey.io] for details.
//
// Parameters:
//
// group - The consumer group name.
// consumer - The group consumer.
// keysAndIds - A map of keys and entry IDs to read from.
//
// Return value:
// A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if
// a key does not exist or does not contain requiested entries.
//
// For example:
//
// result, err := client.XReadGroup({"stream1": "0-0", "stream2": "0-1", "stream3": "0-1"})
// err == nil: true
// result: map[string]map[string][][]string{
// "stream1": {
// "0-1": {{"field1", "value1"}},
// "0-2": {{"field2", "value2"}, {"field2", "value3"}},
// },
// "stream2": {
// "1526985676425-0": {{"name", "Virginia"}, {"surname", "Woolf"}},
// "1526985685298-0": nil, // entry was deleted
// }
// "stream3": {}, // stream is empty
// }
//
// [valkey.io]: https://valkey.io/commands/xreadgroup/
func (client *baseClient) XReadGroup(
group string,
consumer string,
keysAndIds map[string]string,
) (map[string]map[string][][]string, error) {
return client.XReadGroupWithOptions(group, consumer, keysAndIds, options.NewXReadGroupOptions())
}

// Reads entries from the given streams owned by a consumer group.
//
// Note:
//
// When in cluster mode, all keys in `keysAndIds` must map to the same hash slot.
//
// See [valkey.io] for details.
//
// Parameters:
//
// group - The consumer group name.
// consumer - The group consumer.
// keysAndIds - A map of keys and entry IDs to read from.
// options - Options detailing how to read the stream.
//
// Return value:
// A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if
// a key does not exist or does not contain requiested entries.
//
// For example:
//
// options := options.NewXReadGroupOptions().SetNoAck()
// result, err := client.XReadGroupWithOptions({"stream1": "0-0", "stream2": "0-1", "stream3": "0-1"}, options)
// err == nil: true
// result: map[string]map[string][][]string{
// "stream1": {
// "0-1": {{"field1", "value1"}},
// "0-2": {{"field2", "value2"}, {"field2", "value3"}},
// },
// "stream2": {
// "1526985676425-0": {{"name", "Virginia"}, {"surname", "Woolf"}},
// "1526985685298-0": nil, // entry was deleted
// }
// "stream3": {}, // stream is empty
// }
//
// [valkey.io]: https://valkey.io/commands/xreadgroup/
func (client *baseClient) XReadGroupWithOptions(
group string,
consumer string,
keysAndIds map[string]string,
options *options.XReadGroupOptions,
) (map[string]map[string][][]string, error) {
args, err := createStreamCommandArgs([]string{"GROUP", group, consumer}, keysAndIds, options)
if err != nil {
return nil, err
}

result, err := client.executeCommand(C.XReadGroup, args)
if err != nil {
return nil, err
}

return handleXReadGroupResponse(result)
}

// Combine `args` with `keysAndIds` and `options` into arguments for a stream command
func createStreamCommandArgs(
args []string,
keysAndIds map[string]string,
options interface{ ToArgs() ([]string, error) },
) ([]string, error) {
optionArgs, err := options.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optionArgs...)
// Note: this loop iterates in an indeterminate order, but it is OK for that case
keys := make([]string, 0, len(keysAndIds))
values := make([]string, 0, len(keysAndIds))
for key := range keysAndIds {
keys = append(keys, key)
values = append(values, keysAndIds[key])
}
args = append(args, "STREAMS")
args = append(args, keys...)
args = append(args, values...)
return args, nil
}

func (client *baseClient) ZAdd(
key string,
membersScoreMap map[string]float64,
Expand Down
45 changes: 45 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,48 @@ func (xTrimOptions *XTrimOptions) ToArgs() ([]string, error) {
}
return args, nil
}

// Optional arguments for `XReadGroup` in [StreamCommands]
type XReadGroupOptions struct {
count, block int64
noAck bool
}

// Create new empty `XReadOptions`
func NewXReadGroupOptions() *XReadGroupOptions {
return &XReadGroupOptions{-1, -1, false}
}

// The maximal number of elements requested. Equivalent to `COUNT` in the Valkey API.
func (xrgo *XReadGroupOptions) SetCount(count int64) *XReadGroupOptions {
xrgo.count = count
return xrgo
}

// If set, the request will be blocked for the set amount of milliseconds or until the server has
// the required number of entries. A value of `0` will block indefinitely. Equivalent to `BLOCK` in the Valkey API.
func (xrgo *XReadGroupOptions) SetBlock(block int64) *XReadGroupOptions {
xrgo.block = block
return xrgo
}

// If set, messages are not added to the Pending Entries List (PEL). This is equivalent to
// acknowledging the message when it is read.
func (xrgo *XReadGroupOptions) SetNoAck() *XReadGroupOptions {
xrgo.noAck = true
return xrgo
}

func (xrgo *XReadGroupOptions) ToArgs() ([]string, error) {
args := []string{}
if xrgo.count >= 0 {
args = append(args, "COUNT", utils.IntToString(xrgo.count))
}
if xrgo.block >= 0 {
args = append(args, "BLOCK", utils.IntToString(xrgo.block))
}
if xrgo.noAck {
args = append(args, "NOACK")
}
return args, nil
}
138 changes: 138 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import "C"

import (
"fmt"
"reflect"
"unsafe"
)

Expand Down Expand Up @@ -482,3 +483,140 @@ func convertToResultStringArray(input []interface{}) ([]Result[string], error) {
}
return result, nil
}

// get type of T
func getType[T any]() reflect.Type {
var zero [0]T
return reflect.TypeOf(zero).Elem()
}

// convert (typecast) untyped response into a typed value
// for example, an arbitrary array `[]interface{}` into `[]string`
type responseConverter interface {
convert(data interface{}) (interface{}, error)
}

// convert maps, T - type of the value, key is string
type mapConverter[T any] struct {
next responseConverter
canBeNil bool
}

func (node mapConverter[T]) convert(data interface{}) (interface{}, error) {
if data == nil {
if node.canBeNil {
return nil, nil
} else {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: nil, expected: map[string]%v", getType[T]())}
}
}
result := make(map[string]T)

for key, value := range data.(map[string]interface{}) {
if node.next == nil {
valueT, ok := value.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type of map element: %T, expected: %v", value, getType[T]())}
}
result[key] = valueT
} else {
val, err := node.next.convert(value)
if err != nil {
return nil, err
}
if val == nil {
var null T
result[key] = null
continue
}
valueT, ok := val.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type of map element: %T, expected: %v", val, getType[T]())}
}
result[key] = valueT
}
}

return result, nil
}

// convert arrays, T - type of the value
type arrayConverter[T any] struct {
next responseConverter
canBeNil bool
}

func (node arrayConverter[T]) convert(data interface{}) (interface{}, error) {
if data == nil {
if node.canBeNil {
return nil, nil
} else {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: nil, expected: []%v", getType[T]())}
}
}
arrData := data.([]interface{})
result := make([]T, 0, len(arrData))
for _, value := range arrData {
if node.next == nil {
valueT, ok := value.(T)
if !ok {
return nil, &RequestError{
fmt.Sprintf("Unexpected type of array element: %T, expected: %v", value, getType[T]()),
}
}
result = append(result, valueT)
} else {
val, err := node.next.convert(value)
if err != nil {
return nil, err
}
if val == nil {
var null T
result = append(result, null)
continue
}
valueT, ok := val.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type of array element: %T, expected: %v", val, getType[T]())}
}
result = append(result, valueT)
}
}

return result, nil
}

// TODO: convert sets

func handleXReadGroupResponse(response *C.struct_CommandResponse) (map[string]map[string][][]string, error) {
data, err := parseMap(response)
if err != nil {
return nil, err
}
if data == nil {
return nil, nil
}

converters := mapConverter[map[string][][]string]{
mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{
nil,
false,
},
true,
},
false,
},
false,
}

res, err := converters.convert(data)
if err != nil {
return nil, err
}
if result, ok := res.(map[string]map[string][][]string); ok {
return result, nil
}
return nil, &RequestError{fmt.Sprintf("unexpected type received: %T", res)}
}
9 changes: 9 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,13 @@ type StreamCommands interface {
//
// [valkey.io]: https://valkey.io/commands/xlen/
XLen(key string) (Result[int64], error)

XReadGroup(group string, consumer string, keysAndIds map[string]string) (map[string]map[string][][]string, error)

XReadGroupWithOptions(
group string,
consumer string,
keysAndIds map[string]string,
options *options.XReadGroupOptions,
) (map[string]map[string][][]string, error)
}
Loading

0 comments on commit 5c9b9be

Please sign in to comment.