Skip to content

Commit

Permalink
read old new key and new new key
Browse files Browse the repository at this point in the history
  • Loading branch information
jkt-signal authored Jul 24, 2024
1 parent f12a6ff commit 1fe6dac
Showing 1 changed file with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -135,7 +137,14 @@ public CompletableFuture<Boolean> mayHaveMessages(final UUID accountIdentifier,
}

public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final Device device, final Integer limit) {
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, device);
return Flux.concat(
Stream.of(convertPartitionKeyDeprecated(destinationAccountUuid, device), convertPartitionKey(destinationAccountUuid, device))
.distinct()
.map(pk -> load(limit, pk))
.toList());
}

public Publisher<MessageProtos.Envelope> load(final Integer limit, final AttributeValue partitionKey) {
QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
.tableName(tableName)
.consistentRead(true)
Expand Down Expand Up @@ -165,7 +174,19 @@ public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid,

public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestinationAndGuid(
final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) {
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice);
return Stream.of(convertPartitionKey(destinationAccountUuid, destinationDevice),
convertPartitionKeyDeprecated(destinationAccountUuid, destinationDevice))
.distinct()
.map(pk -> deleteMessageByDestinationAndGuid(pk, messageUuid))
// this combines the futures by producing a future that returns an arbitrary nonempty
// result if there is one, which should be OK because only one of the keys
// should produce a nonempty result for any given message uuid
.reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b)))
.get();
}

public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestinationAndGuid(
final AttributeValue partitionKey, final UUID messageUuid) {
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.indexName(LOCAL_INDEX_MESSAGE_UUID_NAME)
Expand Down Expand Up @@ -207,7 +228,18 @@ public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestin

public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final UUID destinationAccountUuid,
final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) {
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice);
return Stream.of(convertPartitionKey(destinationAccountUuid, destinationDevice),
convertPartitionKeyDeprecated(destinationAccountUuid, destinationDevice))
.distinct()
.map(pk -> deleteMessage(pk, destinationDevice, messageUuid, serverTimestamp))
// this combines the futures by producing a future that returns an arbitrary nonempty
// result if there is one, which should be OK because only one of the keys
// should produce a nonempty result for any given message uuid
.reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b)))
.get();
}

public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final AttributeValue partitionKey, final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) {
final AttributeValue sortKey = convertSortKey(destinationDevice.getId(), serverTimestamp, messageUuid);
DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder()
.tableName(tableName)
Expand Down Expand Up @@ -247,6 +279,14 @@ private static AttributeValue convertPartitionKey(final UUID destinationAccountU
return AttributeValues.fromByteBuffer(byteBuffer.flip());
}

private static AttributeValue convertPartitionKeyDeprecated(final UUID destinationAccountUuid, final Device destinationDevice) {
final ByteBuffer byteBuffer = ByteBuffer.allocate(24);
byteBuffer.putLong(destinationAccountUuid.getMostSignificantBits());
byteBuffer.putLong(destinationAccountUuid.getLeastSignificantBits());
byteBuffer.putLong(destinationDevice.getCreated() & ~0x7f + destinationDevice.getId());
return AttributeValues.fromByteBuffer(byteBuffer.flip());
}

private static AttributeValue convertSortKey(final byte destinationDeviceId, final long serverTimestamp, final UUID messageUuid) {
final ByteBuffer byteBuffer = ByteBuffer.allocate(24);
byteBuffer.putLong(serverTimestamp);
Expand Down

0 comments on commit 1fe6dac

Please sign in to comment.