diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index 9f1e3b456..894f99520 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -1,3 +1,5 @@ +-[cygnus-ngsi][NGSISink] Perform batch retries one by one (#2059) +-[cygnus-ngsi][NGSISink] Try retries when an event batch is in CygnusBadConfiguration, CygnusBadContextData, CygnusRuntimeError (#2059) -[cygnus-commons][PostgreSQL, PostGIS] Upgrade posgresql driver from 9.4-1206-jdbc41 to 42.2.22 -[cygnus-commons][MySQLBackend] Upgrade mysql-connector-java from 5.1.47 to 8.0.22 due to security vulnerability -[cygnus-ngsi][NGSIPostgis, NGSIPostgreSQL] fix create error_log table (#2061) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java index 3ea32dcf7..196a54585 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java @@ -391,38 +391,38 @@ private Status processRollbackedBatches() { StringBuffer transactionIds = new StringBuffer(); batch.startIterator(); while (batch.hasNext()) { - NGSIBatch batchToPersist = new NGSIBatch(); String destination = batch.getNextDestination(); ArrayList events = batch.getNextEvents(); for (NGSIEvent event : events) { + NGSIBatch batchToPersist = new NGSIBatch(); batchToPersist.addEvent(destination, event); transactionIds.append(event.getHeaders().get(CommonConstants.HEADER_CORRELATOR_ID)).append(", "); - } - try { - persistBatch(batchToPersist); - updateServiceMetrics(batchToPersist, false); - if (persistencePolicyMaxRecords > -1) { - try { - capRecords(batchToPersist, persistencePolicyMaxRecords); - } catch (CygnusCappingError e) { - LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace())); - } // try - } // if - numPersistedEvents += batchToPersist.getNumEvents(); - LOGGER.info("Finishing internal transaction (" + transactionIds + ")"); - } catch (CygnusBadConfiguration | CygnusBadContextData | CygnusRuntimeError e) { - updateServiceMetrics(batchToPersist, true); - LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace())); - } catch (Exception e) { - updateServiceMetrics(batchToPersist, true); - LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace())); - for (NGSIEvent event : batchToPersist.getNextEvents()) { - rollbackBatch.addEvent(destination, event); + //} + // force to persist a batch with just one element + try { + persistBatch(batchToPersist); + updateServiceMetrics(batchToPersist, false); + if (persistencePolicyMaxRecords > -1) { + try { + capRecords(batchToPersist, persistencePolicyMaxRecords); + } catch (CygnusCappingError e) { + LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace())); + } // try + } // if + numPersistedEvents += batchToPersist.getNumEvents(); + LOGGER.info("Finishing internal transaction (" + transactionIds + ")"); + } catch (CygnusBadConfiguration | CygnusBadContextData | CygnusRuntimeError e) { + updateServiceMetrics(batchToPersist, true); // do not try again, is just one event + LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace())); + } catch (Exception e) { + updateServiceMetrics(batchToPersist, true); + LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace())); + rollbackBatch.addEvent(destination, event); // there is just one event + } finally { + batch.setNextPersisted(true); } - } finally { - batch.setNextPersisted(true); - } - } + } // for (NGSIEvent event : events) + } // while (batch.hasNext()) if (rollbackBatch.getNumEvents() > 0) { Accumulator rollbackAccumulator = new Accumulator(); rollbackAccumulator.initialize(rollbackedAccumulation.getAccStartDate()); @@ -607,6 +607,12 @@ private Status processNewBatches() { } catch (CygnusBadConfiguration | CygnusBadContextData | CygnusRuntimeError e) { updateServiceMetrics(batchToPersist, true); LOGGER.error(e.getMessage() + " Sink: " + this.getClass().getName() + " Destination: " + destination + " Stack trace: " + Arrays.toString(e.getStackTrace())); + if (batchToPersist.getNumEvents() > 1) { + // Maybe there are other events int batch that could finally get inserted + for (NGSIEvent event : batchToPersist.getNextEvents()) { + rollbackBatch.addEvent(destination, event); + } + } } catch (Exception e) { updateServiceMetrics(batchToPersist, true); LOGGER.error(e.getMessage() + " Sink: " + this.getClass().getName() + " Destination: " + destination + " Stack trace: " + Arrays.toString(e.getStackTrace())); @@ -616,7 +622,7 @@ private Status processNewBatches() { } finally { batch.setNextPersisted(true); } - } + } // while (batch.hasNext()) if (rollbackBatch.getNumEvents() > 0) { Accumulator rollbackAccumulator = new Accumulator(); rollbackAccumulator.initialize(accumulator.getAccStartDate());