diff --git a/docker-d2hub-push.sh b/docker-d2hub-push.sh new file mode 100644 index 000000000..7b6657ae3 --- /dev/null +++ b/docker-d2hub-push.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +set -e -o pipefail + +scriptdir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" > /dev/null && pwd )" + +IMAGE_REGISTRY="idock.daumkakao.io" +IMAGE_NAMESPACE=varlog +DOCKER_CONTEXT="${scriptdir}" +DOCKERFILE="${scriptdir}/build/Dockerfile" +VERSION=$(git describe --tags --abbrev=0)-$(git rev-parse --short HEAD) + + +if [ "${IMAGE_REGISTRY}" = "" ]; then + echo "no image registry" + exit 1 +fi + +function build_push() { + local target=$1 + local name=$2 + local tag=$3 + + echo "build: target=${target} tag=${tag}" + docker build --target "${target}" --file "${DOCKERFILE}" --tag "${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/${name}:${tag}" "${DOCKER_CONTEXT}" + docker push "${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/${name}:${tag}" +} + +function push() { + local target=$1 + local name=$2 + local tag=$3 + + echo "push: target=${target} tag=${tag}" + docker push "${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/${name}:${tag}" +} + +for name in varlogctl varlogcli varlogmr varlogadm varlogsn; do + target="${name}" + tag="${VERSION}" + build_push ${target} ${name} ${tag} + build_push ${target} ${name} ${tag} +done diff --git a/internal/storagenode/replication_server.go b/internal/storagenode/replication_server.go index dfa9c776a..49b9f353e 100644 --- a/internal/storagenode/replication_server.go +++ b/internal/storagenode/replication_server.go @@ -93,15 +93,12 @@ type replicationServerTask struct { err error } -func newReplicationServerTask(req snpb.ReplicateRequest, err error) *replicationServerTask { - rst := replicationServerTaskPool.Get().(*replicationServerTask) - rst.req = req - rst.err = err - return rst +func newReplicationServerTask() *replicationServerTask { + return replicationServerTaskPool.Get().(*replicationServerTask) } func (rst *replicationServerTask) release() { - rst.req = snpb.ReplicateRequest{} + rst.req.ResetReuse() rst.err = nil replicationServerTaskPool.Put(rst) } @@ -113,11 +110,10 @@ func (rs *replicationServer) recv(ctx context.Context, stream snpb.Replicator_Re go func() { defer wg.Done() defer close(c) - req := &snpb.ReplicateRequest{} for { - req.Reset() - err := stream.RecvMsg(req) - rst := newReplicationServerTask(*req, err) + rst := newReplicationServerTask() + err := stream.RecvMsg(&rst.req) + rst.err = err select { case c <- rst: if err != nil { diff --git a/proto/snpb/replicator.go b/proto/snpb/replicator.go index 6d9800cf8..2c28ca8f6 100644 --- a/proto/snpb/replicator.go +++ b/proto/snpb/replicator.go @@ -6,6 +6,17 @@ import ( "github.com/kakao/varlog/pkg/types" ) +func (m *ReplicateRequest) ResetReuse() { + llsnSlice := m.LLSN[:0] + for i := range m.Data { + m.Data[i] = m.Data[i][:0] + } + dataSlice := m.Data[:0] + m.Reset() + m.LLSN = llsnSlice + m.Data = dataSlice +} + func InvalidSyncPosition() SyncPosition { return SyncPosition{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN} } diff --git a/proto/snpb/replicator.pb.go b/proto/snpb/replicator.pb.go index 139a31eb6..e5105c631 100644 --- a/proto/snpb/replicator.pb.go +++ b/proto/snpb/replicator.pb.go @@ -10,6 +10,7 @@ import ( io "io" math "math" math_bits "math/bits" + "slices" strconv "strconv" _ "github.com/gogo/protobuf/gogoproto" @@ -24,9 +25,11 @@ import ( ) // Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf +var ( + _ = proto.Marshal + _ = fmt.Errorf + _ = math.Inf +) // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. @@ -80,9 +83,11 @@ func (*ReplicateRequest) ProtoMessage() {} func (*ReplicateRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{0} } + func (m *ReplicateRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_ReplicateRequest.Marshal(b, m, deterministic) @@ -95,12 +100,15 @@ func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, er return b[:n], nil } } + func (m *ReplicateRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_ReplicateRequest.Merge(m, src) } + func (m *ReplicateRequest) XXX_Size() int { return m.ProtoSize() } + func (m *ReplicateRequest) XXX_DiscardUnknown() { xxx_messageInfo_ReplicateRequest.DiscardUnknown(m) } @@ -135,8 +143,7 @@ func (m *ReplicateRequest) GetData() [][]byte { return nil } -type ReplicateResponse struct { -} +type ReplicateResponse struct{} func (m *ReplicateResponse) Reset() { *m = ReplicateResponse{} } func (m *ReplicateResponse) String() string { return proto.CompactTextString(m) } @@ -144,9 +151,11 @@ func (*ReplicateResponse) ProtoMessage() {} func (*ReplicateResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{1} } + func (m *ReplicateResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_ReplicateResponse.Marshal(b, m, deterministic) @@ -159,12 +168,15 @@ func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, e return b[:n], nil } } + func (m *ReplicateResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_ReplicateResponse.Merge(m, src) } + func (m *ReplicateResponse) XXX_Size() int { return m.ProtoSize() } + func (m *ReplicateResponse) XXX_DiscardUnknown() { xxx_messageInfo_ReplicateResponse.DiscardUnknown(m) } @@ -182,9 +194,11 @@ func (*SyncPosition) ProtoMessage() {} func (*SyncPosition) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{2} } + func (m *SyncPosition) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncPosition.Marshal(b, m, deterministic) @@ -197,12 +211,15 @@ func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } + func (m *SyncPosition) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncPosition.Merge(m, src) } + func (m *SyncPosition) XXX_Size() int { return m.ProtoSize() } + func (m *SyncPosition) XXX_DiscardUnknown() { xxx_messageInfo_SyncPosition.DiscardUnknown(m) } @@ -239,9 +256,11 @@ func (*SyncRange) ProtoMessage() {} func (*SyncRange) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{3} } + func (m *SyncRange) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncRange.Marshal(b, m, deterministic) @@ -254,12 +273,15 @@ func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } + func (m *SyncRange) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncRange.Merge(m, src) } + func (m *SyncRange) XXX_Size() int { return m.ProtoSize() } + func (m *SyncRange) XXX_DiscardUnknown() { xxx_messageInfo_SyncRange.DiscardUnknown(m) } @@ -305,9 +327,11 @@ func (*SyncInitRequest) ProtoMessage() {} func (*SyncInitRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{4} } + func (m *SyncInitRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncInitRequest.Marshal(b, m, deterministic) @@ -320,12 +344,15 @@ func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, err return b[:n], nil } } + func (m *SyncInitRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncInitRequest.Merge(m, src) } + func (m *SyncInitRequest) XXX_Size() int { return m.ProtoSize() } + func (m *SyncInitRequest) XXX_DiscardUnknown() { xxx_messageInfo_SyncInitRequest.DiscardUnknown(m) } @@ -381,9 +408,11 @@ func (*SyncInitResponse) ProtoMessage() {} func (*SyncInitResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{5} } + func (m *SyncInitResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncInitResponse.Marshal(b, m, deterministic) @@ -396,12 +425,15 @@ func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, er return b[:n], nil } } + func (m *SyncInitResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncInitResponse.Merge(m, src) } + func (m *SyncInitResponse) XXX_Size() int { return m.ProtoSize() } + func (m *SyncInitResponse) XXX_DiscardUnknown() { xxx_messageInfo_SyncInitResponse.DiscardUnknown(m) } @@ -428,9 +460,11 @@ func (*SyncStatus) ProtoMessage() {} func (*SyncStatus) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{6} } + func (m *SyncStatus) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncStatus.Marshal(b, m, deterministic) @@ -443,12 +477,15 @@ func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } + func (m *SyncStatus) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncStatus.Merge(m, src) } + func (m *SyncStatus) XXX_Size() int { return m.ProtoSize() } + func (m *SyncStatus) XXX_DiscardUnknown() { xxx_messageInfo_SyncStatus.DiscardUnknown(m) } @@ -494,9 +531,11 @@ func (*SyncPayload) ProtoMessage() {} func (*SyncPayload) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{7} } + func (m *SyncPayload) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncPayload.Marshal(b, m, deterministic) @@ -509,12 +548,15 @@ func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } + func (m *SyncPayload) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncPayload.Merge(m, src) } + func (m *SyncPayload) XXX_Size() int { return m.ProtoSize() } + func (m *SyncPayload) XXX_DiscardUnknown() { xxx_messageInfo_SyncPayload.DiscardUnknown(m) } @@ -548,9 +590,11 @@ func (*SyncReplicateRequest) ProtoMessage() {} func (*SyncReplicateRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{8} } + func (m *SyncReplicateRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncReplicateRequest.Marshal(b, m, deterministic) @@ -563,12 +607,15 @@ func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte return b[:n], nil } } + func (m *SyncReplicateRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncReplicateRequest.Merge(m, src) } + func (m *SyncReplicateRequest) XXX_Size() int { return m.ProtoSize() } + func (m *SyncReplicateRequest) XXX_DiscardUnknown() { xxx_messageInfo_SyncReplicateRequest.DiscardUnknown(m) } @@ -613,9 +660,11 @@ func (*SyncReplicateResponse) ProtoMessage() {} func (*SyncReplicateResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{9} } + func (m *SyncReplicateResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncReplicateResponse.Marshal(b, m, deterministic) @@ -628,12 +677,15 @@ func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byt return b[:n], nil } } + func (m *SyncReplicateResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncReplicateResponse.Merge(m, src) } + func (m *SyncReplicateResponse) XXX_Size() int { return m.ProtoSize() } + func (m *SyncReplicateResponse) XXX_DiscardUnknown() { xxx_messageInfo_SyncReplicateResponse.DiscardUnknown(m) } @@ -738,6 +790,7 @@ func (x SyncState) String() string { } return strconv.Itoa(int(x)) } + func (this *ReplicateRequest) Equal(that interface{}) bool { if that == nil { return this == nil @@ -781,6 +834,7 @@ func (this *ReplicateRequest) Equal(that interface{}) bool { } return true } + func (this *SyncPosition) Equal(that interface{}) bool { if that == nil { return this == nil @@ -810,8 +864,10 @@ func (this *SyncPosition) Equal(that interface{}) bool { } // Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn +var ( + _ context.Context + _ grpc.ClientConn +) // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. @@ -1022,18 +1078,20 @@ type ReplicatorServer interface { } // UnimplementedReplicatorServer can be embedded to have forward compatible implementations. -type UnimplementedReplicatorServer struct { -} +type UnimplementedReplicatorServer struct{} func (*UnimplementedReplicatorServer) Replicate(srv Replicator_ReplicateServer) error { return status.Errorf(codes.Unimplemented, "method Replicate not implemented") } + func (*UnimplementedReplicatorServer) SyncInit(ctx context.Context, req *SyncInitRequest) (*SyncInitResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SyncInit not implemented") } + func (*UnimplementedReplicatorServer) SyncReplicate(ctx context.Context, req *SyncReplicateRequest) (*SyncReplicateResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SyncReplicate not implemented") } + func (*UnimplementedReplicatorServer) SyncReplicateStream(srv Replicator_SyncReplicateStreamServer) error { return status.Errorf(codes.Unimplemented, "method SyncReplicateStream not implemented") } @@ -1612,6 +1670,7 @@ func encodeVarintReplicator(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } + func NewPopulatedReplicateRequest(r randyReplicator, easy bool) *ReplicateRequest { this := &ReplicateRequest{} this.TopicID = github_com_kakao_varlog_pkg_types.TopicID(r.Int31()) @@ -1659,6 +1718,7 @@ func randUTF8RuneReplicator(r randyReplicator) rune { } return rune(ru + 61) } + func randStringReplicator(r randyReplicator) string { v4 := r.Intn(100) tmps := make([]rune, v4) @@ -1667,6 +1727,7 @@ func randStringReplicator(r randyReplicator) string { } return string(tmps) } + func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []byte) { l := r.Intn(5) for i := 0; i < l; i++ { @@ -1679,6 +1740,7 @@ func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []b } return dAtA } + func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire int) []byte { key := uint32(fieldNumber)<<3 | uint32(wire) switch wire { @@ -1705,6 +1767,7 @@ func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire i } return dAtA } + func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { for v >= 1<<7 { dAtA = append(dAtA, uint8(uint64(v)&0x7f|0x80)) @@ -1713,6 +1776,7 @@ func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { dAtA = append(dAtA, uint8(v)) return dAtA } + func (m *ReplicateRequest) ProtoSize() (n int) { if m == nil { return 0 @@ -1881,9 +1945,11 @@ func (m *SyncReplicateResponse) ProtoSize() (n int) { func sovReplicator(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } + func sozReplicator(x uint64) (n int) { return sovReplicator(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } + func (this *SyncPayload) GetValue() interface{} { if this.CommitContext != nil { return this.CommitContext @@ -1905,6 +1971,7 @@ func (this *SyncPayload) SetValue(value interface{}) bool { } return true } + func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2025,7 +2092,8 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { } elementCount = count if elementCount != 0 && len(m.LLSN) == 0 { - m.LLSN = make([]github_com_kakao_varlog_pkg_types.LLSN, 0, elementCount) + m.LLSN = slices.Grow(m.LLSN, elementCount) + m.LLSN = m.LLSN[:0] } for iNdEx < postIndex { var v github_com_kakao_varlog_pkg_types.LLSN @@ -2077,8 +2145,15 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Data = append(m.Data, make([]byte, postIndex-iNdEx)) - copy(m.Data[len(m.Data)-1], dAtA[iNdEx:postIndex]) + if len(m.Data) < cap(m.Data) { + m.Data = m.Data[:len(m.Data)+1] + } else { + m.Data = append(m.Data, nil) + } + lastIndex := len(m.Data) - 1 + m.Data[lastIndex] = slices.Grow(m.Data[lastIndex], postIndex-iNdEx) + m.Data[lastIndex] = m.Data[lastIndex][:postIndex-iNdEx] + copy(m.Data[lastIndex], dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -2101,6 +2176,7 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2151,6 +2227,7 @@ func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncPosition) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2239,6 +2316,7 @@ func (m *SyncPosition) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncRange) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2327,6 +2405,7 @@ func (m *SyncRange) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2514,6 +2593,7 @@ func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2597,6 +2677,7 @@ func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncStatus) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2765,6 +2846,7 @@ func (m *SyncStatus) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncPayload) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2887,6 +2969,7 @@ func (m *SyncPayload) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3055,6 +3138,7 @@ func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3141,6 +3225,7 @@ func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { } return nil } + func skipReplicator(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/proto/snpb/replicator_test.go b/proto/snpb/replicator_test.go index 437c06d5d..888a086e3 100644 --- a/proto/snpb/replicator_test.go +++ b/proto/snpb/replicator_test.go @@ -2,10 +2,74 @@ package snpb import ( "testing" + "unsafe" "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" ) +func getPointers[S ~[]E, E any](elems S) []uintptr { + ptrs := make([]uintptr, len(elems)) + for i := range elems { + p := uintptr(unsafe.Pointer(&elems[i])) + ptrs[i] = p + } + return ptrs +} + +func TestReplicateRequest(t *testing.T) { + reqLen1 := ReplicateRequest{ + LLSN: []types.LLSN{1}, + Data: [][]byte{[]byte("one")}, + } + data1, err := reqLen1.Marshal() + require.NoError(t, err) + _ = data1 + + reqLen2 := ReplicateRequest{ + LLSN: []types.LLSN{1, 2}, + Data: [][]byte{[]byte("one"), []byte("two")}, + } + data2, err := reqLen2.Marshal() + require.NoError(t, err) + _ = data2 + + t.Run("EqualLength", func(t *testing.T) { + var req ReplicateRequest + err := req.Unmarshal(data1) + require.NoError(t, err) + want1 := getPointers(req.LLSN) + want2 := getPointers(req.Data) + + req.ResetReuse() + err = req.Unmarshal(data1) + require.NoError(t, err) + got1 := getPointers(req.LLSN) + got2 := getPointers(req.Data) + + require.Equal(t, want1, got1) + require.Equal(t, want2, got2) + }) + + t.Run("GrowLength", func(t *testing.T) { + var req ReplicateRequest + err := req.Unmarshal(data1) + require.NoError(t, err) + want1 := getPointers(req.LLSN) + want2 := getPointers(req.Data) + + req.ResetReuse() + err = req.Unmarshal(data2) + require.NoError(t, err) + got1 := getPointers(req.LLSN) + got2 := getPointers(req.Data) + + require.NotEqual(t, want1, got1) + require.NotEqual(t, want2, got2) + }) +} + func TestSyncPositionInvalid(t *testing.T) { tcs := []struct { input SyncPosition diff --git a/test.log b/test.log new file mode 100644 index 000000000..dd5595e3b --- /dev/null +++ b/test.log @@ -0,0 +1,116 @@ +go test -race -failfast -count=1 -timeout=20m -p=1 ./... +? github.com/kakao/varlog/cmd/benchmark [no test files] +? github.com/kakao/varlog/cmd/mrtool [no test files] +? github.com/kakao/varlog/cmd/varlogadm [no test files] +? github.com/kakao/varlog/cmd/varlogcli [no test files] +? github.com/kakao/varlog/cmd/varlogctl [no test files] +? github.com/kakao/varlog/cmd/varlogmr [no test files] +? github.com/kakao/varlog/cmd/varlogsn [no test files] +ok github.com/kakao/varlog/internal/admin 3.658s +? github.com/kakao/varlog/internal/admin/admerrors [no test files] +? github.com/kakao/varlog/internal/admin/mrmanager [no test files] +? github.com/kakao/varlog/internal/admin/sfgkey [no test files] +ok github.com/kakao/varlog/internal/admin/snmanager 1.712s +ok github.com/kakao/varlog/internal/admin/snwatcher 1.617s +ok github.com/kakao/varlog/internal/admin/stats 1.495s +? github.com/kakao/varlog/internal/batchlet [no test files] +? github.com/kakao/varlog/internal/benchmark [no test files] +? github.com/kakao/varlog/internal/benchmark/database [no test files] +? github.com/kakao/varlog/internal/benchmark/initdb [no test files] +? github.com/kakao/varlog/internal/benchmark/model/execution [no test files] +? github.com/kakao/varlog/internal/benchmark/model/executiontrigger [no test files] +? github.com/kakao/varlog/internal/benchmark/model/macro/macrobenchmark [no test files] +? github.com/kakao/varlog/internal/benchmark/model/macro/metric [no test files] +? github.com/kakao/varlog/internal/benchmark/model/macro/result [no test files] +? github.com/kakao/varlog/internal/benchmark/model/macro/target [no test files] +? github.com/kakao/varlog/internal/benchmark/model/macro/workload [no test files] +? github.com/kakao/varlog/internal/benchmark/server [no test files] +? github.com/kakao/varlog/internal/buildinfo [no test files] +ok github.com/kakao/varlog/internal/flags 1.483s +ok github.com/kakao/varlog/internal/metarepos 247.365s +? github.com/kakao/varlog/internal/reportcommitter [no test files] +ok github.com/kakao/varlog/internal/stopchannel 1.398s +ok github.com/kakao/varlog/internal/storage 8.096s +ok github.com/kakao/varlog/internal/storagenode 20.965s +ok github.com/kakao/varlog/internal/storagenode/client 2.759s +? github.com/kakao/varlog/internal/storagenode/errors [no test files] +ok github.com/kakao/varlog/internal/storagenode/executorsmap 1.689s +ok github.com/kakao/varlog/internal/storagenode/logstream 25.671s +? github.com/kakao/varlog/internal/storagenode/pprof [no test files] +ok github.com/kakao/varlog/internal/storagenode/telemetry 1.382s +ok github.com/kakao/varlog/internal/storagenode/volume 1.276s +? github.com/kakao/varlog/internal/varlogcli [no test files] +ok github.com/kakao/varlog/internal/varlogctl 1.674s +? github.com/kakao/varlog/internal/varlogctl/logstream [no test files] +? github.com/kakao/varlog/internal/varlogctl/metarepos [no test files] +? github.com/kakao/varlog/internal/varlogctl/storagenode [no test files] +? github.com/kakao/varlog/internal/varlogctl/topic [no test files] +ok github.com/kakao/varlog/pkg/mrc 1.468s +ok github.com/kakao/varlog/pkg/mrc/mrconnector 10.114s +ok github.com/kakao/varlog/pkg/rpc 1.397s +ok github.com/kakao/varlog/pkg/rpc/interceptors 1.284s +? github.com/kakao/varlog/pkg/rpc/interceptors/logging [no test files] +? github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc [no test files] +ok github.com/kakao/varlog/pkg/types 1.275s +ok github.com/kakao/varlog/pkg/util/container/set 1.249s +ok github.com/kakao/varlog/pkg/util/fputil 1.269s +ok github.com/kakao/varlog/pkg/util/jobqueue 1.392s +ok github.com/kakao/varlog/pkg/util/log 1.263s [no tests to run] +? github.com/kakao/varlog/pkg/util/mathutil [no test files] +ok github.com/kakao/varlog/pkg/util/netutil 1.309s +ok github.com/kakao/varlog/pkg/util/runner 1.794s +ok github.com/kakao/varlog/pkg/util/runner/first 1.250s +ok github.com/kakao/varlog/pkg/util/runner/stopwaiter 1.253s +ok github.com/kakao/varlog/pkg/util/stringsutil 1.259s +ok github.com/kakao/varlog/pkg/util/syncutil 1.256s +ok github.com/kakao/varlog/pkg/util/syncutil/atomicutil 1.254s [no tests to run] +? github.com/kakao/varlog/pkg/util/telemetry [no test files] +? github.com/kakao/varlog/pkg/util/testutil [no test files] +? github.com/kakao/varlog/pkg/util/testutil/conveyutil [no test files] +ok github.com/kakao/varlog/pkg/util/testutil/ports 1.265s +ok github.com/kakao/varlog/pkg/util/timeutil 1.248s [no tests to run] +ok github.com/kakao/varlog/pkg/util/units 1.270s +ok github.com/kakao/varlog/pkg/varlog 4.831s +? github.com/kakao/varlog/pkg/varlog/x/mlsa [no test files] +ok github.com/kakao/varlog/pkg/varlogtest 2.065s +ok github.com/kakao/varlog/pkg/verrors 1.295s +? github.com/kakao/varlog/pkg/vflag [no test files] +? github.com/kakao/varlog/proto/admpb [no test files] +? github.com/kakao/varlog/proto/errpb [no test files] +ok github.com/kakao/varlog/proto/mrpb 3.290s +? github.com/kakao/varlog/proto/mrpb/mock [no test files] +? github.com/kakao/varlog/proto/rpcbenchpb [no test files] +ok github.com/kakao/varlog/proto/snpb 1.425s +? github.com/kakao/varlog/proto/snpb/mock [no test files] +ok github.com/kakao/varlog/proto/varlogpb 1.278s +ok github.com/kakao/varlog/tests 1.600s +? github.com/kakao/varlog/tests/ee [no test files] +? github.com/kakao/varlog/tests/ee/cluster [no test files] +? github.com/kakao/varlog/tests/ee/cluster/k8s [no test files] +? github.com/kakao/varlog/tests/ee/cluster/k8s/client [no test files] +? github.com/kakao/varlog/tests/ee/cluster/k8s/nodelabel [no test files] +? github.com/kakao/varlog/tests/ee/cluster/k8s/podlabel [no test files] +? github.com/kakao/varlog/tests/ee/cluster/k8s/vault [no test files] +? github.com/kakao/varlog/tests/ee/cluster/local [no test files] +ok github.com/kakao/varlog/tests/ee/cluster/local/daemon 1.292s +? github.com/kakao/varlog/tests/ee/cluster/local/metarepos [no test files] +? github.com/kakao/varlog/tests/ee/cluster/local/storagenode [no test files] +? github.com/kakao/varlog/tests/ee/controller [no test files] +? github.com/kakao/varlog/tests/ee/grpc [no test files] +ok github.com/kakao/varlog/tests/it 35.095s +ok github.com/kakao/varlog/tests/it/cluster 341.877s +ok github.com/kakao/varlog/tests/it/failover 56.871s +ok github.com/kakao/varlog/tests/it/management 249.755s +ok github.com/kakao/varlog/tests/it/mrconnector 5.485s +? github.com/kakao/varlog/vtesting [no test files] +pytest +============================= test session starts ============================== +platform darwin -- Python 3.12.0, pytest-7.4.3, pluggy-1.3.0 +rootdir: /Users/ijsong/workspace/varlog +collected 6 items + +bin/tests/test_start_varlogmr.py . [ 16%] +bin/tests/test_start_varlogsn.py ... [ 66%] +bin/tests/test_vmr.py .. [100%] + +============================== 6 passed in 1.84s ===============================