Skip to content

Commit

Permalink
add check for potential memory use increase in bulk operations or mem…
Browse files Browse the repository at this point in the history
…ory leak

add check for potential memory use increase in bulk operations or memory leak
  • Loading branch information
ashitsalesforce committed Dec 29, 2024
1 parent ea9409f commit efbfb5c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.salesforce.dataloader.exception.ExtractException;
import com.salesforce.dataloader.exception.ExtractExceptionOnServer;
import com.salesforce.dataloader.exception.ParameterLoadException;
import com.salesforce.dataloader.util.AppUtil;
import com.salesforce.dataloader.util.DLLogManager;
import com.salesforce.dataloader.util.LoadRateCalculator;
import com.sforce.async.AsyncApiException;
Expand All @@ -57,7 +58,7 @@
import com.sforce.async.JobStateEnum;
import com.sforce.async.OperationEnum;

class BulkApiVisitorUtil {
public class BulkApiVisitorUtil {

private static final Logger logger = DLLogManager.getLogger(BulkApiVisitorUtil.class);

Expand Down Expand Up @@ -226,6 +227,7 @@ public Map<String, InputStream> getAttachments() {
return this.attachments;
}

public static final String MEMORY_USE_TAG_CREATE_BULK_UPLOAD_= "createBulkUploadBatch";
BatchInfo createBatch(InputStream batchContent) throws AsyncApiException {
BatchInfo batch = null;
if (isBulkV2QueryJob()) {
Expand All @@ -243,7 +245,8 @@ BatchInfo createBatch(InputStream batchContent) throws AsyncApiException {
}
logger.info(Messages.getMessage(getClass(), "logBatchLoaded", batch.getId()));
}

AppUtil.captureUsedHeap(MEMORY_USE_TAG_CREATE_BULK_UPLOAD_);

// Done creating a batch. Clear attachments map in preparation for the next batch
this.attachments.clear();
this.attachmentNum = 0;
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/salesforce/dataloader/util/AppUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,23 @@ public static void setConnectorConfigProxySettings(AppConfig appConfig, Connecto
}

}

private static HashMap<String, Long> usedHeapCheckMap = new HashMap<String, Long>();
private static boolean usedHeapCaptureEnabled = false;
public static void captureUsedHeap(String tag) {
if (!usedHeapCaptureEnabled) {
return;
}
Runtime runtime = Runtime.getRuntime();
System.gc();
usedHeapCheckMap.put(tag, runtime.totalMemory() - runtime.freeMemory());
}

public static void enableUsedHeapCapture(boolean enable) {
usedHeapCaptureEnabled = enable;
}

public static Long getUsedHeap(String tag) {
return usedHeapCheckMap.get(tag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.*;

import org.apache.logging.log4j.Logger;

import com.salesforce.dataloader.util.AppUtil;
import com.salesforce.dataloader.util.DLLogManager;

import org.junit.*;
Expand All @@ -41,6 +43,7 @@
import com.salesforce.dataloader.TestSetting;
import com.salesforce.dataloader.TestVariant;
import com.salesforce.dataloader.action.OperationInfo;
import com.salesforce.dataloader.action.visitor.bulk.BulkApiVisitorUtil;
import com.salesforce.dataloader.config.AppConfig;
import com.salesforce.dataloader.config.LastRunProperties;
import com.salesforce.dataloader.controller.Controller;
Expand Down Expand Up @@ -74,7 +77,7 @@ public static Collection<Object[]> getTestParameters() {
TestVariant.forSettings(TestSetting.BULK_API_ENABLED, TestSetting.BULK_API_CACHE_DAO_UPLOAD_ENABLED),
TestVariant.forSettings(TestSetting.BULK_API_ENABLED, TestSetting.BULK_API_ZIP_CONTENT_ENABLED),
TestVariant.forSettings(TestSetting.BULK_API_ENABLED, TestSetting.BULK_API_SERIAL_MODE_ENABLED),
TestVariant.forSettings(TestSetting.BULK_API_ENABLED, TestSetting.BULK_V2_API_ENABLED)
TestVariant.forSettings(TestSetting.BULK_API_ENABLED, TestSetting.BULK_V2_API_ENABLED, TestSetting.BULK_API_CACHE_DAO_UPLOAD_ENABLED)
);
}

Expand Down Expand Up @@ -132,7 +135,32 @@ public void testUpsertAccountDb() throws Exception {
final Map<String, String> argMap = getTestConfig();
argMap.remove(AppConfig.PROP_PASSWORD);
// insert
long usedMemoryBefore = 0;
Runtime runtime = Runtime.getRuntime();
if (isSettingEnabled(argMap, AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)
&& isSettingEnabled(argMap, AppConfig.PROP_BULK_API_ENABLED)
|| isSettingEnabled(argMap, AppConfig.PROP_BULKV2_API_ENABLED)) {
AppUtil.enableUsedHeapCapture(true);
System.gc();
usedMemoryBefore = runtime.totalMemory() - runtime.freeMemory();
}
testUpsertAccountsDb(argMap, NUM_ROWS, true, false);

if (isSettingEnabled(argMap, AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)
&& isSettingEnabled(argMap, AppConfig.PROP_BULK_API_ENABLED)
|| isSettingEnabled(argMap, AppConfig.PROP_BULKV2_API_ENABLED)) {
System.gc();
long usedMemoryAfter = runtime.totalMemory() - runtime.freeMemory();
long usedMemoryDuring = AppUtil.getUsedHeap(BulkApiVisitorUtil.MEMORY_USE_TAG_CREATE_BULK_UPLOAD_);
long memoryIncreaseAfterUse = usedMemoryAfter - usedMemoryBefore;
long memoryIncreaseDuringUse = usedMemoryDuring - usedMemoryBefore;
assertTrue("Potential memory leak", memoryIncreaseAfterUse < 2000000);
if (isSettingEnabled(argMap, AppConfig.PROP_BULKV2_API_ENABLED)) {
// empirically the memory used during the operation is < 22Mb. If the memory use goes up by
assertTrue("Potential memory use increase", memoryIncreaseDuringUse < 25000000);
}
AppUtil.enableUsedHeapCapture(false);
}
// update
testUpsertAccountsDb(argMap, NUM_ROWS, false, false);
}
Expand Down

0 comments on commit efbfb5c

Please sign in to comment.