diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go index c782c5666..6438fc64d 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go @@ -124,15 +124,30 @@ func (cursor *BlobFeedCursor) LessThan(other *BlobFeedCursor) bool { return false } -// ToRequestedAtBlobKey encodes the cursor into a string that preserves ordering. +// ToCursorKey encodes the cursor into a string that preserves ordering. // For any two cursors A and B: -// - A < B if and only if A.ToRequestedAtBlobKey() < B.ToRequestedAtBlobKey() -// - A == B if and only if A.ToRequestedAtBlobKey() == B.ToRequestedAtBlobKey() -func (cursor *BlobFeedCursor) ToRequestedAtBlobKey() string { - if cursor.BlobKey == nil { - return encodeRequestedAtBlobKey(cursor.RequestedAt, "") +// - A < B if and only if A.ToCursorKey() < B.ToCursorKey() +// - A == B if and only if A.ToCursorKey() == B.ToCursorKey() +func (cursor *BlobFeedCursor) ToCursorKey() string { + return encodeBlobFeedCursorKey(cursor.RequestedAt, cursor.BlobKey) +} + +// FromCursorKey decodes the cursor key string back to the cursor. +func (cursor *BlobFeedCursor) FromCursorKey(encoded string) (*BlobFeedCursor, error) { + requestedAt, blobKey, err := decodeBlobFeedCursorKey(encoded) + if err != nil { + return nil, err + } + if len(blobKey) == 0 { + return &BlobFeedCursor{ + RequestedAt: requestedAt, + }, nil + } - return encodeRequestedAtBlobKey(cursor.RequestedAt, cursor.BlobKey.Hex()) + return &BlobFeedCursor{ + RequestedAt: requestedAt, + BlobKey: blobKey, + }, nil } // BlobMetadataStore is a blob metadata storage backed by DynamoDB @@ -326,7 +341,7 @@ func (s *BlobMetadataStore) queryBucketMetadata( return metadata, nil } -// GetBlobMetadataByRequestedAt returns blobs (as BlobMetadata) that are in cusor position +// GetBlobMetadataByRequestedAt returns blobs (as BlobMetadata) that are in cursor position // range (start, end]. Blobs returned are in cursor order. // // If limit > 0, returns at most that many blobs. If limit <= 0, returns all blobs in range. @@ -342,8 +357,8 @@ func (s *BlobMetadataStore) GetBlobMetadataByRequestedAt( } startBucket, endBucket := s.getBucketIDRange(start.RequestedAt, end.RequestedAt) - startKey := start.ToRequestedAtBlobKey() - endKey := end.ToRequestedAtBlobKey() + startKey := start.ToCursorKey() + endKey := end.ToCursorKey() result := make([]*v2.BlobMetadata, 0) var lastProcessedCursor *BlobFeedCursor @@ -1092,7 +1107,7 @@ func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + blobKey.Hex()} fields["SK"] = &types.AttributeValueMemberS{Value: blobMetadataSK} fields["RequestedAtBucket"] = &types.AttributeValueMemberS{Value: computeRequestedAtBucket(metadata.RequestedAt)} - fields["RequestedAtBlobKey"] = &types.AttributeValueMemberS{Value: encodeRequestedAtBlobKey(metadata.RequestedAt, blobKey.Hex())} + fields["RequestedAtBlobKey"] = &types.AttributeValueMemberS{Value: encodeBlobFeedCursorKey(metadata.RequestedAt, &blobKey)} return fields, nil } @@ -1369,17 +1384,50 @@ func computeRequestedAtBucket(requestedAt uint64) string { return fmt.Sprintf("%d", id) } -// encodeRequestedAtBlobKey encodes into string which +// encodeBlobFeedCursorKey encodes into string which // preserves the order. -func encodeRequestedAtBlobKey(requestedAt uint64, blobKey string) string { +func encodeBlobFeedCursorKey(requestedAt uint64, blobKey *corev2.BlobKey) string { result := make([]byte, 40) // 8 bytes for timestamp + 32 bytes for blobKey // Write timestamp binary.BigEndian.PutUint64(result[:8], requestedAt) - if blobKey != "" { - copy(result[8:], []byte(blobKey)) + if blobKey != nil { + copy(result[8:], blobKey[:]) } // Use hex encoding to preserve byte ordering return hex.EncodeToString(result) } + +// decodeBlobFeedCursorKey decodes the cursor key back to . +func decodeBlobFeedCursorKey(encoded string) (uint64, *corev2.BlobKey, error) { + // Decode hex string + bytes, err := hex.DecodeString(encoded) + if err != nil { + return 0, nil, fmt.Errorf("invalid hex encoding: %w", err) + } + + // Check length + if len(bytes) != 40 { // 8 bytes timestamp + 32 bytes blobKey + return 0, nil, fmt.Errorf("invalid length: expected 40 bytes, got %d", len(bytes)) + } + + // Get timestamp + requestedAt := binary.BigEndian.Uint64(bytes[:8]) + + // Check if the remaining bytes are all zeros + allZeros := true + for i := 8; i < len(bytes); i++ { + if bytes[i] != 0 { + allZeros = false + break + } + } + + if allZeros { + return requestedAt, nil, nil + } + var bk corev2.BlobKey + copy(bk[:], bytes[8:]) + return requestedAt, &bk, nil +} diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index 47df8c371..b1c841812 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -31,6 +31,152 @@ func checkBlobKeyEqual(t *testing.T, blobKey corev2.BlobKey, blobHeader *corev2. assert.Equal(t, blobKey, bk) } +func TestBlobFeedCursor_Equal(t *testing.T) { + bk1 := corev2.BlobKey([32]byte{1, 2, 3}) + bk2 := corev2.BlobKey([32]byte{2, 3, 4}) + tests := []struct { + cursor *blobstore.BlobFeedCursor + requestedAt uint64 + blobKey *corev2.BlobKey + expected bool + }{ + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + requestedAt: 1, + blobKey: &bk1, + expected: true, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: nil}, + requestedAt: 1, + blobKey: nil, + expected: true, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + requestedAt: 2, + blobKey: &bk1, + expected: false, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + requestedAt: 1, + blobKey: nil, + expected: false, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: nil}, + requestedAt: 1, + blobKey: &bk1, + expected: false, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + requestedAt: 1, + blobKey: &bk2, + expected: false, + }, + } + + for _, tt := range tests { + t.Run("Equal", func(t *testing.T) { + result := tt.cursor.Equal(tt.requestedAt, tt.blobKey) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestBlobFeedCursor_LessThan(t *testing.T) { + bk1 := corev2.BlobKey([32]byte{1, 2, 3}) + bk2 := corev2.BlobKey([32]byte{2, 3, 4}) + tests := []struct { + cursor *blobstore.BlobFeedCursor + otherCursor *blobstore.BlobFeedCursor + expected bool + }{ + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 2, BlobKey: &bk1}, + expected: true, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 2, BlobKey: &bk1}, + otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + expected: false, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + expected: false, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: nil}, + otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + expected: true, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: nil}, + expected: false, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk2}, + expected: true, + }, + { + cursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk2}, + otherCursor: &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk1}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run("LessThan", func(t *testing.T) { + result := tt.cursor.LessThan(tt.otherCursor) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestBlobFeedCursor_CursorKeyCodec(t *testing.T) { + bk := corev2.BlobKey([32]byte{1, 2, 3}) + cursors := []*blobstore.BlobFeedCursor{ + &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: nil}, + &blobstore.BlobFeedCursor{RequestedAt: 1, BlobKey: &bk}, + } + for _, cursor := range cursors { + encoded := cursor.ToCursorKey() + c, err := new(blobstore.BlobFeedCursor).FromCursorKey(encoded) + assert.Nil(t, err) + assert.Equal(t, uint64(1), c.RequestedAt) + assert.Equal(t, cursor.BlobKey, c.BlobKey) + } +} + +func TestBlobFeedCursor_OrderPreserving(t *testing.T) { + bk1 := corev2.BlobKey([32]byte{1, 2, 3}) + bk2 := corev2.BlobKey([32]byte{2, 3, 4}) + cursors := []*blobstore.BlobFeedCursor{ + {RequestedAt: 100, BlobKey: nil}, + {RequestedAt: 100, BlobKey: &bk1}, + {RequestedAt: 100, BlobKey: &bk2}, + {RequestedAt: 101, BlobKey: nil}, + {RequestedAt: 101, BlobKey: &bk1}, + } + + // Test that ordering is consistent between LessThan and ToCursorKey + for i := 0; i < len(cursors); i++ { + for j := 0; j < len(cursors); j++ { + if i != j { + cursorLessThan := cursors[i].LessThan(cursors[j]) + encodedLessThan := cursors[i].ToCursorKey() < cursors[j].ToCursorKey() + assert.Equal(t, encodedLessThan, cursorLessThan) + } + } + } +} + func TestBlobMetadataStoreOperations(t *testing.T) { ctx := context.Background() blobKey1, blobHeader1 := newBlob(t)