diff --git a/opensearch-kafka-connector/kafka-connect-transforms/src/main/java/org/openg2p/reporting/kafka/connect/transforms/DynamicNewFieldInsertBack.java b/opensearch-kafka-connector/kafka-connect-transforms/src/main/java/org/openg2p/reporting/kafka/connect/transforms/DynamicNewFieldInsertBack.java index a1480f9..2e09a83 100644 --- a/opensearch-kafka-connector/kafka-connect-transforms/src/main/java/org/openg2p/reporting/kafka/connect/transforms/DynamicNewFieldInsertBack.java +++ b/opensearch-kafka-connector/kafka-connect-transforms/src/main/java/org/openg2p/reporting/kafka/connect/transforms/DynamicNewFieldInsertBack.java @@ -69,6 +69,7 @@ public class ESQueryConfig extends Config{ String esUsername; String esPassword; boolean esUpsert; + int esMaxRetries; int esRetryOnConflict; ESQueryConfig( @@ -82,6 +83,7 @@ public class ESQueryConfig extends Config{ String esUsername, String esPassword, boolean esUpsert, + int esMaxRetries, int esRetryOnConflict ) { super(type,idExpr,conditionExpr,valueExpr); @@ -92,6 +94,7 @@ public class ESQueryConfig extends Config{ this.esUsername = esUsername; this.esPassword = esPassword; this.esUpsert = esUpsert; + this.esMaxRetries = esMaxRetries; this.esRetryOnConflict = esRetryOnConflict; HttpClientBuilder hClientBuilder = HttpClients.custom(); @@ -153,13 +156,24 @@ void insertBack(Object input){ requestJson.set("doc_as_upsert", BooleanNode.TRUE); } hPost.setEntity(new StringEntity(requestJson.toString())); - try(CloseableHttpResponse hResponse = hClient.execute(hPost)){ - HttpEntity hResponseEntity = hResponse.getEntity(); - if(hResponse.getCode() < 200 || hResponse.getCode() >= 400){ - logger.error("Error occured while ES query. " + EntityUtils.toString(hResponseEntity)); + boolean isSuccess = false; + for(int i=1; i <= esMaxRetries; i++){ + try(CloseableHttpResponse hResponse = hClient.execute(hPost)){ + HttpEntity hResponseEntity = hResponse.getEntity(); + if(hResponse.getCode() < 200 || hResponse.getCode() >= 400){ + logger.error("Error occured while ES query. " + EntityUtils.toString(hResponseEntity) + ". Retrying..."); + } else { + isSuccess = true; + } + } catch(Exception e) { + logger.error("ES connection issues. Retrying...", e); } - } catch(Exception e) { - logger.error("ES connection issues.", e); + if(isSuccess){ + break; + } + } + if (!isSuccess){ + logger.error("Max retries exceeded."); } } } @@ -185,8 +199,9 @@ void close(){ public static final String ES_SECURITY_ENABLED_CONFIG = "es.security.enabled"; public static final String ES_USERNAME_CONFIG = "es.username"; public static final String ES_PASSWORD_CONFIG = "es.password"; - public static final String ES_RETRY_ON_CONFLICT_CONFIG = "es.retry.on.conflict"; - public static final String ES_UPSERT = "es.upsert"; + public static final String ES_UPSERT_CONFIG = "es.upsert"; + public static final String ES_RETRY_ON_CONFLICT_CONFIG = "es.max.retries"; + public static final String ES_MAX_RETRIES_CONFIG = "es.retry.on.conflict"; private Config config; private boolean stopRecordAfterInsert; @@ -203,8 +218,9 @@ void close(){ .define(ES_SECURITY_ENABLED_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Is Elasticsearch security enabled?") .define(ES_USERNAME_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Elasticsearch Username") .define(ES_PASSWORD_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Elasticsearch Password") - .define(ES_UPSERT, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.HIGH, "Elasticsearch Update API doc_as_upsert parameter.") - .define(ES_RETRY_ON_CONFLICT_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Importance.HIGH, "Elasticsearch Update API retry_on_conflict parameter."); + .define(ES_UPSERT_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.HIGH, "Elasticsearch Update API doc_as_upsert param.") + .define(ES_MAX_RETRIES_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Importance.HIGH, "Elasticsearch Update API no of retries before fail.") + .define(ES_RETRY_ON_CONFLICT_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Importance.HIGH, "Elasticsearch Update API retry_on_conflict param."); @Override @@ -241,8 +257,9 @@ public void configure(Map configs) { String esSecurity = absconf.getString(ES_SECURITY_ENABLED_CONFIG); String esUsername = absconf.getString(ES_USERNAME_CONFIG); String esPassword = absconf.getString(ES_PASSWORD_CONFIG); + boolean esUpsert = absconf.getBoolean(ES_UPSERT_CONFIG); + int esMaxRetries = absconf.getInt(ES_MAX_RETRIES_CONFIG); int esRetryOnConflict = absconf.getInt(ES_RETRY_ON_CONFLICT_CONFIG); - boolean esUpsert = absconf.getBoolean(ES_UPSERT); if(esUrl.isEmpty() || esIndex.isEmpty()){ throw new ConfigException("One of required transform Elasticsearch config fields not set. Required Elasticsearch fields in tranform: " + ES_URL_CONFIG + " ," + ES_INDEX_CONFIG); @@ -260,6 +277,7 @@ public void configure(Map configs) { esUsername, esPassword, esUpsert, + esMaxRetries, esRetryOnConflict ); }