diff --git a/migrations/message_store/00008_updatePrimaryKey_rm_id.up.sql b/migrations/message_store/00008_updatePrimaryKey_rm_id.up.sql new file mode 100644 index 0000000000..fd8c4aded2 --- /dev/null +++ b/migrations/message_store/00008_updatePrimaryKey_rm_id.up.sql @@ -0,0 +1,18 @@ +ALTER TABLE message RENAME TO message_backup; + +CREATE TABLE IF NOT EXISTS message( + pubsubTopic BLOB NOT NULL, + contentTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + messageHash BLOB, + storedAt INTEGER NOT NULL, + CONSTRAINT messageIndex PRIMARY KEY (storedAt, messageHash) +) WITHOUT ROWID; + +INSERT OR IGNORE INTO message(pubsubTopic, contentTopic, payload, version, timestamp, messageHash, storedAt) + SELECT pubsubTopic, contentTopic, payload, version, timestamp, id, storedAt + FROM message_backup; + +DROP TABLE message_backup; \ No newline at end of file diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 149c43aa05..8f90dc86ef 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -19,7 +19,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message, pubsubTopic) + messageHash: computeDigest(message, pubsubTopic) ) suite "Postgres driver": @@ -87,10 +87,10 @@ suite "Postgres driver": require: storedMsg.len == 1 storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, actualMsg, digest, storeTimestamp) = item + let (pubsubTopic, actualMsg, messageHash, storeTimestamp) = item actualMsg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and - toHex(computedDigest.data) == toHex(digest) and + toHex(computedDigest.data) == toHex(messageHash) and toHex(actualMsg.payload) == toHex(msg.payload) (await driver.close()).expect("driver to close") diff --git a/tests/waku_archive/test_driver_postgres_query.nim b/tests/waku_archive/test_driver_postgres_query.nim index ea174eab05..ea043815e2 100644 --- a/tests/waku_archive/test_driver_postgres_query.nim +++ b/tests/waku_archive/test_driver_postgres_query.nim @@ -37,7 +37,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message, pubsubTopic) + messageHash: computeDigest(message, pubsubTopic) ) suite "Postgres driver - query by content topic": diff --git a/tests/waku_archive/test_driver_queue.nim b/tests/waku_archive/test_driver_queue.nim index 852697ea0b..d38416451b 100644 --- a/tests/waku_archive/test_driver_queue.nim +++ b/tests/waku_archive/test_driver_queue.nim @@ -23,7 +23,7 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage = cursor = Index( receiverTime: Timestamp(i), senderTime: Timestamp(i), - digest: MessageDigest(data: data), + messageHash: MessageHash(data: data), pubsubTopic: "test-pubsub-topic" ) diff --git a/tests/waku_archive/test_driver_queue_index.nim b/tests/waku_archive/test_driver_queue_index.nim index d5b1136464..3fcee31904 100644 --- a/tests/waku_archive/test_driver_queue_index.nim +++ b/tests/waku_archive/test_driver_queue_index.nim @@ -31,44 +31,44 @@ suite "Queue Driver - index": ## Test vars let - smallIndex1 = Index(digest: hashFromStr("1234"), + smallIndex1 = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(1000)) - smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime + smallIndex2 = Index(messageHash: hashFromStr("1234567"), # messageHash is less significant than senderTime receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(1000)) - largeIndex1 = Index(digest: hashFromStr("1234"), + largeIndex1 = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1 - largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1 + largeIndex2 = Index(messageHash: hashFromStr("12345"), # only messageHash differs from smallIndex1 receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(1000)) - eqIndex1 = Index(digest: hashFromStr("0003"), + eqIndex1 = Index(messageHash: hashFromStr("0003"), receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(54321)) - eqIndex2 = Index(digest: hashFromStr("0003"), + eqIndex2 = Index(messageHash: hashFromStr("0003"), receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(54321)) - eqIndex3 = Index(digest: hashFromStr("0003"), + eqIndex3 = Index(messageHash: hashFromStr("0003"), receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons senderTime: getNanosecondTime(54321)) - diffPsTopic = Index(digest: hashFromStr("1234"), + diffPsTopic = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(0), senderTime: getNanosecondTime(1000), pubsubTopic: "zzzz") - noSenderTime1 = Index(digest: hashFromStr("1234"), + noSenderTime1 = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(1100), senderTime: getNanosecondTime(0), pubsubTopic: "zzzz") - noSenderTime2 = Index(digest: hashFromStr("1234"), + noSenderTime2 = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(10000), senderTime: getNanosecondTime(0), pubsubTopic: "zzzz") - noSenderTime3 = Index(digest: hashFromStr("1234"), + noSenderTime3 = Index(messageHash: hashFromStr("1234"), receiverTime: getNanosecondTime(1200), senderTime: getNanosecondTime(0), pubsubTopic: "aaaa") - noSenderTime4 = Index(digest: hashFromStr("0"), + noSenderTime4 = Index(messageHash: hashFromStr("0"), receiverTime: getNanosecondTime(1200), senderTime: getNanosecondTime(0), pubsubTopic: "zzzz") @@ -156,8 +156,8 @@ suite "Queue Driver - index": ## Then check: - index.digest.data.len != 0 - index.digest.data.len == 32 # sha2 output length in bytes + index.messageHash.data.len != 0 + index.messageHash.data.len == 32 # sha2 output length in bytes index.receiverTime == ts2 # the receiver timestamp should be a non-zero value index.senderTime == ts index.pubsubTopic == DefaultContentTopic @@ -177,4 +177,4 @@ suite "Queue Driver - index": ## Then check: - index1.digest == index2.digest + index1.messageHash == index2.messageHash diff --git a/tests/waku_archive/test_driver_queue_pagination.nim b/tests/waku_archive/test_driver_queue_pagination.nim index e073c1f451..8fb41875e7 100644 --- a/tests/waku_archive/test_driver_queue_pagination.nim +++ b/tests/waku_archive/test_driver_queue_pagination.nim @@ -25,7 +25,7 @@ proc getTestQueueDriver(numMessages: int): QueueDriver = index: Index( receiverTime: Timestamp(i), senderTime: Timestamp(i), - digest: MessageDigest(data: data) + messageHash: MessageHash(data: data) ) ) discard testQueueDriver.add(msg) @@ -156,7 +156,7 @@ procSuite "Queue driver - pagination": pubsubTopic: DefaultPubsubTopic, senderTime: msg.timestamp, storeTime: msg.timestamp, - digest: computeDigest(msg, DefaultPubsubTopic) + messageHash: computeDigest(msg, DefaultPubsubTopic) ).toIndex() let @@ -337,7 +337,7 @@ procSuite "Queue driver - pagination": pubsubTopic: DefaultPubsubTopic, senderTime: msg.timestamp, storeTime: msg.timestamp, - digest: computeDigest(msg, DefaultPubsubTopic) + messageHash: computeDigest(msg, DefaultPubsubTopic) ).toIndex() let diff --git a/tests/waku_archive/test_driver_queue_query.nim b/tests/waku_archive/test_driver_queue_query.nim index 1d7294c2fc..89c40d605c 100644 --- a/tests/waku_archive/test_driver_queue_query.nim +++ b/tests/waku_archive/test_driver_queue_query.nim @@ -29,7 +29,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message, pubsubTopic) + messageHash: computeDigest(message, pubsubTopic) ) diff --git a/tests/waku_archive/test_driver_sqlite.nim b/tests/waku_archive/test_driver_sqlite.nim index 0fae560e03..d60fdc5d41 100644 --- a/tests/waku_archive/test_driver_sqlite.nim +++ b/tests/waku_archive/test_driver_sqlite.nim @@ -60,7 +60,7 @@ suite "SQLite driver": check: storedMsg.len == 1 storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, msg, digest, storeTimestamp) = item + let (pubsubTopic, msg, messageHash, storeTimestamp) = item msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic diff --git a/tests/waku_archive/test_driver_sqlite_query.nim b/tests/waku_archive/test_driver_sqlite_query.nim index 2ed20f0fac..c7e2bc3834 100644 --- a/tests/waku_archive/test_driver_sqlite_query.nim +++ b/tests/waku_archive/test_driver_sqlite_query.nim @@ -33,7 +33,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message, pubsubTopic) + messageHash: computeDigest(message, pubsubTopic) ) @@ -423,6 +423,50 @@ suite "SQLite driver - query by pubsub topic": ## Cleanup (await driver.close()).expect("driver to close") + + asyncTest "pubSubTopic messageHash match": + ## Given + const pubsubTopic1 = "test-pubsub-topic1" + const pubsubTopic2 = "test-pubsub-topic2" + # take 2 variables to hold the message hashes + var msgHash1: seq[byte] + var msgHash2: seq[byte] + + let driver = newTestSqliteDriver() + var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() + + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(1)) + putFutures.add(driver.put(pubsubTopic1, msg1, computeDigest(msg1, pubsubTopic1), msg1.timestamp)) + + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(2)) + putFutures.add(driver.put(pubsubTopic2, msg2, computeDigest(msg2, pubsubTopic2), msg2.timestamp)) + + discard waitFor allFinished(putFutures) + + # get the messages from the database + let storedMsg = (waitFor driver.getAllMessages()).tryGet() + + check: + # there needs to be two messages + storedMsg.len > 0 + storedMsg.len == 2 + + # get the individual messages and message hash values + @[storedMsg[0]].all do (item1: auto) -> bool: + let (gotPubsubTopic1, gotMsg1, messageHash1, timestamp1) = item1 + msgHash1 = messageHash1 + true + + @[storedMsg[1]].all do (item2: auto) -> bool: + let (gotPubsubTopic2, gotMsg2, messageHash2, timestamp2) = item2 + msgHash2 = messageHash2 + true + + # compare of the messge hashes, given the context, they should be different + msgHash1 != msgHash2 + + ## Cleanup + (await driver.close()).expect("driver to close") suite "SQLite driver - query by cursor": diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 1e6bab4614..834137a801 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -147,7 +147,7 @@ suite "Waku Archive - Retention policy": check: storedMsg.len == capacity storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, msg, digest, storeTimestamp) = item + let (pubsubTopic, msg, messageHash, storeTimestamp) = item msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic diff --git a/tests/waku_archive/test_waku_archive.nim b/tests/waku_archive/test_waku_archive.nim index 681c93c061..ac671303d3 100644 --- a/tests/waku_archive/test_waku_archive.nim +++ b/tests/waku_archive/test_waku_archive.nim @@ -30,7 +30,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message, pubsubTopic) + messageHash: computeDigest(message, pubsubTopic) ) diff --git a/tests/waku_store/test_resume.nim b/tests/waku_store/test_resume.nim index ea918ce3c3..4b872a9a52 100644 --- a/tests/waku_store/test_resume.nim +++ b/tests/waku_store/test_resume.nim @@ -58,7 +58,7 @@ procSuite "Waku Store - resume store": ] for msg in msgList: - require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp).isOk() store @@ -76,7 +76,7 @@ procSuite "Waku Store - resume store": ] for msg in msgList2: - require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp).isOk() store @@ -272,7 +272,7 @@ suite "WakuNode - waku store": # Insert the same message in both node's store let receivedTime3 = now() + getNanosecondTime(10) - digest3 = computeDigest(msg3) + digest3 = computeDigest(msg3, DefaultPubsubTopic) require server.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk() require client.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk() diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index 9fbe234bd3..9bfd0c66eb 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -34,7 +34,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): HistoryC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: waku_archive.computeDigest(message, pubsubTopic) + messageHash: waku_archive.computeDigest(message, pubsubTopic) ) procSuite "WakuNode - Store": @@ -57,8 +57,8 @@ procSuite "WakuNode - Store": let driver = newTestArchiveDriver() for msg in msgListA: - let msg_digest = waku_archive.computeDigest(msg, DefaultPubsubTopic) - require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp)).isOk() + let msg_hash = waku_archive.computeDigest(msg, DefaultPubsubTopic) + require (waitFor driver.put(DefaultPubsubTopic, msg, msg_hash, msg.timestamp)).isOk() driver diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index ee64a873ed..4cc806c331 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -32,11 +32,11 @@ logScope: proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] = let - digest = waku_archive.computeDigest(message, pubsubTopic) + messageHash = waku_archive.computeDigest(message, pubsubTopic) receivedTime = if message.timestamp > 0: message.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - store.put(pubsubTopic, message, digest, receivedTime) + store.put(pubsubTopic, message, messageHash, receivedTime) # Creates a new WakuNode proc testWakuNode(): WakuNode = @@ -53,19 +53,19 @@ proc testWakuNode(): WakuNode = ################################################################################ procSuite "Waku v2 Rest API - Store": - asyncTest "MessageDigest <-> string conversions": - # Validate MessageDigest conversion from a WakuMessage obj + asyncTest "MessageHash <-> string conversions": + # Validate MessageHash conversion from a WakuMessage obj let wakuMsg = WakuMessage( contentTopic: "Test content topic", payload: @[byte('H'), byte('i'), byte('!')] ) - let messageDigest = waku_store.computeDigest(wakuMsg) - let restMsgDigest = some(messageDigest.toRestStringMessageDigest()) + let messageHash = waku_store.computeDigest(wakuMsg, DefaultPubsubTopic) + let restMsgDigest = some(messageHash.toRestStringMessageDigest()) let parsedMsgDigest = restMsgDigest.parseMsgDigest().value check: - messageDigest == parsedMsgDigest.get() + messageHash == parsedMsgDigest.get() # Random validation. Obtained the raw values manually let expected = some("ZjNhM2Q2NDkwMTE0MjMzNDg0MzJlMDdiZGI3NzIwYTc%3D") @@ -129,7 +129,7 @@ procSuite "Waku v2 Rest API - Store": "6", # end time "", # sender time "", # store time - "", # base64-encoded digest + "", # base64-encoded messageHash "", # empty implies default page size "true" # ascending ) @@ -198,7 +198,7 @@ procSuite "Waku v2 Rest API - Store": var reqPubsubTopic = DefaultPubsubTopic var reqSenderTime = Timestamp(0) var reqStoreTime = Timestamp(0) - var reqDigest = waku_store.MessageDigest() + var reqHash = waku_store.MessageHash() for i in 0..<2: let response = @@ -210,7 +210,7 @@ procSuite "Waku v2 Rest API - Store": "", # end time. Empty ignores the field. encodeUrl($reqSenderTime), # sender time encodeUrl($reqStoreTime), # store time - reqDigest.toRestStringMessageDigest(), # base64-encoded digest. Empty ignores the field. + reqHash.toRestStringMessageDigest(), # base64-encoded hash. Empty ignores the field. "7", # page size. Empty implies default page size. "true" # ascending ) @@ -224,7 +224,7 @@ procSuite "Waku v2 Rest API - Store": # populate the cursor for next page if response.data.cursor.isSome(): reqPubsubTopic = response.data.cursor.get().pubsubTopic - reqDigest = response.data.cursor.get().digest + reqHash = response.data.cursor.get().messageHash reqSenderTime = response.data.cursor.get().senderTime reqStoreTime = response.data.cursor.get().storeTime diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index fc512b1786..f117c1b73d 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -769,7 +769,7 @@ proc toArchiveQuery(request: HistoryQuery): ArchiveQuery = ArchiveQuery( pubsubTopic: request.pubsubTopic, contentTopics: request.contentTopics, - cursor: request.cursor.map(proc(cursor: HistoryCursor): ArchiveCursor = ArchiveCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)), + cursor: request.cursor.map(proc(cursor: HistoryCursor): ArchiveCursor = ArchiveCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, messageHash: cursor.messageHash)), startTime: request.startTime, endTime: request.endTime, pageSize: request.pageSize.uint, @@ -793,7 +793,7 @@ proc toHistoryResult*(res: ArchiveResult): HistoryResult = let response = res.get() ok(HistoryResponse( messages: response.messages, - cursor: response.cursor.map(proc(cursor: ArchiveCursor): HistoryCursor = HistoryCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)), + cursor: response.cursor.map(proc(cursor: ArchiveCursor): HistoryCursor = HistoryCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, messageHash: cursor.messageHash)), )) proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} = diff --git a/waku/waku_api/rest/store/client.nim b/waku/waku_api/rest/store/client.nim index 71babc8e12..e61b26854f 100644 --- a/waku/waku_api/rest/store/client.nim +++ b/waku/waku_api/rest/store/client.nim @@ -60,7 +60,7 @@ proc getStoreMessagesV1*( # Optional cursor fields senderTime: string = "", storeTime: string = "", - digest: string = "", # base64-encoded digest + messageHash: string = "", # base64-encoded hash pageSize: string = "", ascending: string = "" diff --git a/waku/waku_api/rest/store/handlers.nim b/waku/waku_api/rest/store/handlers.nim index de1ae34c13..66c053233a 100644 --- a/waku/waku_api/rest/store/handlers.nim +++ b/waku/waku_api/rest/store/handlers.nim @@ -80,7 +80,7 @@ proc parseTime(input: Option[string]): proc parseCursor(parsedPubsubTopic: Option[string], senderTime: Option[string], storeTime: Option[string], - digest: Option[string]): + messageHash: Option[string]): Result[Option[HistoryCursor], string] = # Parse sender time @@ -93,23 +93,23 @@ proc parseCursor(parsedPubsubTopic: Option[string], if not parsedStoreTime.isOk(): return err(parsedStoreTime.error) - # Parse message digest - let parsedMsgDigest = parseMsgDigest(digest) - if not parsedMsgDigest.isOk(): - return err(parsedMsgDigest.error) + # Parse message hash + let parsedMsgHash = parseMsgDigest(messageHash) + if not parsedMsgHash.isOk(): + return err(parsedMsgHash.error) # Parse cursor information if parsedPubsubTopic.isSome() and parsedSenderTime.value.isSome() and parsedStoreTime.value.isSome() and - parsedMsgDigest.value.isSome(): + parsedMsgHash.value.isSome(): return ok(some( HistoryCursor( pubsubTopic: parsedPubsubTopic.get(), senderTime: parsedSenderTime.value.get(), storeTime: parsedStoreTime.value.get(), - digest: parsedMsgDigest.value.get()) + messageHash: parsedMsgHash.value.get()) )) else: return ok(none(HistoryCursor)) @@ -119,7 +119,7 @@ proc createHistoryQuery(pubsubTopic: Option[string], contentTopics: Option[string], senderTime: Option[string], storeTime: Option[string], - digest: Option[string], + messageHash: Option[string], startTime: Option[string], endTime: Option[string], pageSize: Option[string], @@ -146,7 +146,7 @@ proc createHistoryQuery(pubsubTopic: Option[string], let parsedCursor = ? parseCursor(parsedPubsubTopic, senderTime, storeTime, - digest) + messageHash) # Parse page size field var parsedPagedSize = DefaultPageSize @@ -201,7 +201,7 @@ proc installStoreApiHandlers*( contentTopics: Option[string], senderTime: Option[string], storeTime: Option[string], - digest: Option[string], + messageHash: Option[string], startTime: Option[string], endTime: Option[string], pageSize: Option[string], @@ -236,7 +236,7 @@ proc installStoreApiHandlers*( contentTopics.toOpt(), senderTime.toOpt(), storeTime.toOpt(), - digest.toOpt(), + messageHash.toOpt(), startTime.toOpt(), endTime.toOpt(), pageSize.toOpt(), diff --git a/waku/waku_api/rest/store/types.nim b/waku/waku_api/rest/store/types.nim index 92da532a90..d8ac8c1396 100644 --- a/waku/waku_api/rest/store/types.nim +++ b/waku/waku_api/rest/store/types.nim @@ -24,7 +24,7 @@ type pubsubTopic*: PubsubTopic senderTime*: Timestamp storeTime*: Timestamp - digest*: MessageDigest + messageHash*: MessageHash StoreRequestRest* = object # inspired by https://github.com/waku-org/nwaku/blob/f95147f5b7edfd45f914586f2d41cd18fb0e0d18/waku/v2//waku_store/common.nim#L52 @@ -53,37 +53,37 @@ type #### Type conversion -# Converts a URL-encoded-base64 string into a 'MessageDigest' +# Converts a URL-encoded-base64 string into a 'MessageHash' proc parseMsgDigest*(input: Option[string]): - Result[Option[MessageDigest], string] = + Result[Option[MessageHash], string] = if not input.isSome() or input.get() == "": - return ok(none(MessageDigest)) + return ok(none(MessageHash)) let decodedUrl = decodeUrl(input.get()) let base64Decoded = base64.decode(Base64String(decodedUrl)) - var messageDigest = MessageDigest() + var messageHash = MessageHash() if not base64Decoded.isOk(): return err(base64Decoded.error) let base64DecodedArr = base64Decoded.get() # Next snippet inspired by "nwaku/waku/waku_archive/archive.nim" - # TODO: Improve coherence of MessageDigest type - messageDigest = block: + # TODO: Improve coherence of MessageHash type + messageHash = block: var data: array[32, byte] for i in 0.. 0: msg.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest + trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, messageHash=msgDigest let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgReceivedTime) if putRes.isErr(): @@ -163,21 +163,21 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] { ## Build last message cursor ## The cursor is built from the last message INCLUDED in the response ## (i.e. the second last message in the rows list) - let (pubsubTopic, message, digest, storeTimestamp) = rows[^2] + let (pubsubTopic, message, messageHash, storeTimestamp) = rows[^2] - # TODO: Improve coherence of MessageDigest type - let messageDigest = block: + # TODO: Improve coherence of MessageHash type + let msgHash = block: var data: array[32, byte] - for i in 0.." else: "<" statements.add("(storedAt, id) " & comp & " (?,?)") args.add($cursor.get().storeTime) - args.add(toHex(cursor.get().digest.data)) + args.add(toHex(cursor.get().messageHash.data)) if startTime.isSome(): statements.add("storedAt >= ?") diff --git a/waku/waku_archive/driver/queue_driver/index.nim b/waku/waku_archive/driver/queue_driver/index.nim index e04e0246de..f8409459ac 100644 --- a/waku/waku_archive/driver/queue_driver/index.nim +++ b/waku/waku_archive/driver/queue_driver/index.nim @@ -16,19 +16,19 @@ type Index* = object pubsubTopic*: string senderTime*: Timestamp # the time at which the message is generated receiverTime*: Timestamp - digest*: MessageDigest # calculated over payload and content topic + messageHash*: MessageHash # calculated over payload and content topic proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T = ## Takes a WakuMessage with received timestamp and returns its Index. let - digest = computeDigest(msg, pubsubTopic) + messageHash = computeDigest(msg, pubsubTopic) senderTime = msg.timestamp Index( pubsubTopic: pubsubTopic, senderTime: senderTime, receiverTime: receivedTime, - digest: digest + messageHash: messageHash ) @@ -37,7 +37,7 @@ proc tohistoryCursor*(index: Index): ArchiveCursor = pubsubTopic: index.pubsubTopic, senderTime: index.senderTime, storeTime: index.receiverTime, - digest: index.digest + messageHash: index.messageHash ) proc toIndex*(index: ArchiveCursor): Index = @@ -45,14 +45,14 @@ proc toIndex*(index: ArchiveCursor): Index = pubsubTopic: index.pubsubTopic, senderTime: index.senderTime, receiverTime: index.storeTime, - digest: index.digest + messageHash: index.messageHash ) proc `==`*(x, y: Index): bool = ## receiverTime plays no role in index equality (x.senderTime == y.senderTime) and - (x.digest == y.digest) and + (x.messageHash == y.messageHash) and (x.pubsubTopic == y.pubsubTopic) proc cmp*(x, y: Index): int = @@ -84,7 +84,7 @@ proc cmp*(x, y: Index): int = return timecmp # Continue only when timestamps are equal - let digestcmp = cmp(x.digest.data, y.digest.data) + let digestcmp = cmp(x.messageHash.data, y.messageHash.data) if digestcmp != 0: return digestcmp diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index cce7d895bf..e6473cafb2 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -138,7 +138,7 @@ proc getPage(driver: QueueDriver, numberOfItems += 1 - outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime)) + outSeq.add((key.pubsubTopic, data.msg, @(key.messageHash.data), key.receiverTime)) currentEntry = if forward: w.next() else: w.prev() @@ -227,10 +227,10 @@ proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[voi method put*(driver: QueueDriver, pubsubTopic: PubsubTopic, message: WakuMessage, - digest: MessageDigest, + messageHash: MessageHash, receivedTime: Timestamp): Future[ArchiveDriverResult[void]] {.async.} = - let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest) + let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, messageHash: messageHash) let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic) return driver.add(message) diff --git a/waku/waku_archive/driver/sqlite_driver/cursor.nim b/waku/waku_archive/driver/sqlite_driver/cursor.nim index 9b4d00fd95..a5258f22b9 100644 --- a/waku/waku_archive/driver/sqlite_driver/cursor.nim +++ b/waku/waku_archive/driver/sqlite_driver/cursor.nim @@ -10,4 +10,4 @@ import type DbCursor* = (Timestamp, seq[byte], PubsubTopic) -proc toDbCursor*(c: ArchiveCursor): DbCursor = (c.storeTime, @(c.digest.data), c.pubsubTopic) +proc toDbCursor*(c: ArchiveCursor): DbCursor = (c.storeTime, @(c.messageHash.data), c.pubsubTopic) diff --git a/waku/waku_archive/driver/sqlite_driver/migrations.nim b/waku/waku_archive/driver/sqlite_driver/migrations.nim index 0aa925fda1..745a3f4424 100644 --- a/waku/waku_archive/driver/sqlite_driver/migrations.nim +++ b/waku/waku_archive/driver/sqlite_driver/migrations.nim @@ -14,7 +14,7 @@ logScope: topics = "waku archive migration" -const SchemaVersion* = 7 # increase this when there is an update in the database schema +const SchemaVersion* = 8 # increase this when there is an update in the database schema template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." const MessageStoreMigrationPath: string = projectRoot / "migrations" / "message_store" @@ -48,7 +48,7 @@ proc isSchemaVersion7*(db: SqliteDatabase): DatabaseResult[bool] = return ok(true) else: - info "Not considered schema version 7" + info "Not considered schema version 8" ok(false) proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] = diff --git a/waku/waku_archive/driver/sqlite_driver/queries.nim b/waku/waku_archive/driver/sqlite_driver/queries.nim index 27ab4de618..5e3c893e17 100644 --- a/waku/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/waku_archive/driver/sqlite_driver/queries.nim @@ -70,9 +70,9 @@ proc createTableQuery(table: string): SqlQueryStr = " payload BLOB," & " version INTEGER NOT NULL," & " timestamp INTEGER NOT NULL," & - " id BLOB," & + " messageHash BLOB," & " storedAt INTEGER NOT NULL," & - " CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" & + " CONSTRAINT messageIndex PRIMARY KEY (storedAt, messageHash)" & ") WITHOUT ROWID;" proc createTable*(db: SqliteDatabase): DatabaseResult[void] = @@ -93,7 +93,7 @@ proc createOldestMessageTimestampIndex*(db: SqliteDatabase): proc createHistoryQueryIndexQuery(table: string): SqlQueryStr = - "CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, id);" + "CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, messageHash);" proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = let query = createHistoryQueryIndexQuery(DbTable) @@ -105,7 +105,7 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) proc insertMessageQuery(table: string): SqlQueryStr = - "INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & + "INSERT INTO " & table & "(messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & " VALUES (?, ?, ?, ?, ?, ?, ?);" proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] = @@ -181,9 +181,9 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): ## Delete oldest messages not within limit proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr = - "DELETE FROM " & table & " WHERE (storedAt, id, pubsubTopic) NOT IN (" & - " SELECT storedAt, id, pubsubTopic FROM " & table & - " ORDER BY storedAt DESC, id DESC" & + "DELETE FROM " & table & " WHERE (storedAt, messageHash, pubsubTopic) NOT IN (" & + " SELECT storedAt, messageHash, pubsubTopic FROM " & table & + " ORDER BY storedAt DESC, messageHash DESC" & " LIMIT " & $limit & ");" @@ -197,7 +197,7 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): ## Select all messages proc selectAllMessagesQuery(table: string): SqlQueryStr = - "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" & + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, messageHash" & " FROM " & table & " ORDER BY storedAt ASC" @@ -211,10 +211,10 @@ proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) - digest = queryRowDigestCallback(s, digestCol=6) + messageHash = queryRowDigestCallback(s, digestCol=6) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) - rows.add((pubsubTopic, wakuMessage, digest, storedAt)) + rows.add((pubsubTopic, wakuMessage, messageHash, storedAt)) let query = selectAllMessagesQuery(DbTable) let res = db.query(query, queryRowCallback) @@ -246,7 +246,7 @@ proc whereClause(cursor: Option[DbCursor], none(string) else: let comp = if ascending: ">" else: "<" - some("(storedAt, id) " & comp & " (?, ?)") + some("(storedAt, messageHash) " & comp & " (?, ?)") let pubsubTopicClause = if pubsubTopic.isNone(): none(string) @@ -280,13 +280,13 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u var query: string - query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" + query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, messageHash" query &= " FROM " & table if where.isSome(): query &= " WHERE " & where.get() - query &= " ORDER BY storedAt " & order & ", id " & order + query &= " ORDER BY storedAt " & order & ", messageHash " & order query &= " LIMIT " & $limit & ";" query @@ -308,11 +308,11 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt, # Bind params var paramIndex = 1 - if cursor.isSome(): # cursor = storedAt, id, pubsubTopic - let (storedAt, id, _) = cursor.get() + if cursor.isSome(): # cursor = storedAt, messageHash, pubsubTopic + let (storedAt, messageHash, _) = cursor.get() checkErr bindParam(s, paramIndex, storedAt) paramIndex += 1 - checkErr bindParam(s, paramIndex, id) + checkErr bindParam(s, paramIndex, messageHash) paramIndex += 1 if pubsubTopic.isSome(): @@ -369,10 +369,10 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase, let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) - digest = queryRowDigestCallback(s, digestCol=6) + messageHash = queryRowDigestCallback(s, digestCol=6) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) - messages.add((pubsubTopic, message, digest, storedAt)) + messages.add((pubsubTopic, message, messageHash, storedAt)) let query = block: let where = whereClause(cursor, pubsubTopic, contentTopic, startTime, endTime, ascending) diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 53da379b1a..99418c472e 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -60,12 +60,12 @@ proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] = method put*(s: SqliteDriver, pubsubTopic: PubsubTopic, message: WakuMessage, - digest: MessageDigest, + messageHash: MessageHash, receivedTime: Timestamp): Future[ArchiveDriverResult[void]] {.async.} = ## Inserts a message into the store let res = s.insertStmt.exec(( - @(digest.data), # id + @(messageHash.data), # messageHash receivedTime, # storedAt toBytes(message.contentTopic), # contentTopic message.payload, # payload diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 1c3ec5a41e..da9deec344 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -153,11 +153,11 @@ when defined(waku_exp_store_resume): proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] = let - digest = waku_archive.computeDigest(message, pubsubTopic) + messageHash = waku_archive.computeDigest(message, pubsubTopic) receivedTime = if message.timestamp > 0: message.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - store.put(pubsubTopic, message, digest, receivedTime) + store.put(pubsubTopic, message, messageHash, receivedTime) proc resume*(w: WakuStoreClient, peerList = none(seq[RemotePeerInfo]), diff --git a/waku/waku_store/common.nim b/waku/waku_store/common.nim index edd86ea4b1..03058a6001 100644 --- a/waku/waku_store/common.nim +++ b/waku/waku_store/common.nim @@ -23,17 +23,19 @@ const type WakuStoreResult*[T] = Result[T, string] -## Waku message digest +## Waku message hash -type MessageDigest* = MDigest[256] +type MessageHash* = MDigest[256] -proc computeDigest*(msg: WakuMessage): MessageDigest = +proc computeDigest*(msg: WakuMessage, pubSubTopic: string): MessageHash = var ctx: sha256 ctx.init() defer: ctx.clear() - ctx.update(msg.contentTopic.toBytes()) + ctx.update(pubSubTopic.toBytes()) ctx.update(msg.payload) + ctx.update(msg.contentTopic.toBytes()) + ctx.update(msg.meta) # Computes the hash return ctx.finish() @@ -46,7 +48,7 @@ type pubsubTopic*: PubsubTopic senderTime*: Timestamp storeTime*: Timestamp - digest*: MessageDigest + messageHash*: MessageHash HistoryQuery* = object pubsubTopic*: Option[PubsubTopic] diff --git a/waku/waku_store/rpc.nim b/waku/waku_store/rpc.nim index c0b105c7b8..d30e838626 100644 --- a/waku/waku_store/rpc.nim +++ b/waku/waku_store/rpc.nim @@ -18,25 +18,25 @@ type PagingIndexRPC* = object pubsubTopic*: PubsubTopic senderTime*: Timestamp # the time at which the message is generated receiverTime*: Timestamp - digest*: MessageDigest # calculated over payload and content topic + messageHash*: MessageHash # calculated over payload and content topic proc `==`*(x, y: PagingIndexRPC): bool = ## receiverTime plays no role in index equality (x.senderTime == y.senderTime) and - (x.digest == y.digest) and + (x.messageHash == y.messageHash) and (x.pubsubTopic == y.pubsubTopic) proc compute*(T: type PagingIndexRPC, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T = ## Takes a WakuMessage with received timestamp and returns its Index. let - digest = computeDigest(msg) + msgHash = computeDigest(msg, pubsubTopic) senderTime = msg.timestamp PagingIndexRPC( pubsubTopic: pubsubTopic, senderTime: senderTime, receiverTime: receivedTime, - digest: digest + messageHash: msgHash ) @@ -98,7 +98,7 @@ proc toRPC*(cursor: HistoryCursor): PagingIndexRPC {.gcsafe.}= pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, receiverTime: cursor.storeTime, - digest: cursor.digest + messageHash: cursor.messageHash ) proc toAPI*(rpc: PagingIndexRPC): HistoryCursor = @@ -106,7 +106,7 @@ proc toAPI*(rpc: PagingIndexRPC): HistoryCursor = pubsubTopic: rpc.pubsubTopic, senderTime: rpc.senderTime, storeTime: rpc.receiverTime, - digest: rpc.digest + messageHash: rpc.messageHash ) diff --git a/waku/waku_store/rpc_codec.nim b/waku/waku_store/rpc_codec.nim index 3223fb7ec7..57cb26bf28 100644 --- a/waku/waku_store/rpc_codec.nim +++ b/waku/waku_store/rpc_codec.nim @@ -23,7 +23,7 @@ proc encode*(index: PagingIndexRPC): ProtoBuffer = ## returns the resultant ProtoBuffer var pb = initProtoBuffer() - pb.write3(1, index.digest.data) + pb.write3(1, index.messageHash.data) pb.write3(2, zint64(index.receiverTime)) pb.write3(3, zint64(index.senderTime)) pb.write3(4, index.pubsubTopic) @@ -38,13 +38,13 @@ proc decode*(T: type PagingIndexRPC, buffer: seq[byte]): ProtobufResult[T] = var data: seq[byte] if not ?pb.getField(1, data): - return err(ProtobufError.missingRequiredField("digest")) + return err(ProtobufError.missingRequiredField("messageHash")) else: - var digest = MessageDigest() + var messageHash = MessageHash() for count, b in data: - digest.data[count] = b + messageHash.data[count] = b - rpc.digest = digest + rpc.messageHash = messageHash var receiverTime: zint64 if not ?pb.getField(2, receiverTime):