Skip to content

Commit 533b871

Browse files
authored
Add IT for field mapper type for pull-based ingestion (opensearch-project#20754)
* Add IT for field mapping message mapper Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com> * Add check for version to be always number Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com> * Update IT Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com> --------- Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
1 parent 24a2d68 commit 533b871

3 files changed

Lines changed: 278 additions & 1 deletion

File tree

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,4 +1091,251 @@ public void testDynamicConfigUpdateOnNoMessages() throws Exception {
10911091
.getPollingIngestStats();
10921092
assertEquals(1L, stats.getMessageProcessorStats().totalProcessedCount());
10931093
}
1094+
1095+
public void testKafkaIngestionWithFieldMappingMapper() throws Exception {
1096+
// Produce raw JSON messages (no _id/_source/_op_type envelope)
1097+
produceData("{\"user_id\": \"abc\", \"name\": \"alice\", \"age\": 30, \"timestamp\": 100, \"is_deleted\": \"false\"}");
1098+
produceData("{\"user_id\": \"def\", \"name\": \"bob\", \"age\": 25, \"timestamp\": 200, \"is_deleted\": \"false\"}");
1099+
1100+
internalCluster().startNode();
1101+
createIndex(
1102+
indexName,
1103+
Settings.builder()
1104+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1105+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
1106+
.put("ingestion_source.type", "kafka")
1107+
.put("ingestion_source.pointer.init.reset", "earliest")
1108+
.put("ingestion_source.param.topic", topicName)
1109+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
1110+
.put("index.replication.type", "SEGMENT")
1111+
.put("ingestion_source.mapper_type", "field_mapping")
1112+
.put("ingestion_source.mapper_settings.id_field", "user_id")
1113+
.put("ingestion_source.mapper_settings.version_field", "timestamp")
1114+
.put("ingestion_source.mapper_settings.op_type_field", "is_deleted")
1115+
.put("ingestion_source.mapper_settings.op_type_field.delete_value", "true")
1116+
.build(),
1117+
mapping
1118+
);
1119+
1120+
waitForState(() -> {
1121+
refresh(indexName);
1122+
SearchResponse response = client().prepareSearch(indexName).get();
1123+
if (response.getHits().getTotalHits().value() != 2L) return false;
1124+
1125+
// Verify document IDs are extracted from user_id field
1126+
Map<String, Map<String, Object>> docs = new HashMap<>();
1127+
response.getHits().forEach(hit -> docs.put(hit.getId(), hit.getSourceAsMap()));
1128+
if (!docs.containsKey("abc") || !docs.containsKey("def")) return false;
1129+
1130+
// Verify _source does not contain extracted metadata fields
1131+
Map<String, Object> aliceDoc = docs.get("abc");
1132+
return "alice".equals(aliceDoc.get("name"))
1133+
&& Integer.valueOf(30).equals(aliceDoc.get("age"))
1134+
&& !aliceDoc.containsKey("user_id")
1135+
&& !aliceDoc.containsKey("timestamp")
1136+
&& !aliceDoc.containsKey("is_deleted");
1137+
});
1138+
}
1139+
1140+
public void testKafkaIngestionWithFieldMappingMapper_DeleteOperation() throws Exception {
1141+
// Produce an index message followed by a delete message for the same document
1142+
produceData("{\"user_id\": \"abc\", \"name\": \"alice\", \"age\": 30, \"timestamp\": 100, \"is_deleted\": \"false\"}");
1143+
produceData("{\"user_id\": \"abc\", \"name\": \"alice\", \"age\": 30, \"timestamp\": 200, \"is_deleted\": \"true\"}");
1144+
1145+
internalCluster().startNode();
1146+
createIndex(
1147+
indexName,
1148+
Settings.builder()
1149+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1150+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
1151+
.put("ingestion_source.type", "kafka")
1152+
.put("ingestion_source.pointer.init.reset", "earliest")
1153+
.put("ingestion_source.param.topic", topicName)
1154+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
1155+
.put("index.replication.type", "SEGMENT")
1156+
.put("ingestion_source.mapper_type", "field_mapping")
1157+
.put("ingestion_source.mapper_settings.id_field", "user_id")
1158+
.put("ingestion_source.mapper_settings.version_field", "timestamp")
1159+
.put("ingestion_source.mapper_settings.op_type_field", "is_deleted")
1160+
.put("ingestion_source.mapper_settings.op_type_field.delete_value", "true")
1161+
.build(),
1162+
mapping
1163+
);
1164+
1165+
// Both messages processed, but the delete should remove the document
1166+
waitForState(() -> {
1167+
refresh(indexName);
1168+
SearchResponse response = client().prepareSearch(indexName).get();
1169+
if (response.getHits().getTotalHits().value() != 0L) return false;
1170+
1171+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
1172+
.getPollingIngestStats();
1173+
return stats != null && stats.getMessageProcessorStats().totalProcessedCount() == 2L;
1174+
});
1175+
}
1176+
1177+
public void testKafkaIngestionWithFieldMappingMapper_VersionConflict() throws Exception {
1178+
// Produce messages out of order: newer version first, then older version
1179+
produceData("{\"user_id\": \"abc\", \"name\": \"alice_v2\", \"age\": 31, \"timestamp\": 200, \"is_deleted\": \"false\"}");
1180+
produceData("{\"user_id\": \"abc\", \"name\": \"alice_v1\", \"age\": 30, \"timestamp\": 100, \"is_deleted\": \"false\"}");
1181+
1182+
internalCluster().startNode();
1183+
createIndex(
1184+
indexName,
1185+
Settings.builder()
1186+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1187+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
1188+
.put("ingestion_source.type", "kafka")
1189+
.put("ingestion_source.pointer.init.reset", "earliest")
1190+
.put("ingestion_source.param.topic", topicName)
1191+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
1192+
.put("index.replication.type", "SEGMENT")
1193+
.put("ingestion_source.mapper_type", "field_mapping")
1194+
.put("ingestion_source.mapper_settings.id_field", "user_id")
1195+
.put("ingestion_source.mapper_settings.version_field", "timestamp")
1196+
.put("ingestion_source.mapper_settings.op_type_field", "is_deleted")
1197+
.put("ingestion_source.mapper_settings.op_type_field.delete_value", "true")
1198+
.build(),
1199+
mapping
1200+
);
1201+
1202+
// Both messages processed, but the older version should be rejected
1203+
waitForState(() -> {
1204+
refresh(indexName);
1205+
SearchResponse response = client().prepareSearch(indexName).get();
1206+
if (response.getHits().getTotalHits().value() != 1L) return false;
1207+
1208+
// The newer version (v2) should win
1209+
Map<String, Object> source = response.getHits().getAt(0).getSourceAsMap();
1210+
if (!"alice_v2".equals(source.get("name"))) return false;
1211+
1212+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
1213+
.getPollingIngestStats();
1214+
return stats != null
1215+
&& stats.getMessageProcessorStats().totalProcessedCount() == 2L
1216+
&& stats.getMessageProcessorStats().totalVersionConflictsCount() == 1L;
1217+
});
1218+
}
1219+
1220+
public void testKafkaIngestionWithFieldMappingMapper_VariousConfigurations() throws Exception {
1221+
// Produce messages covering create_value, only-id, and custom delete_value scenarios
1222+
// These share the same Kafka topic but use different indices with different mapper configs
1223+
produceData("{\"user_id\": \"abc\", \"name\": \"alice\", \"age\": 30, \"timestamp\": 100, \"action\": \"INSERT\"}");
1224+
produceData("{\"user_id\": \"def\", \"name\": \"bob\", \"age\": 25, \"timestamp\": 200, \"action\": \"UPDATE\"}");
1225+
1226+
internalCluster().startNode();
1227+
1228+
// --- Scenario 1: create_value support ---
1229+
String createIndex = "test_create_op";
1230+
createIndex(
1231+
createIndex,
1232+
Settings.builder()
1233+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1234+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
1235+
.put("ingestion_source.type", "kafka")
1236+
.put("ingestion_source.pointer.init.reset", "earliest")
1237+
.put("ingestion_source.param.topic", topicName)
1238+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
1239+
.put("index.replication.type", "SEGMENT")
1240+
.put("ingestion_source.mapper_type", "field_mapping")
1241+
.put("ingestion_source.mapper_settings.id_field", "user_id")
1242+
.put("ingestion_source.mapper_settings.version_field", "timestamp")
1243+
.put("ingestion_source.mapper_settings.op_type_field", "action")
1244+
.put("ingestion_source.mapper_settings.op_type_field.create_value", "INSERT")
1245+
.build(),
1246+
mapping
1247+
);
1248+
1249+
waitForState(() -> {
1250+
refresh(createIndex);
1251+
SearchResponse response = client().prepareSearch(createIndex).get();
1252+
if (response.getHits().getTotalHits().value() != 2L) return false;
1253+
1254+
Map<String, Map<String, Object>> docs = new HashMap<>();
1255+
response.getHits().forEach(hit -> docs.put(hit.getId(), hit.getSourceAsMap()));
1256+
1257+
// Both documents indexed — INSERT as create, UPDATE as index (default)
1258+
// action field should be removed from _source
1259+
return docs.containsKey("abc")
1260+
&& docs.containsKey("def")
1261+
&& !docs.get("abc").containsKey("action")
1262+
&& !docs.get("def").containsKey("action");
1263+
});
1264+
1265+
// --- Scenario 2: only id_field configured (minimal config) ---
1266+
String idOnlyIndex = "test_id_only";
1267+
createIndex(
1268+
idOnlyIndex,
1269+
Settings.builder()
1270+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1271+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
1272+
.put("ingestion_source.type", "kafka")
1273+
.put("ingestion_source.pointer.init.reset", "earliest")
1274+
.put("ingestion_source.param.topic", topicName)
1275+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
1276+
.put("index.replication.type", "SEGMENT")
1277+
.put("ingestion_source.mapper_type", "field_mapping")
1278+
.put("ingestion_source.mapper_settings.id_field", "user_id")
1279+
.build(),
1280+
mapping
1281+
);
1282+
1283+
waitForState(() -> {
1284+
refresh(idOnlyIndex);
1285+
SearchResponse response = client().prepareSearch(idOnlyIndex).get();
1286+
if (response.getHits().getTotalHits().value() != 2L) return false;
1287+
1288+
Map<String, Map<String, Object>> docs = new HashMap<>();
1289+
response.getHits().forEach(hit -> docs.put(hit.getId(), hit.getSourceAsMap()));
1290+
1291+
// IDs extracted from user_id, user_id removed from source, content preserved
1292+
return docs.containsKey("abc")
1293+
&& docs.containsKey("def")
1294+
&& !docs.get("abc").containsKey("user_id")
1295+
&& "alice".equals(docs.get("abc").get("name"));
1296+
});
1297+
1298+
// --- Scenario 3: custom delete_value (Y/N convention) ---
1299+
// Need new messages with the Y/N pattern on the same topic
1300+
produceData("{\"user_id\": \"ghi\", \"name\": \"charlie\", \"age\": 35, \"timestamp\": 400, \"expired\": \"N\"}");
1301+
produceData("{\"user_id\": \"jkl\", \"name\": \"diana\", \"age\": 28, \"timestamp\": 500, \"expired\": \"N\"}");
1302+
produceData("{\"user_id\": \"ghi\", \"name\": \"charlie\", \"age\": 35, \"timestamp\": 600, \"expired\": \"Y\"}");
1303+
1304+
String deleteValueIndex = "test_custom_delete";
1305+
createIndex(
1306+
deleteValueIndex,
1307+
Settings.builder()
1308+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1309+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
1310+
.put("ingestion_source.type", "kafka")
1311+
.put("ingestion_source.pointer.init.reset", "earliest")
1312+
.put("ingestion_source.param.topic", topicName)
1313+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
1314+
.put("index.replication.type", "SEGMENT")
1315+
.put("ingestion_source.mapper_type", "field_mapping")
1316+
.put("ingestion_source.mapper_settings.id_field", "user_id")
1317+
.put("ingestion_source.mapper_settings.version_field", "timestamp")
1318+
.put("ingestion_source.mapper_settings.op_type_field", "expired")
1319+
.put("ingestion_source.mapper_settings.op_type_field.delete_value", "Y")
1320+
.build(),
1321+
mapping
1322+
);
1323+
1324+
// ghi indexed then deleted by Y, jkl remains. Earlier messages (abc, def) also indexed.
1325+
// Total messages on topic: 5. For this index: abc, def have no "expired" field → default index,
1326+
// ghi N → index, jkl N → index, ghi Y → delete. Result: abc + def + jkl = at least jkl present, ghi deleted.
1327+
waitForState(() -> {
1328+
refresh(deleteValueIndex);
1329+
SearchResponse response = client().prepareSearch(deleteValueIndex).get();
1330+
1331+
Map<String, Map<String, Object>> docs = new HashMap<>();
1332+
response.getHits().forEach(hit -> docs.put(hit.getId(), hit.getSourceAsMap()));
1333+
1334+
// ghi should be deleted, jkl should remain
1335+
return docs.containsKey("jkl")
1336+
&& !docs.containsKey("ghi")
1337+
&& "diana".equals(docs.get("jkl").get("name"))
1338+
&& !docs.get("jkl").containsKey("expired");
1339+
});
1340+
}
10941341
}

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,15 @@ public ShardUpdateMessage mapAndProcess(IngestionShardPointer pointer, Message m
151151
}
152152
Object versionValue = rawPayload.remove(versionField);
153153
validateScalar(versionField, versionValue);
154-
payloadMap.put(VersionFieldMapper.NAME, String.valueOf(versionValue).trim());
154+
String versionStr = String.valueOf(versionValue).trim();
155+
try {
156+
Long.parseLong(versionStr);
157+
} catch (NumberFormatException e) {
158+
throw new IllegalArgumentException(
159+
"version_field [" + versionField + "] must be a numeric value, but found: " + versionStr
160+
);
161+
}
162+
payloadMap.put(VersionFieldMapper.NAME, versionStr);
155163
}
156164

157165
// Extract _op_type

server/src/test/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapperTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,28 @@ public void testVersionFieldMissing_ThrowsException() {
127127
assertTrue(e.getMessage().contains("configured version_field [timestamp] is missing from the message"));
128128
}
129129

130+
public void testVersionFieldNonNumeric_ThrowsException() {
131+
Map<String, Object> settings = Map.of(FieldMappingIngestionMessageMapper.VERSION_FIELD, "version");
132+
FieldMappingIngestionMessageMapper mapper = new FieldMappingIngestionMessageMapper(settings);
133+
134+
IllegalArgumentException e = expectThrows(
135+
IllegalArgumentException.class,
136+
() -> mapMessage(mapper, "{\"version\": \"abc\", \"name\": \"alice\"}")
137+
);
138+
assertTrue(e.getMessage().contains("version_field [version] must be a numeric value"));
139+
}
140+
141+
public void testVersionFieldBoolean_ThrowsException() {
142+
Map<String, Object> settings = Map.of(FieldMappingIngestionMessageMapper.VERSION_FIELD, "version");
143+
FieldMappingIngestionMessageMapper mapper = new FieldMappingIngestionMessageMapper(settings);
144+
145+
IllegalArgumentException e = expectThrows(
146+
IllegalArgumentException.class,
147+
() -> mapMessage(mapper, "{\"version\": true, \"name\": \"alice\"}")
148+
);
149+
assertTrue(e.getMessage().contains("version_field [version] must be a numeric value"));
150+
}
151+
130152
// --- Op type field tests (delete_value) ---
131153

132154
public void testDeleteValueConfigured_MatchesDelete() {

0 commit comments

Comments
 (0)