From 0e0a36e446af16bdfaeb7dff8367f768e7c7dcf1 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Wed, 5 Jun 2024 11:26:44 +0900 Subject: [PATCH] perf(sn): reuse buffer for ReplicateRequest unmarshaling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve unmarshaling performance by reusing buffers for ReplicateRequest in the backup replica. The protobuf message `github.com/kakao/varlog/proto/snpb.(ReplicateRequest)` has two slice fields—LLSN (`[]uint64`) and Data (`[][]byte`). The backup replica receives replicated log entries from the primary replica via the gRPC service `github.com/kakao/varlog/proto/snpb.(ReplicatorServer).Replicate`, which sends `ReplicateRequest` messages. Upon receiving a `ReplicateRequest`, the backup replica unmarshals the message, which involves growing slices for fields such as LLSN and Data. This growth causes copy overhead whenever the slice capacities need to expand. To address this, we introduce a new method, `ResetReuse`, for reusing slices instead of resetting them completely. The `ResetReuse` method shrinks the slice lengths while preserving their capacities, thus avoiding the overhead of reallocating memory. Example implementation: ```go type Message struct { Buffer []byte // Other fields } func (m *Message) Reset() { *m = Message{} } func (m *Message) ResetReuse() { s := m.Buffer[:0] *m = Message{} m.Buffer = s } ``` Risks: This approach has potential downsides. Since the heap space consumed by the slices is not reclaimed, the storage node's memory consumption may increase. Currently, there is no mechanism to shrink the heap usage. Additionally, this PR changes the generated code. The protobuf compiler can revert it, which is contrary to our intention. To catch this mistake, this PR includes a unit test (github.com/kakao/varlog/proto/snpb.TestReplicateRequest) to verify that the buffer backing the slices is reused. Resolves: #795 See also: #806 --- docker-d2hub-push.sh | 43 ++++++++ internal/storagenode/replication_server.go | 16 ++- proto/snpb/replicator.go | 11 ++ proto/snpb/replicator.pb.go | 109 ++++++++++++++++--- proto/snpb/replicator_test.go | 64 ++++++++++++ test.log | 116 +++++++++++++++++++++ 6 files changed, 337 insertions(+), 22 deletions(-) create mode 100644 docker-d2hub-push.sh create mode 100644 test.log diff --git a/docker-d2hub-push.sh b/docker-d2hub-push.sh new file mode 100644 index 000000000..7b6657ae3 --- /dev/null +++ b/docker-d2hub-push.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +set -e -o pipefail + +scriptdir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" > /dev/null && pwd )" + +IMAGE_REGISTRY="idock.daumkakao.io" +IMAGE_NAMESPACE=varlog +DOCKER_CONTEXT="${scriptdir}" +DOCKERFILE="${scriptdir}/build/Dockerfile" +VERSION=$(git describe --tags --abbrev=0)-$(git rev-parse --short HEAD) + + +if [ "${IMAGE_REGISTRY}" = "" ]; then + echo "no image registry" + exit 1 +fi + +function build_push() { + local target=$1 + local name=$2 + local tag=$3 + + echo "build: target=${target} tag=${tag}" + docker build --target "${target}" --file "${DOCKERFILE}" --tag "${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/${name}:${tag}" "${DOCKER_CONTEXT}" + docker push "${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/${name}:${tag}" +} + +function push() { + local target=$1 + local name=$2 + local tag=$3 + + echo "push: target=${target} tag=${tag}" + docker push "${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/${name}:${tag}" +} + +for name in varlogctl varlogcli varlogmr varlogadm varlogsn; do + target="${name}" + tag="${VERSION}" + build_push ${target} ${name} ${tag} + build_push ${target} ${name} ${tag} +done diff --git a/internal/storagenode/replication_server.go b/internal/storagenode/replication_server.go index dfa9c776a..49b9f353e 100644 --- a/internal/storagenode/replication_server.go +++ b/internal/storagenode/replication_server.go @@ -93,15 +93,12 @@ type replicationServerTask struct { err error } -func newReplicationServerTask(req snpb.ReplicateRequest, err error) *replicationServerTask { - rst := replicationServerTaskPool.Get().(*replicationServerTask) - rst.req = req - rst.err = err - return rst +func newReplicationServerTask() *replicationServerTask { + return replicationServerTaskPool.Get().(*replicationServerTask) } func (rst *replicationServerTask) release() { - rst.req = snpb.ReplicateRequest{} + rst.req.ResetReuse() rst.err = nil replicationServerTaskPool.Put(rst) } @@ -113,11 +110,10 @@ func (rs *replicationServer) recv(ctx context.Context, stream snpb.Replicator_Re go func() { defer wg.Done() defer close(c) - req := &snpb.ReplicateRequest{} for { - req.Reset() - err := stream.RecvMsg(req) - rst := newReplicationServerTask(*req, err) + rst := newReplicationServerTask() + err := stream.RecvMsg(&rst.req) + rst.err = err select { case c <- rst: if err != nil { diff --git a/proto/snpb/replicator.go b/proto/snpb/replicator.go index 6d9800cf8..2c28ca8f6 100644 --- a/proto/snpb/replicator.go +++ b/proto/snpb/replicator.go @@ -6,6 +6,17 @@ import ( "github.com/kakao/varlog/pkg/types" ) +func (m *ReplicateRequest) ResetReuse() { + llsnSlice := m.LLSN[:0] + for i := range m.Data { + m.Data[i] = m.Data[i][:0] + } + dataSlice := m.Data[:0] + m.Reset() + m.LLSN = llsnSlice + m.Data = dataSlice +} + func InvalidSyncPosition() SyncPosition { return SyncPosition{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN} } diff --git a/proto/snpb/replicator.pb.go b/proto/snpb/replicator.pb.go index 139a31eb6..e5105c631 100644 --- a/proto/snpb/replicator.pb.go +++ b/proto/snpb/replicator.pb.go @@ -10,6 +10,7 @@ import ( io "io" math "math" math_bits "math/bits" + "slices" strconv "strconv" _ "github.com/gogo/protobuf/gogoproto" @@ -24,9 +25,11 @@ import ( ) // Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf +var ( + _ = proto.Marshal + _ = fmt.Errorf + _ = math.Inf +) // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. @@ -80,9 +83,11 @@ func (*ReplicateRequest) ProtoMessage() {} func (*ReplicateRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{0} } + func (m *ReplicateRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_ReplicateRequest.Marshal(b, m, deterministic) @@ -95,12 +100,15 @@ func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, er return b[:n], nil } } + func (m *ReplicateRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_ReplicateRequest.Merge(m, src) } + func (m *ReplicateRequest) XXX_Size() int { return m.ProtoSize() } + func (m *ReplicateRequest) XXX_DiscardUnknown() { xxx_messageInfo_ReplicateRequest.DiscardUnknown(m) } @@ -135,8 +143,7 @@ func (m *ReplicateRequest) GetData() [][]byte { return nil } -type ReplicateResponse struct { -} +type ReplicateResponse struct{} func (m *ReplicateResponse) Reset() { *m = ReplicateResponse{} } func (m *ReplicateResponse) String() string { return proto.CompactTextString(m) } @@ -144,9 +151,11 @@ func (*ReplicateResponse) ProtoMessage() {} func (*ReplicateResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{1} } + func (m *ReplicateResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_ReplicateResponse.Marshal(b, m, deterministic) @@ -159,12 +168,15 @@ func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, e return b[:n], nil } } + func (m *ReplicateResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_ReplicateResponse.Merge(m, src) } + func (m *ReplicateResponse) XXX_Size() int { return m.ProtoSize() } + func (m *ReplicateResponse) XXX_DiscardUnknown() { xxx_messageInfo_ReplicateResponse.DiscardUnknown(m) } @@ -182,9 +194,11 @@ func (*SyncPosition) ProtoMessage() {} func (*SyncPosition) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{2} } + func (m *SyncPosition) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncPosition.Marshal(b, m, deterministic) @@ -197,12 +211,15 @@ func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } + func (m *SyncPosition) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncPosition.Merge(m, src) } + func (m *SyncPosition) XXX_Size() int { return m.ProtoSize() } + func (m *SyncPosition) XXX_DiscardUnknown() { xxx_messageInfo_SyncPosition.DiscardUnknown(m) } @@ -239,9 +256,11 @@ func (*SyncRange) ProtoMessage() {} func (*SyncRange) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{3} } + func (m *SyncRange) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncRange.Marshal(b, m, deterministic) @@ -254,12 +273,15 @@ func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } + func (m *SyncRange) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncRange.Merge(m, src) } + func (m *SyncRange) XXX_Size() int { return m.ProtoSize() } + func (m *SyncRange) XXX_DiscardUnknown() { xxx_messageInfo_SyncRange.DiscardUnknown(m) } @@ -305,9 +327,11 @@ func (*SyncInitRequest) ProtoMessage() {} func (*SyncInitRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{4} } + func (m *SyncInitRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncInitRequest.Marshal(b, m, deterministic) @@ -320,12 +344,15 @@ func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, err return b[:n], nil } } + func (m *SyncInitRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncInitRequest.Merge(m, src) } + func (m *SyncInitRequest) XXX_Size() int { return m.ProtoSize() } + func (m *SyncInitRequest) XXX_DiscardUnknown() { xxx_messageInfo_SyncInitRequest.DiscardUnknown(m) } @@ -381,9 +408,11 @@ func (*SyncInitResponse) ProtoMessage() {} func (*SyncInitResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{5} } + func (m *SyncInitResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncInitResponse.Marshal(b, m, deterministic) @@ -396,12 +425,15 @@ func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, er return b[:n], nil } } + func (m *SyncInitResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncInitResponse.Merge(m, src) } + func (m *SyncInitResponse) XXX_Size() int { return m.ProtoSize() } + func (m *SyncInitResponse) XXX_DiscardUnknown() { xxx_messageInfo_SyncInitResponse.DiscardUnknown(m) } @@ -428,9 +460,11 @@ func (*SyncStatus) ProtoMessage() {} func (*SyncStatus) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{6} } + func (m *SyncStatus) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncStatus.Marshal(b, m, deterministic) @@ -443,12 +477,15 @@ func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } + func (m *SyncStatus) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncStatus.Merge(m, src) } + func (m *SyncStatus) XXX_Size() int { return m.ProtoSize() } + func (m *SyncStatus) XXX_DiscardUnknown() { xxx_messageInfo_SyncStatus.DiscardUnknown(m) } @@ -494,9 +531,11 @@ func (*SyncPayload) ProtoMessage() {} func (*SyncPayload) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{7} } + func (m *SyncPayload) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncPayload.Marshal(b, m, deterministic) @@ -509,12 +548,15 @@ func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } + func (m *SyncPayload) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncPayload.Merge(m, src) } + func (m *SyncPayload) XXX_Size() int { return m.ProtoSize() } + func (m *SyncPayload) XXX_DiscardUnknown() { xxx_messageInfo_SyncPayload.DiscardUnknown(m) } @@ -548,9 +590,11 @@ func (*SyncReplicateRequest) ProtoMessage() {} func (*SyncReplicateRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{8} } + func (m *SyncReplicateRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncReplicateRequest.Marshal(b, m, deterministic) @@ -563,12 +607,15 @@ func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte return b[:n], nil } } + func (m *SyncReplicateRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncReplicateRequest.Merge(m, src) } + func (m *SyncReplicateRequest) XXX_Size() int { return m.ProtoSize() } + func (m *SyncReplicateRequest) XXX_DiscardUnknown() { xxx_messageInfo_SyncReplicateRequest.DiscardUnknown(m) } @@ -613,9 +660,11 @@ func (*SyncReplicateResponse) ProtoMessage() {} func (*SyncReplicateResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{9} } + func (m *SyncReplicateResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncReplicateResponse.Marshal(b, m, deterministic) @@ -628,12 +677,15 @@ func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byt return b[:n], nil } } + func (m *SyncReplicateResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncReplicateResponse.Merge(m, src) } + func (m *SyncReplicateResponse) XXX_Size() int { return m.ProtoSize() } + func (m *SyncReplicateResponse) XXX_DiscardUnknown() { xxx_messageInfo_SyncReplicateResponse.DiscardUnknown(m) } @@ -738,6 +790,7 @@ func (x SyncState) String() string { } return strconv.Itoa(int(x)) } + func (this *ReplicateRequest) Equal(that interface{}) bool { if that == nil { return this == nil @@ -781,6 +834,7 @@ func (this *ReplicateRequest) Equal(that interface{}) bool { } return true } + func (this *SyncPosition) Equal(that interface{}) bool { if that == nil { return this == nil @@ -810,8 +864,10 @@ func (this *SyncPosition) Equal(that interface{}) bool { } // Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn +var ( + _ context.Context + _ grpc.ClientConn +) // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. @@ -1022,18 +1078,20 @@ type ReplicatorServer interface { } // UnimplementedReplicatorServer can be embedded to have forward compatible implementations. -type UnimplementedReplicatorServer struct { -} +type UnimplementedReplicatorServer struct{} func (*UnimplementedReplicatorServer) Replicate(srv Replicator_ReplicateServer) error { return status.Errorf(codes.Unimplemented, "method Replicate not implemented") } + func (*UnimplementedReplicatorServer) SyncInit(ctx context.Context, req *SyncInitRequest) (*SyncInitResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SyncInit not implemented") } + func (*UnimplementedReplicatorServer) SyncReplicate(ctx context.Context, req *SyncReplicateRequest) (*SyncReplicateResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SyncReplicate not implemented") } + func (*UnimplementedReplicatorServer) SyncReplicateStream(srv Replicator_SyncReplicateStreamServer) error { return status.Errorf(codes.Unimplemented, "method SyncReplicateStream not implemented") } @@ -1612,6 +1670,7 @@ func encodeVarintReplicator(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } + func NewPopulatedReplicateRequest(r randyReplicator, easy bool) *ReplicateRequest { this := &ReplicateRequest{} this.TopicID = github_com_kakao_varlog_pkg_types.TopicID(r.Int31()) @@ -1659,6 +1718,7 @@ func randUTF8RuneReplicator(r randyReplicator) rune { } return rune(ru + 61) } + func randStringReplicator(r randyReplicator) string { v4 := r.Intn(100) tmps := make([]rune, v4) @@ -1667,6 +1727,7 @@ func randStringReplicator(r randyReplicator) string { } return string(tmps) } + func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []byte) { l := r.Intn(5) for i := 0; i < l; i++ { @@ -1679,6 +1740,7 @@ func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []b } return dAtA } + func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire int) []byte { key := uint32(fieldNumber)<<3 | uint32(wire) switch wire { @@ -1705,6 +1767,7 @@ func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire i } return dAtA } + func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { for v >= 1<<7 { dAtA = append(dAtA, uint8(uint64(v)&0x7f|0x80)) @@ -1713,6 +1776,7 @@ func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { dAtA = append(dAtA, uint8(v)) return dAtA } + func (m *ReplicateRequest) ProtoSize() (n int) { if m == nil { return 0 @@ -1881,9 +1945,11 @@ func (m *SyncReplicateResponse) ProtoSize() (n int) { func sovReplicator(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } + func sozReplicator(x uint64) (n int) { return sovReplicator(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } + func (this *SyncPayload) GetValue() interface{} { if this.CommitContext != nil { return this.CommitContext @@ -1905,6 +1971,7 @@ func (this *SyncPayload) SetValue(value interface{}) bool { } return true } + func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2025,7 +2092,8 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { } elementCount = count if elementCount != 0 && len(m.LLSN) == 0 { - m.LLSN = make([]github_com_kakao_varlog_pkg_types.LLSN, 0, elementCount) + m.LLSN = slices.Grow(m.LLSN, elementCount) + m.LLSN = m.LLSN[:0] } for iNdEx < postIndex { var v github_com_kakao_varlog_pkg_types.LLSN @@ -2077,8 +2145,15 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Data = append(m.Data, make([]byte, postIndex-iNdEx)) - copy(m.Data[len(m.Data)-1], dAtA[iNdEx:postIndex]) + if len(m.Data) < cap(m.Data) { + m.Data = m.Data[:len(m.Data)+1] + } else { + m.Data = append(m.Data, nil) + } + lastIndex := len(m.Data) - 1 + m.Data[lastIndex] = slices.Grow(m.Data[lastIndex], postIndex-iNdEx) + m.Data[lastIndex] = m.Data[lastIndex][:postIndex-iNdEx] + copy(m.Data[lastIndex], dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -2101,6 +2176,7 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2151,6 +2227,7 @@ func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncPosition) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2239,6 +2316,7 @@ func (m *SyncPosition) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncRange) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2327,6 +2405,7 @@ func (m *SyncRange) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2514,6 +2593,7 @@ func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2597,6 +2677,7 @@ func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncStatus) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2765,6 +2846,7 @@ func (m *SyncStatus) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncPayload) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2887,6 +2969,7 @@ func (m *SyncPayload) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3055,6 +3138,7 @@ func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3141,6 +3225,7 @@ func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { } return nil } + func skipReplicator(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/proto/snpb/replicator_test.go b/proto/snpb/replicator_test.go index 437c06d5d..888a086e3 100644 --- a/proto/snpb/replicator_test.go +++ b/proto/snpb/replicator_test.go @@ -2,10 +2,74 @@ package snpb import ( "testing" + "unsafe" "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" ) +func getPointers[S ~[]E, E any](elems S) []uintptr { + ptrs := make([]uintptr, len(elems)) + for i := range elems { + p := uintptr(unsafe.Pointer(&elems[i])) + ptrs[i] = p + } + return ptrs +} + +func TestReplicateRequest(t *testing.T) { + reqLen1 := ReplicateRequest{ + LLSN: []types.LLSN{1}, + Data: [][]byte{[]byte("one")}, + } + data1, err := reqLen1.Marshal() + require.NoError(t, err) + _ = data1 + + reqLen2 := ReplicateRequest{ + LLSN: []types.LLSN{1, 2}, + Data: [][]byte{[]byte("one"), []byte("two")}, + } + data2, err := reqLen2.Marshal() + require.NoError(t, err) + _ = data2 + + t.Run("EqualLength", func(t *testing.T) { + var req ReplicateRequest + err := req.Unmarshal(data1) + require.NoError(t, err) + want1 := getPointers(req.LLSN) + want2 := getPointers(req.Data) + + req.ResetReuse() + err = req.Unmarshal(data1) + require.NoError(t, err) + got1 := getPointers(req.LLSN) + got2 := getPointers(req.Data) + + require.Equal(t, want1, got1) + require.Equal(t, want2, got2) + }) + + t.Run("GrowLength", func(t *testing.T) { + var req ReplicateRequest + err := req.Unmarshal(data1) + require.NoError(t, err) + want1 := getPointers(req.LLSN) + want2 := getPointers(req.Data) + + req.ResetReuse() + err = req.Unmarshal(data2) + require.NoError(t, err) + got1 := getPointers(req.LLSN) + got2 := getPointers(req.Data) + + require.NotEqual(t, want1, got1) + require.NotEqual(t, want2, got2) + }) +} + func TestSyncPositionInvalid(t *testing.T) { tcs := []struct { input SyncPosition diff --git a/test.log b/test.log new file mode 100644 index 000000000..dd5595e3b --- /dev/null +++ b/test.log @@ -0,0 +1,116 @@ +go test -race -failfast -count=1 -timeout=20m -p=1 ./... +? github.com/kakao/varlog/cmd/benchmark [no test files] +? github.com/kakao/varlog/cmd/mrtool [no test files] +? github.com/kakao/varlog/cmd/varlogadm [no test files] +? github.com/kakao/varlog/cmd/varlogcli [no test files] +? github.com/kakao/varlog/cmd/varlogctl [no test files] +? github.com/kakao/varlog/cmd/varlogmr [no test files] +? github.com/kakao/varlog/cmd/varlogsn [no test files] +ok github.com/kakao/varlog/internal/admin 3.658s +? github.com/kakao/varlog/internal/admin/admerrors [no test files] +? github.com/kakao/varlog/internal/admin/mrmanager [no test files] +? github.com/kakao/varlog/internal/admin/sfgkey [no test files] +ok github.com/kakao/varlog/internal/admin/snmanager 1.712s +ok github.com/kakao/varlog/internal/admin/snwatcher 1.617s +ok github.com/kakao/varlog/internal/admin/stats 1.495s +? github.com/kakao/varlog/internal/batchlet [no test files] +? github.com/kakao/varlog/internal/benchmark [no test files] +? github.com/kakao/varlog/internal/benchmark/database [no test files] +? github.com/kakao/varlog/internal/benchmark/initdb [no test files] +? github.com/kakao/varlog/internal/benchmark/model/execution [no test files] +? github.com/kakao/varlog/internal/benchmark/model/executiontrigger [no test files] +? github.com/kakao/varlog/internal/benchmark/model/macro/macrobenchmark [no test files] +? github.com/kakao/varlog/internal/benchmark/model/macro/metric [no test files] +? github.com/kakao/varlog/internal/benchmark/model/macro/result [no test files] +? github.com/kakao/varlog/internal/benchmark/model/macro/target [no test files] +? github.com/kakao/varlog/internal/benchmark/model/macro/workload [no test files] +? github.com/kakao/varlog/internal/benchmark/server [no test files] +? github.com/kakao/varlog/internal/buildinfo [no test files] +ok github.com/kakao/varlog/internal/flags 1.483s +ok github.com/kakao/varlog/internal/metarepos 247.365s +? github.com/kakao/varlog/internal/reportcommitter [no test files] +ok github.com/kakao/varlog/internal/stopchannel 1.398s +ok github.com/kakao/varlog/internal/storage 8.096s +ok github.com/kakao/varlog/internal/storagenode 20.965s +ok github.com/kakao/varlog/internal/storagenode/client 2.759s +? github.com/kakao/varlog/internal/storagenode/errors [no test files] +ok github.com/kakao/varlog/internal/storagenode/executorsmap 1.689s +ok github.com/kakao/varlog/internal/storagenode/logstream 25.671s +? github.com/kakao/varlog/internal/storagenode/pprof [no test files] +ok github.com/kakao/varlog/internal/storagenode/telemetry 1.382s +ok github.com/kakao/varlog/internal/storagenode/volume 1.276s +? github.com/kakao/varlog/internal/varlogcli [no test files] +ok github.com/kakao/varlog/internal/varlogctl 1.674s +? github.com/kakao/varlog/internal/varlogctl/logstream [no test files] +? github.com/kakao/varlog/internal/varlogctl/metarepos [no test files] +? github.com/kakao/varlog/internal/varlogctl/storagenode [no test files] +? github.com/kakao/varlog/internal/varlogctl/topic [no test files] +ok github.com/kakao/varlog/pkg/mrc 1.468s +ok github.com/kakao/varlog/pkg/mrc/mrconnector 10.114s +ok github.com/kakao/varlog/pkg/rpc 1.397s +ok github.com/kakao/varlog/pkg/rpc/interceptors 1.284s +? github.com/kakao/varlog/pkg/rpc/interceptors/logging [no test files] +? github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc [no test files] +ok github.com/kakao/varlog/pkg/types 1.275s +ok github.com/kakao/varlog/pkg/util/container/set 1.249s +ok github.com/kakao/varlog/pkg/util/fputil 1.269s +ok github.com/kakao/varlog/pkg/util/jobqueue 1.392s +ok github.com/kakao/varlog/pkg/util/log 1.263s [no tests to run] +? github.com/kakao/varlog/pkg/util/mathutil [no test files] +ok github.com/kakao/varlog/pkg/util/netutil 1.309s +ok github.com/kakao/varlog/pkg/util/runner 1.794s +ok github.com/kakao/varlog/pkg/util/runner/first 1.250s +ok github.com/kakao/varlog/pkg/util/runner/stopwaiter 1.253s +ok github.com/kakao/varlog/pkg/util/stringsutil 1.259s +ok github.com/kakao/varlog/pkg/util/syncutil 1.256s +ok github.com/kakao/varlog/pkg/util/syncutil/atomicutil 1.254s [no tests to run] +? github.com/kakao/varlog/pkg/util/telemetry [no test files] +? github.com/kakao/varlog/pkg/util/testutil [no test files] +? github.com/kakao/varlog/pkg/util/testutil/conveyutil [no test files] +ok github.com/kakao/varlog/pkg/util/testutil/ports 1.265s +ok github.com/kakao/varlog/pkg/util/timeutil 1.248s [no tests to run] +ok github.com/kakao/varlog/pkg/util/units 1.270s +ok github.com/kakao/varlog/pkg/varlog 4.831s +? github.com/kakao/varlog/pkg/varlog/x/mlsa [no test files] +ok github.com/kakao/varlog/pkg/varlogtest 2.065s +ok github.com/kakao/varlog/pkg/verrors 1.295s +? github.com/kakao/varlog/pkg/vflag [no test files] +? github.com/kakao/varlog/proto/admpb [no test files] +? github.com/kakao/varlog/proto/errpb [no test files] +ok github.com/kakao/varlog/proto/mrpb 3.290s +? github.com/kakao/varlog/proto/mrpb/mock [no test files] +? github.com/kakao/varlog/proto/rpcbenchpb [no test files] +ok github.com/kakao/varlog/proto/snpb 1.425s +? github.com/kakao/varlog/proto/snpb/mock [no test files] +ok github.com/kakao/varlog/proto/varlogpb 1.278s +ok github.com/kakao/varlog/tests 1.600s +? github.com/kakao/varlog/tests/ee [no test files] +? github.com/kakao/varlog/tests/ee/cluster [no test files] +? github.com/kakao/varlog/tests/ee/cluster/k8s [no test files] +? github.com/kakao/varlog/tests/ee/cluster/k8s/client [no test files] +? github.com/kakao/varlog/tests/ee/cluster/k8s/nodelabel [no test files] +? github.com/kakao/varlog/tests/ee/cluster/k8s/podlabel [no test files] +? github.com/kakao/varlog/tests/ee/cluster/k8s/vault [no test files] +? github.com/kakao/varlog/tests/ee/cluster/local [no test files] +ok github.com/kakao/varlog/tests/ee/cluster/local/daemon 1.292s +? github.com/kakao/varlog/tests/ee/cluster/local/metarepos [no test files] +? github.com/kakao/varlog/tests/ee/cluster/local/storagenode [no test files] +? github.com/kakao/varlog/tests/ee/controller [no test files] +? github.com/kakao/varlog/tests/ee/grpc [no test files] +ok github.com/kakao/varlog/tests/it 35.095s +ok github.com/kakao/varlog/tests/it/cluster 341.877s +ok github.com/kakao/varlog/tests/it/failover 56.871s +ok github.com/kakao/varlog/tests/it/management 249.755s +ok github.com/kakao/varlog/tests/it/mrconnector 5.485s +? github.com/kakao/varlog/vtesting [no test files] +pytest +============================= test session starts ============================== +platform darwin -- Python 3.12.0, pytest-7.4.3, pluggy-1.3.0 +rootdir: /Users/ijsong/workspace/varlog +collected 6 items + +bin/tests/test_start_varlogmr.py . [ 16%] +bin/tests/test_start_varlogsn.py ... [ 66%] +bin/tests/test_vmr.py .. [100%] + +============================== 6 passed in 1.84s ===============================