Skip to content

Commit 5025ed7

Browse files
authored
Add MongoPostgresWriteConsistencyTest (#280)
1 parent 9254122 commit 5025ed7

File tree

15 files changed

+2109
-2315
lines changed

15 files changed

+2109
-2315
lines changed
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package org.hypertrace.core.documentstore;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.fasterxml.jackson.databind.node.ObjectNode;
5+
import com.typesafe.config.ConfigFactory;
6+
import java.io.BufferedReader;
7+
import java.io.IOException;
8+
import java.io.InputStream;
9+
import java.io.InputStreamReader;
10+
import java.nio.charset.StandardCharsets;
11+
import java.sql.Connection;
12+
import java.sql.PreparedStatement;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
import java.util.stream.Stream;
16+
import org.hypertrace.core.documentstore.expression.impl.ConstantExpression;
17+
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;
18+
import org.hypertrace.core.documentstore.expression.impl.RelationalExpression;
19+
import org.hypertrace.core.documentstore.expression.operators.RelationalOperator;
20+
import org.hypertrace.core.documentstore.model.options.MissingColumnStrategy;
21+
import org.hypertrace.core.documentstore.postgres.PostgresDatastore;
22+
import org.hypertrace.core.documentstore.query.Query;
23+
import org.junit.jupiter.api.extension.ExtensionContext;
24+
import org.junit.jupiter.params.provider.Arguments;
25+
import org.junit.jupiter.params.provider.ArgumentsProvider;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
import org.testcontainers.containers.GenericContainer;
29+
import org.testcontainers.containers.wait.strategy.Wait;
30+
import org.testcontainers.utility.DockerImageName;
31+
32+
/** Base class for write tests */
33+
public abstract class BaseWriteTest {
34+
35+
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseWriteTest.class);
36+
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
37+
protected static final String DEFAULT_TENANT = "default";
38+
39+
// MongoDB container and datastore - shared by all subclasses
40+
protected static GenericContainer<?> mongoContainer;
41+
protected static Datastore mongoDatastore;
42+
43+
// PostgreSQL container and datastore - shared by all subclasses
44+
protected static GenericContainer<?> postgresContainer;
45+
protected static Datastore postgresDatastore;
46+
47+
// Maps for multi-store tests
48+
protected static Map<String, Datastore> datastoreMap = new HashMap<>();
49+
protected static Map<String, Collection> collectionMap = new HashMap<>();
50+
51+
protected Collection getCollection(String storeName) {
52+
return collectionMap.get(storeName);
53+
}
54+
55+
private static final String FLAT_COLLECTION_SCHEMA_PATH =
56+
"schema/flat_collection_test_schema.sql";
57+
58+
protected static String loadFlatCollectionSchema() {
59+
try (InputStream is =
60+
BaseWriteTest.class.getClassLoader().getResourceAsStream(FLAT_COLLECTION_SCHEMA_PATH);
61+
BufferedReader reader =
62+
new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
63+
StringBuilder sb = new StringBuilder();
64+
String line;
65+
while ((line = reader.readLine()) != null) {
66+
sb.append(line).append(" ");
67+
}
68+
return sb.toString().trim();
69+
} catch (Exception e) {
70+
throw new RuntimeException("Failed to load schema from " + FLAT_COLLECTION_SCHEMA_PATH, e);
71+
}
72+
}
73+
74+
protected static void initMongo() {
75+
mongoContainer =
76+
new GenericContainer<>(DockerImageName.parse("mongo:8.0.1"))
77+
.withExposedPorts(27017)
78+
.waitingFor(Wait.forListeningPort());
79+
mongoContainer.start();
80+
81+
Map<String, String> mongoConfig = new HashMap<>();
82+
mongoConfig.put("host", "localhost");
83+
mongoConfig.put("port", mongoContainer.getMappedPort(27017).toString());
84+
85+
mongoDatastore = DatastoreProvider.getDatastore("Mongo", ConfigFactory.parseMap(mongoConfig));
86+
LOGGER.info("Mongo datastore initialized");
87+
}
88+
89+
protected static void shutdownMongo() {
90+
if (mongoContainer != null) {
91+
mongoContainer.stop();
92+
}
93+
}
94+
95+
protected static void initPostgres() {
96+
postgresContainer =
97+
new GenericContainer<>(DockerImageName.parse("postgres:13.1"))
98+
.withEnv("POSTGRES_PASSWORD", "postgres")
99+
.withEnv("POSTGRES_USER", "postgres")
100+
.withExposedPorts(5432)
101+
.waitingFor(Wait.forListeningPort());
102+
postgresContainer.start();
103+
104+
String postgresConnectionUrl =
105+
String.format("jdbc:postgresql://localhost:%s/", postgresContainer.getMappedPort(5432));
106+
107+
Map<String, String> postgresConfig = new HashMap<>();
108+
postgresConfig.put("url", postgresConnectionUrl);
109+
postgresConfig.put("user", "postgres");
110+
postgresConfig.put("password", "postgres");
111+
112+
postgresDatastore =
113+
DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(postgresConfig));
114+
LOGGER.info("Postgres datastore initialized");
115+
}
116+
117+
protected static void shutdownPostgres() {
118+
if (postgresContainer != null) {
119+
postgresContainer.stop();
120+
}
121+
}
122+
123+
protected static void createFlatCollectionSchema(
124+
PostgresDatastore pgDatastore, String tableName) {
125+
String createTableSQL = String.format(loadFlatCollectionSchema(), tableName);
126+
127+
try (Connection connection = pgDatastore.getPostgresClient();
128+
PreparedStatement statement = connection.prepareStatement(createTableSQL)) {
129+
statement.execute();
130+
LOGGER.info("Created flat collection table: {}", tableName);
131+
} catch (Exception e) {
132+
LOGGER.error("Failed to create flat collection schema: {}", e.getMessage(), e);
133+
throw new RuntimeException("Failed to create flat collection schema", e);
134+
}
135+
}
136+
137+
protected static String generateDocId(String prefix) {
138+
return prefix + "-" + System.currentTimeMillis() + "-" + (int) (Math.random() * 10000);
139+
}
140+
141+
protected static String getKeyString(String docId) {
142+
return new SingleValueKey(DEFAULT_TENANT, docId).toString();
143+
}
144+
145+
protected Query buildQueryById(String docId) {
146+
return Query.builder()
147+
.setFilter(
148+
RelationalExpression.of(
149+
IdentifierExpression.of("id"),
150+
RelationalOperator.EQ,
151+
ConstantExpression.of(getKeyString(docId))))
152+
.build();
153+
}
154+
155+
protected Document createTestDocument(String docId) {
156+
Key key = new SingleValueKey(DEFAULT_TENANT, docId);
157+
String keyStr = key.toString();
158+
159+
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
160+
objectNode.put("id", keyStr);
161+
objectNode.put("item", "TestItem");
162+
objectNode.put("price", 100);
163+
objectNode.put("quantity", 50);
164+
objectNode.put("in_stock", true);
165+
objectNode.put("big_number", 1000000000000L);
166+
objectNode.put("rating", 3.5);
167+
objectNode.put("weight", 50.0);
168+
objectNode.putArray("tags").add("tag1").add("tag2");
169+
objectNode.putArray("numbers").add(1).add(2).add(3);
170+
ObjectNode props = OBJECT_MAPPER.createObjectNode();
171+
props.put("brand", "TestBrand");
172+
props.put("size", "M");
173+
props.put("count", 10);
174+
props.putArray("colors").add("red").add("blue");
175+
objectNode.set("props", props);
176+
ObjectNode sales = OBJECT_MAPPER.createObjectNode();
177+
sales.put("total", 200);
178+
sales.put("count", 10);
179+
objectNode.set("sales", sales);
180+
181+
return new JSONDocument(objectNode);
182+
}
183+
184+
protected Key createKey(String docId) {
185+
return new SingleValueKey(DEFAULT_TENANT, docId);
186+
}
187+
188+
protected static void clearTable(String tableName) {
189+
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
190+
String deleteSQL = String.format("DELETE FROM \"%s\"", tableName);
191+
try (Connection connection = pgDatastore.getPostgresClient();
192+
PreparedStatement statement = connection.prepareStatement(deleteSQL)) {
193+
statement.executeUpdate();
194+
} catch (Exception e) {
195+
LOGGER.error("Failed to clear table {}: {}", tableName, e.getMessage(), e);
196+
}
197+
}
198+
199+
protected void insertTestDocument(String docId, Collection collection) throws IOException {
200+
Key key = createKey(docId);
201+
Document document = createTestDocument(docId);
202+
collection.upsert(key, document);
203+
}
204+
205+
/** Provides all MissingColumnStrategy values for parameterized tests */
206+
protected static class MissingColumnStrategyProvider implements ArgumentsProvider {
207+
@Override
208+
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
209+
return Stream.of(
210+
Arguments.of(MissingColumnStrategy.SKIP),
211+
Arguments.of(MissingColumnStrategy.THROW),
212+
Arguments.of(MissingColumnStrategy.IGNORE_DOCUMENT));
213+
}
214+
}
215+
}

0 commit comments

Comments
 (0)