Skip to content

Commit

Permalink
v2 Retrieval Client (#953)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Dec 7, 2024
1 parent 100eb3e commit dd42f16
Showing 1 changed file with 205 additions and 0 deletions.
205 changes: 205 additions & 0 deletions api/clients/retrieval_client_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package clients

import (
"context"
"errors"
"fmt"

grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/gammazero/workerpool"
)

// RetrievalClientV2 is an object that can retrieve blobs from the DA nodes.
// To retrieve a blob from the relay, use RelayClient instead.
type RetrievalClientV2 interface {
// GetBlob downloads chunks of a blob from operator network and reconstructs the blob.
GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error)
}

type retrievalClientV2 struct {
logger logging.Logger
ethClient core.Reader
indexedChainState core.IndexedChainState
verifier encoding.Verifier
numConnections int
}

// NewRetrievalClientV2 creates a new retrieval client.
func NewRetrievalClientV2(
logger logging.Logger,
ethClient core.Reader,
chainState core.IndexedChainState,
verifier encoding.Verifier,
numConnections int,
) RetrievalClientV2 {
return &retrievalClientV2{
logger: logger.With("component", "RetrievalClient"),
ethClient: ethClient,
indexedChainState: chainState,
verifier: verifier,
numConnections: numConnections,
}
}

func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error) {
blobKey, err := blobHeader.BlobKey()
if err != nil {
return nil, err
}

commitmentBatch := []encoding.BlobCommitments{blobHeader.BlobCommitments}
err = r.verifier.VerifyCommitEquivalenceBatch(commitmentBatch)
if err != nil {
return nil, err
}

indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, uint(referenceBlockNumber), []core.QuorumID{quorumID})
if err != nil {
return nil, err
}
operators, ok := indexedOperatorState.Operators[quorumID]
if !ok {
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
}

blobVersions, err := r.ethClient.GetAllVersionedBlobParams(ctx)
if err != nil {
return nil, err
}

blobParam, ok := blobVersions[blobHeader.BlobVersion]
if !ok {
return nil, fmt.Errorf("invalid blob version %d", blobHeader.BlobVersion)
}

encodingParams, err := blobHeader.GetEncodingParams(blobParam)
if err != nil {
return nil, err
}

assignments, err := corev2.GetAssignments(indexedOperatorState.OperatorState, blobParam, quorumID)
if err != nil {
return nil, errors.New("failed to get assignments")
}

// Fetch chunks from all operators
chunksChan := make(chan RetrievedChunks, len(operators))
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
opInfo := indexedOperatorState.IndexedOperators[opID]
pool.Submit(func() {
r.getChunksFromOperator(ctx, opID, opInfo, blobKey, quorumID, chunksChan)
})
}

var chunks []*encoding.Frame
var indices []encoding.ChunkNumber
// TODO(ian-shim): if we gathered enough chunks, cancel remaining RPC calls
for i := 0; i < len(operators); i++ {
reply := <-chunksChan
if reply.Err != nil {
r.logger.Error("failed to get chunks from operator", "operator", reply.OperatorID.Hex(), "err", reply.Err)
continue
}
assignment, ok := assignments[reply.OperatorID]
if !ok {
return nil, fmt.Errorf("no assignment to operator %s", reply.OperatorID.Hex())
}

assignmentIndices := make([]uint, len(assignment.GetIndices()))
for i, index := range assignment.GetIndices() {
assignmentIndices[i] = uint(index)
}

err = r.verifier.VerifyFrames(reply.Chunks, assignmentIndices, blobHeader.BlobCommitments, encodingParams)
if err != nil {
r.logger.Error("failed to verify chunks from operator", "operator", reply.OperatorID.Hex(), "err", err)
continue
} else {
r.logger.Info("verified chunks from operator", "operator", reply.OperatorID.Hex())
}

chunks = append(chunks, reply.Chunks...)
indices = append(indices, assignmentIndices...)
}

return r.verifier.Decode(
chunks,
indices,
encodingParams,
uint64(blobHeader.BlobCommitments.Length)*encoding.BYTES_PER_SYMBOL,
)
}

func (r *retrievalClientV2) getChunksFromOperator(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
blobKey corev2.BlobKey,
quorumID core.QuorumID,
chunksChan chan RetrievedChunks,
) {
conn, err := grpc.NewClient(
core.OperatorSocket(opInfo.Socket).GetRetrievalSocket(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
defer func() {
err := conn.Close()
if err != nil {
r.logger.Error("failed to close connection", "err", err)
}
}()
if err != nil {
chunksChan <- RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
}
return
}

n := grpcnode.NewRetrievalClient(conn)
request := &grpcnode.GetChunksRequest{
BlobKey: blobKey[:],
QuorumId: uint32(quorumID),
}

reply, err := n.GetChunks(ctx, request)
if err != nil {
chunksChan <- RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
}
return
}

chunks := make([]*encoding.Frame, len(reply.GetChunks()))
for i, data := range reply.GetChunks() {
var chunk *encoding.Frame
chunk, err = new(encoding.Frame).DeserializeGnark(data)
if err != nil {
chunksChan <- RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
}
return
}

chunks[i] = chunk
}
chunksChan <- RetrievedChunks{
OperatorID: opID,
Err: nil,
Chunks: chunks,
}
}

0 comments on commit dd42f16

Please sign in to comment.