Skip to content

Commit

Permalink
[NOD-541] fix: refactored event dumping strategy and error logging
Browse files Browse the repository at this point in the history
  • Loading branch information
andrea-deri committed Oct 26, 2023
1 parent d4346f8 commit a0e5fdf
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;
import it.gov.pagopa.nodoverifykotodatastore.util.AppInfo;
import it.gov.pagopa.nodoverifykotodatastore.util.Constants;
import it.gov.pagopa.nodoverifykototablestorage.util.AppInfo;
import it.gov.pagopa.nodoverifykototablestorage.util.Constants;

import java.io.InputStream;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
Expand All @@ -44,34 +41,67 @@ public void processNodoVerifyKOEvent (
final ExecutionContext context) {

Logger logger = context.getLogger();
logger.log(Level.INFO, "Persisting {0} events...", events.size());
logger.log(Level.INFO, () -> String.format("Persisting [%d] events...", events.size()));

try {
if (events.size() == properties.length) {
Map<String, List<TableTransactionAction>> partitionedEvents = new HashMap<>();

for (int index = 0; index < properties.length; index++) {
final Map<String, Object> event = ObjectMapperUtils.readValue(events.get(index), Map.class);
String eventInStringForm = events.get(index);
final Map<String, Object> event = ObjectMapperUtils.readValue(eventInStringForm, Map.class);

final Map<String, Object> eventToBeStored = new HashMap<>();

// update event with the required parameters and other needed fields
properties[index].forEach((property, value) -> event.put(replaceDashWithUppercase(property), value));
String insertedDateValue = event.get(Constants.INSERTED_TIMESTAMP_EVENT_FIELD) != null ? ((String)event.get(Constants.INSERTED_TIMESTAMP_EVENT_FIELD)).substring(0, 10) : Constants.NA;
event.put(Constants.INSERTED_DATE_EVENT_FIELD, insertedDateValue);
event.put(Constants.PAYLOAD_EVENT_FIELD, getPayload(logger, event));
event.put(Constants.PARTITION_KEY_EVENT_FIELD, generatePartitionKey(event, insertedDateValue));
addToBatch(partitionedEvents, event);
properties[index].forEach((property, value) -> eventToBeStored.put(replaceDashWithUppercase(property), value));

String insertedTimestampValue = getEventField(event, Constants.INSERTED_TIMESTAMP_EVENT_FIELD, String.class, Constants.NA);
String insertedDateValue = Constants.NA.equals(insertedTimestampValue) ? Constants.NA : insertedTimestampValue.substring(0, 10);

// inserting the identification columns on event saved in Table Storage
eventToBeStored.put(Constants.PARTITION_KEY_TABLESTORAGE_EVENT_FIELD, insertedDateValue);
eventToBeStored.put(Constants.ROW_KEY_TABLESTORAGE_EVENT_FIELD, generateRowKey(event, insertedTimestampValue));

// inserting the additional columns on event saved in Table Storage
eventToBeStored.put(Constants.UNIQUE_ID_TABLESTORAGE_EVENT_FIELD, event.get(Constants.ID_EVENT_FIELD));
eventToBeStored.put(Constants.TIMESTAMP_TABLESTORAGE_EVENT_FIELD, insertedTimestampValue);
eventToBeStored.put(Constants.NOTICE_NUMBER_TABLESTORAGE_EVENT_FIELD, getEventField(event, Constants.NOTICE_NUMBER_EVENT_FIELD, String.class, Constants.NA));
eventToBeStored.put(Constants.ID_PA_TABLESTORAGE_EVENT_FIELD, getEventField(event, Constants.ID_PA_EVENT_FIELD, String.class, Constants.NA));
eventToBeStored.put(Constants.ID_PSP_TABLESTORAGE_EVENT_FIELD, getEventField(event, Constants.ID_PSP_EVENT_FIELD, String.class, Constants.NA));
eventToBeStored.put(Constants.ID_STATION_TABLESTORAGE_EVENT_FIELD, getEventField(event, Constants.ID_STATION_EVENT_FIELD, String.class, Constants.NA));
eventToBeStored.put(Constants.ID_CHANNEL_TABLESTORAGE_EVENT_FIELD, getEventField(event, Constants.ID_CHANNEL_EVENT_FIELD, String.class, Constants.NA));
eventToBeStored.put(Constants.PAYLOAD_TABLESTORAGE_EVENT_FIELD, new String(Base64.getEncoder().encode(eventInStringForm.getBytes()))); // TODO to be refactored

addToBatch(partitionedEvents, eventToBeStored);
}

// save all events in the retrieved batch in the storage
persistEventBatch(logger, partitionedEvents);
}
} catch (NullPointerException e) {
logger.severe("NullPointerException exception on table storage nodo-verify-ko-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage());
logger.log(Level.SEVERE, () -> "[ALERT][VerifyKOToTS] AppException - NullPointerException exception on table storage nodo-verify-ko-events msg ingestion at " + LocalDateTime.now() + " : " + e.getMessage());
} catch (Throwable e) {
logger.severe("Generic exception on table storage nodo-verify-ko-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage());
logger.log(Level.SEVERE, () -> "[ALERT][VerifyKOToTS] AppException - Generic exception on table storage nodo-verify-ko-events msg ingestion at " + LocalDateTime.now() + " : " + e.getMessage());
}
}

private <T> T getEventField(Map<String, Object> event, String name, Class<T> clazz, T defaultValue) {
T field = null;
List<String> splitPath = List.of(name.split("\\."));
Map eventSubset = event;
Iterator<String> it = splitPath.listIterator();
while(it.hasNext()) {
Object retrievedEventField = eventSubset.get(it.next());
if (!it.hasNext()) {
field = clazz.cast(retrievedEventField);
} else {
eventSubset = (Map) retrievedEventField;
}
}
return field == null ? defaultValue : field;
}

private String replaceDashWithUppercase(String input) {
if (!input.contains("-")){
return input;
Expand All @@ -94,8 +124,8 @@ private static TableServiceClient getTableServiceClient(){
}

private void addToBatch(Map<String,List<TableTransactionAction>> partitionEvents, Map<String, Object> event) {
if (event.get(Constants.UNIQUE_ID_EVENT_FIELD) != null) {
TableEntity entity = new TableEntity((String) event.get(Constants.PARTITION_KEY_EVENT_FIELD), (String) event.get(Constants.UNIQUE_ID_EVENT_FIELD));
if (event.get(Constants.UNIQUE_ID_TABLESTORAGE_EVENT_FIELD) != null) {
TableEntity entity = new TableEntity((String) event.get(Constants.PARTITION_KEY_TABLESTORAGE_EVENT_FIELD), (String) event.get(Constants.UNIQUE_ID_TABLESTORAGE_EVENT_FIELD));
entity.setProperties(event);
if (!partitionEvents.containsKey(entity.getPartitionKey())){
partitionEvents.put(entity.getPartitionKey(), new ArrayList<>());
Expand All @@ -106,7 +136,7 @@ private void addToBatch(Map<String,List<TableTransactionAction>> partitionEvents

private byte[] getPayload(Logger logger, Map<String,Object> event) {
ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
String payload = (String) event.get(Constants.PAYLOAD_EVENT_FIELD);
String payload = (String) event.get(Constants.PAYLOAD_TABLESTORAGE_EVENT_FIELD);
byte[] data = null;
if (payload != null) {
try {
Expand All @@ -115,19 +145,18 @@ private byte[] getPayload(Logger logger, Map<String,Object> event) {
deflaterOutputStream.write(data);
deflaterOutputStream.close();
} catch (Exception e) {
logger.severe(e.getMessage());
logger.log(Level.SEVERE, () -> "[ALERT][VerifyKOToTS] AppException - Error while generating payload for event: " + e.getMessage());
}
}
return data != null ? byteArrayStream.toByteArray() : null;
}

private String generatePartitionKey(Map<String, Object> event, String insertedDateValue) {
return new StringBuilder().append(insertedDateValue)
.append("-")
.append(event.get(Constants.ID_DOMINIO_EVENT_FIELD) != null ? event.get(Constants.ID_DOMINIO_EVENT_FIELD).toString() : Constants.NA)
.append("-")
.append(event.get(Constants.PSP_EVENT_FIELD) != null ? event.get(Constants.PSP_EVENT_FIELD).toString() : Constants.NA)
.toString();
private String generateRowKey(Map<String, Object> event, String insertedDateValue) {
return insertedDateValue.replace(":", "").replace(".", "").replace("T", "").replace("-", "") +
"-" +
getEventField(event, Constants.CREDITOR_ID_EVENT_FIELD, String.class, Constants.NA) +
"-" +
getEventField(event, Constants.PSP_ID_EVENT_FIELD, String.class, Constants.NA);
}

private void persistEventBatch(Logger logger, Map<String, List<TableTransactionAction>> partitionedEvents) {
Expand All @@ -136,7 +165,7 @@ private void persistEventBatch(Logger logger, Map<String, List<TableTransactionA
try {
tableClient.submitTransaction(values);
} catch (Exception e) {
logger.log(Level.SEVERE, "Could not save {0} events (partition [{1}]) on Azure Table Storage, error: [{2}]", new Object[]{values.size(), partition, e});
logger.log(Level.SEVERE, () -> "[ALERT][VerifyKOToTS] Persistence Exception - Could not save " + values.size() + " events (partition [" + partition + "]) on Azure Table Storage, error: " + e);
}
});
logger.info("Done processing events");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,33 @@

public class Constants {


private Constants() {}

public static final String POM_PROPERTIES_PATH = "/META-INF/maven/it.gov.pagopa/nodoverifykotodatastore/pom.properties";
public static final Pattern REPLACE_DASH_PATTERN = Pattern.compile("-([a-zA-Z])");
public static final String NA = "NA";
public static final String UNIQUE_ID_EVENT_FIELD = "uniqueId";


public static final String ID_EVENT_FIELD = "id";
public static final String ID_DOMINIO_EVENT_FIELD = "idDominio";
public static final String PSP_EVENT_FIELD = "psp";
public static final String INSERTED_TIMESTAMP_EVENT_FIELD = "insertedTimestamp";
public static final String INSERTED_DATE_EVENT_FIELD = "insertedDate";
public static final String PARTITION_KEY_EVENT_FIELD = "PartitionKey";
public static final String PAYLOAD_EVENT_FIELD = "payload";
public static final String INSERTED_TIMESTAMP_EVENT_FIELD = "faultBean.timestamp";
public static final String CREDITOR_ID_EVENT_FIELD = "creditor.idPA";
public static final String PSP_ID_EVENT_FIELD = "psp.idPsp";
public static final String NOTICE_NUMBER_EVENT_FIELD = "debtorPosition.noticeNumber";
public static final String ID_PA_EVENT_FIELD = "creditor.idPA";
public static final String ID_PSP_EVENT_FIELD = "psp.idPsp";
public static final String ID_STATION_EVENT_FIELD = "creditor.idStation";
public static final String ID_CHANNEL_EVENT_FIELD = "psp.idChannel";

public static final String PARTITION_KEY_TABLESTORAGE_EVENT_FIELD = "PartitionKey";
public static final String ROW_KEY_TABLESTORAGE_EVENT_FIELD = "RowKey";
public static final String UNIQUE_ID_TABLESTORAGE_EVENT_FIELD = "uniqueId";
public static final String NOTICE_NUMBER_TABLESTORAGE_EVENT_FIELD = "noticeNumber";
public static final String ID_PA_TABLESTORAGE_EVENT_FIELD = "idPA";
public static final String ID_PSP_TABLESTORAGE_EVENT_FIELD = "idPsp";
public static final String ID_STATION_TABLESTORAGE_EVENT_FIELD = "idStation";
public static final String ID_CHANNEL_TABLESTORAGE_EVENT_FIELD = "idChannel";
public static final String TIMESTAMP_TABLESTORAGE_EVENT_FIELD = "timestamp";
public static final String PAYLOAD_TABLESTORAGE_EVENT_FIELD = "payload";
public static final String TABLE_NAME = System.getenv("TABLE_STORAGE_TABLE_NAME");
}

0 comments on commit a0e5fdf

Please sign in to comment.