From 045ec9689dc7a316bc7012e45cd165e69d329b21 Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Mon, 29 Jul 2024 11:16:53 -0400 Subject: [PATCH] Introduce a job scheduler and experiment for sending notifications to idle devices --- .../textsecuregcm/WhisperServerService.java | 24 ++ .../experiment/DeviceLastSeenState.java | 17 ++ ...outMessagesPushNotificationExperiment.java | 159 ++++++++++ .../PushNotificationExperiment.java | 9 + .../push/IdleDeviceNotificationScheduler.java | 99 +++++++ .../workers/CommandDependencies.java | 6 +- ...nishPushNotificationExperimentCommand.java | 89 +++--- ...dleDeviceNotificationSchedulerFactory.java | 21 ++ ...vicesWithoutMessagesExperimentFactory.java | 28 ++ ...essagesPushNotificationExperimentTest.java | 280 ++++++++++++++++++ .../IdleDeviceNotificationSchedulerTest.java | 131 ++++++++ ...PushNotificationExperimentCommandTest.java | 10 + ...PushNotificationExperimentCommandTest.java | 1 + 13 files changed, 830 insertions(+), 44 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/experiment/DeviceLastSeenState.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/experiment/NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/push/IdleDeviceNotificationScheduler.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/workers/IdleDeviceNotificationSchedulerFactory.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesWithoutMessagesExperimentFactory.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/experiment/NotifyIdleDevicesWithoutMessagesPushNotificationExperimentTest.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/push/IdleDeviceNotificationSchedulerTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 468b042cc..ad57324a6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -251,7 +251,12 @@ import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; +import org.whispersystems.textsecuregcm.workers.DiscardPushNotificationExperimentSamplesCommand; +import org.whispersystems.textsecuregcm.workers.FinishPushNotificationExperimentCommand; +import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory; import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand; +import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesWithoutMessagesExperimentFactory; +import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand; import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand; import org.whispersystems.textsecuregcm.workers.RemoveExpiredBackupsCommand; import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand; @@ -260,6 +265,7 @@ import org.whispersystems.textsecuregcm.workers.ServerVersionCommand; import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask; import org.whispersystems.textsecuregcm.workers.SetUserDiscoverabilityCommand; +import org.whispersystems.textsecuregcm.workers.StartPushNotificationExperimentCommand; import org.whispersystems.textsecuregcm.workers.UnlinkDeviceCommand; import org.whispersystems.textsecuregcm.workers.ZkParamsCommand; import org.whispersystems.websocket.WebSocketResourceProviderFactory; @@ -313,6 +319,24 @@ public void initialize(final Bootstrap bootstrap) { bootstrap.addCommand(new RemoveExpiredBackupsCommand(Clock.systemUTC())); bootstrap.addCommand(new BackupMetricsCommand(Clock.systemUTC())); bootstrap.addCommand(new RemoveExpiredLinkedDevicesCommand()); + bootstrap.addCommand(new ProcessScheduledJobsServiceCommand("process-idle-device-notification-jobs", + "Processes scheduled jobs to send notifications to idle devices", + new IdleDeviceNotificationSchedulerFactory())); + + bootstrap.addCommand( + new StartPushNotificationExperimentCommand<>("start-notify-idle-devices-without-messages-experiment", + "Start an experiment to send push notifications to idle devices with empty message queues", + new NotifyIdleDevicesWithoutMessagesExperimentFactory())); + + bootstrap.addCommand( + new FinishPushNotificationExperimentCommand<>("finish-notify-idle-devices-without-messages-experiment", + "Finish an experiment to send push notifications to idle devices with empty message queues", + new NotifyIdleDevicesWithoutMessagesExperimentFactory())); + + bootstrap.addCommand( + new DiscardPushNotificationExperimentSamplesCommand("discard-notify-idle-devices-without-messages-samples", + "Discard samples from the \"notify idle devices without messages\" experiment", + new NotifyIdleDevicesWithoutMessagesExperimentFactory())); } @Override diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/experiment/DeviceLastSeenState.java b/service/src/main/java/org/whispersystems/textsecuregcm/experiment/DeviceLastSeenState.java new file mode 100644 index 000000000..f57d16f60 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/experiment/DeviceLastSeenState.java @@ -0,0 +1,17 @@ +package org.whispersystems.textsecuregcm.experiment; + +import javax.annotation.Nullable; + +public record DeviceLastSeenState(boolean deviceExists, + long createdAtMillis, + boolean hasPushToken, + long lastSeenMillis, + @Nullable PushTokenType pushTokenType) { + + public static DeviceLastSeenState MISSING_DEVICE_STATE = new DeviceLastSeenState(false, 0, false, 0, null); + + public enum PushTokenType { + APNS, + FCM + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/experiment/NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.java b/service/src/main/java/org/whispersystems/textsecuregcm/experiment/NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.java new file mode 100644 index 000000000..aeb129dd4 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/experiment/NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.java @@ -0,0 +1,159 @@ +package org.whispersystems.textsecuregcm.experiment; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.identity.IdentityType; +import org.whispersystems.textsecuregcm.push.IdleDeviceNotificationScheduler; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.MessagesManager; +import reactor.core.publisher.Flux; +import javax.annotation.Nullable; +import java.util.EnumMap; +import java.util.Map; +import java.time.LocalTime; +import java.util.concurrent.CompletableFuture; + +public class NotifyIdleDevicesWithoutMessagesPushNotificationExperiment implements PushNotificationExperiment { + + private final MessagesManager messagesManager; + private final IdleDeviceNotificationScheduler idleDeviceNotificationScheduler; + + private static final LocalTime PREFERRED_NOTIFICATION_TIME = LocalTime.of(14, 0); + + private static final Logger log = LoggerFactory.getLogger(NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.class); + + @VisibleForTesting + enum Population { + APNS_CONTROL, + APNS_EXPERIMENT, + FCM_CONTROL, + FCM_EXPERIMENT + } + + @VisibleForTesting + enum Outcome { + DELETED, + UNINSTALLED, + REACTIVATED, + UNCHANGED + } + + public NotifyIdleDevicesWithoutMessagesPushNotificationExperiment(final MessagesManager messagesManager, + final IdleDeviceNotificationScheduler idleDeviceNotificationScheduler) { + + this.messagesManager = messagesManager; + this.idleDeviceNotificationScheduler = idleDeviceNotificationScheduler; + } + + @Override + public String getExperimentName() { + return "notify-idle-devices-without-messages"; + } + + @Override + public CompletableFuture isDeviceEligible(final Account account, final Device device) { + + if (!hasPushToken(device)) { + return CompletableFuture.completedFuture(false); + } + + if (!idleDeviceNotificationScheduler.isIdle(device)) { + return CompletableFuture.completedFuture(false); + } + + return messagesManager.mayHaveMessages(account.getIdentifier(IdentityType.ACI), device) + .thenApply(mayHaveMessages -> !mayHaveMessages); + } + + @VisibleForTesting + static boolean hasPushToken(final Device device) { + // Exclude VOIP tokens since they have their own, distinct delivery mechanism + return !StringUtils.isAllBlank(device.getApnId(), device.getGcmId()) && StringUtils.isBlank(device.getVoipApnId()); + } + + @Override + public DeviceLastSeenState getState(@Nullable final Account account, @Nullable final Device device) { + if (account != null && device != null) { + final DeviceLastSeenState.PushTokenType pushTokenType = StringUtils.isNotBlank(device.getApnId()) + ? DeviceLastSeenState.PushTokenType.APNS + : DeviceLastSeenState.PushTokenType.FCM; + + return new DeviceLastSeenState(true, device.getCreated(), hasPushToken(device), device.getLastSeen(), pushTokenType); + } else { + return DeviceLastSeenState.MISSING_DEVICE_STATE; + } + } + + @Override + public CompletableFuture applyExperimentTreatment(final Account account, final Device device) { + return idleDeviceNotificationScheduler.scheduleNotification(account, device.getId(), PREFERRED_NOTIFICATION_TIME); + } + + @Override + public void analyzeResults(final Flux> samples) { + final Map> contingencyTable = new EnumMap<>(Population.class); + + for (final Population population : Population.values()) { + final Map countsByOutcome = new EnumMap<>(Outcome.class); + + for (final Outcome outcome : Outcome.values()) { + countsByOutcome.put(outcome, 0); + } + + contingencyTable.put(population, countsByOutcome); + } + + samples.doOnNext(sample -> contingencyTable.get(getPopulation(sample)).merge(getOutcome(sample), 1, Integer::sum)) + .then() + .block(); + + final StringBuilder reportBuilder = new StringBuilder("population,deleted,uninstalled,reactivated,unchanged\n"); + + for (final Population population : Population.values()) { + final Map countsByOutcome = contingencyTable.get(population); + + reportBuilder.append(population.name()); + reportBuilder.append(","); + reportBuilder.append(countsByOutcome.getOrDefault(Outcome.DELETED, 0)); + reportBuilder.append(","); + reportBuilder.append(countsByOutcome.getOrDefault(Outcome.UNINSTALLED, 0)); + reportBuilder.append(","); + reportBuilder.append(countsByOutcome.getOrDefault(Outcome.REACTIVATED, 0)); + reportBuilder.append(","); + reportBuilder.append(countsByOutcome.getOrDefault(Outcome.UNCHANGED, 0)); + reportBuilder.append("\n"); + } + + log.info(reportBuilder.toString()); + } + + @VisibleForTesting + static Population getPopulation(final PushNotificationExperimentSample sample) { + assert sample.initialState() != null && sample.initialState().pushTokenType() != null; + + return switch (sample.initialState().pushTokenType()) { + case APNS -> sample.inExperimentGroup() ? Population.APNS_EXPERIMENT : Population.APNS_CONTROL; + case FCM -> sample.inExperimentGroup() ? Population.FCM_EXPERIMENT : Population.FCM_CONTROL; + }; + } + + @VisibleForTesting + static Outcome getOutcome(final PushNotificationExperimentSample sample) { + final Outcome outcome; + + if (!sample.finalState().deviceExists() || sample.initialState().createdAtMillis() != sample.finalState().createdAtMillis()) { + outcome = Outcome.DELETED; + } else if (!sample.finalState().hasPushToken()) { + outcome = Outcome.UNINSTALLED; + } else if (sample.initialState().lastSeenMillis() != sample.finalState().lastSeenMillis()) { + outcome = Outcome.REACTIVATED; + } else { + outcome = Outcome.UNCHANGED; + } + + return outcome; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/experiment/PushNotificationExperiment.java b/service/src/main/java/org/whispersystems/textsecuregcm/experiment/PushNotificationExperiment.java index 92ec44f3c..2661b8b62 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/experiment/PushNotificationExperiment.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/experiment/PushNotificationExperiment.java @@ -2,6 +2,7 @@ import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; +import reactor.core.publisher.Flux; import javax.annotation.Nullable; import java.util.concurrent.CompletableFuture; @@ -65,4 +66,12 @@ default CompletableFuture applyControlTreatment(Account account, Device de * @return a future that completes when the experimental treatment has been applied for the given device */ CompletableFuture applyExperimentTreatment(Account account, Device device); + + /** + * Consumes a stream of finished samples and emits an analysis of the results via an implementation-specific channel + * (e.g. a log message). Implementations must block until all samples have been consumed and analyzed. + * + * @param samples a stream of finished samples from this experiment + */ + void analyzeResults(Flux> samples); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/IdleDeviceNotificationScheduler.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/IdleDeviceNotificationScheduler.java new file mode 100644 index 000000000..06a7f0667 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/IdleDeviceNotificationScheduler.java @@ -0,0 +1,99 @@ +package org.whispersystems.textsecuregcm.push; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.annotations.VisibleForTesting; +import org.whispersystems.textsecuregcm.identity.IdentityType; +import org.whispersystems.textsecuregcm.scheduler.JobScheduler; +import org.whispersystems.textsecuregcm.scheduler.SchedulingUtil; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.util.SystemMapper; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import javax.annotation.Nullable; +import java.io.IOException; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalTime; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +public class IdleDeviceNotificationScheduler extends JobScheduler { + + private final AccountsManager accountsManager; + private final PushNotificationManager pushNotificationManager; + private final Clock clock; + + @VisibleForTesting + static final Duration MIN_IDLE_DURATION = Duration.ofDays(14); + + @VisibleForTesting + record AccountAndDeviceIdentifier(UUID accountIdentifier, byte deviceId) {} + + public IdleDeviceNotificationScheduler(final AccountsManager accountsManager, + final PushNotificationManager pushNotificationManager, + final DynamoDbAsyncClient dynamoDbAsyncClient, + final String tableName, + final Duration jobExpiration, + final Clock clock) { + + super(dynamoDbAsyncClient, tableName, jobExpiration, clock); + + this.accountsManager = accountsManager; + this.pushNotificationManager = pushNotificationManager; + this.clock = clock; + } + + @Override + public String getSchedulerName() { + return "IdleDeviceNotification"; + } + + @Override + protected CompletableFuture processJob(@Nullable final byte[] jobData) { + final AccountAndDeviceIdentifier accountAndDeviceIdentifier; + + try { + accountAndDeviceIdentifier = SystemMapper.jsonMapper().readValue(jobData, AccountAndDeviceIdentifier.class); + } catch (final IOException e) { + return CompletableFuture.failedFuture(e); + } + + return accountsManager.getByAccountIdentifierAsync(accountAndDeviceIdentifier.accountIdentifier()) + .thenCompose(maybeAccount -> maybeAccount.map(account -> + account.getDevice(accountAndDeviceIdentifier.deviceId()).map(device -> { + if (!isIdle(device)) { + return CompletableFuture.completedFuture("deviceSeenRecently"); + } + + try { + return pushNotificationManager + .sendNewMessageNotification(account, accountAndDeviceIdentifier.deviceId(), true) + .thenApply(ignored -> "sent"); + } catch (final NotPushRegisteredException e) { + return CompletableFuture.completedFuture("deviceTokenDeleted"); + } + }) + .orElse(CompletableFuture.completedFuture("deviceDeleted"))) + .orElse(CompletableFuture.completedFuture("accountDeleted"))); + } + + public boolean isIdle(final Device device) { + final Duration idleDuration = Duration.between(Instant.ofEpochMilli(device.getLastSeen()), clock.instant()); + + return idleDuration.compareTo(MIN_IDLE_DURATION) >= 0; + } + + public CompletableFuture scheduleNotification(final Account account, final byte deviceId, final LocalTime preferredDeliveryTime) { + final Instant runAt = SchedulingUtil.getNextRecommendedNotificationTime(account, preferredDeliveryTime, clock); + + try { + return scheduleJob(runAt, SystemMapper.jsonMapper().writeValueAsBytes( + new AccountAndDeviceIdentifier(account.getIdentifier(IdentityType.ACI), deviceId))); + } catch (final JsonProcessingException e) { + // This should never happen when serializing an `AccountAndDeviceIdentifier` + throw new AssertionError(e); + } + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index cd7e4589b..aeb165594 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -81,7 +81,8 @@ record CommandDependencies( FaultTolerantRedisCluster pushSchedulerCluster, ClientResources.Builder redisClusterClientResourcesBuilder, BackupManager backupManager, - DynamicConfigurationManager dynamicConfigurationManager) { + DynamicConfigurationManager dynamicConfigurationManager, + DynamoDbAsyncClient dynamoDbAsyncClient) { static CommandDependencies build( final String name, @@ -271,7 +272,8 @@ static CommandDependencies build( pushSchedulerCluster, redisClientResourcesBuilder, backupManager, - dynamicConfigurationManager + dynamicConfigurationManager, + dynamoDbAsyncClient ); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommand.java index 38f7bd345..572b7569c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommand.java @@ -10,10 +10,12 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.experiment.PushNotificationExperiment; +import org.whispersystems.textsecuregcm.experiment.PushNotificationExperimentSample; import org.whispersystems.textsecuregcm.experiment.PushNotificationExperimentSamples; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuples; import reactor.util.retry.Retry; @@ -71,47 +73,50 @@ protected void run(final Environment environment, final AccountsManager accountsManager = commandDependencies.accountsManager(); final PushNotificationExperimentSamples pushNotificationExperimentSamples = commandDependencies.pushNotificationExperimentSamples(); - pushNotificationExperimentSamples.getDevicesPendingFinalState(experiment.getExperimentName()) - .flatMap(accountIdentifierAndDeviceId -> - Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(accountIdentifierAndDeviceId.getT1())) - .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) - .map(maybeAccount -> Tuples.of(accountIdentifierAndDeviceId.getT1(), accountIdentifierAndDeviceId.getT2(), maybeAccount)), maxConcurrency) - .map(accountIdentifierAndDeviceIdAndMaybeAccount -> { - final UUID accountIdentifier = accountIdentifierAndDeviceIdAndMaybeAccount.getT1(); - final byte deviceId = accountIdentifierAndDeviceIdAndMaybeAccount.getT2(); - - @Nullable final Account account = accountIdentifierAndDeviceIdAndMaybeAccount.getT3() - .orElse(null); - - @Nullable final Device device = accountIdentifierAndDeviceIdAndMaybeAccount.getT3() - .flatMap(a -> a.getDevice(deviceId)) - .orElse(null); - - return Tuples.of(accountIdentifier, deviceId, experiment.getState(account, device)); - }) - .flatMap(accountIdentifierAndDeviceIdAndFinalState -> { - final UUID accountIdentifier = accountIdentifierAndDeviceIdAndFinalState.getT1(); - final byte deviceId = accountIdentifierAndDeviceIdAndFinalState.getT2(); - final T finalState = accountIdentifierAndDeviceIdAndFinalState.getT3(); - - return Mono.fromFuture(() -> { - try { - return pushNotificationExperimentSamples.recordFinalState(accountIdentifier, deviceId, - experiment.getExperimentName(), finalState); - } catch (final JsonProcessingException e) { - throw new RuntimeException(e); - } - }) - .onErrorResume(ConditionalCheckFailedException.class, throwable -> Mono.empty()) - .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) - .onErrorResume(throwable -> { - log.warn("Failed to record final state for {}:{} in experiment {}", - accountIdentifier, deviceId, experiment.getExperimentName(), throwable); - - return Mono.empty(); - }); - }, maxConcurrency) - .then() - .block(); + final Flux> finishedSamples = + pushNotificationExperimentSamples.getDevicesPendingFinalState(experiment.getExperimentName()) + .flatMap(accountIdentifierAndDeviceId -> + Mono.fromFuture(() -> accountsManager.getByAccountIdentifierAsync(accountIdentifierAndDeviceId.getT1())) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) + .map(maybeAccount -> Tuples.of(accountIdentifierAndDeviceId.getT1(), + accountIdentifierAndDeviceId.getT2(), maybeAccount)), maxConcurrency) + .map(accountIdentifierAndDeviceIdAndMaybeAccount -> { + final UUID accountIdentifier = accountIdentifierAndDeviceIdAndMaybeAccount.getT1(); + final byte deviceId = accountIdentifierAndDeviceIdAndMaybeAccount.getT2(); + + @Nullable final Account account = accountIdentifierAndDeviceIdAndMaybeAccount.getT3() + .orElse(null); + + @Nullable final Device device = accountIdentifierAndDeviceIdAndMaybeAccount.getT3() + .flatMap(a -> a.getDevice(deviceId)) + .orElse(null); + + return Tuples.of(accountIdentifier, deviceId, experiment.getState(account, device)); + }) + .flatMap(accountIdentifierAndDeviceIdAndFinalState -> { + final UUID accountIdentifier = accountIdentifierAndDeviceIdAndFinalState.getT1(); + final byte deviceId = accountIdentifierAndDeviceIdAndFinalState.getT2(); + final T finalState = accountIdentifierAndDeviceIdAndFinalState.getT3(); + + return Mono.fromFuture(() -> { + try { + return pushNotificationExperimentSamples.recordFinalState(accountIdentifier, deviceId, + experiment.getExperimentName(), finalState); + } catch (final JsonProcessingException e) { + throw new RuntimeException(e); + } + }) + .onErrorResume(ConditionalCheckFailedException.class, throwable -> Mono.empty()) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) + .onErrorResume(throwable -> { + log.warn("Failed to record final state for {}:{} in experiment {}", + accountIdentifier, deviceId, experiment.getExperimentName(), throwable); + + return Mono.empty(); + }); + }, maxConcurrency) + .flatMap(Mono::justOrEmpty); + + experiment.analyzeResults(finishedSamples); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/IdleDeviceNotificationSchedulerFactory.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/IdleDeviceNotificationSchedulerFactory.java new file mode 100644 index 000000000..bc1a2c917 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/IdleDeviceNotificationSchedulerFactory.java @@ -0,0 +1,21 @@ +package org.whispersystems.textsecuregcm.workers; + +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.push.IdleDeviceNotificationScheduler; +import org.whispersystems.textsecuregcm.scheduler.JobScheduler; +import java.time.Clock; + +public class IdleDeviceNotificationSchedulerFactory implements JobSchedulerFactory { + + @Override + public JobScheduler buildJobScheduler(final CommandDependencies commandDependencies, + final WhisperServerConfiguration configuration) { + + return new IdleDeviceNotificationScheduler(commandDependencies.accountsManager(), + commandDependencies.pushNotificationManager(), + commandDependencies.dynamoDbAsyncClient(), + configuration.getDynamoDbTables().getScheduledJobs().getTableName(), + configuration.getDynamoDbTables().getScheduledJobs().getExpiration(), + Clock.systemUTC()); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesWithoutMessagesExperimentFactory.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesWithoutMessagesExperimentFactory.java new file mode 100644 index 000000000..64bdfd2d7 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesWithoutMessagesExperimentFactory.java @@ -0,0 +1,28 @@ +package org.whispersystems.textsecuregcm.workers; + +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.configuration.DynamoDbTables; +import org.whispersystems.textsecuregcm.experiment.DeviceLastSeenState; +import org.whispersystems.textsecuregcm.experiment.NotifyIdleDevicesWithoutMessagesPushNotificationExperiment; +import org.whispersystems.textsecuregcm.experiment.PushNotificationExperiment; +import org.whispersystems.textsecuregcm.push.IdleDeviceNotificationScheduler; +import java.time.Clock; + +public class NotifyIdleDevicesWithoutMessagesExperimentFactory implements PushNotificationExperimentFactory { + + @Override + public PushNotificationExperiment buildExperiment(final CommandDependencies commandDependencies, + final WhisperServerConfiguration configuration) { + + final DynamoDbTables.TableWithExpiration tableConfiguration = configuration.getDynamoDbTables().getScheduledJobs(); + + return new NotifyIdleDevicesWithoutMessagesPushNotificationExperiment(commandDependencies.messagesManager(), + new IdleDeviceNotificationScheduler( + commandDependencies.accountsManager(), + commandDependencies.pushNotificationManager(), + commandDependencies.dynamoDbAsyncClient(), + tableConfiguration.getTableName(), + tableConfiguration.getExpiration(), + Clock.systemUTC())); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/experiment/NotifyIdleDevicesWithoutMessagesPushNotificationExperimentTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/experiment/NotifyIdleDevicesWithoutMessagesPushNotificationExperimentTest.java new file mode 100644 index 000000000..9b37bf154 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/experiment/NotifyIdleDevicesWithoutMessagesPushNotificationExperimentTest.java @@ -0,0 +1,280 @@ +package org.whispersystems.textsecuregcm.experiment; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyByte; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.i18n.phonenumbers.PhoneNumberUtil; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.whispersystems.textsecuregcm.identity.IdentityType; +import org.whispersystems.textsecuregcm.push.IdleDeviceNotificationScheduler; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.MessagesManager; + +class NotifyIdleDevicesWithoutMessagesPushNotificationExperimentTest { + + private MessagesManager messagesManager; + private IdleDeviceNotificationScheduler idleDeviceNotificationScheduler; + + private NotifyIdleDevicesWithoutMessagesPushNotificationExperiment experiment; + + private static final Instant CURRENT_TIME = Instant.now(); + + @BeforeEach + void setUp() { + messagesManager = mock(MessagesManager.class); + + idleDeviceNotificationScheduler = mock(IdleDeviceNotificationScheduler.class); + when(idleDeviceNotificationScheduler.scheduleNotification(any(), anyByte(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + + experiment = new NotifyIdleDevicesWithoutMessagesPushNotificationExperiment(messagesManager, + idleDeviceNotificationScheduler); + } + + @ParameterizedTest + @MethodSource + void isDeviceEligible(final Account account, + final Device device, + final boolean isDeviceIdle, + final boolean mayHaveMessages, + final boolean expectEligible) { + + when(messagesManager.mayHaveMessages(account.getIdentifier(IdentityType.ACI), device)) + .thenReturn(CompletableFuture.completedFuture(mayHaveMessages)); + + when(idleDeviceNotificationScheduler.isIdle(device)).thenReturn(isDeviceIdle); + + assertEquals(expectEligible, experiment.isDeviceEligible(account, device).join()); + } + + private static List isDeviceEligible() { + final List arguments = new ArrayList<>(); + + final Account account = mock(Account.class); + when(account.getIdentifier(IdentityType.ACI)).thenReturn(UUID.randomUUID()); + when(account.getNumber()).thenReturn(PhoneNumberUtil.getInstance().format( + PhoneNumberUtil.getInstance().getExampleNumber("US"), PhoneNumberUtil.PhoneNumberFormat.E164)); + + { + // Idle device with push token and messages + final Device device = mock(Device.class); + when(device.getApnId()).thenReturn("apns-token"); + + arguments.add(Arguments.of(account, device, true, true, false)); + } + + { + // Idle device missing push token, but with messages + arguments.add(Arguments.of(account, mock(Device.class), true, true, false)); + } + + { + // Idle device missing push token and messages + arguments.add(Arguments.of(account, mock(Device.class), true, false, false)); + } + + { + // Idle device with push token, but no messages + final Device device = mock(Device.class); + when(device.getApnId()).thenReturn("apns-token"); + + arguments.add(Arguments.of(account, device, true, false, true)); + } + + { + // Active device with push token and messages + final Device device = mock(Device.class); + when(device.getApnId()).thenReturn("apns-token"); + + arguments.add(Arguments.of(account, device, false, true, false)); + } + + { + // Active device missing push token, but with messages + arguments.add(Arguments.of(account, mock(Device.class), false, true, false)); + } + + { + // Active device missing push token and messages + arguments.add(Arguments.of(account, mock(Device.class), false, false, false)); + } + + { + // Active device with push token, but no messages + final Device device = mock(Device.class); + when(device.getApnId()).thenReturn("apns-token"); + + arguments.add(Arguments.of(account, device, false, false, false)); + } + + return arguments; + } + + @ParameterizedTest + @MethodSource + void hasPushToken(final Device device, final boolean expectHasPushToken) { + assertEquals(expectHasPushToken, NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.hasPushToken(device)); + } + + private static List hasPushToken() { + final List arguments = new ArrayList<>(); + + { + // No token at all + final Device device = mock(Device.class); + + arguments.add(Arguments.of(device, false)); + } + + { + // FCM token + final Device device = mock(Device.class); + when(device.getGcmId()).thenReturn("fcm-token"); + + arguments.add(Arguments.of(device, true)); + } + + { + // APNs token + final Device device = mock(Device.class); + when(device.getApnId()).thenReturn("apns-token"); + + arguments.add(Arguments.of(device, true)); + } + + { + // APNs VOIP token + final Device device = mock(Device.class); + when(device.getApnId()).thenReturn("apns-token"); + when(device.getVoipApnId()).thenReturn("apns-voip-token"); + + arguments.add(Arguments.of(device, false)); + } + + return arguments; + } + + @Test + void getState() { + assertEquals(DeviceLastSeenState.MISSING_DEVICE_STATE, experiment.getState(null, null)); + assertEquals(DeviceLastSeenState.MISSING_DEVICE_STATE, experiment.getState(mock(Account.class), null)); + + final long createdAtMillis = CURRENT_TIME.minus(Duration.ofDays(14)).toEpochMilli(); + + { + final Device apnsDevice = mock(Device.class); + when(apnsDevice.getApnId()).thenReturn("apns-token"); + when(apnsDevice.getCreated()).thenReturn(createdAtMillis); + when(apnsDevice.getLastSeen()).thenReturn(CURRENT_TIME.toEpochMilli()); + + assertEquals( + new DeviceLastSeenState(true, createdAtMillis, true, CURRENT_TIME.toEpochMilli(), DeviceLastSeenState.PushTokenType.APNS), + experiment.getState(mock(Account.class), apnsDevice)); + } + + { + final Device fcmDevice = mock(Device.class); + when(fcmDevice.getGcmId()).thenReturn("fcm-token"); + when(fcmDevice.getCreated()).thenReturn(createdAtMillis); + when(fcmDevice.getLastSeen()).thenReturn(CURRENT_TIME.toEpochMilli()); + + assertEquals( + new DeviceLastSeenState(true, createdAtMillis, true, CURRENT_TIME.toEpochMilli(), DeviceLastSeenState.PushTokenType.FCM), + experiment.getState(mock(Account.class), fcmDevice)); + } + } + + @ParameterizedTest + @MethodSource + void getPopulation(final boolean inExperimentGroup, + final DeviceLastSeenState.PushTokenType tokenType, + final NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Population expectedPopulation) { + + final DeviceLastSeenState state = new DeviceLastSeenState(true, 0, true, 0, tokenType); + final PushNotificationExperimentSample sample = + new PushNotificationExperimentSample<>(inExperimentGroup, state, state); + + assertEquals(expectedPopulation, NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.getPopulation(sample)); + } + + private static List getPopulation() { + return List.of( + Arguments.of(true, DeviceLastSeenState.PushTokenType.APNS, + NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Population.APNS_EXPERIMENT), + + Arguments.of(false, DeviceLastSeenState.PushTokenType.APNS, + NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Population.APNS_CONTROL), + + Arguments.of(true, DeviceLastSeenState.PushTokenType.FCM, + NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Population.FCM_EXPERIMENT), + + Arguments.of(false, DeviceLastSeenState.PushTokenType.FCM, + NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Population.FCM_CONTROL) + ); + } + + @ParameterizedTest + @MethodSource + void getOutcome(final DeviceLastSeenState initialState, + final DeviceLastSeenState finalState, + final NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Outcome expectedOutcome) { + + final PushNotificationExperimentSample sample = + new PushNotificationExperimentSample<>(true, initialState, finalState); + + assertEquals(expectedOutcome, NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.getOutcome(sample)); + } + + private static List getOutcome() { + return List.of( + // Device no longer exists + Arguments.of( + new DeviceLastSeenState(true, 0, true, 0, DeviceLastSeenState.PushTokenType.APNS), + DeviceLastSeenState.MISSING_DEVICE_STATE, + NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Outcome.DELETED + ), + + // Device re-registered (i.e. "created" timestamp changed) + Arguments.of( + new DeviceLastSeenState(true, 0, true, 0, DeviceLastSeenState.PushTokenType.APNS), + new DeviceLastSeenState(true, 1, true, 1, DeviceLastSeenState.PushTokenType.APNS), + NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Outcome.DELETED + ), + + // Device has lost push tokens + Arguments.of( + new DeviceLastSeenState(true, 0, true, 0, DeviceLastSeenState.PushTokenType.APNS), + new DeviceLastSeenState(true, 0, false, 0, DeviceLastSeenState.PushTokenType.APNS), + NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Outcome.UNINSTALLED + ), + + // Device reactivated + Arguments.of( + new DeviceLastSeenState(true, 0, true, 0, DeviceLastSeenState.PushTokenType.APNS), + new DeviceLastSeenState(true, 0, true, 1, DeviceLastSeenState.PushTokenType.APNS), + NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Outcome.REACTIVATED + ), + + // No change + Arguments.of( + new DeviceLastSeenState(true, 0, true, 0, DeviceLastSeenState.PushTokenType.APNS), + new DeviceLastSeenState(true, 0, true, 0, DeviceLastSeenState.PushTokenType.APNS), + NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.Outcome.UNCHANGED + ) + ); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/IdleDeviceNotificationSchedulerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/IdleDeviceNotificationSchedulerTest.java new file mode 100644 index 000000000..bbf8558c4 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/IdleDeviceNotificationSchedulerTest.java @@ -0,0 +1,131 @@ +package org.whispersystems.textsecuregcm.push; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyByte; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.util.SystemMapper; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; + +class IdleDeviceNotificationSchedulerTest { + + private AccountsManager accountsManager; + private PushNotificationManager pushNotificationManager; + + private IdleDeviceNotificationScheduler idleDeviceNotificationScheduler; + + private static final Instant CURRENT_TIME = Instant.now(); + + @BeforeEach + void setUp() { + accountsManager = mock(AccountsManager.class); + pushNotificationManager = mock(PushNotificationManager.class); + + idleDeviceNotificationScheduler = new IdleDeviceNotificationScheduler( + accountsManager, + pushNotificationManager, + mock(DynamoDbAsyncClient.class), + "test-idle-device-notifications", + Duration.ofDays(7), + Clock.fixed(CURRENT_TIME, ZoneId.systemDefault())); + } + + @ParameterizedTest + @MethodSource + void processJob(final boolean accountPresent, + final boolean devicePresent, + final boolean tokenPresent, + final Instant deviceLastSeen, + final String expectedOutcome) throws JsonProcessingException, NotPushRegisteredException { + + final UUID accountIdentifier = UUID.randomUUID(); + final byte deviceId = Device.PRIMARY_ID; + + final Device device = mock(Device.class); + when(device.getLastSeen()).thenReturn(deviceLastSeen.toEpochMilli()); + + final Account account = mock(Account.class); + when(account.getDevice(deviceId)).thenReturn(devicePresent ? Optional.of(device) : Optional.empty()); + + when(accountsManager.getByAccountIdentifierAsync(accountIdentifier)) + .thenReturn(CompletableFuture.completedFuture(accountPresent ? Optional.of(account) : Optional.empty())); + + if (tokenPresent) { + when(pushNotificationManager.sendNewMessageNotification(any(), anyByte(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture( + Optional.of(new SendPushNotificationResult(true, Optional.empty(), false, Optional.empty())))); + } else { + when(pushNotificationManager.sendNewMessageNotification(any(), anyByte(), anyBoolean())) + .thenThrow(NotPushRegisteredException.class); + } + + final byte[] jobData = SystemMapper.jsonMapper().writeValueAsBytes( + new IdleDeviceNotificationScheduler.AccountAndDeviceIdentifier(accountIdentifier, deviceId)); + + assertEquals(expectedOutcome, idleDeviceNotificationScheduler.processJob(jobData).join()); + } + + private static List processJob() { + final Instant idleDeviceLastSeenTimestamp = CURRENT_TIME + .minus(IdleDeviceNotificationScheduler.MIN_IDLE_DURATION) + .minus(Duration.ofDays(1)); + + return List.of( + // Account present, device present, device has tokens, device is idle + Arguments.of(true, true, true, idleDeviceLastSeenTimestamp, "sent"), + + // Account present, device present, device has tokens, but device is active + Arguments.of(true, true, true, CURRENT_TIME, "deviceSeenRecently"), + + // Account present, device present, device is idle, but missing tokens + Arguments.of(true, true, false, idleDeviceLastSeenTimestamp, "deviceTokenDeleted"), + + // Account present, but device missing + Arguments.of(true, false, true, idleDeviceLastSeenTimestamp, "deviceDeleted"), + + // Account missing + Arguments.of(false, true, true, idleDeviceLastSeenTimestamp, "accountDeleted") + ); + } + + @Test + void isIdle() { + { + final Device idleDevice = mock(Device.class); + when(idleDevice.getLastSeen()) + .thenReturn(CURRENT_TIME.minus(IdleDeviceNotificationScheduler.MIN_IDLE_DURATION).minus(Duration.ofDays(1)) + .toEpochMilli()); + + assertTrue(idleDeviceNotificationScheduler.isIdle(idleDevice)); + } + + { + final Device activeDevice = mock(Device.class); + when(activeDevice.getLastSeen()).thenReturn(CURRENT_TIME.toEpochMilli()); + + assertFalse(idleDeviceNotificationScheduler.isIdle(activeDevice)); + } + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java index 87ad5ee2b..795b49547 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java @@ -4,6 +4,7 @@ import net.sourceforge.argparse4j.inf.Namespace; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.experiment.PushNotificationExperiment; import org.whispersystems.textsecuregcm.experiment.PushNotificationExperimentSample; import org.whispersystems.textsecuregcm.experiment.PushNotificationExperimentSamples; @@ -23,6 +24,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyByte; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -69,6 +71,7 @@ void setUp() throws JsonProcessingException { null, null, null, + null, null); //noinspection unchecked @@ -76,6 +79,13 @@ void setUp() throws JsonProcessingException { when(experiment.getExperimentName()).thenReturn(EXPERIMENT_NAME); when(experiment.getState(any(), any())).thenReturn("test"); + doAnswer(invocation -> { + final Flux> samples = invocation.getArgument(0); + samples.then().block(); + + return null; + }).when(experiment).analyzeResults(any()); + finishPushNotificationExperimentCommand = new TestFinishPushNotificationExperimentCommand(experiment); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java index 308a817a7..4cbacb909 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java @@ -63,6 +63,7 @@ public TestStartPushNotificationExperimentCommand( null, null, null, + null, null); }