Skip to content

Commit

Permalink
feat(storagenode): change append rpc from unary to stream
Browse files Browse the repository at this point in the history
This PR changes the Append RPC lifestyle from a unary to a bidirectional stream. However, it neither
adds nor updates stream-styled Append. Therefore, this change is transparent for users.

This is starting point to support streaming Append API, and LocalOrderClient mentioned on #433.
  • Loading branch information
ijsong committed May 16, 2023
1 parent d0a3868 commit 2cea668
Show file tree
Hide file tree
Showing 10 changed files with 391 additions and 259 deletions.
20 changes: 18 additions & 2 deletions internal/storagenode/client/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,30 @@ func (c *LogClient) reset(rpcConn *rpc.Conn, _ types.ClusterID, target varlogpb.
// The backup indicates the storage nodes that have backup replicas of that log stream.
// It returns valid GLSN if the append completes successfully.
func (c *LogClient) Append(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, data [][]byte) ([]snpb.AppendResult, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

stream, err := c.rpcClient.Append(ctx)
if err != nil {
return nil, err
}
defer func() {
_ = stream.CloseSend()
}()

req := &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid,
Payload: data,
}
rsp, err := c.rpcClient.Append(ctx, req)
err = stream.Send(req)
if err != nil {
return nil, fmt.Errorf("logclient: %w", verrors.FromStatusError(err))
return nil, err
}

rsp, err := stream.Recv()
if err != nil {
return nil, err
}
return rsp.Results, nil
}
Expand Down
18 changes: 12 additions & 6 deletions internal/storagenode/client/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type storageNode struct {
llsn types.LLSN
logEntries map[types.GLSN][]byte
glsnToLLSN map[types.GLSN]types.LLSN
appendReq *snpb.AppendRequest
mu sync.Mutex
}

Expand All @@ -46,16 +47,19 @@ func newMockStorageNodeServiceClient(ctrl *gomock.Controller, sn *storageNode, t
mockClient := mock.NewMockLogIOClient(ctrl)

// Append
mockClient.EXPECT().Append(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).DoAndReturn(func(ctx context.Context, req *snpb.AppendRequest, opts ...grpc.CallOption) (*snpb.AppendResponse, error) {
mockAppendClient := mock.NewMockLogIO_AppendClient(ctrl)
mockAppendClient.EXPECT().Send(gomock.Any()).DoAndReturn(func(req *snpb.AppendRequest) error {
sn.mu.Lock()
defer sn.mu.Unlock()
sn.appendReq = req
return nil
}).AnyTimes()
mockAppendClient.EXPECT().Recv().DoAndReturn(func() (*snpb.AppendResponse, error) {
sn.mu.Lock()
defer sn.mu.Unlock()

rsp := &snpb.AppendResponse{}
for _, buf := range req.GetPayload() {
for _, buf := range sn.appendReq.GetPayload() {
sn.logEntries[sn.glsn] = buf
sn.glsnToLLSN[sn.glsn] = sn.llsn
rsp.Results = append(rsp.Results, snpb.AppendResult{
Expand All @@ -71,6 +75,8 @@ func newMockStorageNodeServiceClient(ctrl *gomock.Controller, sn *storageNode, t
}
return rsp, nil
}).AnyTimes()
mockAppendClient.EXPECT().CloseSend().Return(nil).AnyTimes()
mockClient.EXPECT().Append(gomock.Any(), gomock.Any()).Return(mockAppendClient, nil).AnyTimes()

// Read
mockClient.EXPECT().Read(
Expand Down
20 changes: 20 additions & 0 deletions internal/storagenode/client/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package client

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)

func TestNewLogClient(t *testing.T, rpcClient snpb.LogIOClient, target varlogpb.StorageNode) *LogClient {
require.NotNil(t, rpcClient)
require.False(t, target.StorageNodeID.Invalid())
require.NotEmpty(t, target.Address)
return &LogClient{
rpcClient: rpcClient,
target: target,
}
}
62 changes: 39 additions & 23 deletions internal/storagenode/log_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storagenode
import (
"context"
"errors"
"io"

pbtypes "github.com/gogo/protobuf/types"
"go.uber.org/multierr"
Expand All @@ -22,33 +23,48 @@ type logServer struct {

var _ snpb.LogIOServer = (*logServer)(nil)

func (ls logServer) Append(ctx context.Context, req *snpb.AppendRequest) (*snpb.AppendResponse, error) {
err := snpb.ValidateTopicLogStream(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
func (ls logServer) Append(stream snpb.LogIO_AppendServer) (err error) {
req, rsp := &snpb.AppendRequest{}, &snpb.AppendResponse{}

payload := req.GetPayload()
req.Payload = nil
lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return nil, status.Error(codes.NotFound, "no such log stream")
}
for {
req.Reset()
err = stream.RecvMsg(req)
if err == io.EOF {
return nil
}
if err != nil {
return err
}

res, err := lse.Append(ctx, payload)
if err != nil {
var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.FromContextError(err).Code()
err = snpb.ValidateTopicLogStream(req)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return status.Error(codes.NotFound, "no such log stream")
}
res, err := lse.Append(stream.Context(), req.Payload)
if err != nil {
var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.FromContextError(err).Code()
}
return status.Error(code, err.Error())
}

rsp.Results = res
err = stream.Send(rsp)
if err != nil {
return err
}
return nil, status.Error(code, err.Error())
}
return &snpb.AppendResponse{Results: res}, nil
}

func (ls logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) {
Expand Down
74 changes: 23 additions & 51 deletions internal/storagenode/storagenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,61 +456,45 @@ func TestStorageNode_Append(t *testing.T) {

tcs := []struct {
name string
testf func(t *testing.T, addr string, lc snpb.LogIOClient)
testf func(t *testing.T, addr string, lc *client.LogClient)
}{
{
name: "InvalidTopicID",
testf: func(t *testing.T, _ string, lc snpb.LogIOClient) {
testf: func(t *testing.T, _ string, lc *client.LogClient) {
const invalidTopicID = types.TopicID(0)
_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: invalidTopicID,
LogStreamID: lsid,
Payload: payload,
})
_, err := lc.Append(context.Background(), invalidTopicID, lsid, payload)
require.Error(t, err)
require.Equal(t, codes.InvalidArgument, status.Code(err))
},
},
{
name: "InvalidLogStreamID",
testf: func(t *testing.T, _ string, lc snpb.LogIOClient) {
testf: func(t *testing.T, _ string, lc *client.LogClient) {
const invalidLogStreamID = types.LogStreamID(0)
_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: invalidLogStreamID,
Payload: payload,
})
_, err := lc.Append(context.Background(), tpid, invalidLogStreamID, payload)
require.Error(t, err)
require.Equal(t, codes.InvalidArgument, status.Code(err))
},
},
{
name: "NoSuchTopic",
testf: func(t *testing.T, _ string, lc snpb.LogIOClient) {
_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: tpid + 1,
LogStreamID: lsid,
Payload: payload,
})
testf: func(t *testing.T, _ string, lc *client.LogClient) {
_, err := lc.Append(context.Background(), tpid+1, lsid, payload)
require.Error(t, err)
require.Equal(t, codes.NotFound, status.Code(err))
},
},
{
name: "NoSuchLogStream",
testf: func(t *testing.T, _ string, lc snpb.LogIOClient) {
_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid + 1,
Payload: payload,
})
testf: func(t *testing.T, _ string, lc *client.LogClient) {
_, err := lc.Append(context.Background(), tpid, lsid+1, payload)
require.Error(t, err)
require.Equal(t, codes.NotFound, status.Code(err))
},
},
{
name: "NotPrimary",
testf: func(t *testing.T, addr string, lc snpb.LogIOClient) {
testf: func(t *testing.T, addr string, lc *client.LogClient) {
lss, lastGLSN := TestSealLogStreamReplica(t, cid, snid, tpid, lsid, types.InvalidGLSN, addr)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.True(t, lastGLSN.Invalid())
Expand Down Expand Up @@ -538,34 +522,26 @@ func TestStorageNode_Append(t *testing.T) {
},
}, addr)

_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid,
Payload: payload,
})
_, err := lc.Append(context.Background(), tpid, lsid, payload)
require.Error(t, err)
require.Equal(t, codes.Unavailable, status.Code(err))
},
},
{
name: "Sealed",
testf: func(t *testing.T, addr string, lc snpb.LogIOClient) {
testf: func(t *testing.T, addr string, lc *client.LogClient) {
lss, lastGLSN := TestSealLogStreamReplica(t, cid, snid, tpid, lsid, types.InvalidGLSN, addr)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.True(t, lastGLSN.Invalid())

_, err := lc.Append(context.Background(), &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid,
Payload: payload,
})
_, err := lc.Append(context.Background(), tpid, lsid, payload)
require.Error(t, err)
require.Equal(t, codes.FailedPrecondition, status.Code(err))
},
},
{
name: "DeadlineExceeded",
testf: func(t *testing.T, addr string, lc snpb.LogIOClient) {
testf: func(t *testing.T, addr string, lc *client.LogClient) {
lss, lastGLSN := TestSealLogStreamReplica(t, cid, snid, tpid, lsid, types.InvalidGLSN, addr)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.True(t, lastGLSN.Invalid())
Expand All @@ -585,18 +561,14 @@ func TestStorageNode_Append(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
_, err := lc.Append(ctx, &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid,
Payload: payload,
})
_, err := lc.Append(ctx, tpid, lsid, payload)
require.Error(t, err)
require.Equal(t, codes.DeadlineExceeded, status.Code(err))
},
},
{
name: "Canceled",
testf: func(t *testing.T, addr string, lc snpb.LogIOClient) {
testf: func(t *testing.T, addr string, lc *client.LogClient) {
lss, lastGLSN := TestSealLogStreamReplica(t, cid, snid, tpid, lsid, types.InvalidGLSN, addr)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.True(t, lastGLSN.Invalid())
Expand All @@ -623,11 +595,7 @@ func TestStorageNode_Append(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := lc.Append(ctx, &snpb.AppendRequest{
TopicID: tpid,
LogStreamID: lsid,
Payload: payload,
})
_, err := lc.Append(ctx, tpid, lsid, payload)
assert.Error(t, err)
assert.Equal(t, codes.Canceled, status.Code(err))
}()
Expand Down Expand Up @@ -663,8 +631,12 @@ func TestStorageNode_Append(t *testing.T) {
defer func() {
require.NoError(t, rpcConn.Close())
}()
lc := snpb.NewLogIOClient(rpcConn.Conn)

lc := client.TestNewLogClient(t, snpb.NewLogIOClient(rpcConn.Conn),
varlogpb.StorageNode{
StorageNodeID: snid,
Address: addr,
},
)
tc.testf(t, addr, lc)
})
}
Expand Down
Loading

0 comments on commit 2cea668

Please sign in to comment.