Skip to content

Commit f3bc54c

Browse files
authored
Add Additional Telemetry to FlatPostgresCollection And PostgresLazilyLoadedSchemaRegistry (#282)
1 parent 5025ed7 commit f3bc54c

File tree

2 files changed

+80
-15
lines changed

2 files changed

+80
-15
lines changed

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,21 +131,14 @@ public class FlatPostgresCollection extends PostgresCollection {
131131
this.createdTsColumn = createdTs;
132132
this.lastUpdatedTsColumn = lastUpdatedTs;
133133

134-
if (this.createdTsColumn == null) {
134+
if (this.createdTsColumn == null || this.lastUpdatedTsColumn == null) {
135135
LOGGER.warn(
136136
"timestampFields config not set properly for collection '{}'. "
137-
+ "Row created timestamp will not be auto-managed. "
138-
+ "Configure via collectionConfigs.{}.timestampFields {{ created = \"<col>\", lastUpdated = \"<col>\" }}",
139-
collectionName,
140-
collectionName);
141-
}
142-
143-
if (this.lastUpdatedTsColumn == null) {
144-
LOGGER.warn(
145-
"timestampFields config not set properly for collection '{}'. "
146-
+ "Row lastUpdated timestamp will not be auto-managed. "
137+
+ "createdTsColumn: {}, lastUpdatedTsColumn: {}. "
147138
+ "Configure via collectionConfigs.{}.timestampFields {{ created = \"<col>\", lastUpdated = \"<col>\" }}",
148139
collectionName,
140+
this.createdTsColumn,
141+
this.lastUpdatedTsColumn,
149142
collectionName);
150143
}
151144
}
@@ -572,11 +565,20 @@ public Optional<Document> update(
572565

573566
String tableName = tableIdentifier.getTableName();
574567

568+
long startTime = System.currentTimeMillis();
569+
long resolveColumnsTime;
570+
long connectionAcquireTime;
571+
long executeUpdateTime;
572+
575573
// Acquire a transactional connection that can be managed manually
574+
long connStart = System.currentTimeMillis();
576575
try (Connection connection = client.getTransactionalConnection()) {
576+
connectionAcquireTime = System.currentTimeMillis() - connStart;
577577
try {
578578
// 1. Validate all columns exist and operators are supported.
579+
long resolveStart = System.currentTimeMillis();
579580
Map<String, String> resolvedColumns = resolvePathsToColumns(updates, tableName);
581+
resolveColumnsTime = System.currentTimeMillis() - resolveStart;
580582

581583
// 2. Get before-document if needed (only for BEFORE_UPDATE)
582584
Optional<Document> beforeDoc = Optional.empty();
@@ -589,10 +591,10 @@ public Optional<Document> update(
589591
}
590592
}
591593

592-
// 3. Build and execute UPDATE
594+
long execStart = System.currentTimeMillis();
593595
executeUpdate(connection, query, updates, tableName, resolvedColumns);
596+
executeUpdateTime = System.currentTimeMillis() - execStart;
594597

595-
// 4. Resolve return document based on options
596598
Document returnDoc = null;
597599
if (returnType == BEFORE_UPDATE) {
598600
returnDoc = beforeDoc.orElse(null);
@@ -601,6 +603,16 @@ public Optional<Document> update(
601603
}
602604

603605
connection.commit();
606+
607+
long totalTime = System.currentTimeMillis() - startTime;
608+
LOGGER.debug(
609+
"update timing - totalMs: {}, resolveColumnsMs: {}, connectionAcquireMs: {}, executeUpdateMs: {}, table: {}",
610+
totalTime,
611+
resolveColumnsTime,
612+
connectionAcquireTime,
613+
executeUpdateTime,
614+
tableName);
615+
604616
return Optional.ofNullable(returnDoc);
605617

606618
} catch (Exception e) {
@@ -628,19 +640,40 @@ public CloseableIterator<Document> bulkUpdate(
628640
String tableName = tableIdentifier.getTableName();
629641
CloseableIterator<Document> beforeIterator = null;
630642

643+
long startTime = System.currentTimeMillis();
644+
long resolveColumnsTime;
645+
long connectionAcquireTime;
646+
long executeUpdateTime;
647+
631648
try {
632649
ReturnDocumentType returnType = updateOptions.getReturnDocumentType();
633650

651+
long resolveStart = System.currentTimeMillis();
634652
Map<String, String> resolvedColumns = resolvePathsToColumns(updates, tableName);
653+
resolveColumnsTime = System.currentTimeMillis() - resolveStart;
635654

636655
if (returnType == BEFORE_UPDATE) {
637656
beforeIterator = find(query);
638657
}
639658

659+
long connStart = System.currentTimeMillis();
640660
try (Connection connection = client.getPooledConnection()) {
661+
connectionAcquireTime = System.currentTimeMillis() - connStart;
662+
663+
long execStart = System.currentTimeMillis();
641664
executeUpdate(connection, query, updates, tableName, resolvedColumns);
665+
executeUpdateTime = System.currentTimeMillis() - execStart;
642666
}
643667

668+
long totalTime = System.currentTimeMillis() - startTime;
669+
LOGGER.debug(
670+
"bulkUpdate timing - totalMs: {}, resolveColumnsMs: {}, connectionAcquireMs: {}, executeUpdateMs: {}, table: {}",
671+
totalTime,
672+
resolveColumnsTime,
673+
connectionAcquireTime,
674+
executeUpdateTime,
675+
tableName);
676+
644677
switch (returnType) {
645678
case AFTER_UPDATE:
646679
return find(query);

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class PostgresLazyilyLoadedSchemaRegistry implements SchemaRegistry<Postg
5555

5656
private static final Logger LOGGER =
5757
LoggerFactory.getLogger(PostgresLazyilyLoadedSchemaRegistry.class);
58+
private static final int SLOW_LOOKUP_THRESHOLD_MS = 100;
5859

5960
// The cache registry - Key: Table name, value: Map of column name to column metadata
6061
private final LoadingCache<String, Map<String, PostgresColumnMetadata>> cache;
@@ -84,9 +85,15 @@ public PostgresLazyilyLoadedSchemaRegistry(
8485
@Override
8586
public Map<String, PostgresColumnMetadata> load(String tableName) {
8687
LOGGER.info("Loading schema for table: {}", tableName);
88+
long startTime = System.currentTimeMillis();
8789
Map<String, PostgresColumnMetadata> updatedSchema = fetcher.fetch(tableName);
90+
long duration = System.currentTimeMillis() - startTime;
8891
lastRefreshTimes.put(tableName, Instant.now());
89-
LOGGER.info("Successfully loading schema for table: {}", tableName);
92+
LOGGER.info(
93+
"Successfully loaded schema for table: {}, columns: {}, durationMs: {}",
94+
tableName,
95+
updatedSchema.size(),
96+
duration);
9097
return updatedSchema;
9198
}
9299
});
@@ -102,7 +109,17 @@ public Map<String, PostgresColumnMetadata> load(String tableName) {
102109
@Override
103110
public Map<String, PostgresColumnMetadata> getSchema(String tableName) {
104111
try {
105-
return cache.get(tableName);
112+
long startTime = System.currentTimeMillis();
113+
Map<String, PostgresColumnMetadata> schema = cache.get(tableName);
114+
long duration = System.currentTimeMillis() - startTime;
115+
// Only log slow lookups (indicates a cache miss that triggered DB fetch)
116+
if (duration > SLOW_LOOKUP_THRESHOLD_MS) {
117+
LOGGER.debug(
118+
"getSchema slow lookup (likely cache miss) - table: {}, durationMs: {}",
119+
tableName,
120+
duration);
121+
}
122+
return schema;
106123
} catch (ExecutionException e) {
107124
LOGGER.error("Could not fetch the schema for table from the cache: {}", tableName, e);
108125
throw new RuntimeException("Failed to fetch schema for " + tableName, e.getCause());
@@ -148,10 +165,25 @@ public void invalidate(String tableName) {
148165
@Override
149166
public Optional<PostgresColumnMetadata> getColumnOrRefresh(String tableName, String colName) {
150167
Map<String, PostgresColumnMetadata> schema = getSchema(tableName);
168+
boolean refreshed = false;
151169

152170
if (!schema.containsKey(colName) && canRefresh(tableName)) {
171+
LOGGER.debug(
172+
"Column '{}' not found in cached schema for table '{}', triggering refresh",
173+
colName,
174+
tableName);
153175
invalidate(tableName);
154176
schema = getSchema(tableName);
177+
refreshed = true;
178+
}
179+
180+
if (refreshed) {
181+
LOGGER.debug(
182+
"getColumnOrRefresh - table: {}, column: {}, found: {}, refreshed: {}",
183+
tableName,
184+
colName,
185+
schema.containsKey(colName),
186+
refreshed);
155187
}
156188

157189
return Optional.ofNullable(schema.get(colName));

0 commit comments

Comments
 (0)