Skip to content

Commit

Permalink
(Java) Rename mongo non transactional api + implement part 1 of trans…
Browse files Browse the repository at this point in the history
…actional event store
  • Loading branch information
galeaspablo committed Dec 8, 2024
1 parent 1e66ac6 commit 8392907
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package cloud.ambar.common.commandhandler;

abstract public class Command {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cloud.ambar.common.commandhandler;

import cloud.ambar.common.eventstore.EventStore;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.stream.Collectors;

@RequiredArgsConstructor
public class CommandController {
private final EventStore eventStore;
private static final Logger log = LogManager.getLogger(CommandController.class);

public void processCommand(final Command command, final CommandHandler commandHandler) {
try {
eventStore.beginTransaction();
commandHandler.handleCommand(command);
eventStore.commitTransaction();
} catch (Exception e) {
log.error("Failed to process reaction command.");
log.error(e);
log.error(e.getMessage());

String stackTraceString = Arrays.stream(e.getStackTrace())
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n"));
log.error(stackTraceString);

if (eventStore.isTransactionActive()) {
eventStore.abortTransaction();
eventStore.closeSession();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class CommandHandler {
abstract public class CommandHandler {
final protected EventStore eventStore;
protected abstract void handleCommand(Command command);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,23 @@
import cloud.ambar.common.serializedevent.Serializer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;

import java.util.List;

@Service
@RequestScope
@RequiredArgsConstructor
public class EventStore {
private final EventRepository eventRepository;
private final Deserializer deserializer;
private final Serializer serializer;

public boolean doesEventAlreadyExist(String eventId) {
return eventRepository.findByEventId(eventId).isPresent();
}
private boolean transactionActive = false;

public AggregateAndEventIdsInLastEvent findAggregate(String aggregateId) {
if (!transactionActive) {
throw new RuntimeException("Transaction must be active to read aggregate from event store!");
}
final List<SerializedEvent> serializedEvents = eventRepository.findAllByAggregateId(aggregateId);
final List<Event> events = serializedEvents.stream()
.map(deserializer::deserialize)
Expand Down Expand Up @@ -66,7 +68,50 @@ public AggregateAndEventIdsInLastEvent findAggregate(String aggregateId) {
}

public void saveEvent(Event event) {
// todo transactional
if (!transactionActive) {
throw new RuntimeException("Transaction must be active to write into event store!");
}
eventRepository.save(serializer.serialize(event));
}

public boolean doesEventAlreadyExist(String eventId) {
if (!transactionActive) {
throw new RuntimeException("Transaction must be active to read event from event store!");
}
return eventRepository.findByEventId(eventId).isPresent();
}

public void beginTransaction() {
if (transactionActive) {
throw new RuntimeException("Transaction already active.");
}
// todo transaction begin
transactionActive = true;
}

public void commitTransaction() {
if (!transactionActive) {
throw new RuntimeException("No transaction to commit.");
}

// todo transaction commit
transactionActive = false;
}

public void abortTransaction() {
if (!transactionActive) {
throw new RuntimeException("No transaction to abort");
}

// todo abort transaction
transactionActive = false;
}

public void closeSession() {
// todo close session
}

public boolean isTransactionActive() {
return transactionActive;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public MongoTransactionalAPI mongoTransactionalAPI() {
}

@Bean
public MongoInitializerApi mongoInitializerApi() {
public MongoNonTransactionalApi mongoNonTransactionalApi() {
MongoClient mongoClient = MongoClients.create(mongodbUri);
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, mongoDatabaseName);

// Disable _class field in mongo documents
MappingMongoConverter converter = (MappingMongoConverter) mongoTemplate.getConverter();
converter.setTypeMapper(new DefaultMongoTypeMapper(null));

return new MongoInitializerApi(mongoTemplate);
return new MongoNonTransactionalApi(mongoTemplate);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@
public class MongoInitializer {
private static final Logger log = LogManager.getLogger(MongoInitializer.class);

private final MongoInitializerApi mongoInitializerApi;
private final MongoNonTransactionalApi mongoNonTransactionalApi;

public MongoInitializer(MongoInitializerApi mongoInitializerApi) {
this.mongoInitializerApi = mongoInitializerApi;
public MongoInitializer(MongoNonTransactionalApi mongoNonTransactionalApi) {
this.mongoNonTransactionalApi = mongoNonTransactionalApi;
}

@Bean
ApplicationRunner initMongo() {
return args -> {
log.info("Creating collections");
mongoInitializerApi.operate().createCollection("CreditCard_Enrollment_Enrollment");
mongoInitializerApi.operate().createCollection("CreditCard_Enrollment_ProductName");
mongoInitializerApi.operate().createCollection("CreditCard_Enrollment_ProductActiveStatus");
mongoNonTransactionalApi.operate().createCollection("CreditCard_Enrollment_Enrollment");
mongoNonTransactionalApi.operate().createCollection("CreditCard_Enrollment_ProductName");
mongoNonTransactionalApi.operate().createCollection("CreditCard_Enrollment_ProductActiveStatus");
log.info("Created collections");

log.info("Creating indexes");
mongoInitializerApi.operate().indexOps("CreditCard_Enrollment_Enrollment")
mongoNonTransactionalApi.operate().indexOps("CreditCard_Enrollment_Enrollment")
.ensureIndex(new Index().on("userId", org.springframework.data.domain.Sort.Direction.ASC));
log.info("Created indexes");
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.springframework.data.mongodb.core.MongoTemplate;

@RequiredArgsConstructor
public class MongoInitializerApi {
public class MongoNonTransactionalApi {
private final MongoTemplate mongoTemplate;

public MongoTemplate operate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ protected String processHttpRequest(
log.error(stackTraceString);
if (mongoTransactionalAPI.isTransactionActive()) {
mongoTransactionalAPI.abortTransaction();
mongoTransactionalAPI.closeSession();
}
return AmbarResponseFactory.retryResponse(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import cloud.ambar.common.ambar.AmbarHttpRequest;
import cloud.ambar.common.ambar.AmbarResponseFactory;
import cloud.ambar.common.eventstore.EventStore;
import cloud.ambar.common.serializedevent.Deserializer;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
Expand All @@ -12,14 +13,18 @@

@RequiredArgsConstructor
public abstract class ReactionController {
private final EventStore eventStore;
private final Deserializer deserializer;
private static final Logger log = LogManager.getLogger(ReactionController.class);

public String processHttpRequest(final AmbarHttpRequest ambarHttpRequest, final ReactionHandler reactionHandler) {
try {
log.info("Reaction received http request: " + ambarHttpRequest);

eventStore.beginTransaction();
reactionHandler.react(deserializer.deserialize(ambarHttpRequest.getSerializedEvent()));
eventStore.commitTransaction();

return AmbarResponseFactory.successResponse();
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().startsWith("Unknown event type")) {
Expand All @@ -35,6 +40,10 @@ public String processHttpRequest(final AmbarHttpRequest ambarHttpRequest, final
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n"));
log.error(stackTraceString);
if (eventStore.isTransactionActive()) {
eventStore.abortTransaction();
eventStore.closeSession();
}
return AmbarResponseFactory.retryResponse(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
package cloud.ambar.common.sessionauth;

import cloud.ambar.common.projection.MongoTransactionalAPI;
import cloud.ambar.common.projection.MongoNonTransactionalApi;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;

import java.time.Instant;
import java.util.Optional;

@Service
@RequiredArgsConstructor
@RequestScope
public class SessionRepository {
private static final Logger log = LogManager.getLogger(SessionRepository.class);
private final SessionConfig sessionConfig;
private final MongoTransactionalAPI mongoTransactionalAPI;
private final MongoNonTransactionalApi mongoNonTransactionalApi;
public Optional<String> authenticatedUserIdFromSessionToken(String sessionToken) {
Session session = mongoTransactionalAPI.operate().findOne(
Session session = mongoNonTransactionalApi.operate().findOne(
Query.query(
Criteria.where("sessionToken").is(sessionToken)
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package cloud.ambar.creditcard.enrollment.commandhandler;

import cloud.ambar.common.commandhandler.Command;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;

@Builder
@Getter
public class RequestEnrollmentCommand {
public class RequestEnrollmentCommand extends Command {
@NonNull private String userId;
@NonNull private String productId;
@NonNull private Integer annualIncomeInCents;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package cloud.ambar.creditcard.enrollment.commandhandler;

import cloud.ambar.common.commandhandler.Command;
import cloud.ambar.common.commandhandler.CommandHandler;
import cloud.ambar.common.eventstore.EventStore;
import cloud.ambar.creditcard.enrollment.event.EnrollmentRequested;
import cloud.ambar.creditcard.enrollment.exception.InactiveProductException;
import cloud.ambar.creditcard.enrollment.projection.isproductactive.IsProductActive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;

Expand All @@ -16,18 +15,23 @@

@Service
@RequestScope
public class EnrollmentCommandHandler extends CommandHandler {
private static final Logger log = LogManager.getLogger(EnrollmentCommandHandler.class);

public class RequestEnrollmentCommandHandler extends CommandHandler {
private final IsProductActive isProductActive;

public EnrollmentCommandHandler(EventStore eventStore, IsProductActive isProductActive) {
public RequestEnrollmentCommandHandler(EventStore eventStore, IsProductActive isProductActive) {
super(eventStore);
this.isProductActive = isProductActive;
}

public void handle(final RequestEnrollmentCommand command) {
log.info("Handling enrollment request for user: {}, product: {}", command.getUserId(), command.getProductId());
protected void handleCommand(Command command) {
if (command instanceof RequestEnrollmentCommand) {
handleRequestEnrollment((RequestEnrollmentCommand) command);
} else {
throw new IllegalArgumentException("Unsupported command type: " + command.getClass().getName());
}
}

private void handleRequestEnrollment(final RequestEnrollmentCommand command) {
if (!isProductActive.isProductActive(command.getProductId())) {
throw new InactiveProductException();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
package cloud.ambar.creditcard.enrollment.controller;

import cloud.ambar.common.commandhandler.CommandController;
import cloud.ambar.common.eventstore.EventStore;
import cloud.ambar.common.sessionauth.SessionService;
import cloud.ambar.creditcard.enrollment.commandhandler.EnrollmentCommandHandler;
import cloud.ambar.creditcard.enrollment.commandhandler.RequestEnrollmentCommandHandler;
import cloud.ambar.creditcard.enrollment.commandhandler.RequestEnrollmentCommand;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.annotation.RequestScope;

@RestController
@RequestScope
@RequiredArgsConstructor
@RequestMapping("/api/v1/credit_card/enrollment")
public class EnrollmentCommandController {
public class EnrollmentCommandController extends CommandController {
private final SessionService sessionService;

private final EnrollmentCommandHandler enrollmentService;
private final RequestEnrollmentCommandHandler requestEnrollmentCommandHandler;

public EnrollmentCommandController(
EventStore eventStore,
SessionService sessionService,
RequestEnrollmentCommandHandler requestEnrollmentCommandHandler
) {
super(eventStore);
this.sessionService = sessionService;
this.requestEnrollmentCommandHandler = requestEnrollmentCommandHandler;
}

@PostMapping("/request-enrollment")
@ResponseStatus(HttpStatus.OK)
Expand All @@ -30,6 +40,6 @@ public void requestEnrollment(
.annualIncomeInCents(request.getAnnualIncomeInCents())
.build();

enrollmentService.handle(command);
processCommand(command, requestEnrollmentCommandHandler);
}
}

0 comments on commit 8392907

Please sign in to comment.