Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[finalizer] Update confirmation block number in finalizer #600

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion disperser/batcher/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,21 @@ func (f *finalizer) updateBlobs(ctx context.Context, metadatas []*disperser.Blob
continue
}

if confirmationBlockNumber != uint64(confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber) {
// Confirmation block number has changed due to reorg. Update the confirmation block number in the metadata
err := f.blobStore.UpdateConfirmationBlockNumber(ctx, m, uint32(confirmationBlockNumber))
if err != nil {
f.logger.Error("error updating confirmation block number", "blobKey", blobKey.String(), "err", err)
f.metrics.IncrementNumBlobs("failed")
continue
}
}

// Leave as confirmed if the reorged confirmation block is after the latest finalized block (not yet finalized)
if uint64(confirmationBlockNumber) > lastFinalBlock {
continue
}

confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber = uint32(confirmationBlockNumber)
err = f.blobStore.MarkBlobFinalized(ctx, blobKey)
if err != nil {
f.logger.Error("error marking blob as finalized", "blobKey", blobKey.String(), "err", err)
Expand Down
24 changes: 24 additions & 0 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,30 @@ func (s *BlobMetadataStore) IncrementNumRetries(ctx context.Context, existingMet
return err
}

func (s *BlobMetadataStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint32) error {
updated := *existingMetadata
if updated.ConfirmationInfo == nil {
return fmt.Errorf("failed to update confirmation block number because confirmation info is missing for blob key %s", existingMetadata.GetBlobKey().String())
}

updated.ConfirmationInfo.ConfirmationBlockNumber = confirmationBlockNumber
item, err := MarshalBlobMetadata(&updated)
if err != nil {
return err
}

_, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{
"BlobHash": &types.AttributeValueMemberS{
Value: existingMetadata.BlobHash,
},
"MetadataHash": &types.AttributeValueMemberS{
Value: existingMetadata.MetadataHash,
},
}, item)

return err
}

func (s *BlobMetadataStore) UpdateBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey, updated *disperser.BlobMetadata) error {
item, err := MarshalBlobMetadata(updated)
if err != nil {
Expand Down
15 changes: 12 additions & 3 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,19 @@ func (s *SharedBlobStore) MarkBlobDispersing(ctx context.Context, metadataKey di
}

func (s *SharedBlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationInfo *disperser.ConfirmationInfo) (*disperser.BlobMetadata, error) {
if existingMetadata == nil {
return nil, errors.New("metadata is nil")
}
newMetadata := *existingMetadata
newMetadata.BlobStatus = disperser.InsufficientSignatures
newMetadata.ConfirmationInfo = confirmationInfo
if confirmationInfo != nil {
newMetadata.ConfirmationInfo = confirmationInfo
}
return &newMetadata, s.blobMetadataStore.UpdateBlobMetadata(ctx, existingMetadata.GetBlobKey(), &newMetadata)
}

func (s *SharedBlobStore) MarkBlobFinalized(ctx context.Context, metadataKey disperser.BlobKey) error {
return s.blobMetadataStore.SetBlobStatus(ctx, metadataKey, disperser.Finalized)
func (s *SharedBlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.BlobKey) error {
return s.blobMetadataStore.SetBlobStatus(ctx, blobKey, disperser.Finalized)
}

func (s *SharedBlobStore) MarkBlobProcessing(ctx context.Context, metadataKey disperser.BlobKey) error {
Expand All @@ -185,6 +190,10 @@ func (s *SharedBlobStore) IncrementBlobRetryCount(ctx context.Context, existingM
return s.blobMetadataStore.IncrementNumRetries(ctx, existingMetadata)
}

func (s *SharedBlobStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint32) error {
return s.blobMetadataStore.UpdateConfirmationBlockNumber(ctx, existingMetadata, confirmationBlockNumber)
}

func (s *SharedBlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperser.BlobMetadata) (map[disperser.BlobKey]*core.Blob, error) {
pool := workerpool.New(maxS3BlobFetchWorkers)
resultChan := make(chan blobResultOrError, len(metadata))
Expand Down
9 changes: 9 additions & 0 deletions disperser/common/blobstore/shared_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,17 @@ func TestSharedBlobStore(t *testing.T) {
assert.Nil(t, err)
assertMetadata(t, blobKey, blobSize, requestedAt, disperser.Confirmed, metadata1)

err = sharedStorage.UpdateConfirmationBlockNumber(ctx, metadata1, 151)
assert.Nil(t, err)
metadata1, err = sharedStorage.GetBlobMetadata(ctx, blobKey)
assert.Nil(t, err)
assert.Equal(t, uint32(151), metadata1.ConfirmationInfo.ConfirmationBlockNumber)

err = sharedStorage.MarkBlobFinalized(ctx, blobKey)
assert.Nil(t, err)
metadata1, err = sharedStorage.GetBlobMetadata(ctx, blobKey)
assert.Nil(t, err)
assert.Equal(t, disperser.Finalized, metadata1.BlobStatus)

metadata1, err = sharedStorage.GetBlobMetadata(ctx, blobKey)
assert.Nil(t, err)
Expand Down
16 changes: 16 additions & 0 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -167,6 +168,21 @@ func (q *BlobStore) IncrementBlobRetryCount(ctx context.Context, existingMetadat
return nil
}

func (q *BlobStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint32) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok {
return disperser.ErrBlobNotFound
}

if q.Metadata[existingMetadata.GetBlobKey()].ConfirmationInfo == nil {
return fmt.Errorf("cannot update confirmation block number for blob without confirmation info: %s", existingMetadata.GetBlobKey().String())
}

q.Metadata[existingMetadata.GetBlobKey()].ConfirmationInfo.ConfirmationBlockNumber = confirmationBlockNumber
return nil
}

func (q *BlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperser.BlobMetadata) (map[disperser.BlobKey]*core.Blob, error) {
q.mu.RLock()
defer q.mu.RUnlock()
Expand Down
5 changes: 5 additions & 0 deletions disperser/common/inmem/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ func TestBlobStore(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, disperser.Confirmed, updated.BlobStatus)

err = bs.UpdateConfirmationBlockNumber(ctx, updated, 151)
assert.Nil(t, err)

meta2, err = bs.GetBlobMetadata(ctx, blobKey2)
assert.Nil(t, err)
assert.Equal(t, meta2.BlobStatus, disperser.Confirmed)
assert.Equal(t, uint32(151), meta2.ConfirmationInfo.ConfirmationBlockNumber)

meta1, err = bs.GetBlobMetadata(ctx, blobKey1)
assert.Nil(t, err)
assert.Equal(t, meta1.BlobStatus, disperser.Processing)
Expand Down
2 changes: 2 additions & 0 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ type BlobStore interface {
MarkBlobFailed(ctx context.Context, blobKey BlobKey) error
// IncrementBlobRetryCount increments the retry count of a blob
IncrementBlobRetryCount(ctx context.Context, existingMetadata *BlobMetadata) error
// UpdateConfirmationBlockNumber updates the confirmation block number of a blob
UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *BlobMetadata, confirmationBlockNumber uint32) error
// GetBlobsByMetadata retrieves a list of blobs given a list of metadata
GetBlobsByMetadata(ctx context.Context, metadata []*BlobMetadata) (map[BlobKey]*core.Blob, error)
// GetBlobMetadataByStatus returns a list of blob metadata for blobs with the given status
Expand Down
Loading