Skip to content

Commit

Permalink
Resolve transactional pooling in java backend
Browse files Browse the repository at this point in the history
galeaspablo committed Dec 9, 2024
1 parent 3257a15 commit a95545c
Showing 9 changed files with 71 additions and 108 deletions.
Original file line number Diff line number Diff line change
@@ -31,6 +31,9 @@ public void processCommand(
commandHandler.handleCommand(command);
postgresTransactionalEventStore.commitTransaction();
mongoTransactionalProjectionOperator.commitTransaction();

postgresTransactionalEventStore.abortDanglingTransactionsAndReturnConnectionToPool();
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
} catch (Exception e) {
log.error("Failed to process reaction command.");
log.error(e);
@@ -41,25 +44,8 @@ public void processCommand(
.collect(Collectors.joining("\n"));
log.error(stackTraceString);

try {
if (postgresTransactionalEventStore.isTransactionActive()) {
postgresTransactionalEventStore.abortTransaction();
}
} catch (Exception postgresException) {
log.error("Failed to abort postgres transaction.");
log.error(postgresException);
log.error(postgresException.getMessage());
}

try {
if (mongoTransactionalProjectionOperator.isTransactionActive()) {
mongoTransactionalProjectionOperator.abortTransaction();
}
} catch (Exception mongoException) {
log.error("Failed to abort mongo transaction.");
log.error(mongoException);
log.error(mongoException.getMessage());
}
postgresTransactionalEventStore.abortDanglingTransactionsAndReturnConnectionToPool();
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();

throw new RuntimeException("Failed to process command with exception: " + e);
}
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@
import cloud.ambar.common.event.CreationEvent;
import cloud.ambar.common.event.Event;
import cloud.ambar.common.event.TransformationEvent;
import cloud.ambar.common.queryhandler.QueryController;
import cloud.ambar.common.serializedevent.Deserializer;
import cloud.ambar.common.serializedevent.SerializedEvent;
import cloud.ambar.common.serializedevent.Serializer;
@@ -18,7 +17,7 @@
import java.util.Optional;

@RequiredArgsConstructor
public class PostgresTransactionalEventStore implements AutoCloseable {
public class PostgresTransactionalEventStore {
private static final Logger log = LogManager.getLogger(PostgresTransactionalEventStore.class);

private final Connection connection;
@@ -108,32 +107,21 @@ public void commitTransaction() {
}
}

public void abortTransaction() {
if (!isTransactionActive) {
throw new RuntimeException("Transaction must be active to abort!");
}
public void abortDanglingTransactionsAndReturnConnectionToPool() {
log.info("PostgresTransactionalEventStore: Aborting dangling transactions and returning connection to pool.");
try {
connection.rollback();
isTransactionActive = false;
} catch (SQLException e) {
throw new RuntimeException("Failed to abort transaction", e);
log.error("Failed to abort transaction", e);
}
}

public boolean isTransactionActive() {
return isTransactionActive;
}

// IMPLEMENTATION OF AutoCloseable INTERFACE - cleanly close dangling transactions
// when the transaction event store gets garbage collected.
// I.e., it will return the event store's connection back to the connection pool.
// Note: There is need to close the connection, because that would mess with the library's connection pool.
// The transactional event store is meant to be used in @RequestScope, so the connection will be cleaned up
// by the library when the transactional event store and its connection are garbage collected.
public void close() {
if (isTransactionActive) {
abortTransaction();
try {
connection.close();
} catch (SQLException e) {
log.error("Failed to close connection", e);
}
log.info("Aborted dangling transactions and returning connection to pool");
}

private List<SerializedEvent> findAllSerializedEventsByAggregateId(String aggregateId) {
Original file line number Diff line number Diff line change
@@ -6,10 +6,16 @@
import com.mongodb.WriteConcern;
import com.mongodb.client.ClientSession;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.data.mongodb.core.MongoTemplate;

import java.sql.SQLException;

@RequiredArgsConstructor
public class MongoTransactionalProjectionOperator implements AutoCloseable {
public class MongoTransactionalProjectionOperator {
private static final Logger log = LogManager.getLogger(MongoTransactionalProjectionOperator.class);

private final MongoTemplate mongoTemplate;

private final ClientSession session;
@@ -41,26 +47,11 @@ public void commitTransaction() {
session.commitTransaction();
}

public boolean isTransactionActive() {
return session.hasActiveTransaction();
}

public void abortTransaction() {
if (!session.hasActiveTransaction()) {
throw new RuntimeException("Transaction must be active to abort transaction for MongoDB!");
}
session.abortTransaction();
}

// IMPLEMENTATION OF AutoCloseable INTERFACE - cleanly close dangling transactions
// when the transactional projection operator gets garbage collected.
// I.e., it will return the projection operator's session back to the connection pool.
// Note: There is no need to close the session, because that would mess with the library's session pool.
// The transactional projection operator is meant to be used in @RequestScope, so the session will be cleaned up
// by the library when the transactional projection operator and its session are garbage collected.
public void close() {
public void abortDanglingTransactionsAndReturnSessionToPool() {
log.info("MongoTransactionalProjectionOperator: Aborting dangling transactions and returning connection to pool.");
if (session.hasActiveTransaction()) {
session.abortTransaction();
}
session.close();
}
}
Original file line number Diff line number Diff line change
@@ -55,11 +55,16 @@ protected String processProjectionHttpRequest(
projectionHandler.project(event);

mongoTransactionalProjectionOperator.commitTransaction();
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();

return AmbarResponseFactory.successResponse();
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().startsWith("Unknown event type")) {
log.warn("Unknown event type. Skipping projection.");
log.warn(e);

mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();

return AmbarResponseFactory.successResponse();
}

@@ -70,9 +75,9 @@ protected String processProjectionHttpRequest(
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n"));
log.error(stackTraceString);
if (mongoTransactionalProjectionOperator.isTransactionActive()) {
mongoTransactionalProjectionOperator.abortTransaction();
}

mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();

return AmbarResponseFactory.retryResponse(e);
}
}
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ public Object processQuery(final Query query, final QueryHandler queryHandler) {
mongoTransactionalProjectionOperator.startTransaction();
Object object = queryHandler.handleQuery(query);
mongoTransactionalProjectionOperator.commitTransaction();
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();

return object;
} catch (Exception e) {
@@ -32,15 +33,7 @@ public Object processQuery(final Query query, final QueryHandler queryHandler) {
.collect(Collectors.joining("\n"));
log.error(stackTraceString);

try {
if (mongoTransactionalProjectionOperator.isTransactionActive()) {
mongoTransactionalProjectionOperator.abortTransaction();
}
} catch (Exception mongoException) {
log.error("Failed to abort mongo transaction.");
log.error(mongoException);
log.error(mongoException.getMessage());
}
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();

throw new RuntimeException("Failed to process query with exception: " + e);
}
Original file line number Diff line number Diff line change
@@ -33,11 +33,16 @@ public String processReactionHttpRequest(final AmbarHttpRequest ambarHttpRequest
postgresTransactionalEventStore.commitTransaction();
mongoTransactionalProjectionOperator.commitTransaction();

postgresTransactionalEventStore.abortDanglingTransactionsAndReturnConnectionToPool();
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();

return AmbarResponseFactory.successResponse();
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().startsWith("Unknown event type")) {
log.warn("Unknown event type. Skipping reaction.");
log.warn(e);
postgresTransactionalEventStore.abortDanglingTransactionsAndReturnConnectionToPool();
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
return AmbarResponseFactory.successResponse();
}

@@ -49,25 +54,8 @@ public String processReactionHttpRequest(final AmbarHttpRequest ambarHttpRequest
.collect(Collectors.joining("\n"));
log.error(stackTraceString);

try {
if (postgresTransactionalEventStore.isTransactionActive()) {
postgresTransactionalEventStore.abortTransaction();
}
} catch (Exception postgresException) {
log.error("Failed to abort postgres transaction.");
log.error(postgresException);
log.error(postgresException.getMessage());
}

try {
if (mongoTransactionalProjectionOperator.isTransactionActive()) {
mongoTransactionalProjectionOperator.abortTransaction();
}
} catch (Exception mongoException) {
log.error("Failed to abort mongo transaction.");
log.error(mongoException);
log.error(mongoException.getMessage());
}
postgresTransactionalEventStore.abortDanglingTransactionsAndReturnConnectionToPool();
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();

return AmbarResponseFactory.retryResponse(e);
}
Original file line number Diff line number Diff line change
@@ -6,10 +6,13 @@
import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@@ -25,6 +28,8 @@ public class MongoConfig {
@Value("${app.mongodb.database}")
private String mongoDatabaseName;

private static final Logger log = LogManager.getLogger(MongoConfig.class);

@Bean("MongoClientForTransactionalSupport")
public MongoClient mongoClientForTransactionalSupport() {
ConnectionString connectionString = new ConnectionString(mongodbUri);
@@ -47,23 +52,23 @@ public MongoClient mongoClientForNonTransactionalOperations() {
ConnectionString connectionString = new ConnectionString(mongodbUri);
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(connectionString)
.applyToConnectionPoolSettings(builder ->
builder.maxSize(20)
.minSize(5)
.maxWaitTime(2000, TimeUnit.MILLISECONDS)
.maxConnectionLifeTime(30, TimeUnit.MINUTES)
.maxConnectionIdleTime(10, TimeUnit.MINUTES)
)
.build();

return MongoClients.create(settings);
}

// It's extremely important to lazily initialize this bean. Why?
// Because the session must be closed each time, so anyone who asks for this bean must either close
// it explicitly or rely on something else closing it explicitly (such as the controller).
// Why must it be closed? Because we might run out of slots in the pool.
// If we didn't initalize it lazily, requests that don't need this bean would still create a session.
@Bean
@Lazy
@RequestScope
public MongoTransactionalProjectionOperator mongoTransactionalAPI(
public MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator(
@Qualifier("MongoClientForNonTransactionalOperations") MongoClient mongoClient
) {
log.info("MongoClientForNonTransactionalOperations: Creating new session.");
ClientSession session = mongoClient.startSession();
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, mongoDatabaseName).withSession(session);

@@ -75,7 +80,7 @@ public MongoTransactionalProjectionOperator mongoTransactionalAPI(
}

@Bean
public MongoInitializerApi mongoInitalizerApi(
public MongoInitializerApi mongoInitializerApi(
@Qualifier("MongoClientForNonTransactionalOperations") MongoClient mongoClient
) {
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, mongoDatabaseName);
Original file line number Diff line number Diff line change
@@ -3,10 +3,13 @@
import cloud.ambar.common.eventstore.PostgresTransactionalEventStore;
import cloud.ambar.common.serializedevent.Deserializer;
import cloud.ambar.common.serializedevent.Serializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.context.annotation.RequestScope;

import javax.sql.DataSource;
@@ -35,6 +38,8 @@ public class PostgresConfig {
@Value("${app.postgresql.eventStoreCreateReplicationPublication}")
private String postgresReplicationPublicationName;

private static final Logger log = LogManager.getLogger(PostgresConfig.class);

@Bean
@Qualifier("DataSourceForTransactionalSupport")
public DataSource dataSourceForTransactionalSupport() {
@@ -44,8 +49,8 @@ public DataSource dataSourceForTransactionalSupport() {

config.setMaximumPoolSize(10);
config.setMinimumIdle(5);
config.setIdleTimeout(300000); // 5 minutes
config.setConnectionTimeout(20000); // 20 seconds
config.setIdleTimeout(300000);
config.setConnectionTimeout(20000);

return new HikariDataSource(config);
}
@@ -56,22 +61,24 @@ public DataSource dataSourceNonTransactionalOperations() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(postgresUri);

config.setMaximumPoolSize(10);
config.setMinimumIdle(5);
config.setIdleTimeout(300000);
config.setConnectionTimeout(20000);

return new HikariDataSource(config);
}

// It's extremely important to lazily initialize this bean. Why?
// Because the connection must be closed each time, so anyone who asks for this bean must either close
// it explicitly or rely on something else closing it explicitly (such as the controller).
// Why must it be closed? Because we might run out of slots in the pool.
// If we didn't initalize it lazily, requests that don't need this bean would still create a connection.
@Bean
@Lazy
@RequestScope
public PostgresTransactionalEventStore postgresTransactionalAPI(
public PostgresTransactionalEventStore postgresTransactionalEventStore(
@Qualifier("DataSourceForTransactionalSupport") DataSource dataSource,
Serializer serializer,
Deserializer deserializer
) {
try {
log.info("PostgresTransactionalEventStore: Creating new connection.");
Connection connection = dataSource.getConnection();
return new PostgresTransactionalEventStore(connection, serializer, deserializer, postgresTable);
} catch (SQLException e) {
2 changes: 1 addition & 1 deletion local-development/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -191,7 +191,7 @@ services:
ipv4_address: 172.30.0.108

ambar-emulator:
image: ambarltd/emulator:v1.5
image: ambarltd/emulator:v1.6
container_name: ambar-emulator
restart: always
healthcheck:

0 comments on commit a95545c

Please sign in to comment.