From 1fe6dac76040118420207ece102de4d513e46604 Mon Sep 17 00:00:00 2001 From: Jonathan Klabunde Tomer <125505367+jkt-signal@users.noreply.github.com> Date: Wed, 24 Jul 2024 07:44:44 -0700 Subject: [PATCH] read old new key and new new key --- .../storage/MessagesDynamoDb.java | 46 +++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index 46f1c068b..deec76eaf 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -26,6 +26,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Predicate; +import java.util.stream.Stream; + import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,7 +137,14 @@ public CompletableFuture mayHaveMessages(final UUID accountIdentifier, } public Publisher load(final UUID destinationAccountUuid, final Device device, final Integer limit) { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, device); + return Flux.concat( + Stream.of(convertPartitionKeyDeprecated(destinationAccountUuid, device), convertPartitionKey(destinationAccountUuid, device)) + .distinct() + .map(pk -> load(limit, pk)) + .toList()); + } + + public Publisher load(final Integer limit, final AttributeValue partitionKey) { QueryRequest.Builder queryRequestBuilder = QueryRequest.builder() .tableName(tableName) .consistentRead(true) @@ -165,7 +174,19 @@ public Publisher load(final UUID destinationAccountUuid, public CompletableFuture> deleteMessageByDestinationAndGuid( final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice); + return Stream.of(convertPartitionKey(destinationAccountUuid, destinationDevice), + convertPartitionKeyDeprecated(destinationAccountUuid, destinationDevice)) + .distinct() + .map(pk -> deleteMessageByDestinationAndGuid(pk, messageUuid)) + // this combines the futures by producing a future that returns an arbitrary nonempty + // result if there is one, which should be OK because only one of the keys + // should produce a nonempty result for any given message uuid + .reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b))) + .get(); + } + + public CompletableFuture> deleteMessageByDestinationAndGuid( + final AttributeValue partitionKey, final UUID messageUuid) { final QueryRequest queryRequest = QueryRequest.builder() .tableName(tableName) .indexName(LOCAL_INDEX_MESSAGE_UUID_NAME) @@ -207,7 +228,18 @@ public CompletableFuture> deleteMessageByDestin public CompletableFuture> deleteMessage(final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice); + return Stream.of(convertPartitionKey(destinationAccountUuid, destinationDevice), + convertPartitionKeyDeprecated(destinationAccountUuid, destinationDevice)) + .distinct() + .map(pk -> deleteMessage(pk, destinationDevice, messageUuid, serverTimestamp)) + // this combines the futures by producing a future that returns an arbitrary nonempty + // result if there is one, which should be OK because only one of the keys + // should produce a nonempty result for any given message uuid + .reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b))) + .get(); + } + + public CompletableFuture> deleteMessage(final AttributeValue partitionKey, final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) { final AttributeValue sortKey = convertSortKey(destinationDevice.getId(), serverTimestamp, messageUuid); DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder() .tableName(tableName) @@ -247,6 +279,14 @@ private static AttributeValue convertPartitionKey(final UUID destinationAccountU return AttributeValues.fromByteBuffer(byteBuffer.flip()); } + private static AttributeValue convertPartitionKeyDeprecated(final UUID destinationAccountUuid, final Device destinationDevice) { + final ByteBuffer byteBuffer = ByteBuffer.allocate(24); + byteBuffer.putLong(destinationAccountUuid.getMostSignificantBits()); + byteBuffer.putLong(destinationAccountUuid.getLeastSignificantBits()); + byteBuffer.putLong(destinationDevice.getCreated() & ~0x7f + destinationDevice.getId()); + return AttributeValues.fromByteBuffer(byteBuffer.flip()); + } + private static AttributeValue convertSortKey(final byte destinationDeviceId, final long serverTimestamp, final UUID messageUuid) { final ByteBuffer byteBuffer = ByteBuffer.allocate(24); byteBuffer.putLong(serverTimestamp);