From a0e5fdffc9b408d1c0050795b9b0d013ecf01a60 Mon Sep 17 00:00:00 2001 From: Andrea De Rinaldis Date: Thu, 26 Oct 2023 11:49:21 +0200 Subject: [PATCH] [NOD-541] fix: refactored event dumping strategy and error logging --- .../nodoverifykototablestorage/Info.java | 4 +- .../NodoVerifyKOEventToTableStorage.java | 81 +++++++++++++------ .../util/Constants.java | 29 +++++-- 3 files changed, 79 insertions(+), 35 deletions(-) diff --git a/src/main/java/it/gov/pagopa/nodoverifykototablestorage/Info.java b/src/main/java/it/gov/pagopa/nodoverifykototablestorage/Info.java index c1a9569..42f6216 100644 --- a/src/main/java/it/gov/pagopa/nodoverifykototablestorage/Info.java +++ b/src/main/java/it/gov/pagopa/nodoverifykototablestorage/Info.java @@ -4,8 +4,8 @@ import com.microsoft.azure.functions.annotation.AuthorizationLevel; import com.microsoft.azure.functions.annotation.FunctionName; import com.microsoft.azure.functions.annotation.HttpTrigger; -import it.gov.pagopa.nodoverifykotodatastore.util.AppInfo; -import it.gov.pagopa.nodoverifykotodatastore.util.Constants; +import it.gov.pagopa.nodoverifykototablestorage.util.AppInfo; +import it.gov.pagopa.nodoverifykototablestorage.util.Constants; import java.io.InputStream; import java.util.Optional; diff --git a/src/main/java/it/gov/pagopa/nodoverifykototablestorage/NodoVerifyKOEventToTableStorage.java b/src/main/java/it/gov/pagopa/nodoverifykototablestorage/NodoVerifyKOEventToTableStorage.java index b2af715..d267bf1 100644 --- a/src/main/java/it/gov/pagopa/nodoverifykototablestorage/NodoVerifyKOEventToTableStorage.java +++ b/src/main/java/it/gov/pagopa/nodoverifykototablestorage/NodoVerifyKOEventToTableStorage.java @@ -14,10 +14,7 @@ import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; @@ -44,34 +41,67 @@ public void processNodoVerifyKOEvent ( final ExecutionContext context) { Logger logger = context.getLogger(); - logger.log(Level.INFO, "Persisting {0} events...", events.size()); + logger.log(Level.INFO, () -> String.format("Persisting [%d] events...", events.size())); try { if (events.size() == properties.length) { Map> partitionedEvents = new HashMap<>(); for (int index = 0; index < properties.length; index++) { - final Map event = ObjectMapperUtils.readValue(events.get(index), Map.class); + String eventInStringForm = events.get(index); + final Map event = ObjectMapperUtils.readValue(eventInStringForm, Map.class); + + final Map eventToBeStored = new HashMap<>(); // update event with the required parameters and other needed fields - properties[index].forEach((property, value) -> event.put(replaceDashWithUppercase(property), value)); - String insertedDateValue = event.get(Constants.INSERTED_TIMESTAMP_EVENT_FIELD) != null ? ((String)event.get(Constants.INSERTED_TIMESTAMP_EVENT_FIELD)).substring(0, 10) : Constants.NA; - event.put(Constants.INSERTED_DATE_EVENT_FIELD, insertedDateValue); - event.put(Constants.PAYLOAD_EVENT_FIELD, getPayload(logger, event)); - event.put(Constants.PARTITION_KEY_EVENT_FIELD, generatePartitionKey(event, insertedDateValue)); - addToBatch(partitionedEvents, event); + properties[index].forEach((property, value) -> eventToBeStored.put(replaceDashWithUppercase(property), value)); + + String insertedTimestampValue = getEventField(event, Constants.INSERTED_TIMESTAMP_EVENT_FIELD, String.class, Constants.NA); + String insertedDateValue = Constants.NA.equals(insertedTimestampValue) ? Constants.NA : insertedTimestampValue.substring(0, 10); + + // inserting the identification columns on event saved in Table Storage + eventToBeStored.put(Constants.PARTITION_KEY_TABLESTORAGE_EVENT_FIELD, insertedDateValue); + eventToBeStored.put(Constants.ROW_KEY_TABLESTORAGE_EVENT_FIELD, generateRowKey(event, insertedTimestampValue)); + + // inserting the additional columns on event saved in Table Storage + eventToBeStored.put(Constants.UNIQUE_ID_TABLESTORAGE_EVENT_FIELD, event.get(Constants.ID_EVENT_FIELD)); + eventToBeStored.put(Constants.TIMESTAMP_TABLESTORAGE_EVENT_FIELD, insertedTimestampValue); + eventToBeStored.put(Constants.NOTICE_NUMBER_TABLESTORAGE_EVENT_FIELD, getEventField(event, Constants.NOTICE_NUMBER_EVENT_FIELD, String.class, Constants.NA)); + eventToBeStored.put(Constants.ID_PA_TABLESTORAGE_EVENT_FIELD, getEventField(event, Constants.ID_PA_EVENT_FIELD, String.class, Constants.NA)); + eventToBeStored.put(Constants.ID_PSP_TABLESTORAGE_EVENT_FIELD, getEventField(event, Constants.ID_PSP_EVENT_FIELD, String.class, Constants.NA)); + eventToBeStored.put(Constants.ID_STATION_TABLESTORAGE_EVENT_FIELD, getEventField(event, Constants.ID_STATION_EVENT_FIELD, String.class, Constants.NA)); + eventToBeStored.put(Constants.ID_CHANNEL_TABLESTORAGE_EVENT_FIELD, getEventField(event, Constants.ID_CHANNEL_EVENT_FIELD, String.class, Constants.NA)); + eventToBeStored.put(Constants.PAYLOAD_TABLESTORAGE_EVENT_FIELD, new String(Base64.getEncoder().encode(eventInStringForm.getBytes()))); // TODO to be refactored + + addToBatch(partitionedEvents, eventToBeStored); } // save all events in the retrieved batch in the storage persistEventBatch(logger, partitionedEvents); } } catch (NullPointerException e) { - logger.severe("NullPointerException exception on table storage nodo-verify-ko-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage()); + logger.log(Level.SEVERE, () -> "[ALERT][VerifyKOToTS] AppException - NullPointerException exception on table storage nodo-verify-ko-events msg ingestion at " + LocalDateTime.now() + " : " + e.getMessage()); } catch (Throwable e) { - logger.severe("Generic exception on table storage nodo-verify-ko-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage()); + logger.log(Level.SEVERE, () -> "[ALERT][VerifyKOToTS] AppException - Generic exception on table storage nodo-verify-ko-events msg ingestion at " + LocalDateTime.now() + " : " + e.getMessage()); } } + private T getEventField(Map event, String name, Class clazz, T defaultValue) { + T field = null; + List splitPath = List.of(name.split("\\.")); + Map eventSubset = event; + Iterator it = splitPath.listIterator(); + while(it.hasNext()) { + Object retrievedEventField = eventSubset.get(it.next()); + if (!it.hasNext()) { + field = clazz.cast(retrievedEventField); + } else { + eventSubset = (Map) retrievedEventField; + } + } + return field == null ? defaultValue : field; + } + private String replaceDashWithUppercase(String input) { if (!input.contains("-")){ return input; @@ -94,8 +124,8 @@ private static TableServiceClient getTableServiceClient(){ } private void addToBatch(Map> partitionEvents, Map event) { - if (event.get(Constants.UNIQUE_ID_EVENT_FIELD) != null) { - TableEntity entity = new TableEntity((String) event.get(Constants.PARTITION_KEY_EVENT_FIELD), (String) event.get(Constants.UNIQUE_ID_EVENT_FIELD)); + if (event.get(Constants.UNIQUE_ID_TABLESTORAGE_EVENT_FIELD) != null) { + TableEntity entity = new TableEntity((String) event.get(Constants.PARTITION_KEY_TABLESTORAGE_EVENT_FIELD), (String) event.get(Constants.UNIQUE_ID_TABLESTORAGE_EVENT_FIELD)); entity.setProperties(event); if (!partitionEvents.containsKey(entity.getPartitionKey())){ partitionEvents.put(entity.getPartitionKey(), new ArrayList<>()); @@ -106,7 +136,7 @@ private void addToBatch(Map> partitionEvents private byte[] getPayload(Logger logger, Map event) { ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream(); - String payload = (String) event.get(Constants.PAYLOAD_EVENT_FIELD); + String payload = (String) event.get(Constants.PAYLOAD_TABLESTORAGE_EVENT_FIELD); byte[] data = null; if (payload != null) { try { @@ -115,19 +145,18 @@ private byte[] getPayload(Logger logger, Map event) { deflaterOutputStream.write(data); deflaterOutputStream.close(); } catch (Exception e) { - logger.severe(e.getMessage()); + logger.log(Level.SEVERE, () -> "[ALERT][VerifyKOToTS] AppException - Error while generating payload for event: " + e.getMessage()); } } return data != null ? byteArrayStream.toByteArray() : null; } - private String generatePartitionKey(Map event, String insertedDateValue) { - return new StringBuilder().append(insertedDateValue) - .append("-") - .append(event.get(Constants.ID_DOMINIO_EVENT_FIELD) != null ? event.get(Constants.ID_DOMINIO_EVENT_FIELD).toString() : Constants.NA) - .append("-") - .append(event.get(Constants.PSP_EVENT_FIELD) != null ? event.get(Constants.PSP_EVENT_FIELD).toString() : Constants.NA) - .toString(); + private String generateRowKey(Map event, String insertedDateValue) { + return insertedDateValue.replace(":", "").replace(".", "").replace("T", "").replace("-", "") + + "-" + + getEventField(event, Constants.CREDITOR_ID_EVENT_FIELD, String.class, Constants.NA) + + "-" + + getEventField(event, Constants.PSP_ID_EVENT_FIELD, String.class, Constants.NA); } private void persistEventBatch(Logger logger, Map> partitionedEvents) { @@ -136,7 +165,7 @@ private void persistEventBatch(Logger logger, Map "[ALERT][VerifyKOToTS] Persistence Exception - Could not save " + values.size() + " events (partition [" + partition + "]) on Azure Table Storage, error: " + e); } }); logger.info("Done processing events"); diff --git a/src/main/java/it/gov/pagopa/nodoverifykototablestorage/util/Constants.java b/src/main/java/it/gov/pagopa/nodoverifykototablestorage/util/Constants.java index fb2a3ed..f58738a 100644 --- a/src/main/java/it/gov/pagopa/nodoverifykototablestorage/util/Constants.java +++ b/src/main/java/it/gov/pagopa/nodoverifykototablestorage/util/Constants.java @@ -4,18 +4,33 @@ public class Constants { + private Constants() {} public static final String POM_PROPERTIES_PATH = "/META-INF/maven/it.gov.pagopa/nodoverifykotodatastore/pom.properties"; public static final Pattern REPLACE_DASH_PATTERN = Pattern.compile("-([a-zA-Z])"); public static final String NA = "NA"; - public static final String UNIQUE_ID_EVENT_FIELD = "uniqueId"; + + public static final String ID_EVENT_FIELD = "id"; - public static final String ID_DOMINIO_EVENT_FIELD = "idDominio"; - public static final String PSP_EVENT_FIELD = "psp"; - public static final String INSERTED_TIMESTAMP_EVENT_FIELD = "insertedTimestamp"; - public static final String INSERTED_DATE_EVENT_FIELD = "insertedDate"; - public static final String PARTITION_KEY_EVENT_FIELD = "PartitionKey"; - public static final String PAYLOAD_EVENT_FIELD = "payload"; + public static final String INSERTED_TIMESTAMP_EVENT_FIELD = "faultBean.timestamp"; + public static final String CREDITOR_ID_EVENT_FIELD = "creditor.idPA"; + public static final String PSP_ID_EVENT_FIELD = "psp.idPsp"; + public static final String NOTICE_NUMBER_EVENT_FIELD = "debtorPosition.noticeNumber"; + public static final String ID_PA_EVENT_FIELD = "creditor.idPA"; + public static final String ID_PSP_EVENT_FIELD = "psp.idPsp"; + public static final String ID_STATION_EVENT_FIELD = "creditor.idStation"; + public static final String ID_CHANNEL_EVENT_FIELD = "psp.idChannel"; + + public static final String PARTITION_KEY_TABLESTORAGE_EVENT_FIELD = "PartitionKey"; + public static final String ROW_KEY_TABLESTORAGE_EVENT_FIELD = "RowKey"; + public static final String UNIQUE_ID_TABLESTORAGE_EVENT_FIELD = "uniqueId"; + public static final String NOTICE_NUMBER_TABLESTORAGE_EVENT_FIELD = "noticeNumber"; + public static final String ID_PA_TABLESTORAGE_EVENT_FIELD = "idPA"; + public static final String ID_PSP_TABLESTORAGE_EVENT_FIELD = "idPsp"; + public static final String ID_STATION_TABLESTORAGE_EVENT_FIELD = "idStation"; + public static final String ID_CHANNEL_TABLESTORAGE_EVENT_FIELD = "idChannel"; + public static final String TIMESTAMP_TABLESTORAGE_EVENT_FIELD = "timestamp"; + public static final String PAYLOAD_TABLESTORAGE_EVENT_FIELD = "payload"; public static final String TABLE_NAME = System.getenv("TABLE_STORAGE_TABLE_NAME"); }