Skip to content

Commit 9d53586

Browse files
author
Olivier Dufour
committed
data import: bulk api v2 support
1 parent 082305a commit 9d53586

2 files changed

Lines changed: 202 additions & 72 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Version 1.37
44
- data export: fix filter operator behaviour with null
55
- data import : support hard delete [issue 129](https://github.com/dufoli/Salesforce-Inspector-Advanced/129)
6+
- data import : support Bulk API [issue267](https://github.com/dufoli/Salesforce-Inspector-Advanced/267)
67
- metadata: automatic download produce zip file and not csv and add metadataStatus json file [issue 262](https://github.com/dufoli/Salesforce-Inspector-Advanced/262)
78
- add a report bug button on popup [issue 264](https://github.com/dufoli/Salesforce-Inspector-Advanced/264)
89

addon/data-import.js

Lines changed: 201 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ class Model {
1414
this.allApis = [
1515
{value: "Enterprise", label: "Enterprise (default)"},
1616
{value: "Tooling", label: "Tooling"},
17-
{value: "Metadata", label: "Metadata"}
17+
{value: "Metadata", label: "Metadata"},
18+
{value: "Bulk", label: "Bulk"}
1819
];
1920
this.allActions = [
20-
{value: "create", label: "Insert", supportedApis: ["Enterprise", "Tooling"]},
21-
{value: "update", label: "Update", supportedApis: ["Enterprise", "Tooling"]},
22-
{value: "upsert", label: "Upsert", supportedApis: ["Enterprise", "Tooling"]},
23-
{value: "delete", label: "Delete", supportedApis: ["Enterprise", "Tooling"]},
24-
{value: "hardDelete", label: "Hard Delete", supportedApis: ["Enterprise", "Tooling"]},
21+
{value: "create", label: "Insert", supportedApis: ["Enterprise", "Tooling", "Bulk"]},
22+
{value: "update", label: "Update", supportedApis: ["Enterprise", "Tooling", "Bulk"]},
23+
{value: "upsert", label: "Upsert", supportedApis: ["Enterprise", "Tooling", "Bulk"]},
24+
{value: "delete", label: "Delete", supportedApis: ["Enterprise", "Tooling", "Bulk"]},
25+
{value: "hardDelete", label: "Hard Delete", supportedApis: ["Bulk"]},
2526
{value: "undelete", label: "Undelete", supportedApis: ["Enterprise", "Tooling"]},
2627
{value: "upsertMetadata", label: "Upsert Metadata", supportedApis: ["Metadata"]},
2728
{value: "deleteMetadata", label: "Delete Metadata", supportedApis: ["Metadata"]}
@@ -895,8 +896,21 @@ class Model {
895896
} else {
896897
importArgs.sObjects = [];
897898
}
898-
899+
let headerBulk = header.join(",");
900+
let chunk = headerBulk + "\n";
901+
let rowCount = 1;
899902
for (let row of data) {
903+
if (this.apiType == "Bulk") {
904+
rowCount++;
905+
if (chunk.length > 100000000 || rowCount > 10000) { //100 MB
906+
batchRows.push(chunk);
907+
chunk = headerBulk + "\n";
908+
rowCount = 1;
909+
}
910+
chunk += row.join(",") + "\n";
911+
row[statusColumnIndex] = "Processing";
912+
continue;
913+
}
900914
if (batchRows.length == batchSize) {
901915
break;
902916
}
@@ -1039,19 +1053,20 @@ class Model {
10391053
if (this.importType === "Case" || this.importType === "Lead" || this.importType === "Account") {
10401054
soapheaders.headers = {"AssignmentRuleHeader": {"useDefaultRule": this.assignmentRule}};
10411055
}
1042-
if (importAction == "hardDelete") {
1043-
let hardDeleteBody = {
1044-
"operation": "hardDelete",
1056+
if (this.apiType == "Bulk") {
1057+
//batchRows
1058+
let bulkJobCreationBody = {
1059+
"operation": this.importAction == "create" ? "insert" : this.importAction, // insert, delete, hardDelete, update, upsert
10451060
"object": this.importType,
1046-
"contentType": "CSV"
1061+
"contentType": "CSV",
1062+
"lineEnding": "LF", //LF, CRLF
1063+
"columnDelimiter": "COMMA", //BACKQUOTE, CARET, COMMA, PIPE, SEMICOLON, TAB
10471064
};
1048-
///services/data/vXX.X/jobs/ingest
1049-
//this.spinFor(sfConn.rest("/services/async/" + apiVersion + "/job", {method: "POST", "api": "bulk", body: hardDeleteBody}).then(jobResponse => {
1050-
this.spinFor(sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest", {method: "POST", withoutCache: true, body: hardDeleteBody}).then(jobResponse => {
1065+
this.spinFor(sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest", {method: "POST", withoutCache: true, body: bulkJobCreationBody}).then(async jobResponse => {
10511066
console.log(jobResponse);
10521067
if (jobResponse.exceptionCode) {
10531068
let errorText = jobResponse.exceptionMessage;
1054-
for (let row of batchRows) {
1069+
for (let row of data) {
10551070
row[statusColumnIndex] = "Failed";
10561071
row[resultIdColumnIndex] = "";
10571072
row[actionColumnIndex] = "";
@@ -1060,72 +1075,186 @@ class Model {
10601075
return;
10611076
}
10621077
if (!jobResponse.id) {
1063-
throw new Error("Hard delete failed");
1078+
throw new Error("bulk job creation failed");
1079+
}
1080+
for (let chunk of batchRows) {
1081+
try {
1082+
await sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest/" + jobResponse.id + "/batches", {method: "PUT", withoutCache: true, bodyType: "csv", body: chunk});
1083+
} catch (error) {
1084+
console.error("Unexpected exception", error);
1085+
return;
1086+
}
1087+
}
1088+
1089+
let uploadCompleteResponse = await sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest/" + jobResponse.id, {method: "PATCH", withoutCache: true, body: {"state": "UploadComplete"}});
1090+
console.log(uploadCompleteResponse);
1091+
let state = "UploadComplete";
1092+
let timeout = ms => new Promise(resolve => setTimeout(resolve, ms));
1093+
while (state != "JobComplete") {
1094+
//TODO avoid infinite loop here
1095+
await timeout(5000);
1096+
if (state == "Failed" || state == "Aborted" || state == null) {
1097+
throw new Error("Bulk operation failed");
1098+
}
1099+
try {
1100+
let jobStateResponse = await sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest/" + jobResponse.id, {method: "GET", withoutCache: true});
1101+
console.log(jobStateResponse);
1102+
state = jobStateResponse?.state;
1103+
} catch (error) {
1104+
console.error("Unexpected exception", error);
1105+
return;
1106+
}
10641107
}
1108+
if (state == "JobComplete") {
1109+
this.activeBatches = 0;
1110+
let jobSuccessfulResultsResponse = await sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest/" + jobResponse.id + "/successfulResults", {method: "GET", withoutCache: true, responseType: "text"});
1111+
console.log(jobSuccessfulResultsResponse);
1112+
//parse response to get successful results body is a csv with columns "sf__Id","sf__Created",Id
1113+
let successTable;
1114+
try {
1115+
successTable = csvParse(jobSuccessfulResultsResponse, ",");
1116+
} catch (e) {
1117+
console.error("Error parsing successful results CSV", e);
1118+
// Fallback to marking all as failed if we can't parse
1119+
for (let row of data) {
1120+
if (row[statusColumnIndex] == "Processing") {
1121+
row[statusColumnIndex] = "Failed";
1122+
row[errorColumnIndex] = "Error parsing successful results: " + e.message;
1123+
}
1124+
}
1125+
this.updateResult(this.importData.importTable);
1126+
return;
1127+
}
1128+
if (successTable.length < 2) {
1129+
// No successful results
1130+
successTable = [];
1131+
}
1132+
const successHeaders = successTable[0].map(h => h.trim());
1133+
const successRows = successTable.slice(1);
1134+
const successfulResults = successRows.map(row =>
1135+
successHeaders.reduce((acc, header, index) => {
1136+
acc[header] = row[index] || "";
1137+
return acc;
1138+
}, {})
1139+
);
1140+
console.log(successfulResults);
1141+
1142+
// Build ordered list of rows that were sent in this batch (for position-based matching)
1143+
// and a Map from input data rows keyed by their ID for O(1) lookups (for ID-based matching)
1144+
const rowsInBatch = [];
1145+
const dataRowMap = new Map();
1146+
for (let i = 0; i < data.length; i++) {
1147+
const row = data[i];
1148+
// Rows marked as Processing are the ones sent in this batch
1149+
if (row[statusColumnIndex] == "Processing") {
1150+
rowsInBatch.push({row, index: i});
1151+
// Also index by ID if available (for update/delete/upsert operations)
1152+
if (inputIdColumnIndex >= 0) {
1153+
const rowId = row[inputIdColumnIndex];
1154+
if (rowId) {
1155+
// Handle multiple rows with same ID by storing as array
1156+
if (!dataRowMap.has(rowId)) {
1157+
dataRowMap.set(rowId, []);
1158+
}
1159+
dataRowMap.get(rowId).push(row);
1160+
}
1161+
}
1162+
}
1163+
}
1164+
1165+
// Process successful results using the map for performance
1166+
// Bulk API preserves row order, so results correspond to rowsInBatch by position
1167+
for (let resultIndex = 0; resultIndex < successfulResults.length; resultIndex++) {
1168+
const result = successfulResults[resultIndex];
1169+
const resultId = result.Id || result.id || result.sf__Id || "";
1170+
let matchedRows = [];
10651171

1066-
let hardDeleteChunckBody = "Id\n" + importArgs.ID.join("\n");
1067-
//services/data/65.0/jobs/ingest/7505fEXAMPLE4C2AAM/batches
1068-
//this.spinFor(sfConn.rest("/services/async/" + apiVersion + "/job/" + jobResponse.id + "/batch", {method: "POST", "api": "bulk", body: hardDeleteChunckBody}).then(uploadResponse => {
1069-
this.spinFor(sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest/" + jobResponse.id + "/batches", {method: "PUT", withoutCache: true, bodyType: "csv", body: hardDeleteChunckBody}).then(uploadResponse => {
1070-
console.log(uploadResponse);
1071-
///services/data/v65.0/jobs/ingest/jobId/
1072-
this.spinFor(sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest/" + jobResponse.id, {method: "PATCH", withoutCache: true, body: {"state": "UploadComplete"}}).then(async uploadCompleteResponse => {
1073-
console.log(uploadCompleteResponse);
1074-
let state = "UploadComplete";
1075-
while (state != "JobComplete") {
1076-
if (state == "Failed" || state == "Aborted") {
1077-
throw new Error("Hard delete failed");
1172+
// Try to match by ID first (for update/delete/upsert operations)
1173+
if (inputIdColumnIndex >= 0 && resultId && dataRowMap.has(resultId)) {
1174+
matchedRows = dataRowMap.get(resultId).filter(r => r[statusColumnIndex] == "Processing");
1175+
}
1176+
1177+
// If ID matching didn't work, match by position (for create operations or when ID matching fails)
1178+
if (matchedRows.length == 0 && resultIndex < rowsInBatch.length) {
1179+
const batchRow = rowsInBatch[resultIndex];
1180+
if (batchRow && batchRow.row[statusColumnIndex] == "Processing") {
1181+
matchedRows = [batchRow.row];
10781182
}
1079-
let jobStateResponse = await sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest/" + jobResponse.id, {method: "GET", withoutCache: true});
1080-
console.log(jobStateResponse);
1081-
state = jobStateResponse?.state;
10821183
}
1083-
if (state == "JobComplete") {
1084-
this.activeBatches = 0;
1085-
let jobSuccessfulResultsResponse = await sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest/" + jobResponse.id + "/successfulResults", {method: "GET", withoutCache: true, responseType: "text"});
1086-
console.log(jobSuccessfulResultsResponse);
1087-
//parse response to get successful results body is a csv with columns "sf__Id","sf__Created",Id
1088-
const successRows = jobSuccessfulResultsResponse.replace(/"/g, "").split("\n");
1089-
const successHeaders = successRows[0].split(",");
1090-
const successfulResults = successRows.slice(1).map(row => {
1091-
const cells = row.split(",");
1092-
return successHeaders.reduce((acc, header, index) => {
1093-
acc[header] = cells[index];
1094-
return acc;
1095-
}, {});
1096-
});
1097-
console.log(successfulResults);
1098-
for (let row of batchRows) {
1099-
if (successfulResults.find(result => result.Id == row[inputIdColumnIndex])) {
1100-
row[statusColumnIndex] = "Succeeded";
1184+
1185+
// Update matched rows
1186+
for (let row of matchedRows) {
1187+
if (row[statusColumnIndex] == "Processing") {
1188+
row[statusColumnIndex] = "Succeeded";
1189+
if (resultIdColumnIndex >= 0) {
1190+
// Use sf__Id if available (for creates), otherwise use the matched ID
1191+
row[resultIdColumnIndex] = result.sf__Id || resultId || "";
11011192
}
11021193
}
1103-
this.updateResult(this.importData.importTable);
1104-
let jobFailedResultsResponse = await sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest/" + jobResponse.id + "/failedResults", {method: "GET", withoutCache: true, responseType: "text"});
1105-
console.log(jobFailedResultsResponse);
1106-
//parse response to get successful results body is a csv with columns sf__Error, sf__Id, Id
1107-
const failedRows = jobFailedResultsResponse.replace(/"/g, "").split("\n");
1108-
const failedHeaders = failedRows[0].split(",");
1109-
const failedResults = failedRows.slice(1).map(row => {
1110-
const cells = row.split(",");
1111-
return failedHeaders.reduce((acc, header, index) => {
1112-
acc[header] = cells[index];
1113-
return acc;
1114-
}, {});
1115-
});
1116-
console.log(failedResults);
1117-
for (let row of batchRows) {
1118-
let failedResult = failedResults.find(result => result.Id == row[inputIdColumnIndex]);
1119-
if (failedResult) {
1120-
row[statusColumnIndex] = "Failed";
1121-
row[resultIdColumnIndex] = failedResult.Id;
1122-
row[errorColumnIndex] = failedResult.sf__Error;
1194+
}
1195+
}
1196+
1197+
this.updateResult(this.importData.importTable);
1198+
let jobFailedResultsResponse = await sfConn.rest("/services/data/v" + apiVersion + "/jobs/ingest/" + jobResponse.id + "/failedResults", {method: "GET", withoutCache: true, responseType: "text"});
1199+
console.log(jobFailedResultsResponse);
1200+
//parse response to get failed results body is a csv with columns sf__Error, sf__Id, Id
1201+
let failedTable;
1202+
try {
1203+
failedTable = csvParse(jobFailedResultsResponse, ",");
1204+
} catch (e) {
1205+
console.error("Error parsing failed results CSV", e);
1206+
// Continue processing - some rows may already be marked as succeeded
1207+
failedTable = [];
1208+
}
1209+
if (failedTable.length < 2) {
1210+
// No failed results
1211+
failedTable = [];
1212+
}
1213+
const failedHeaders = failedTable[0].map(h => h.trim());
1214+
const failedRows = failedTable.slice(1);
1215+
const failedResults = failedRows.map(row =>
1216+
failedHeaders.reduce((acc, header, index) => {
1217+
acc[header] = row[index] || "";
1218+
return acc;
1219+
}, {})
1220+
);
1221+
console.log(failedResults);
1222+
1223+
// Process failed results using the map for performance
1224+
// Bulk API preserves row order, so results correspond to rowsInBatch by position
1225+
for (let resultIndex = 0; resultIndex < failedResults.length; resultIndex++) {
1226+
const result = failedResults[resultIndex];
1227+
const resultId = result.Id || result.id || result.sf__Id || "";
1228+
let matchedRows = [];
1229+
1230+
// Try to match by ID first (for update/delete/upsert operations)
1231+
if (inputIdColumnIndex >= 0 && resultId && dataRowMap.has(resultId)) {
1232+
matchedRows = dataRowMap.get(resultId).filter(r => r[statusColumnIndex] == "Processing");
1233+
}
1234+
1235+
// If ID matching didn't work, match by position (for create operations or when ID matching fails)
1236+
if (matchedRows.length == 0 && resultIndex < rowsInBatch.length) {
1237+
const batchRow = rowsInBatch[resultIndex];
1238+
if (batchRow && batchRow.row[statusColumnIndex] == "Processing") {
1239+
matchedRows = [batchRow.row];
1240+
}
1241+
}
1242+
1243+
// Update matched rows
1244+
for (let row of matchedRows) {
1245+
if (row[statusColumnIndex] == "Processing") {
1246+
row[statusColumnIndex] = "Failed";
1247+
if (resultIdColumnIndex >= 0) {
1248+
row[resultIdColumnIndex] = result.sf__Id || resultId || "";
1249+
}
1250+
if (errorColumnIndex >= 0) {
1251+
row[errorColumnIndex] = result.sf__Error || result.sf__error || "";
11231252
}
11241253
}
1125-
this.updateResult(this.importData.importTable);
11261254
}
1127-
}));
1128-
}));
1255+
}
1256+
this.updateResult(this.importData.importTable);
1257+
}
11291258
}));
11301259
} else {
11311260
this.spinFor(sfConn.soap(wsdl, importAction, importArgs, soapheaders).then(res => {

0 commit comments

Comments
 (0)