Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: messageHash attaribute added in SQLite + migration script ready #2159

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions migrations/message_store/00008_updatePrimaryKey_rm_id.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
ALTER TABLE message RENAME TO message_backup;

CREATE TABLE IF NOT EXISTS message(
pubsubTopic BLOB NOT NULL,
contentTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
messageHash BLOB,
storedAt INTEGER NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (storedAt, messageHash)
) WITHOUT ROWID;

INSERT OR IGNORE INTO message(pubsubTopic, contentTopic, payload, version, timestamp, messageHash, storedAt)
SELECT pubsubTopic, contentTopic, payload, version, timestamp, id, storedAt
FROM message_backup;

DROP TABLE message_backup;
6 changes: 3 additions & 3 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic,
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message, pubsubTopic)
messageHash: computeDigest(message, pubsubTopic)
)

suite "Postgres driver":
Expand Down Expand Up @@ -87,10 +87,10 @@ suite "Postgres driver":
require:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, actualMsg, digest, storeTimestamp) = item
let (pubsubTopic, actualMsg, messageHash, storeTimestamp) = item
actualMsg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic and
toHex(computedDigest.data) == toHex(digest) and
toHex(computedDigest.data) == toHex(messageHash) and
toHex(actualMsg.payload) == toHex(msg.payload)

(await driver.close()).expect("driver to close")
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message, pubsubTopic)
messageHash: computeDigest(message, pubsubTopic)
)

suite "Postgres driver - query by content topic":
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_archive/test_driver_queue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
cursor = Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MessageDigest(data: data),
messageHash: MessageHash(data: data),
pubsubTopic: "test-pubsub-topic"
)

Expand Down
30 changes: 15 additions & 15 deletions tests/waku_archive/test_driver_queue_index.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,44 @@ suite "Queue Driver - index":

## Test vars
let
smallIndex1 = Index(digest: hashFromStr("1234"),
smallIndex1 = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime
smallIndex2 = Index(messageHash: hashFromStr("1234567"), # messageHash is less significant than senderTime
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
largeIndex1 = Index(digest: hashFromStr("1234"),
largeIndex1 = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1
largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1
largeIndex2 = Index(messageHash: hashFromStr("12345"), # only messageHash differs from smallIndex1
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
eqIndex1 = Index(digest: hashFromStr("0003"),
eqIndex1 = Index(messageHash: hashFromStr("0003"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex2 = Index(digest: hashFromStr("0003"),
eqIndex2 = Index(messageHash: hashFromStr("0003"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex3 = Index(digest: hashFromStr("0003"),
eqIndex3 = Index(messageHash: hashFromStr("0003"),
receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons
senderTime: getNanosecondTime(54321))
diffPsTopic = Index(digest: hashFromStr("1234"),
diffPsTopic = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000),
pubsubTopic: "zzzz")
noSenderTime1 = Index(digest: hashFromStr("1234"),
noSenderTime1 = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(1100),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
noSenderTime2 = Index(digest: hashFromStr("1234"),
noSenderTime2 = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(10000),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
noSenderTime3 = Index(digest: hashFromStr("1234"),
noSenderTime3 = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(1200),
senderTime: getNanosecondTime(0),
pubsubTopic: "aaaa")
noSenderTime4 = Index(digest: hashFromStr("0"),
noSenderTime4 = Index(messageHash: hashFromStr("0"),
receiverTime: getNanosecondTime(1200),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
Expand Down Expand Up @@ -156,8 +156,8 @@ suite "Queue Driver - index":

## Then
check:
index.digest.data.len != 0
index.digest.data.len == 32 # sha2 output length in bytes
index.messageHash.data.len != 0
index.messageHash.data.len == 32 # sha2 output length in bytes
index.receiverTime == ts2 # the receiver timestamp should be a non-zero value
index.senderTime == ts
index.pubsubTopic == DefaultContentTopic
Expand All @@ -177,4 +177,4 @@ suite "Queue Driver - index":

## Then
check:
index1.digest == index2.digest
index1.messageHash == index2.messageHash
6 changes: 3 additions & 3 deletions tests/waku_archive/test_driver_queue_pagination.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ proc getTestQueueDriver(numMessages: int): QueueDriver =
index: Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MessageDigest(data: data)
messageHash: MessageHash(data: data)
)
)
discard testQueueDriver.add(msg)
Expand Down Expand Up @@ -156,7 +156,7 @@ procSuite "Queue driver - pagination":
pubsubTopic: DefaultPubsubTopic,
senderTime: msg.timestamp,
storeTime: msg.timestamp,
digest: computeDigest(msg, DefaultPubsubTopic)
messageHash: computeDigest(msg, DefaultPubsubTopic)
).toIndex()

let
Expand Down Expand Up @@ -337,7 +337,7 @@ procSuite "Queue driver - pagination":
pubsubTopic: DefaultPubsubTopic,
senderTime: msg.timestamp,
storeTime: msg.timestamp,
digest: computeDigest(msg, DefaultPubsubTopic)
messageHash: computeDigest(msg, DefaultPubsubTopic)
).toIndex()

let
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_archive/test_driver_queue_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message, pubsubTopic)
messageHash: computeDigest(message, pubsubTopic)
)


Expand Down
2 changes: 1 addition & 1 deletion tests/waku_archive/test_driver_sqlite.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ suite "SQLite driver":
check:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
let (pubsubTopic, msg, messageHash, storeTimestamp) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic

Expand Down
46 changes: 45 additions & 1 deletion tests/waku_archive/test_driver_sqlite_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message, pubsubTopic)
messageHash: computeDigest(message, pubsubTopic)
)


Expand Down Expand Up @@ -423,6 +423,50 @@ suite "SQLite driver - query by pubsub topic":

## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "pubSubTopic messageHash match":
## Given
const pubsubTopic1 = "test-pubsub-topic1"
const pubsubTopic2 = "test-pubsub-topic2"
# take 2 variables to hold the message hashes
var msgHash1: seq[byte]
var msgHash2: seq[byte]

let driver = newTestSqliteDriver()
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()

let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(1))
putFutures.add(driver.put(pubsubTopic1, msg1, computeDigest(msg1, pubsubTopic1), msg1.timestamp))

let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(2))
putFutures.add(driver.put(pubsubTopic2, msg2, computeDigest(msg2, pubsubTopic2), msg2.timestamp))

discard waitFor allFinished(putFutures)

# get the messages from the database
let storedMsg = (waitFor driver.getAllMessages()).tryGet()

check:
# there needs to be two messages
storedMsg.len > 0
storedMsg.len == 2

# get the individual messages and message hash values
@[storedMsg[0]].all do (item1: auto) -> bool:
let (gotPubsubTopic1, gotMsg1, messageHash1, timestamp1) = item1
msgHash1 = messageHash1
true

@[storedMsg[1]].all do (item2: auto) -> bool:
let (gotPubsubTopic2, gotMsg2, messageHash2, timestamp2) = item2
msgHash2 = messageHash2
true

# compare of the messge hashes, given the context, they should be different
msgHash1 != msgHash2

## Cleanup
(await driver.close()).expect("driver to close")


suite "SQLite driver - query by cursor":
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_archive/test_retention_policy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ suite "Waku Archive - Retention policy":
check:
storedMsg.len == capacity
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
let (pubsubTopic, msg, messageHash, storeTimestamp) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic

Expand Down
2 changes: 1 addition & 1 deletion tests/waku_archive/test_waku_archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message, pubsubTopic)
messageHash: computeDigest(message, pubsubTopic)
)


Expand Down
6 changes: 3 additions & 3 deletions tests/waku_store/test_resume.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ procSuite "Waku Store - resume store":
]

for msg in msgList:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp).isOk()

store

Expand All @@ -76,7 +76,7 @@ procSuite "Waku Store - resume store":
]

for msg in msgList2:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp).isOk()

store

Expand Down Expand Up @@ -272,7 +272,7 @@ suite "WakuNode - waku store":
# Insert the same message in both node's store
let
receivedTime3 = now() + getNanosecondTime(10)
digest3 = computeDigest(msg3)
digest3 = computeDigest(msg3, DefaultPubsubTopic)
require server.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()
require client.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()

Expand Down
6 changes: 3 additions & 3 deletions tests/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): HistoryC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: waku_archive.computeDigest(message, pubsubTopic)
messageHash: waku_archive.computeDigest(message, pubsubTopic)
)

procSuite "WakuNode - Store":
Expand All @@ -57,8 +57,8 @@ procSuite "WakuNode - Store":
let driver = newTestArchiveDriver()

for msg in msgListA:
let msg_digest = waku_archive.computeDigest(msg, DefaultPubsubTopic)
require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp)).isOk()
let msg_hash = waku_archive.computeDigest(msg, DefaultPubsubTopic)
require (waitFor driver.put(DefaultPubsubTopic, msg, msg_hash, msg.timestamp)).isOk()

driver

Expand Down
22 changes: 11 additions & 11 deletions tests/wakunode_rest/test_rest_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ logScope:

proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] =
let
digest = waku_archive.computeDigest(message, pubsubTopic)
messageHash = waku_archive.computeDigest(message, pubsubTopic)
receivedTime = if message.timestamp > 0: message.timestamp
else: getNanosecondTime(getTime().toUnixFloat())

store.put(pubsubTopic, message, digest, receivedTime)
store.put(pubsubTopic, message, messageHash, receivedTime)

# Creates a new WakuNode
proc testWakuNode(): WakuNode =
Expand All @@ -53,19 +53,19 @@ proc testWakuNode(): WakuNode =
################################################################################
procSuite "Waku v2 Rest API - Store":

asyncTest "MessageDigest <-> string conversions":
# Validate MessageDigest conversion from a WakuMessage obj
asyncTest "MessageHash <-> string conversions":
# Validate MessageHash conversion from a WakuMessage obj
let wakuMsg = WakuMessage(
contentTopic: "Test content topic",
payload: @[byte('H'), byte('i'), byte('!')]
)

let messageDigest = waku_store.computeDigest(wakuMsg)
let restMsgDigest = some(messageDigest.toRestStringMessageDigest())
let messageHash = waku_store.computeDigest(wakuMsg, DefaultPubsubTopic)
let restMsgDigest = some(messageHash.toRestStringMessageDigest())
let parsedMsgDigest = restMsgDigest.parseMsgDigest().value

check:
messageDigest == parsedMsgDigest.get()
messageHash == parsedMsgDigest.get()

# Random validation. Obtained the raw values manually
let expected = some("ZjNhM2Q2NDkwMTE0MjMzNDg0MzJlMDdiZGI3NzIwYTc%3D")
Expand Down Expand Up @@ -129,7 +129,7 @@ procSuite "Waku v2 Rest API - Store":
"6", # end time
"", # sender time
"", # store time
"", # base64-encoded digest
"", # base64-encoded messageHash
"", # empty implies default page size
"true" # ascending
)
Expand Down Expand Up @@ -198,7 +198,7 @@ procSuite "Waku v2 Rest API - Store":
var reqPubsubTopic = DefaultPubsubTopic
var reqSenderTime = Timestamp(0)
var reqStoreTime = Timestamp(0)
var reqDigest = waku_store.MessageDigest()
var reqHash = waku_store.MessageHash()

for i in 0..<2:
let response =
Expand All @@ -210,7 +210,7 @@ procSuite "Waku v2 Rest API - Store":
"", # end time. Empty ignores the field.
encodeUrl($reqSenderTime), # sender time
encodeUrl($reqStoreTime), # store time
reqDigest.toRestStringMessageDigest(), # base64-encoded digest. Empty ignores the field.
reqHash.toRestStringMessageDigest(), # base64-encoded hash. Empty ignores the field.
"7", # page size. Empty implies default page size.
"true" # ascending
)
Expand All @@ -224,7 +224,7 @@ procSuite "Waku v2 Rest API - Store":
# populate the cursor for next page
if response.data.cursor.isSome():
reqPubsubTopic = response.data.cursor.get().pubsubTopic
reqDigest = response.data.cursor.get().digest
reqHash = response.data.cursor.get().messageHash
reqSenderTime = response.data.cursor.get().senderTime
reqStoreTime = response.data.cursor.get().storeTime

Expand Down
4 changes: 2 additions & 2 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ proc toArchiveQuery(request: HistoryQuery): ArchiveQuery =
ArchiveQuery(
pubsubTopic: request.pubsubTopic,
contentTopics: request.contentTopics,
cursor: request.cursor.map(proc(cursor: HistoryCursor): ArchiveCursor = ArchiveCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)),
cursor: request.cursor.map(proc(cursor: HistoryCursor): ArchiveCursor = ArchiveCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, messageHash: cursor.messageHash)),
startTime: request.startTime,
endTime: request.endTime,
pageSize: request.pageSize.uint,
Expand All @@ -793,7 +793,7 @@ proc toHistoryResult*(res: ArchiveResult): HistoryResult =
let response = res.get()
ok(HistoryResponse(
messages: response.messages,
cursor: response.cursor.map(proc(cursor: ArchiveCursor): HistoryCursor = HistoryCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)),
cursor: response.cursor.map(proc(cursor: ArchiveCursor): HistoryCursor = HistoryCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, messageHash: cursor.messageHash)),
))

proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} =
Expand Down
Loading
Loading