Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Jan 20, 2025
1 parent f2f0e64 commit c9ac7d9
Showing 1 changed file with 89 additions and 67 deletions.
156 changes: 89 additions & 67 deletions waku/waku_store_sync/codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ../common/protobuf, ../waku_core/message, ../waku_core/time, ./common
const
HashLen = 32
VarIntLen = 9
AvgCapacity = 1000

proc encode*(value: WakuMessageAndTopic): ProtoBuffer =
var pb = initProtoBuffer()
Expand Down Expand Up @@ -44,7 +45,7 @@ proc deltaEncode*(value: RangesData): seq[byte] =
return @[0]

var
output = newSeqOfCap[byte](1000)
output = newSeqOfCap[byte](AvgCapacity)
buf = Leb128Buf[uint64]()
lastTimestamp: Timestamp
lastHash: Fingerprint
Expand Down Expand Up @@ -92,10 +93,8 @@ proc deltaEncode*(value: RangesData): seq[byte] =
of RangeType.Skip:
continue
of RangeType.Fingerprint:
let fingerprint = value.fingerprints[i]
output &= value.fingerprints[i]
i.inc()

output &= fingerprint
of RangeType.ItemSet:
let itemSet = value.itemSets[j]
j.inc()
Expand All @@ -112,33 +111,94 @@ proc deltaEncode*(value: RangesData): seq[byte] =

return output

proc getItemSet(idx: var int, buffer: seq[byte], itemSetLength: int): ItemSet =
var itemSet = ItemSet()
let slice = buffer[idx ..< buffer.len]
let count = deltaDecode(itemSet, slice, itemSetLength)
idx += count

proc getItemSetLength(idx: var int, buffer: seq[byte]): int =
# decode item set length
let min = min(idx + VarIntLen, buffer.len)
let slice = buffer[idx ..< min]
(val, len) = uint64.fromBytes(slice, Leb128)
idx += len

return int(val)

proc getFingerprint(idx: var int, buffer: seq[byte]): Fingerprint =
# decode fingerprint
let slice = buffer[idx ..< idx + HashLen]
idx += HashLen
var fingerprint = EmptyFingerprint
for i, bytes in slice:
fingerprint[i] = bytes

return fingerprint

proc getRangeType(idx: var int, buffer: seq[byte]): RangeType =
let rangeType = RangeType(buffer[idx])
idx += 1

return rangeType

proc updateHash(idx: var int, buffer: seq[byte], hash: var WakuMessageHash) =
let sameBytes = int(buffer[idx])
idx += 1

let slice = buffer[idx ..< idx + sameBytes]
idx += sameBytes

for i, bytes in slice:
hash[i] = bytes

proc getTimeDiff(idx: var int, buffer: seq[byte]): Timestamp =
let min = min(idx + VarIntLen, buffer.len)
let slice = buffer[idx ..< min]
(val, len) = uint64.fromBytes(slice, Leb128)
idx += len

return Timestamp(val)

proc getTimestamp(idx: var int, buffer: seq[byte]): Timestamp =
let slice = buffer[idx ..< idx + VarIntLen]
(val, len) = uint64.fromBytes(slice, Leb128)
idx += len

return Timestamp(val)

proc getHash(idx: var int, buffer: seq[byte]): WakuMessageHash =
let slice = buffer[idx ..< idx + HashLen]
idx += HashLen
var hash = EmptyWakuMessageHash
for i, bytes in slice:
hash[i] = bytes

return hash

proc getReconciled(idx: var int, buffer: seq[byte]): bool =
let recon = bool(buffer[idx])
idx += 1

return recon

proc deltaDecode*(itemSet: var ItemSet, buffer: seq[byte], setLength: int): int =
var
lastTime = Timestamp(0)
val = 0.uint64
len = 0.int8
idx = 0

while itemSet.elements.len < setLength:
var slice = buffer[idx ..< idx + VarIntLen]
(val, len) = uint64.fromBytes(slice, Leb128)
idx += len

let time = lastTime + Timestamp(val)
let timeDiff = getTimestamp(idx, buffer)
let time = lastTime + timeDiff
lastTime = time

slice = buffer[idx ..< idx + HashLen]
idx += HashLen
var hash = EmptyWakuMessageHash
for i, bytes in slice:
hash[i] = bytes
let hash = getHash(idx, buffer)

let id = SyncID(time: time, hash: hash)

itemSet.elements.add(id)

itemSet.reconciled = bool(buffer[idx])
idx += 1
itemSet.reconciled = getReconciled(idx, buffer)

return idx

Expand All @@ -149,77 +209,37 @@ proc deltaDecode*(T: type RangesData, buffer: seq[byte]): T =
var
payload = RangesData()
lastTime = Timestamp(0)
val = 0.uint64
len = 0.int8
idx = 0
slice = buffer[idx ..< idx + VarIntLen]

# first timestamp
(val, len) = uint64.fromBytes(slice, Leb128)
idx += len
lastTime = Timestamp(val)
lastTime = getTimestamp(idx, buffer)

# implicit first hash is always 0
# implicit first range mode is alway skip

while idx < buffer.len - 1:
let lowerRangeBound = SyncID(time: lastTime, hash: EmptyWakuMessageHash)

# decode timestamp diff
let min = min(idx + VarIntLen, buffer.len)
slice = buffer[idx ..< min]
(val, len) = uint64.fromBytes(slice, Leb128)
idx += len
let timeDiff = Timestamp(val)
let timeDiff = getTimeDiff(idx, buffer)

var hash = EmptyWakuMessageHash
if timeDiff == 0:
# decode number of same bytes
let sameBytes = int(buffer[idx])
idx += 1

# decode same bytes
slice = buffer[idx ..< idx + sameBytes]
idx += sameBytes
for i, bytes in slice:
hash[i] = bytes
updateHash(idx, buffer, hash)

let thisTime = lastTime + timeDiff
lastTime = thisTime

let upperRangeBound = SyncID(time: thisTime, hash: hash)

let bounds = lowerRangeBound .. upperRangeBound

# decode range type
let rangeType = RangeType(buffer[idx])
idx += 1

let rangeType = getRangeType(idx, buffer)
payload.ranges.add((bounds, rangeType))

if rangeType == RangeType.Fingerprint:
# decode fingerprint
slice = buffer[idx ..< idx + HashLen]
idx += HashLen
var fingerprint = EmptyFingerprint
for i, bytes in slice:
fingerprint[i] = bytes

let fingerprint = getFingerprint(idx, buffer)
payload.fingerprints.add(fingerprint)
elif rangeType == RangeType.ItemSet:
# decode item set length
let min = min(idx + VarIntLen, buffer.len)
slice = buffer[idx ..< min]
(val, len) = uint64.fromBytes(slice, Leb128)
idx += len
let itemSetLength = int(val)

# decode item set
var itemSet = ItemSet()
slice = buffer[idx ..< buffer.len]
let count = deltaDecode(itemSet, slice, itemSetLength)
idx += count

let itemSetLength = getItemSetLength(idx, buffer)
let itemSet = getItemSet(idx, buffer, itemSetLength)
payload.itemSets.add(itemSet)

return payload
Expand All @@ -228,10 +248,12 @@ proc decode*(T: type WakuMessageAndTopic, buffer: seq[byte]): ProtobufResult[T]
let pb = initProtoBuffer(buffer)

var pubsub: string
discard ?pb.getField(1, pubsub)
if not ?pb.getField(1, pubsub):
return err(ProtobufError.missingRequiredField("pubsub"))

var proto: ProtoBuffer
discard ?pb.getField(2, proto)
if not ?pb.getField(2, proto):
return err(ProtobufError.missingRequiredField("msg"))

let message = ?WakuMessage.decode(proto.buffer)

Expand Down

0 comments on commit c9ac7d9

Please sign in to comment.