Skip to content

Commit

Permalink
Introduce a job scheduler and experiment for sending notifications to…
Browse files Browse the repository at this point in the history
… idle devices
  • Loading branch information
jon-signal authored Jul 29, 2024
1 parent 4ebad2c commit 045ec96
Show file tree
Hide file tree
Showing 13 changed files with 830 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,12 @@
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.DiscardPushNotificationExperimentSamplesCommand;
import org.whispersystems.textsecuregcm.workers.FinishPushNotificationExperimentCommand;
import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory;
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesWithoutMessagesExperimentFactory;
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredBackupsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand;
Expand All @@ -260,6 +265,7 @@
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
import org.whispersystems.textsecuregcm.workers.SetUserDiscoverabilityCommand;
import org.whispersystems.textsecuregcm.workers.StartPushNotificationExperimentCommand;
import org.whispersystems.textsecuregcm.workers.UnlinkDeviceCommand;
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
import org.whispersystems.websocket.WebSocketResourceProviderFactory;
Expand Down Expand Up @@ -313,6 +319,24 @@ public void initialize(final Bootstrap<WhisperServerConfiguration> bootstrap) {
bootstrap.addCommand(new RemoveExpiredBackupsCommand(Clock.systemUTC()));
bootstrap.addCommand(new BackupMetricsCommand(Clock.systemUTC()));
bootstrap.addCommand(new RemoveExpiredLinkedDevicesCommand());
bootstrap.addCommand(new ProcessScheduledJobsServiceCommand("process-idle-device-notification-jobs",
"Processes scheduled jobs to send notifications to idle devices",
new IdleDeviceNotificationSchedulerFactory()));

bootstrap.addCommand(
new StartPushNotificationExperimentCommand<>("start-notify-idle-devices-without-messages-experiment",
"Start an experiment to send push notifications to idle devices with empty message queues",
new NotifyIdleDevicesWithoutMessagesExperimentFactory()));

bootstrap.addCommand(
new FinishPushNotificationExperimentCommand<>("finish-notify-idle-devices-without-messages-experiment",
"Finish an experiment to send push notifications to idle devices with empty message queues",
new NotifyIdleDevicesWithoutMessagesExperimentFactory()));

bootstrap.addCommand(
new DiscardPushNotificationExperimentSamplesCommand("discard-notify-idle-devices-without-messages-samples",
"Discard samples from the \"notify idle devices without messages\" experiment",
new NotifyIdleDevicesWithoutMessagesExperimentFactory()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.whispersystems.textsecuregcm.experiment;

import javax.annotation.Nullable;

public record DeviceLastSeenState(boolean deviceExists,
long createdAtMillis,
boolean hasPushToken,
long lastSeenMillis,
@Nullable PushTokenType pushTokenType) {

public static DeviceLastSeenState MISSING_DEVICE_STATE = new DeviceLastSeenState(false, 0, false, 0, null);

public enum PushTokenType {
APNS,
FCM
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package org.whispersystems.textsecuregcm.experiment;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.push.IdleDeviceNotificationScheduler;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import reactor.core.publisher.Flux;
import javax.annotation.Nullable;
import java.util.EnumMap;
import java.util.Map;
import java.time.LocalTime;
import java.util.concurrent.CompletableFuture;

public class NotifyIdleDevicesWithoutMessagesPushNotificationExperiment implements PushNotificationExperiment<DeviceLastSeenState> {

private final MessagesManager messagesManager;
private final IdleDeviceNotificationScheduler idleDeviceNotificationScheduler;

private static final LocalTime PREFERRED_NOTIFICATION_TIME = LocalTime.of(14, 0);

private static final Logger log = LoggerFactory.getLogger(NotifyIdleDevicesWithoutMessagesPushNotificationExperiment.class);

@VisibleForTesting
enum Population {
APNS_CONTROL,
APNS_EXPERIMENT,
FCM_CONTROL,
FCM_EXPERIMENT
}

@VisibleForTesting
enum Outcome {
DELETED,
UNINSTALLED,
REACTIVATED,
UNCHANGED
}

public NotifyIdleDevicesWithoutMessagesPushNotificationExperiment(final MessagesManager messagesManager,
final IdleDeviceNotificationScheduler idleDeviceNotificationScheduler) {

this.messagesManager = messagesManager;
this.idleDeviceNotificationScheduler = idleDeviceNotificationScheduler;
}

@Override
public String getExperimentName() {
return "notify-idle-devices-without-messages";
}

@Override
public CompletableFuture<Boolean> isDeviceEligible(final Account account, final Device device) {

if (!hasPushToken(device)) {
return CompletableFuture.completedFuture(false);
}

if (!idleDeviceNotificationScheduler.isIdle(device)) {
return CompletableFuture.completedFuture(false);
}

return messagesManager.mayHaveMessages(account.getIdentifier(IdentityType.ACI), device)
.thenApply(mayHaveMessages -> !mayHaveMessages);
}

@VisibleForTesting
static boolean hasPushToken(final Device device) {
// Exclude VOIP tokens since they have their own, distinct delivery mechanism
return !StringUtils.isAllBlank(device.getApnId(), device.getGcmId()) && StringUtils.isBlank(device.getVoipApnId());
}

@Override
public DeviceLastSeenState getState(@Nullable final Account account, @Nullable final Device device) {
if (account != null && device != null) {
final DeviceLastSeenState.PushTokenType pushTokenType = StringUtils.isNotBlank(device.getApnId())
? DeviceLastSeenState.PushTokenType.APNS
: DeviceLastSeenState.PushTokenType.FCM;

return new DeviceLastSeenState(true, device.getCreated(), hasPushToken(device), device.getLastSeen(), pushTokenType);
} else {
return DeviceLastSeenState.MISSING_DEVICE_STATE;
}
}

@Override
public CompletableFuture<Void> applyExperimentTreatment(final Account account, final Device device) {
return idleDeviceNotificationScheduler.scheduleNotification(account, device.getId(), PREFERRED_NOTIFICATION_TIME);
}

@Override
public void analyzeResults(final Flux<PushNotificationExperimentSample<DeviceLastSeenState>> samples) {
final Map<Population, Map<Outcome, Integer>> contingencyTable = new EnumMap<>(Population.class);

for (final Population population : Population.values()) {
final Map<Outcome, Integer> countsByOutcome = new EnumMap<>(Outcome.class);

for (final Outcome outcome : Outcome.values()) {
countsByOutcome.put(outcome, 0);
}

contingencyTable.put(population, countsByOutcome);
}

samples.doOnNext(sample -> contingencyTable.get(getPopulation(sample)).merge(getOutcome(sample), 1, Integer::sum))
.then()
.block();

final StringBuilder reportBuilder = new StringBuilder("population,deleted,uninstalled,reactivated,unchanged\n");

for (final Population population : Population.values()) {
final Map<Outcome, Integer> countsByOutcome = contingencyTable.get(population);

reportBuilder.append(population.name());
reportBuilder.append(",");
reportBuilder.append(countsByOutcome.getOrDefault(Outcome.DELETED, 0));
reportBuilder.append(",");
reportBuilder.append(countsByOutcome.getOrDefault(Outcome.UNINSTALLED, 0));
reportBuilder.append(",");
reportBuilder.append(countsByOutcome.getOrDefault(Outcome.REACTIVATED, 0));
reportBuilder.append(",");
reportBuilder.append(countsByOutcome.getOrDefault(Outcome.UNCHANGED, 0));
reportBuilder.append("\n");
}

log.info(reportBuilder.toString());
}

@VisibleForTesting
static Population getPopulation(final PushNotificationExperimentSample<DeviceLastSeenState> sample) {
assert sample.initialState() != null && sample.initialState().pushTokenType() != null;

return switch (sample.initialState().pushTokenType()) {
case APNS -> sample.inExperimentGroup() ? Population.APNS_EXPERIMENT : Population.APNS_CONTROL;
case FCM -> sample.inExperimentGroup() ? Population.FCM_EXPERIMENT : Population.FCM_CONTROL;
};
}

@VisibleForTesting
static Outcome getOutcome(final PushNotificationExperimentSample<DeviceLastSeenState> sample) {
final Outcome outcome;

if (!sample.finalState().deviceExists() || sample.initialState().createdAtMillis() != sample.finalState().createdAtMillis()) {
outcome = Outcome.DELETED;
} else if (!sample.finalState().hasPushToken()) {
outcome = Outcome.UNINSTALLED;
} else if (sample.initialState().lastSeenMillis() != sample.finalState().lastSeenMillis()) {
outcome = Outcome.REACTIVATED;
} else {
outcome = Outcome.UNCHANGED;
}

return outcome;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import reactor.core.publisher.Flux;
import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -65,4 +66,12 @@ default CompletableFuture<Void> applyControlTreatment(Account account, Device de
* @return a future that completes when the experimental treatment has been applied for the given device
*/
CompletableFuture<Void> applyExperimentTreatment(Account account, Device device);

/**
* Consumes a stream of finished samples and emits an analysis of the results via an implementation-specific channel
* (e.g. a log message). Implementations must block until all samples have been consumed and analyzed.
*
* @param samples a stream of finished samples from this experiment
*/
void analyzeResults(Flux<PushNotificationExperimentSample<T>> samples);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.whispersystems.textsecuregcm.push;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.scheduler.JobScheduler;
import org.whispersystems.textsecuregcm.scheduler.SchedulingUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalTime;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

public class IdleDeviceNotificationScheduler extends JobScheduler {

private final AccountsManager accountsManager;
private final PushNotificationManager pushNotificationManager;
private final Clock clock;

@VisibleForTesting
static final Duration MIN_IDLE_DURATION = Duration.ofDays(14);

@VisibleForTesting
record AccountAndDeviceIdentifier(UUID accountIdentifier, byte deviceId) {}

public IdleDeviceNotificationScheduler(final AccountsManager accountsManager,
final PushNotificationManager pushNotificationManager,
final DynamoDbAsyncClient dynamoDbAsyncClient,
final String tableName,
final Duration jobExpiration,
final Clock clock) {

super(dynamoDbAsyncClient, tableName, jobExpiration, clock);

this.accountsManager = accountsManager;
this.pushNotificationManager = pushNotificationManager;
this.clock = clock;
}

@Override
public String getSchedulerName() {
return "IdleDeviceNotification";
}

@Override
protected CompletableFuture<String> processJob(@Nullable final byte[] jobData) {
final AccountAndDeviceIdentifier accountAndDeviceIdentifier;

try {
accountAndDeviceIdentifier = SystemMapper.jsonMapper().readValue(jobData, AccountAndDeviceIdentifier.class);
} catch (final IOException e) {
return CompletableFuture.failedFuture(e);
}

return accountsManager.getByAccountIdentifierAsync(accountAndDeviceIdentifier.accountIdentifier())
.thenCompose(maybeAccount -> maybeAccount.map(account ->
account.getDevice(accountAndDeviceIdentifier.deviceId()).map(device -> {
if (!isIdle(device)) {
return CompletableFuture.completedFuture("deviceSeenRecently");
}

try {
return pushNotificationManager
.sendNewMessageNotification(account, accountAndDeviceIdentifier.deviceId(), true)
.thenApply(ignored -> "sent");
} catch (final NotPushRegisteredException e) {
return CompletableFuture.completedFuture("deviceTokenDeleted");
}
})
.orElse(CompletableFuture.completedFuture("deviceDeleted")))
.orElse(CompletableFuture.completedFuture("accountDeleted")));
}

public boolean isIdle(final Device device) {
final Duration idleDuration = Duration.between(Instant.ofEpochMilli(device.getLastSeen()), clock.instant());

return idleDuration.compareTo(MIN_IDLE_DURATION) >= 0;
}

public CompletableFuture<Void> scheduleNotification(final Account account, final byte deviceId, final LocalTime preferredDeliveryTime) {
final Instant runAt = SchedulingUtil.getNextRecommendedNotificationTime(account, preferredDeliveryTime, clock);

try {
return scheduleJob(runAt, SystemMapper.jsonMapper().writeValueAsBytes(
new AccountAndDeviceIdentifier(account.getIdentifier(IdentityType.ACI), deviceId)));
} catch (final JsonProcessingException e) {
// This should never happen when serializing an `AccountAndDeviceIdentifier`
throw new AssertionError(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ record CommandDependencies(
FaultTolerantRedisCluster pushSchedulerCluster,
ClientResources.Builder redisClusterClientResourcesBuilder,
BackupManager backupManager,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
DynamoDbAsyncClient dynamoDbAsyncClient) {

static CommandDependencies build(
final String name,
Expand Down Expand Up @@ -271,7 +272,8 @@ static CommandDependencies build(
pushSchedulerCluster,
redisClientResourcesBuilder,
backupManager,
dynamicConfigurationManager
dynamicConfigurationManager,
dynamoDbAsyncClient
);
}

Expand Down
Loading

0 comments on commit 045ec96

Please sign in to comment.