Skip to content

Commit

Permalink
Merge pull request telefonicaid#2060 from telefonicaid/task/batch_ret…
Browse files Browse the repository at this point in the history
…ries_one_by_one

try batch retries one by one
  • Loading branch information
fgalan authored Jul 21, 2021
2 parents 500d3b4 + 089a8e8 commit f52fefa
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
-[cygnus-ngsi][NGSISink] Perform batch retries one by one (#2059)
-[cygnus-ngsi][NGSISink] Try retries when an event batch is in CygnusBadConfiguration, CygnusBadContextData, CygnusRuntimeError (#2059)
-[cygnus-commons][PostgreSQL, PostGIS] Upgrade posgresql driver from 9.4-1206-jdbc41 to 42.2.22
-[cygnus-commons][MySQLBackend] Upgrade mysql-connector-java from 5.1.47 to 8.0.22 due to security vulnerability
-[cygnus-ngsi][NGSIPostgis, NGSIPostgreSQL] fix create error_log table (#2061)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,38 +391,38 @@ private Status processRollbackedBatches() {
StringBuffer transactionIds = new StringBuffer();
batch.startIterator();
while (batch.hasNext()) {
NGSIBatch batchToPersist = new NGSIBatch();
String destination = batch.getNextDestination();
ArrayList<NGSIEvent> events = batch.getNextEvents();
for (NGSIEvent event : events) {
NGSIBatch batchToPersist = new NGSIBatch();
batchToPersist.addEvent(destination, event);
transactionIds.append(event.getHeaders().get(CommonConstants.HEADER_CORRELATOR_ID)).append(", ");
}
try {
persistBatch(batchToPersist);
updateServiceMetrics(batchToPersist, false);
if (persistencePolicyMaxRecords > -1) {
try {
capRecords(batchToPersist, persistencePolicyMaxRecords);
} catch (CygnusCappingError e) {
LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace()));
} // try
} // if
numPersistedEvents += batchToPersist.getNumEvents();
LOGGER.info("Finishing internal transaction (" + transactionIds + ")");
} catch (CygnusBadConfiguration | CygnusBadContextData | CygnusRuntimeError e) {
updateServiceMetrics(batchToPersist, true);
LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace()));
} catch (Exception e) {
updateServiceMetrics(batchToPersist, true);
LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace()));
for (NGSIEvent event : batchToPersist.getNextEvents()) {
rollbackBatch.addEvent(destination, event);
//}
// force to persist a batch with just one element
try {
persistBatch(batchToPersist);
updateServiceMetrics(batchToPersist, false);
if (persistencePolicyMaxRecords > -1) {
try {
capRecords(batchToPersist, persistencePolicyMaxRecords);
} catch (CygnusCappingError e) {
LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace()));
} // try
} // if
numPersistedEvents += batchToPersist.getNumEvents();
LOGGER.info("Finishing internal transaction (" + transactionIds + ")");
} catch (CygnusBadConfiguration | CygnusBadContextData | CygnusRuntimeError e) {
updateServiceMetrics(batchToPersist, true); // do not try again, is just one event
LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace()));
} catch (Exception e) {
updateServiceMetrics(batchToPersist, true);
LOGGER.error(e.getMessage() + "Stack trace: " + Arrays.toString(e.getStackTrace()));
rollbackBatch.addEvent(destination, event); // there is just one event
} finally {
batch.setNextPersisted(true);
}
} finally {
batch.setNextPersisted(true);
}
}
} // for (NGSIEvent event : events)
} // while (batch.hasNext())
if (rollbackBatch.getNumEvents() > 0) {
Accumulator rollbackAccumulator = new Accumulator();
rollbackAccumulator.initialize(rollbackedAccumulation.getAccStartDate());
Expand Down Expand Up @@ -607,6 +607,12 @@ private Status processNewBatches() {
} catch (CygnusBadConfiguration | CygnusBadContextData | CygnusRuntimeError e) {
updateServiceMetrics(batchToPersist, true);
LOGGER.error(e.getMessage() + " Sink: " + this.getClass().getName() + " Destination: " + destination + " Stack trace: " + Arrays.toString(e.getStackTrace()));
if (batchToPersist.getNumEvents() > 1) {
// Maybe there are other events int batch that could finally get inserted
for (NGSIEvent event : batchToPersist.getNextEvents()) {
rollbackBatch.addEvent(destination, event);
}
}
} catch (Exception e) {
updateServiceMetrics(batchToPersist, true);
LOGGER.error(e.getMessage() + " Sink: " + this.getClass().getName() + " Destination: " + destination + " Stack trace: " + Arrays.toString(e.getStackTrace()));
Expand All @@ -616,7 +622,7 @@ private Status processNewBatches() {
} finally {
batch.setNextPersisted(true);
}
}
} // while (batch.hasNext())
if (rollbackBatch.getNumEvents() > 0) {
Accumulator rollbackAccumulator = new Accumulator();
rollbackAccumulator.initialize(accumulator.getAccStartDate());
Expand Down

0 comments on commit f52fefa

Please sign in to comment.