Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: using msg hash in hex and adding store test #16

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions waku/common/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package common
import (
"encoding/json"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)

Expand All @@ -13,13 +12,13 @@ import (
type Envelope interface {
Message() *pb.WakuMessage
PubsubTopic() string
Hash() pb.MessageHash
Hash() MessageHash
}

type envelopeImpl struct {
msg *pb.WakuMessage
topic string
hash pb.MessageHash
hash MessageHash
}

type tmpWakuMessageJson struct {
Expand All @@ -35,7 +34,7 @@ type tmpWakuMessageJson struct {
type tmpEnvelopeStruct struct {
WakuMessage tmpWakuMessageJson `json:"wakuMessage"`
PubsubTopic string `json:"pubsubTopic"`
MessageHash string `json:"messageHash"`
MessageHash MessageHash `json:"messageHash"`
}

// NewEnvelope creates a new Envelope from a json string generated in nwaku
Expand All @@ -46,7 +45,6 @@ func NewEnvelope(jsonEventStr string) (Envelope, error) {
return nil, err
}

hash, err := hexutil.Decode(tmpEnvelopeStruct.MessageHash)
if err != nil {
return nil, err
}
Expand All @@ -62,7 +60,7 @@ func NewEnvelope(jsonEventStr string) (Envelope, error) {
RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof,
},
topic: tmpEnvelopeStruct.PubsubTopic,
hash: pb.ToMessageHash(hash),
hash: tmpEnvelopeStruct.MessageHash,
}, nil
}

Expand All @@ -74,6 +72,6 @@ func (e *envelopeImpl) PubsubTopic() string {
return e.topic
}

func (e *envelopeImpl) Hash() pb.MessageHash {
func (e *envelopeImpl) Hash() MessageHash {
return e.hash
}
38 changes: 38 additions & 0 deletions waku/common/message_hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package common

import (
"encoding/hex"
"fmt"
)

// MessageHash represents an unique identifier for a message within a pubsub topic
type MessageHash string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not strictly correct. A message hash is array[byte, 32]
I'd happier if we rename it a little ;P

Suggested change
type MessageHash string
type MessageHashHexStr string


func ToMessageHash(val string) (MessageHash, error) {
if len(val) == 0 {
return "", fmt.Errorf("empty string not allowed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no formatting is being used, you can use errors.New("empty string not allowed") here and line 17

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh good catch! It was either a copy-paste mistake, or it was written by AI and didn't notice that mistake - don't remember lol

}

if len(val) < 2 || val[:2] != "0x" {
return "", fmt.Errorf("string must start with 0x")
}

// Remove "0x" prefix for hex decoding
hexStr := val[2:]

// Verify the remaining string is valid hex
_, err := hex.DecodeString(hexStr)
if err != nil {
return "", fmt.Errorf("invalid hex string: %v", err)
}

return MessageHash(val), nil
}

func (h MessageHash) String() string {
return string(h)
}

func (h MessageHash) Bytes() ([]byte, error) {
return hex.DecodeString(string(h))
}
28 changes: 28 additions & 0 deletions waku/common/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package common

type StoreQueryRequest struct {
RequestId string `json:"request_id"`
IncludeData bool `json:"include_data"`
PubsubTopic string `json:"pubsub_topic,omitempty"`
ContentTopics *[]string `json:"content_topics,omitempty"`
TimeStart *int64 `json:"time_start,omitempty"`
TimeEnd *int64 `json:"time_end,omitempty"`
MessageHashes *[]MessageHash `json:"message_hashes,omitempty"`
PaginationCursor MessageHash `json:"pagination_cursor,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be *MessageHash?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True! Forgot that it was optional. Updated it :)

PaginationForward bool `json:"pagination_forward"`
PaginationLimit *uint64 `json:"pagination_limit,omitempty"`
}

type StoreMessageResponse struct {
WakuMessage tmpWakuMessageJson `json:"message"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The messages are optional and only included if include_data is true, so WakuMessage should be a pointer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True! Changed it :)

PubsubTopic string `json:"pubsubTopic"`
MessageHash MessageHash `json:"messageHash"`
}

type StoreQueryResponse struct {
RequestId string `json:"requestId,omitempty"`
StatusCode *uint32 `json:"statusCode,omitempty"`
StatusDesc string `json:"statusDesc,omitempty"`
Messages *[]StoreMessageResponse `json:"messages,omitempty"`
PaginationCursor MessageHash `json:"paginationCursor,omitempty"`
}
23 changes: 11 additions & 12 deletions waku/nwaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,15 +323,13 @@ import (
"time"
"unsafe"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
libp2pproto "github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/waku-go-bindings/waku/common"
"go.uber.org/zap"
Expand All @@ -347,6 +345,7 @@ type WakuConfig struct {
Nodekey string `json:"nodekey,omitempty"`
Relay bool `json:"relay,omitempty"`
Store bool `json:"store,omitempty"`
LegacyStore bool `json:"legacyStore"`
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I didn't add the omitempty because if we set it manually to false, it is considered empty and not sent. But the default in nwaku is true, and therefore we need to explicitly send it whenever is set to false

Storenode string `json:"storenode,omitempty"`
StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"`
StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"`
Expand Down Expand Up @@ -827,7 +826,7 @@ func (n *WakuNode) Version() (string, error) {
return "", errors.New(errMsg)
}

func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) {
func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *common.StoreQueryRequest, peerInfo peer.AddrInfo) (*common.StoreQueryResponse, error) {
timeoutMs := getContextTimeoutMilliseconds(ctx)

b, err := json.Marshal(storeRequest)
Expand Down Expand Up @@ -856,24 +855,24 @@ func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQu

if C.getRet(resp) == C.RET_OK {
jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
storeQueryResponse := &storepb.StoreQueryResponse{}
err = json.Unmarshal([]byte(jsonResponseStr), storeQueryResponse)
storeQueryResponse := common.StoreQueryResponse{}
err = json.Unmarshal([]byte(jsonResponseStr), &storeQueryResponse)
if err != nil {
return nil, err
}
return storeQueryResponse, nil
return &storeQueryResponse, nil
}
errMsg := "error WakuStoreQuery: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, errors.New(errMsg)
}

func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (common.MessageHash, error) {
timeoutMs := getContextTimeoutMilliseconds(ctx)

jsonMsg, err := json.Marshal(message)
if err != nil {
return pb.MessageHash{}, err
return common.MessageHash(""), err
}

wg := sync.WaitGroup{}
Expand All @@ -890,14 +889,14 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu
wg.Wait()
if C.getRet(resp) == C.RET_OK {
msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
msgHashBytes, err := hexutil.Decode(msgHash)
parsedMsgHash, err := common.ToMessageHash(msgHash)
if err != nil {
return pb.MessageHash{}, err
return common.MessageHash(""), err
}
return pb.ToMessageHash(msgHashBytes), nil
return parsedMsgHash, nil
}
errMsg := "WakuRelayPublish: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return pb.MessageHash{}, errors.New(errMsg)
return common.MessageHash(""), errors.New(errMsg)
}

func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) {
Expand Down
Loading