Skip to content

Commit

Permalink
Use a singleton for DynamoDbMasterMonitor (#737)
Browse files Browse the repository at this point in the history
We encountered an issue where the MasterMonitor's `shutdown` method was
called somewhere.  This led to clients relying on updates to the
MasterMonitor stopping receiving updates but they had no mechanism of
knowing that the monitor they were subscribed to was no longer updating
because the observable was not completed and getting the latest version
would continue to return the last known value (even though that value
was not updated).

We were deciding between either using a Singleton or tracking down
`shutdown` calls to `MasterMonitor` and ensuring that usages could
resubscribe if necessary.  The latter seemed like it would be prone to
breakage in the future (e.g. particularly with the master observable).
Additionally, a Singleton pattern seemed fitting given that almost every
reasonable use case of Mantis relies on a sustained connection to the
Mantis Master.  So there's no reason to expect that shutting down the
executor pool for updating the master is necessary.  Additionally, using
a Singleton avoids any unbounded resource leakage (e.g. it is still safe
to create N MasterMonitors).
  • Loading branch information
timmartin-stripe authored Dec 23, 2024
1 parent 419c15f commit a18a200
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,180 +15,33 @@
*/
package io.mantisrx.extensions.dynamodb;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
import com.amazonaws.services.dynamodbv2.LockItem;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.json.DefaultObjectMapper;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.subjects.BehaviorSubject;


@Slf4j
public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor {

private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class);


private final ThreadFactory monitorThreadFactory = r -> {
Thread thread = new Thread(r);
thread.setName("dynamodb-monitor-" + System.currentTimeMillis());
thread.setDaemon(true); // allow JVM to shutdown if monitor is still running
thread.setPriority(Thread.NORM_PRIORITY);
thread.setUncaughtExceptionHandler((t, e) -> logger.error("thread: {} failed with {}", t.getName(), e.getMessage(), e) );
return thread;
};
private final ScheduledExecutorService leaderMonitor =
Executors.newScheduledThreadPool(1, monitorThreadFactory);

// Assuming your lock client's options are in a variable named options
private final AmazonDynamoDBLockClient lockClient;

private final String partitionKey;

private final Duration pollInterval;

private final Duration gracefulShutdown;

private final BehaviorSubject<MasterDescription> masterSubject;

private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance();

private final Metrics metrics;

private final Counter noLockPresentCounter;
private final Counter lockDecodeFailedCounter;
private final Counter nullNextLeaderCounter;
private final DynamoDBMasterMonitorSingleton singleton;

/**
* Creates a MasterMonitor backed by DynamoDB. This should be used if you are using a {@link DynamoDBLeaderElector}
*/
public DynamoDBMasterMonitor() {
this(DynamoDBClientSingleton.getLockClient(),
DynamoDBClientSingleton.getPartitionKey(),
Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBLeaderHeartbeatDuration()),
Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBMonitorGracefulShutdownDuration()));
}

public DynamoDBMasterMonitor(
AmazonDynamoDBLockClient lockClient,
String partitionKey,
Duration pollInterval,
Duration gracefulShutdown) {
masterSubject = BehaviorSubject.create(MasterDescription.MASTER_NULL);
this.lockClient = lockClient;
this.partitionKey = partitionKey;
this.pollInterval = pollInterval;
this.gracefulShutdown = gracefulShutdown;

Metrics m = new Metrics.Builder()
.id("DynamoDBMasterMonitor")
.addCounter("no_lock_present")
.addCounter("lock_decode_failed")
.addCounter("null_next_leader")
.build();
this.metrics = MetricsRegistry.getInstance().registerAndGet(m);

this.noLockPresentCounter = metrics.getCounter("no_lock_present");
this.lockDecodeFailedCounter = metrics.getCounter("lock_decode_failed");
this.nullNextLeaderCounter = metrics.getCounter("null_next_leader");
this.singleton = DynamoDBMasterMonitorSingleton.getInstance();
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void start() {
leaderMonitor.scheduleAtFixedRate(
this::getCurrentLeader, 0, pollInterval.toMillis(), TimeUnit.MILLISECONDS);
}
public void start() {}

@Override
public void shutdown() {
logger.info("close the lock client");
try {
lockClient.close();
} catch (IOException e) {
logger.error("error closing the dynamodb lock client", e);
}

try {
final boolean isTerminated =
leaderMonitor.awaitTermination(gracefulShutdown.toMillis(), TimeUnit.MILLISECONDS);
if (!isTerminated) {
leaderMonitor.shutdownNow();
}
} catch (InterruptedException e) {
logger.error("error timeout waiting on leader monitor to terminate executor", e);
}
logger.info("leader monitor shutdown");
}

@SuppressWarnings("FutureReturnValueIgnored")
private void getCurrentLeader() {
logger.info("attempting leader lookup");
final Optional<LockItem> optionalLock = lockClient.getLock(partitionKey, Optional.empty());
final MasterDescription nextDescription;
if (optionalLock.isPresent()) {
final LockItem lock = optionalLock.get();
nextDescription = lock.getData().map(this::bytesToMaster).orElse(null);
} else {
nextDescription = null;
logger.warn("no leader found");
this.noLockPresentCounter.increment();
}

if (nextDescription != null) {
updateLeader(nextDescription);
} else {
this.nullNextLeaderCounter.increment();
}
}

private void updateLeader(@Nullable MasterDescription nextDescription) {
final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL);
final MasterDescription next = (nextDescription == null) ? MasterDescription.MASTER_NULL : nextDescription;
if (!prev.equals(next)) {
logger.info("leader changer information previous {} and next {}", prev.getHostname(), next.getHostname());
masterSubject.onNext(next);
}
}

private MasterDescription bytesToMaster(ByteBuffer data) {
// It is possible that the underlying buffer is read more than once,
// so if the offset of the buffer is at the end, rewind, so we can read it.
if (!data.hasRemaining()) {
data.rewind();
}
final byte[] bytes = new byte[data.remaining()];
data.get(bytes);
try {
return jsonMapper.readValue(bytes, MasterDescription.class);
} catch (IOException e) {
logger.error("unable to parse master description bytes: {}", data, e);
this.lockDecodeFailedCounter.increment();
}
return MasterDescription.MASTER_NULL;
}
public void shutdown() {}

@Override
public Observable<MasterDescription> getMasterObservable() {
return masterSubject;
return singleton.getMasterSubject();
}

/**
Expand All @@ -198,8 +51,7 @@ public Observable<MasterDescription> getMasterObservable() {
* @return Latest description of the master
*/
@Override
@Nullable
public MasterDescription getLatestMaster() {
return Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL);
return singleton.getMasterSubject().getValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Copyright 2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.extensions.dynamodb;


import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
import com.amazonaws.services.dynamodbv2.LockItem;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.server.core.json.DefaultObjectMapper;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.subjects.BehaviorSubject;

class DynamoDBMasterMonitorSingleton {
private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class);


private final ThreadFactory monitorThreadFactory = r -> {
Thread thread = new Thread(r);
thread.setName("dynamodb-monitor-" + System.currentTimeMillis());
thread.setDaemon(true); // allow JVM to shut down if monitor is still running
thread.setPriority(Thread.NORM_PRIORITY);
thread.setUncaughtExceptionHandler((t, e) -> logger.error("thread: {} failed with {}", t.getName(), e.getMessage(), e) );
return thread;
};
private final ScheduledExecutorService leaderMonitor =
Executors.newScheduledThreadPool(1, monitorThreadFactory);

// Assuming your lock client's options are in a variable named options
private final AmazonDynamoDBLockClient lockClient;

private final String partitionKey;

private final Duration gracefulShutdown;

private final BehaviorSubject<MasterDescription> masterSubject;

private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance();

private final Duration pollInterval;

private final Counter noLockPresentCounter;
private final Counter lockDecodeFailedCounter;
private final Counter nullNextLeaderCounter;
private final Counter leaderChangedCounter;
private final Counter refreshedLeaderCounter;

private static volatile DynamoDBMasterMonitorSingleton instance = null;

public static synchronized DynamoDBMasterMonitorSingleton getInstance() {
if (instance == null) {
instance = new DynamoDBMasterMonitorSingleton();
Runtime.getRuntime()
.addShutdownHook(new Thread(instance::shutdown, "dynamodb-monitor-shutdown-" + instance.hashCode()));
instance.start();
}
return instance;
}

/**
* Creates a MasterMonitor backed by DynamoDB. This should be used if you are using a {@link DynamoDBLeaderElector}
*/
DynamoDBMasterMonitorSingleton() {
this(DynamoDBClientSingleton.getLockClient(),
DynamoDBClientSingleton.getPartitionKey(),
Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBLeaderHeartbeatDuration()),
Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBMonitorGracefulShutdownDuration()));
}

DynamoDBMasterMonitorSingleton(
AmazonDynamoDBLockClient lockClient,
String partitionKey,
Duration pollInterval,
Duration gracefulShutdown) {
masterSubject = BehaviorSubject.create(MasterDescription.MASTER_NULL);
this.lockClient = lockClient;
this.partitionKey = partitionKey;
this.pollInterval = pollInterval;
this.gracefulShutdown = gracefulShutdown;

Metrics m = new Metrics.Builder()
.id("DynamoDBMasterMonitor")
.addCounter("no_lock_present")
.addCounter("lock_decode_failed")
.addCounter("null_next_leader")
.addCounter("refreshed_leader")
.addCounter("leader_changed")
.build();
Metrics metrics = MetricsRegistry.getInstance().registerAndGet(m);

this.noLockPresentCounter = metrics.getCounter("no_lock_present");
this.lockDecodeFailedCounter = metrics.getCounter("lock_decode_failed");
this.nullNextLeaderCounter = metrics.getCounter("null_next_leader");
this.refreshedLeaderCounter = metrics.getCounter("refreshed_leader");
this.leaderChangedCounter = metrics.getCounter("leader_changed");
}

public void start() {
logger.info("starting leader monitor");
leaderMonitor.scheduleAtFixedRate(
this::getCurrentLeader, 0, pollInterval.toMillis(), TimeUnit.MILLISECONDS);
}

void shutdown() {
logger.info("close the lock client");
try {
lockClient.close();
} catch (IOException e) {
logger.error("error closing the dynamodb lock client", e);
}

try {
final boolean isTerminated =
leaderMonitor.awaitTermination(gracefulShutdown.toMillis(), TimeUnit.MILLISECONDS);
if (!isTerminated) {
leaderMonitor.shutdownNow();
}
} catch (InterruptedException e) {
logger.error("error timeout waiting on leader monitor to terminate executor", e);
}
masterSubject.onCompleted();
logger.info("leader monitor shutdown");
}

@SuppressWarnings("FutureReturnValueIgnored")
private void getCurrentLeader() {
logger.info("attempting leader lookup");
final Optional<LockItem> optionalLock = lockClient.getLock(partitionKey, Optional.empty());
final MasterDescription nextDescription;
if (optionalLock.isPresent()) {
final LockItem lock = optionalLock.get();
nextDescription = lock.getData().map(this::bytesToMaster).orElse(null);
} else {
nextDescription = null;
logger.warn("no leader found");
this.noLockPresentCounter.increment();
}

if (nextDescription != null) {
updateLeader(nextDescription);
} else {
this.nullNextLeaderCounter.increment();
}
}

private void updateLeader(MasterDescription nextDescription) {
this.refreshedLeaderCounter.increment();
final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL);
if (!prev.equals(nextDescription)) {
this.leaderChangedCounter.increment();
logger.info("leader changer information previous {} and next {}", prev.getHostname(), nextDescription.getHostname());
masterSubject.onNext(nextDescription);
}
}

private MasterDescription bytesToMaster(ByteBuffer data) {
// It is possible that the underlying buffer is read more than once,
// so if the offset of the buffer is at the end, rewind, so we can read it.
if (!data.hasRemaining()) {
data.rewind();
}
final byte[] bytes = new byte[data.remaining()];
data.get(bytes);
try {
return jsonMapper.readValue(bytes, MasterDescription.class);
} catch (IOException e) {
logger.error("unable to parse master description bytes: {}", data, e);
this.lockDecodeFailedCounter.increment();
}
return MasterDescription.MASTER_NULL;
}

BehaviorSubject<MasterDescription> getMasterSubject() {
return masterSubject;
}
}
Loading

0 comments on commit a18a200

Please sign in to comment.