Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve blob feed cursor key and add unit test #1113

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 63 additions & 15 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,30 @@ func (cursor *BlobFeedCursor) LessThan(other *BlobFeedCursor) bool {
return false
}

// ToRequestedAtBlobKey encodes the cursor into a string that preserves ordering.
// ToCursorKey encodes the cursor into a string that preserves ordering.
// For any two cursors A and B:
// - A < B if and only if A.ToRequestedAtBlobKey() < B.ToRequestedAtBlobKey()
// - A == B if and only if A.ToRequestedAtBlobKey() == B.ToRequestedAtBlobKey()
func (cursor *BlobFeedCursor) ToRequestedAtBlobKey() string {
if cursor.BlobKey == nil {
return encodeRequestedAtBlobKey(cursor.RequestedAt, "")
// - A < B if and only if A.ToCursorKey() < B.ToCursorKey()
// - A == B if and only if A.ToCursorKey() == B.ToCursorKey()
func (cursor *BlobFeedCursor) ToCursorKey() string {
return encodeBlobFeedCursorKey(cursor.RequestedAt, cursor.BlobKey)
}

// FromCursorKey decodes the cursor key string back to the cursor.
func (cursor *BlobFeedCursor) FromCursorKey(encoded string) (*BlobFeedCursor, error) {
requestedAt, blobKey, err := decodeBlobFeedCursorKey(encoded)
if err != nil {
return nil, err
}
if len(blobKey) == 0 {
return &BlobFeedCursor{
RequestedAt: requestedAt,
}, nil

}
return encodeRequestedAtBlobKey(cursor.RequestedAt, cursor.BlobKey.Hex())
return &BlobFeedCursor{
RequestedAt: requestedAt,
BlobKey: blobKey,
}, nil
}

// BlobMetadataStore is a blob metadata storage backed by DynamoDB
Expand Down Expand Up @@ -326,7 +341,7 @@ func (s *BlobMetadataStore) queryBucketMetadata(
return metadata, nil
}

// GetBlobMetadataByRequestedAt returns blobs (as BlobMetadata) that are in cusor position
// GetBlobMetadataByRequestedAt returns blobs (as BlobMetadata) that are in cursor position
// range (start, end]. Blobs returned are in cursor order.
//
// If limit > 0, returns at most that many blobs. If limit <= 0, returns all blobs in range.
Expand All @@ -342,8 +357,8 @@ func (s *BlobMetadataStore) GetBlobMetadataByRequestedAt(
}

startBucket, endBucket := s.getBucketIDRange(start.RequestedAt, end.RequestedAt)
startKey := start.ToRequestedAtBlobKey()
endKey := end.ToRequestedAtBlobKey()
startKey := start.ToCursorKey()
endKey := end.ToCursorKey()

result := make([]*v2.BlobMetadata, 0)
var lastProcessedCursor *BlobFeedCursor
Expand Down Expand Up @@ -1092,7 +1107,7 @@ func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error)
fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + blobKey.Hex()}
fields["SK"] = &types.AttributeValueMemberS{Value: blobMetadataSK}
fields["RequestedAtBucket"] = &types.AttributeValueMemberS{Value: computeRequestedAtBucket(metadata.RequestedAt)}
fields["RequestedAtBlobKey"] = &types.AttributeValueMemberS{Value: encodeRequestedAtBlobKey(metadata.RequestedAt, blobKey.Hex())}
fields["RequestedAtBlobKey"] = &types.AttributeValueMemberS{Value: encodeBlobFeedCursorKey(metadata.RequestedAt, &blobKey)}
return fields, nil
}

Expand Down Expand Up @@ -1369,17 +1384,50 @@ func computeRequestedAtBucket(requestedAt uint64) string {
return fmt.Sprintf("%d", id)
}

// encodeRequestedAtBlobKey encodes <requestedAt, blobKey> into string which
// encodeBlobFeedCursorKey encodes <requestedAt, blobKey> into string which
// preserves the order.
func encodeRequestedAtBlobKey(requestedAt uint64, blobKey string) string {
func encodeBlobFeedCursorKey(requestedAt uint64, blobKey *corev2.BlobKey) string {
result := make([]byte, 40) // 8 bytes for timestamp + 32 bytes for blobKey

// Write timestamp
binary.BigEndian.PutUint64(result[:8], requestedAt)

if blobKey != "" {
copy(result[8:], []byte(blobKey))
if blobKey != nil {
copy(result[8:], blobKey[:])
}
// Use hex encoding to preserve byte ordering
return hex.EncodeToString(result)
}

// decodeBlobFeedCursorKey decodes the cursor key back to <requestedAt, blobKey>.
func decodeBlobFeedCursorKey(encoded string) (uint64, *corev2.BlobKey, error) {
// Decode hex string
bytes, err := hex.DecodeString(encoded)
if err != nil {
return 0, nil, fmt.Errorf("invalid hex encoding: %w", err)
}

// Check length
if len(bytes) != 40 { // 8 bytes timestamp + 32 bytes blobKey
return 0, nil, fmt.Errorf("invalid length: expected 40 bytes, got %d", len(bytes))
}

// Get timestamp
requestedAt := binary.BigEndian.Uint64(bytes[:8])

// Check if the remaining bytes are all zeros
allZeros := true
for i := 8; i < len(bytes); i++ {
if bytes[i] != 0 {
allZeros = false
break
}
}

if allZeros {
return requestedAt, nil, nil
}
var bk corev2.BlobKey
copy(bk[:], bytes[8:])
return requestedAt, &bk, nil
}
146 changes: 146 additions & 0 deletions disperser/common/v2/blobstore/dynamo_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,152 @@ func checkBlobKeyEqual(t *testing.T, blobKey corev2.BlobKey, blobHeader *corev2.
assert.Equal(t, blobKey, bk)
}

func TestBlobFeedCursor_Equal(t *testing.T) {
bk1 := corev2.BlobKey([32]byte{1, 2, 3})
bk2 := corev2.BlobKey([32]byte{2, 3, 4})
tests := []struct {
cursor *blobstore.BlobFeedCursor
requestedAt uint64
blobKey *corev2.BlobKey
expected bool
}{
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
requestedAt: 1,
blobKey: &bk1,
expected: true,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: nil},
requestedAt: 1,
blobKey: nil,
expected: true,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
requestedAt: 2,
blobKey: &bk1,
expected: false,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
requestedAt: 1,
blobKey: nil,
expected: false,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: nil},
requestedAt: 1,
blobKey: &bk1,
expected: false,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
requestedAt: 1,
blobKey: &bk2,
expected: false,
},
}

for _, tt := range tests {
t.Run("Equal", func(t *testing.T) {
result := tt.cursor.Equal(tt.requestedAt, tt.blobKey)
assert.Equal(t, tt.expected, result)
})
}
}

func TestBlobFeedCursor_LessThan(t *testing.T) {
bk1 := corev2.BlobKey([32]byte{1, 2, 3})
bk2 := corev2.BlobKey([32]byte{2, 3, 4})
tests := []struct {
cursor *blobstore.BlobFeedCursor
otherCursor *blobstore.BlobFeedCursor
expected bool
}{
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 2, BlobKey: &bk1},
expected: true,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 2, BlobKey: &bk1},
otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
expected: false,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
expected: false,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: nil},
otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
expected: true,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: nil},
expected: false,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk2},
expected: true,
},
{
cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk2},
otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1},
expected: false,
},
}

for _, tt := range tests {
t.Run("LessThan", func(t *testing.T) {
result := tt.cursor.LessThan(tt.otherCursor)
assert.Equal(t, tt.expected, result)
})
}
}

func TestBlobFeedCursor_CursorKeyCodec(t *testing.T) {
bk := corev2.BlobKey([32]byte{1, 2, 3})
cursors := []*blobstore.BlobFeedCursor{
&blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: nil},
&blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk},
}
for _, cursor := range cursors {
encoded := cursor.ToCursorKey()
c, err := new(blobstore.BlobFeedCursor).FromCursorKey(encoded)
assert.Nil(t, err)
assert.Equal(t, uint64(1), c.RequestedAt)
assert.Equal(t, cursor.BlobKey, c.BlobKey)
}
}

func TestBlobFeedCursor_OrderPreserving(t *testing.T) {
bk1 := corev2.BlobKey([32]byte{1, 2, 3})
bk2 := corev2.BlobKey([32]byte{2, 3, 4})
cursors := []*blobstore.BlobFeedCursor{
{RequestedAt: 100, BlobKey: nil},
{RequestedAt: 100, BlobKey: &bk1},
{RequestedAt: 100, BlobKey: &bk2},
{RequestedAt: 101, BlobKey: nil},
{RequestedAt: 101, BlobKey: &bk1},
}

// Test that ordering is consistent between LessThan and ToCursorKey
for i := 0; i < len(cursors); i++ {
for j := 0; j < len(cursors); j++ {
if i != j {
cursorLessThan := cursors[i].LessThan(cursors[j])
encodedLessThan := cursors[i].ToCursorKey() < cursors[j].ToCursorKey()
assert.Equal(t, encodedLessThan, cursorLessThan)
}
}
}
}

func TestBlobMetadataStoreOperations(t *testing.T) {
ctx := context.Background()
blobKey1, blobHeader1 := newBlob(t)
Expand Down
Loading