Skip to content

Commit

Permalink
Merge pull request #7 from pagopa/PAGOPA-1446
Browse files Browse the repository at this point in the history
feat: Data-layer improvements and responses aggregation [PAGOPA-1446]
  • Loading branch information
cap-ang authored Jan 23, 2024
2 parents d733ae1 + 4965bba commit ecf9b30
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 93 deletions.
23 changes: 17 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,23 @@ There is an example of a Http Trigger function.
### Test
`curl http://localhost:8999/example`

## Run locally with Maven

`mvn clean package`

`mvn azure-functions:run`

## Run locally with Maven and Azurite

Use the [Azurite](https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio%2Cblob-storage) emulator for local Azure Storage development

```
docker run -p 10000:10000 -p 10001:10001 -p 10002:10002 \
mcr.microsoft.com/azure-storage/azurite
```
#### Maven
```
mvn clean package
mvn azure-functions:run
```

```
mvn -f pom.xml clean package -Dmaven.test.skip=true && mvn -e azure-functions:run
```
### Test
`curl http://localhost:7071/example`

Expand Down
4 changes: 2 additions & 2 deletions helm/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: v2
name: pagopa-gpd-upload-function
description: Microservice description
type: application
version: 0.39.0
appVersion: 0.0.6-1-PAGOPA-1449
version: 0.40.0
appVersion: 0.0.6-2-PAGOPA-1446
dependencies:
- name: microservice-chart
version: 2.4.0
Expand Down
2 changes: 1 addition & 1 deletion helm/values-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ microservice-chart:
fullnameOverride: ""
image:
repository: ghcr.io/pagopa/pagopa-gpd-upload-function
tag: "0.0.6-1-PAGOPA-1449"
tag: "0.0.6-2-PAGOPA-1446"
pullPolicy: Always
# https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs
livenessProbe:
Expand Down
2 changes: 1 addition & 1 deletion helm/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ microservice-chart:
fullnameOverride: ""
image:
repository: pagopapcommonacr.azurecr.io/pagopa<project-name> # TODO
tag: "0.0.6-1-PAGOPA-1449"
tag: "0.0.6-2-PAGOPA-1446"
pullPolicy: Always
# https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs
livenessProbe:
Expand Down
74 changes: 59 additions & 15 deletions helm/values-uat.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
microservice-chart:
namespace: "<namespace>" # TODO
namespace: "gps"
nameOverride: ""
fullnameOverride: ""
image:
repository: pagopaucommonacr.azurecr.io/pagopa<project-name> # TODO
tag: "0.0.6-1-PAGOPA-1449"
repository: ghcr.io/pagopa/pagopa-gpd-upload-function
tag: "0.0.6-2-PAGOPA-1446"
pullPolicy: Always
# https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs
livenessProbe:
Expand All @@ -23,14 +23,25 @@ microservice-chart:
periodSeconds: 10
deployment:
create: true
service:
serviceMonitor:
create: true
endpoints:
- interval: 10s #jmx-exporter
targetPort: 12345
path: /metrics
ports:
- 80 #http
- 12345 #jmx-exporter
service:
type: ClusterIP
port: 80
ports:
- 80 #http
- 12345 #jmx-exporter
ingress:
create: true
host: "weuuat.<namespace>.internal.uat.platform.pagopa.it" # TODO
path: /pagopa-<project-name>-service/(.*) # TODO
host: "weuuat.gps.internal.uat.platform.pagopa.it"
path: /pagopa-gpd-upload-function/(.*)
servicePort: 80
serviceAccount:
create: false
annotations: {}
Expand All @@ -44,14 +55,14 @@ microservice-chart:
resources:
requests:
memory: "512Mi"
cpu: "0.25"
cpu: "0.5"
limits:
memory: "512Mi"
cpu: "0.25"
cpu: "0.5"
autoscaling:
enable: true
minReplica: 3
maxReplica: 10
minReplica: 1
maxReplica: 1
pollingInterval: 10 # seconds
cooldownPeriod: 50 # seconds
triggers:
Expand All @@ -60,16 +71,49 @@ microservice-chart:
# Required
type: Utilization # Allowed types are 'Utilization' or 'AverageValue'
value: "75"
- type: memory
metadata:
# Required
type: Utilization # Allowed types are 'Utilization' or 'AverageValue'
value: "70"
fileConfig: {}
envConfig:
WEBSITE_SITE_NAME: "pagopa<project-name>" # required to show cloud role name in application insights # TODO
WEBSITE_SITE_NAME: "pagopagpduploadfunction" # required to show cloud role name in application insights
FUNCTIONS_WORKER_RUNTIME: "java"
GPD_HOST: "https://api.uat.platform.pagopa.it/gpd/api/v2"
RETRY_MAX_ATTEMPTS: "1"
RETRY_MAX_DELAY: "10000"
BLOCK_SIZE: "1"
COSMOS_URI: "https://pagopa-u-weu-gps-cosmos-account.documents.azure.com:443/"
GPD_DB_NAME: "gpd_db"
GPD_CONTAINER_NAME: "gpd_upload_status"
envFieldRef:
APP_NAME: "metadata.labels['app.kubernetes.io/instance']"
APP_VERSION: "metadata.labels['app.kubernetes.io/version']"
envSecret:
APPLICATIONINSIGHTS_CONNECTION_STRING: 'ai-d-connection-string' # TODO set in kv
APPLICATIONINSIGHTS_CONNECTION_STRING: 'ai-u-connection-string'
AzureWebJobsStorage: 'gpd-upload-sa-connection-string'
GPD_SA_CONNECTION_STRING: 'gpd-upload-sa-connection-string'
GPD_SUBSCRIPTION_KEY: 'gpd-core-key-for-gpd-upload'
COSMOS_KEY: 'gpd-upload-db-key'
keyvault:
name: "pagopa-u-<namespace>-kv" # TODO
name: "pagopa-u-gps-kv"
tenantId: "7788edaf-0346-4068-9d79-c868aed15b3d"
nodeSelector: {}
tolerations: []
affinity: {}
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node_type
operator: In
values:
- "user"
canaryDelivery:
create: false
deployment:
create: false
image:
tag: 0.5.6
create: false
4 changes: 2 additions & 2 deletions host.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
"logging": {
"fileLoggingMode": "always",
"logLevel": {
"default": "Information",
"default": "Error",
"Host.Results": "Error",
"Function": "Information",
"Host.Aggregator": "Trace"
"Host.Aggregator": "Error"
},
"applicationInsights": {
"samplingSettings": {
Expand Down
14 changes: 12 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>it.gov.pagopa.gpd.upload</groupId>
<artifactId>gpd-upload-function</artifactId>
<version>0.0.6-1-PAGOPA-1449</version>
<version>0.0.6-2-PAGOPA-1446</version>
<packaging>jar</packaging>

<name>GPD-Upload-Function</name>
Expand All @@ -18,6 +18,8 @@
<functionAppName>com.microsoft.azure-20220215182005862</functionAppName>
<resteasy.version>3.15.3.Final</resteasy.version>
<lombok.version>1.18.24</lombok.version>
<azure.cosmos.version>4.54.0</azure.cosmos.version>
<azure.storage.blob.version>12.25.1</azure.storage.blob.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -136,11 +138,19 @@
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
<!-- storage -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.53.1</version>
<version>${azure.cosmos.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.azure/azure-storage-blob -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>${azure.storage.blob.version}</version>
</dependency>

<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
Expand Down
62 changes: 33 additions & 29 deletions src/main/java/it/gov/pagopa/gpd/upload/UploadFunction.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package it.gov.pagopa.gpd.upload;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.HttpStatus;
import com.microsoft.azure.functions.OutputBinding;

import com.microsoft.azure.functions.annotation.*;
import it.gov.pagopa.gpd.upload.entity.FailedIUPD;
import it.gov.pagopa.gpd.upload.entity.ResponseEntry;
import it.gov.pagopa.gpd.upload.exception.AppException;
import it.gov.pagopa.gpd.upload.model.ResponseGPD;
import it.gov.pagopa.gpd.upload.entity.Status;
import it.gov.pagopa.gpd.upload.model.pd.PaymentPositionModel;
import it.gov.pagopa.gpd.upload.model.pd.PaymentPositionsModel;
import it.gov.pagopa.gpd.upload.client.GpdClient;
import it.gov.pagopa.gpd.upload.client.GPDClient;
import it.gov.pagopa.gpd.upload.repository.StatusRepository;
import it.gov.pagopa.gpd.upload.service.StatusService;
import jakarta.validation.ConstraintViolation;
Expand All @@ -38,50 +41,52 @@ public class UploadFunction {
* This function will be invoked when a new or updated blob is detected at the
* specified path. The blob contents are provided as input to this function.
*/
@FunctionName("blobprocessor")
@FunctionName("uploadBlobProcessor")
public void run(
@BlobTrigger(name = "file",
dataType = "binary",
path = "gpd-upload/input/{fiscalCode}/{name}",
path = "broker/{organizationFiscalCode}/input/{filename}",
connection = "GPD_SA_CONNECTION_STRING") byte[] content,
@BindingName("fiscalCode") String fiscalCode,
@BindingName("name") String filename,
@BindingName("organizationFiscalCode") String organizationFiscalCode,
@BindingName("filename") String filename,
@BlobOutput(
name = "target",
path = "gpd-upload/output/{fiscalCode}/result_{name}",
path = "broker/{organizationFiscalCode}/output/report_{filename}",
connection = "GPD_SA_CONNECTION_STRING")
OutputBinding<String> outputBlob,
final ExecutionContext context
) {
Logger logger = context.getLogger();
logger.log(Level.INFO, () -> "Blob Trigger function executed at: " + LocalDateTime.now() + " for blob"
+ ", filename " + filename
+ ", fiscal code: " + fiscalCode
+ ", fiscal code: " + organizationFiscalCode
+ ", size : " + content.length + " bytes");

String converted = new String(content, StandardCharsets.UTF_8);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
objectMapper.registerModule(new JavaTimeModule());
String key = filename.substring(0, filename.indexOf("."));

try {
// deserialize payment positions from JSON to Object
PaymentPositionsModel pps = objectMapper.readValue(converted, PaymentPositionsModel.class);
Status status = StatusService.getInstance(logger).createStatus(fiscalCode, key, pps);
Status status = StatusService.getInstance(logger).createStatus(organizationFiscalCode, key, pps);
logger.log(Level.INFO, () -> "Payment positions size: " + pps.getPaymentPositions().size());
// function logic: validation and block upload to GPD-Core
validate(logger, pps, status);
createPaymentPositionBlocks(logger, context.getInvocationId(), fiscalCode, key, pps, status);
createPaymentPositionBlocks(logger, context.getInvocationId(), organizationFiscalCode, key, pps, status);
// write status in output container
outputBlob.setValue(objectMapper.writeValueAsString(status));
} catch (Exception e) {
logger.log(Level.INFO, () -> "Processing function exception: " + e.getMessage());
logger.log(Level.INFO, () -> "Processing function exception: " + e.getMessage() + ", caused by: " + e.getCause());
}
}


public void createPaymentPositionBlocks(Logger logger, String invocationId, String fc, String key, PaymentPositionsModel pps, Status status) throws Exception {
long t1 = System.currentTimeMillis();
StatusService statusService = StatusService.getInstance(logger);

int blockSize = Integer.parseInt(BLOCK_SIZE);
int index = 0;
Expand All @@ -91,10 +96,9 @@ public void createPaymentPositionBlocks(Logger logger, String invocationId, Stri
logger.log(Level.INFO,
"Process block for payment positions from index " + index + ", block size: " + blockSize + ", total size: " + totalPosition);
block = new PaymentPositionsModel(pps.getPaymentPositions().subList(index, index+blockSize));
ResponseGPD response = GpdClient.getInstance().createDebtPositions(fc, block, logger, invocationId);
ResponseGPD response = GPDClient.getInstance().createDebtPositions(fc, block, logger, invocationId);
List<String> IUPDs = block.getPaymentPositions().stream().map(item -> item.getIupd()).collect(Collectors.toList());
StatusService.getInstance(logger).updateStatus(IUPDs, status, response, blockSize);
StatusRepository.getInstance(logger).upsertStatus(key, status);
statusService.updateStatus(status, IUPDs, response);
index += blockSize;
}
// process last block if remaining position size is greater than zero
Expand All @@ -103,10 +107,9 @@ public void createPaymentPositionBlocks(Logger logger, String invocationId, Stri
logger.log(Level.INFO,
"Process last block for payment positions from index " + index + ", remaining position: " + remainingPosition + ", total size: " + totalPosition);
block = new PaymentPositionsModel(pps.getPaymentPositions().subList(index, index+remainingPosition));
ResponseGPD response = GpdClient.getInstance().createDebtPositions(fc, block, logger, invocationId);
ResponseGPD response = GPDClient.getInstance().createDebtPositions(fc, block, logger, invocationId);
List<String> IUPDs = block.getPaymentPositions().stream().map(pp -> pp.getIupd()).collect(Collectors.toList());
StatusService.getInstance(logger).updateStatus(IUPDs, status, response, remainingPosition);
StatusRepository.getInstance(logger).upsertStatus(key, status);
statusService.updateStatus(status, IUPDs, response);
}
if(status.upload.getCurrent() == status.upload.getTotal()) {
status.upload.setEnd(LocalDateTime.now());
Expand All @@ -117,28 +120,28 @@ public void createPaymentPositionBlocks(Logger logger, String invocationId, Stri
logger.log(Level.INFO, "Elapsed upload blocks time: " + uploadDuration);
}

private void validate(Logger logger, PaymentPositionsModel paymentPositionsModel, Status status) {
private void validate(Logger logger, PaymentPositionsModel paymentPositionsModel, Status status) throws AppException {
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
Validator validator = factory.getValidator();
Set<ConstraintViolation<PaymentPositionModel>> violations;
ArrayList<FailedIUPD> failedIUPDs = status.upload.getFailedIUPDs();
int invalidPosition = 0;

List<String> skippedIUPDs;
Iterator<PaymentPositionModel> iterator = paymentPositionsModel.getPaymentPositions().iterator();
while (iterator.hasNext()) {
PaymentPositionModel pp = iterator.next();
violations = validator.validate(pp);

if (!violations.isEmpty()) {
skippedIUPDs = new ArrayList<>();
skippedIUPDs.add(pp.getIupd());
ConstraintViolation<PaymentPositionModel> violation = violations.stream().findFirst().orElse(null);
String details = (violation != null ? violation.getMessage() : "");
FailedIUPD failedIUPD = FailedIUPD.builder()
.errorCode(BAD_REQUEST)
.details("BAD REQUEST " + details)
.skippedIUPDs(skippedIUPDs).build();
failedIUPDs.add(failedIUPD);

ResponseEntry responseEntry = ResponseEntry.builder()
.statusCode(HttpStatus.BAD_REQUEST.value())
.statusMessage(details)
.requestIDs(List.of(pp.getIupd()))
.build();
status.upload.addResponse(responseEntry);
invalidPosition++;
iterator.remove();

for(ConstraintViolation<PaymentPositionModel> v : violations) {
Expand All @@ -149,7 +152,8 @@ private void validate(Logger logger, PaymentPositionsModel paymentPositionsModel
}
}

status.upload.setCurrent(status.upload.getCurrent() + failedIUPDs.size());
status.upload.setFailedIUPDs(failedIUPDs);
status.upload.setCurrent(status.upload.getCurrent() + invalidPosition);
logger.log(Level.INFO, status.toString());
StatusRepository.getInstance(logger).upsertStatus(status.id, status);
}
}
Loading

0 comments on commit ecf9b30

Please sign in to comment.