From dec3421ddf305ec161e274643089a5dcfb415b53 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Thu, 25 May 2023 14:59:16 +0900 Subject: [PATCH] feat(client): add LogStreamAppender This PR adds LogStreamAppender to the client, and it is a client to append asynchronously only a particular log stream. Resolves #433. --- internal/storagenode/client/log_client.go | 4 + pkg/varlog/config.go | 72 ++++++++ pkg/varlog/errors.go | 5 + pkg/varlog/log.go | 7 + pkg/varlog/log_mock.go | 20 +++ pkg/varlog/log_stream_appender.go | 201 ++++++++++++++++++++++ pkg/varlog/log_stream_appender_test.go | 1 + pkg/varlogtest/log.go | 96 ++++++++++- pkg/varlogtest/varlogtest_test.go | 75 ++++++++ tests/it/cluster/client_test.go | 160 +++++++++++++++++ 10 files changed, 640 insertions(+), 1 deletion(-) create mode 100644 pkg/varlog/errors.go create mode 100644 pkg/varlog/log_stream_appender.go create mode 100644 pkg/varlog/log_stream_appender_test.go diff --git a/internal/storagenode/client/log_client.go b/internal/storagenode/client/log_client.go index 6b90155bb..ebfa87e0c 100644 --- a/internal/storagenode/client/log_client.go +++ b/internal/storagenode/client/log_client.go @@ -70,6 +70,10 @@ func (c *LogClient) Append(ctx context.Context, tpid types.TopicID, lsid types.L return rsp.Results, nil } +func (c *LogClient) AppendStream(ctx context.Context) (snpb.LogIO_AppendClient, error) { + return c.rpcClient.Append(ctx) +} + // Subscribe gets log entries continuously from the storage node. It guarantees that LLSNs of log // entries taken are sequential. func (c *LogClient) Subscribe(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, begin, end types.GLSN) (<-chan SubscribeResult, error) { diff --git a/pkg/varlog/config.go b/pkg/varlog/config.go index 469664af9..83f1959be 100644 --- a/pkg/varlog/config.go +++ b/pkg/varlog/config.go @@ -3,6 +3,9 @@ package varlog import ( "context" "time" + + "github.com/kakao/varlog/pkg/types" + "github.com/kakao/varlog/proto/varlogpb" ) type adminConfig struct { @@ -91,3 +94,72 @@ func WithTimeout(timeout time.Duration) AdminCallOption { cfg.timeout.set = true }) } + +const ( + defaultPipelineSize = 2 + minPipelineSize = 1 + maxPipelineSize = 8 +) + +type logStreamAppenderConfig struct { + defaultBatchCallback BatchCallback + tpid types.TopicID + lsid types.LogStreamID + pipelineSize int +} + +func newLogStreamAppenderConfig(opts []LogStreamAppenderOption) logStreamAppenderConfig { + cfg := logStreamAppenderConfig{ + pipelineSize: defaultPipelineSize, + defaultBatchCallback: func([]varlogpb.LogEntryMeta, error) {}, + } + for _, opt := range opts { + opt.applyLogStreamAppender(&cfg) + } + cfg.ensureDefault() + return cfg +} + +func (cfg *logStreamAppenderConfig) ensureDefault() { + if cfg.pipelineSize < minPipelineSize { + cfg.pipelineSize = minPipelineSize + } + if cfg.pipelineSize > maxPipelineSize { + cfg.pipelineSize = maxPipelineSize + } +} + +type funcLogStreamAppenderOption struct { + f func(*logStreamAppenderConfig) +} + +func newFuncLogStreamAppenderOption(f func(config *logStreamAppenderConfig)) *funcLogStreamAppenderOption { + return &funcLogStreamAppenderOption{f: f} +} + +func (fo *funcLogStreamAppenderOption) applyLogStreamAppender(cfg *logStreamAppenderConfig) { + fo.f(cfg) +} + +// LogStreamAppenderOption configures a LogStreamAppender. +type LogStreamAppenderOption interface { + applyLogStreamAppender(config *logStreamAppenderConfig) +} + +// WithPipelineSize sets request pipeline size. The default pipeline size is +// two. Any value below one will be set to one, and any above eight will be +// limited to eight. +func WithPipelineSize(pipelineSize int) LogStreamAppenderOption { + return newFuncLogStreamAppenderOption(func(cfg *logStreamAppenderConfig) { + cfg.pipelineSize = pipelineSize + }) +} + +// WithDefaultBatchCallback sets the default callback function. The default callback +// function can be overridden by the argument callback of the AppendBatch +// method. +func WithDefaultBatchCallback(defaultBatchCallback BatchCallback) LogStreamAppenderOption { + return newFuncLogStreamAppenderOption(func(cfg *logStreamAppenderConfig) { + cfg.defaultBatchCallback = defaultBatchCallback + }) +} diff --git a/pkg/varlog/errors.go b/pkg/varlog/errors.go new file mode 100644 index 000000000..97bfc4ac3 --- /dev/null +++ b/pkg/varlog/errors.go @@ -0,0 +1,5 @@ +package varlog + +import "errors" + +var ErrClosed = errors.New("client: closed") diff --git a/pkg/varlog/log.go b/pkg/varlog/log.go index 39969576f..e4f7747f4 100644 --- a/pkg/varlog/log.go +++ b/pkg/varlog/log.go @@ -42,6 +42,9 @@ type Log interface { // replica. If none of the replicas' statuses is either appendable or // sealed, it returns an error. PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) + + // NewLogStreamAppender returns a new LogStreamAppender. + NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error) } type AppendResult struct { @@ -177,6 +180,10 @@ func (v *logImpl) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid ty return v.peekLogStream(ctx, tpid, lsid) } +func (v *logImpl) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error) { + return v.newLogStreamAppender(context.Background(), tpid, lsid, opts...) +} + func (v *logImpl) Close() (err error) { if v.closed.Load() { return diff --git a/pkg/varlog/log_mock.go b/pkg/varlog/log_mock.go index 7fe132c71..8a5189d77 100644 --- a/pkg/varlog/log_mock.go +++ b/pkg/varlog/log_mock.go @@ -89,6 +89,26 @@ func (mr *MockLogMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockLog)(nil).Close)) } +// NewLogStreamAppender mocks base method. +func (m *MockLog) NewLogStreamAppender(arg0 types.TopicID, arg1 types.LogStreamID, arg2 ...LogStreamAppenderOption) (LogStreamAppender, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "NewLogStreamAppender", varargs...) + ret0, _ := ret[0].(LogStreamAppender) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewLogStreamAppender indicates an expected call of NewLogStreamAppender. +func (mr *MockLogMockRecorder) NewLogStreamAppender(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewLogStreamAppender", reflect.TypeOf((*MockLog)(nil).NewLogStreamAppender), varargs...) +} + // PeekLogStream mocks base method. func (m *MockLog) PeekLogStream(arg0 context.Context, arg1 types.TopicID, arg2 types.LogStreamID) (varlogpb.LogSequenceNumber, varlogpb.LogSequenceNumber, error) { m.ctrl.T.Helper() diff --git a/pkg/varlog/log_stream_appender.go b/pkg/varlog/log_stream_appender.go new file mode 100644 index 000000000..8a4135c29 --- /dev/null +++ b/pkg/varlog/log_stream_appender.go @@ -0,0 +1,201 @@ +package varlog + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/puzpuzpuz/xsync/v2" + + "github.com/kakao/varlog/pkg/types" + "github.com/kakao/varlog/proto/snpb" + "github.com/kakao/varlog/proto/varlogpb" +) + +// LogStreamAppender is a client only to be able to append to a particular log +// stream. +type LogStreamAppender interface { + // AppendBatch appends dataBatch to the given log stream asynchronously. + // Users can call this method without being blocked until the pipeline of + // the LogStreamAppender is full. If the pipeline of the LogStreamAppender + // is already full, it may become blocked. However, the process will + // continue once a response is received from the storage node. + // On completion of AppendBatch, the argument callback provided by users + // will be invoked. All callback functions registered to the same + // LogStreamAppender will be called by the same goroutine sequentially. + // Therefore, the callback should be lightweight. If heavy work is + // necessary for the callback, it would be better to use separate worker + // goroutines. + // The only error from the AppendBatch is ErrClosed, which is returned when + // the LogStreamAppender is already closed. It returns nil even if the + // underlying stream is disconnected and notifies errors via callback. + // It is safe to have multiple goroutines calling AppendBatch + // simultaneously, but the order between them is not guaranteed. + AppendBatch(dataBatch [][]byte, callback BatchCallback) error + + // Close closes the LogStreamAppender client. Once the client is closed, + // calling AppendBatch will fail immediately. If AppendBatch still waits + // for room of pipeline, Close will be blocked. It also waits for all + // pending callbacks to be called. + // It's important for users to avoid calling Close within the callback + // function, as it may cause indefinite blocking. + Close() +} + +// BatchCallback is a callback function to notify the result of +// AppendBatch. +type BatchCallback func([]varlogpb.LogEntryMeta, error) + +type cbQueueEntry struct { + cb BatchCallback + err error +} + +func newCallbackQueueEntry() *cbQueueEntry { + return callbackQueueEntryPool.Get().(*cbQueueEntry) +} + +func (cqe *cbQueueEntry) Release() { + *cqe = cbQueueEntry{} + callbackQueueEntryPool.Put(cqe) +} + +var callbackQueueEntryPool = sync.Pool{ + New: func() any { + return &cbQueueEntry{} + }, +} + +type logStreamAppender struct { + logStreamAppenderConfig + stream snpb.LogIO_AppendClient + cancelFunc context.CancelCauseFunc + sema chan struct{} + cbq chan *cbQueueEntry + wg sync.WaitGroup + closed struct { + xsync.RBMutex + value bool + } +} + +var _ LogStreamAppender = (*logStreamAppender)(nil) + +func (v *logImpl) newLogStreamAppender(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error) { + replicas, ok := v.replicasRetriever.Retrieve(tpid, lsid) + if !ok { + return nil, fmt.Errorf("client: log stream %d of topic %d does not exist", lsid, tpid) + } + + snid := replicas[0].StorageNodeID + addr := replicas[0].Address + cl, err := v.logCLManager.GetOrConnect(ctx, snid, addr) + if err != nil { + v.allowlist.Deny(tpid, lsid) + return nil, fmt.Errorf("client: %w", err) + } + + ctx, cancelFunc := context.WithCancelCause(ctx) + stream, err := cl.AppendStream(ctx) + if err != nil { + cancelFunc(err) + return nil, fmt.Errorf("client: %w", err) + } + + cfg := newLogStreamAppenderConfig(opts) + cfg.tpid = tpid + cfg.lsid = lsid + lsa := &logStreamAppender{ + logStreamAppenderConfig: cfg, + stream: stream, + sema: make(chan struct{}, cfg.pipelineSize), + cbq: make(chan *cbQueueEntry, cfg.pipelineSize), + cancelFunc: cancelFunc, + } + lsa.wg.Add(1) + go lsa.recvLoop() + return lsa, nil +} + +func (lsa *logStreamAppender) AppendBatch(dataBatch [][]byte, callback BatchCallback) error { + rt := lsa.closed.RLock() + defer lsa.closed.RUnlock(rt) + if lsa.closed.value { + return ErrClosed + } + + lsa.sema <- struct{}{} + + qe := newCallbackQueueEntry() + qe.cb = callback + + err := lsa.stream.Send(&snpb.AppendRequest{ + TopicID: lsa.tpid, + LogStreamID: lsa.lsid, + Payload: dataBatch, + }) + if err != nil { + _ = lsa.stream.CloseSend() + qe.err = err + } + lsa.cbq <- qe + return nil +} + +func (lsa *logStreamAppender) Close() { + lsa.cancelFunc(nil) + + lsa.closed.Lock() + defer lsa.closed.Unlock() + if lsa.closed.value { + return + } + lsa.closed.value = true + + close(lsa.cbq) + lsa.wg.Wait() +} + +func (lsa *logStreamAppender) recvLoop() { + defer lsa.wg.Done() + + var err error + var meta []varlogpb.LogEntryMeta + var cb BatchCallback + rsp := &snpb.AppendResponse{} + + for qe := range lsa.cbq { + meta = nil + err = qe.err + if err != nil { + goto Call + } + + rsp.Reset() + err = lsa.stream.RecvMsg(rsp) + if err != nil { + goto Call + } + + meta = make([]varlogpb.LogEntryMeta, len(rsp.Results)) + for idx, res := range rsp.Results { + if len(res.Error) == 0 { + meta[idx] = res.Meta + continue + } + err = errors.New(res.Error) + break + } + Call: + if qe.cb != nil { + cb = qe.cb + } else { + cb = lsa.defaultBatchCallback + } + if cb != nil { + cb(meta, err) + } + <-lsa.sema + } +} diff --git a/pkg/varlog/log_stream_appender_test.go b/pkg/varlog/log_stream_appender_test.go new file mode 100644 index 000000000..d06ab7146 --- /dev/null +++ b/pkg/varlog/log_stream_appender_test.go @@ -0,0 +1 @@ +package varlog diff --git a/pkg/varlogtest/log.go b/pkg/varlogtest/log.go index 8cd860bfb..a09b5c8e0 100644 --- a/pkg/varlogtest/log.go +++ b/pkg/varlogtest/log.go @@ -28,7 +28,7 @@ func (c *testLog) lock() error { return nil } -func (c testLog) unlock() { +func (c *testLog) unlock() { c.vt.cond.L.Unlock() } @@ -297,6 +297,100 @@ func (c *testLog) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid ty } +// NewLogStreamAppender returns a new fake LogStreamAppender for testing. It +// ignores options; the pipeline size is five, and the default callback has no +// operation. +func (c *testLog) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, _ ...varlog.LogStreamAppenderOption) (varlog.LogStreamAppender, error) { + const pipelineSize = 5 + + lsa := &logStreamAppender{ + c: c, + tpid: tpid, + lsid: lsid, + pipelineSize: pipelineSize, + defaultCallback: func([]varlogpb.LogEntryMeta, error) {}, + } + lsa.queue.ch = make(chan *queueEntry, pipelineSize) + lsa.queue.cv = sync.NewCond(&lsa.queue.mu) + + lsa.wg.Add(1) + go func() { + defer lsa.wg.Done() + for qe := range lsa.queue.ch { + qe.callback(qe.result.Metadata, nil) + lsa.queue.cv.L.Lock() + lsa.queue.cv.Broadcast() + lsa.queue.cv.L.Unlock() + } + }() + + return lsa, nil +} + +type queueEntry struct { + callback varlog.BatchCallback + result varlog.AppendResult +} + +type logStreamAppender struct { + c *testLog + tpid types.TopicID + lsid types.LogStreamID + pipelineSize int + defaultCallback varlog.BatchCallback + + closed struct { + value bool + sync.Mutex + } + queue struct { + ch chan *queueEntry + cv *sync.Cond + mu sync.Mutex + } + + wg sync.WaitGroup +} + +var _ varlog.LogStreamAppender = (*logStreamAppender)(nil) + +func (lsa *logStreamAppender) AppendBatch(dataBatch [][]byte, callback varlog.BatchCallback) error { + lsa.closed.Lock() + defer lsa.closed.Unlock() + + if lsa.closed.value { + return varlog.ErrClosed + } + + lsa.queue.cv.L.Lock() + defer lsa.queue.cv.L.Unlock() + + for len(lsa.queue.ch) >= lsa.pipelineSize { + lsa.queue.cv.Wait() + } + + qe := &queueEntry{ + callback: callback, + } + qe.result = lsa.c.AppendTo(context.Background(), lsa.tpid, lsa.lsid, dataBatch) + if qe.callback == nil { + qe.callback = lsa.defaultCallback + } + lsa.queue.ch <- qe + return nil +} + +func (lsa *logStreamAppender) Close() { + lsa.closed.Lock() + defer lsa.closed.Unlock() + if lsa.closed.value { + return + } + lsa.closed.value = true + close(lsa.queue.ch) + lsa.wg.Wait() +} + type errSubscriber struct { err error } diff --git a/pkg/varlogtest/varlogtest_test.go b/pkg/varlogtest/varlogtest_test.go index a585b084c..39429681b 100644 --- a/pkg/varlogtest/varlogtest_test.go +++ b/pkg/varlogtest/varlogtest_test.go @@ -21,6 +21,81 @@ import ( "github.com/kakao/varlog/proto/varlogpb" ) +func TestVarlotTest_LogStreamAppender(t *testing.T) { + const ( + cid = types.ClusterID(1) + numLogs = 10 + replicationFactor = 3 + ) + + tcs := []struct { + name string + testf func(t *testing.T, lsa varlog.LogStreamAppender) + }{ + { + name: "Closed", + testf: func(t *testing.T, lsa varlog.LogStreamAppender) { + lsa.Close() + err := lsa.AppendBatch([][]byte{[]byte("foo")}, nil) + require.Equal(t, varlog.ErrClosed, err) + }, + }, + { + name: "AppendLogs", + testf: func(t *testing.T, lsa varlog.LogStreamAppender) { + cb := func(_ []varlogpb.LogEntryMeta, err error) { + assert.NoError(t, err) + } + for i := 0; i < numLogs; i++ { + err := lsa.AppendBatch([][]byte{[]byte("foo")}, cb) + require.NoError(t, err) + } + }, + }, + } + + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + vt := varlogtest.New(cid, replicationFactor) + adm := vt.Admin() + vlg := vt.Log() + defer func() { + require.NoError(t, vlg.Close()) + require.NoError(t, adm.Close()) + }() + + for i := 0; i < replicationFactor; i++ { + snid := types.StorageNodeID(i + 1) + addr := fmt.Sprintf("sn%03d", i+1) + snMetaDesc, err := adm.AddStorageNode(context.Background(), snid, addr) + require.NoError(t, err) + require.Equal(t, cid, snMetaDesc.ClusterID) + require.Empty(t, snMetaDesc.LogStreamReplicas) + require.Equal(t, varlogpb.StorageNodeStatusRunning, snMetaDesc.Status) + require.Equal(t, addr, snMetaDesc.StorageNode.Address) + require.NotEmpty(t, snMetaDesc.Storages) + } + + td, err := adm.AddTopic(context.Background()) + require.NoError(t, err) + require.Equal(t, varlogpb.TopicStatusRunning, td.Status) + + lsd, err := adm.AddLogStream(context.Background(), td.TopicID, nil) + require.NoError(t, err) + require.Equal(t, td.TopicID, lsd.TopicID) + require.Equal(t, varlogpb.LogStreamStatusRunning, lsd.Status) + require.Len(t, lsd.Replicas, replicationFactor) + + lsa, err := vlg.NewLogStreamAppender(td.TopicID, lsd.LogStreamID) + require.NoError(t, err) + defer lsa.Close() + + tc.testf(t, lsa) + }) + } +} + func TestVarlogTest(t *testing.T) { defer goleak.VerifyNone(t) diff --git a/tests/it/cluster/client_test.go b/tests/it/cluster/client_test.go index c87e0aa68..095f57457 100644 --- a/tests/it/cluster/client_test.go +++ b/tests/it/cluster/client_test.go @@ -12,6 +12,8 @@ import ( . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/testutil" @@ -707,3 +709,161 @@ func TestClientAppendWithAllowedLogStream(t *testing.T) { require.ErrorIs(t, err, io.EOF) require.NoError(t, subscriber.Close()) } + +func TestLogStreamAppender(t *testing.T) { + const ( + pipelineSize = 2 + calls = pipelineSize * 5 + batchSize = 2 + ) + + tcs := []struct { + name string + testf func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) + }{ + { + name: "CloseBeforeAppendLogs", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize)) + require.NoError(t, err) + lsa.Close() + err = lsa.AppendBatch([][]byte{[]byte("foo")}, nil) + require.Equal(t, varlog.ErrClosed, err) + }, + }, + { + name: "CloseAfterProcessingCallbacks", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + var ( + llsn types.LLSN + glsn types.GLSN + called atomic.Int32 + ) + cb := func(metas []varlogpb.LogEntryMeta, err error) { + called.Add(1) + assert.NoError(t, err) + assert.Len(t, metas, batchSize) + assert.Less(t, llsn, metas[batchSize-1].LLSN) + assert.Less(t, glsn, metas[batchSize-1].GLSN) + llsn = metas[batchSize-1].LLSN + glsn = metas[batchSize-1].GLSN + } + + lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize)) + require.NoError(t, err) + defer func() { + lsa.Close() + require.EqualValues(t, calls, called.Load()) + }() + + for i := 0; i < calls; i++ { + data := make([][]byte, batchSize) + for j := 0; j < batchSize; j++ { + data[j] = []byte(fmt.Sprintf("%d,%d", i, j)) + } + err := lsa.AppendBatch(data, cb) + require.NoError(t, err) + } + require.Eventually(t, func() bool { + return called.Load() == calls + }, 5*time.Second, 100*time.Millisecond) + }, + }, + { + name: "CloseWhileProcessingCallbacks", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + var ( + llsn types.LLSN + glsn types.GLSN + called atomic.Int32 + ) + cb := func(metas []varlogpb.LogEntryMeta, err error) { + called.Add(1) + if err != nil { + assert.Equal(t, codes.Canceled, status.Code(err)) + return + } + assert.NoError(t, err) + assert.Len(t, metas, batchSize) + assert.Less(t, llsn, metas[batchSize-1].LLSN) + assert.Less(t, glsn, metas[batchSize-1].GLSN) + llsn = metas[batchSize-1].LLSN + glsn = metas[batchSize-1].GLSN + } + + lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize)) + require.NoError(t, err) + defer func() { + lsa.Close() + require.EqualValues(t, calls, called.Load()) + }() + + for i := 0; i < calls; i++ { + data := make([][]byte, batchSize) + for j := 0; j < batchSize; j++ { + data[j] = []byte(fmt.Sprintf("%d,%d", i, j)) + } + err := lsa.AppendBatch(data, cb) + require.NoError(t, err) + } + require.Eventually(t, func() bool { + return called.Load() > 0 + }, time.Second, 10*time.Millisecond) + }, + }, + { + name: "AppendChain_DoNotDoThis", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + var called atomic.Int32 + + lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize)) + require.NoError(t, err) + defer func() { + lsa.Close() + require.EqualValues(t, calls, called.Load()) + }() + + dataBatch := [][]byte{[]byte("foo")} + callback := testCallbackGen(t, lsa, dataBatch, calls, &called) + err = lsa.AppendBatch(dataBatch, callback) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return called.Load() == calls + }, 5*time.Second, 100*time.Millisecond) + }, + }, + } + + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + clus := it.NewVarlogCluster(t, + it.WithReplicationFactor(3), + it.WithNumberOfStorageNodes(3), + it.WithNumberOfLogStreams(1), + it.WithNumberOfClients(1), + it.WithVMSOptions(it.NewTestVMSOptions()...), + it.WithNumberOfTopics(1), + ) + defer clus.Close(t) + + tpid := clus.TopicIDs()[0] + lsid := clus.LogStreamID(t, tpid, 0) + client := clus.ClientAtIndex(t, 0) + + tc.testf(t, tpid, lsid, client) + }) + } +} + +func testCallbackGen(t *testing.T, lsa varlog.LogStreamAppender, dataBatch [][]byte, limit int32, called *atomic.Int32) varlog.BatchCallback { + return func(metas []varlogpb.LogEntryMeta, err error) { + require.NoError(t, err) + called.Add(1) + if called.Load() < limit { + err = lsa.AppendBatch(dataBatch, testCallbackGen(t, lsa, dataBatch, limit, called)) + require.NoError(t, err) + } + } +}