Skip to content

Commit 10c59d5

Browse files
msrathore-dbclaude
andauthored
[PECOBLR-1709] Add SEA metadata operation logging header (#1191)
## Description <!-- Provide a brief summary of the changes made and the issue they aim to address.--> Add X-Databricks-Sea-Metadata-Operation-Type HTTP header to SEA ExecuteStatement requests for metadata operations. This enables CP-side logs (ELK, usage logs, SHR metrics) to distinguish metadata operations (getTables, getColumns, etc.) from regular SQL queries. ## Changes: - Added metadataOperationType parameter to executeStatement() in IDatabricksClient - SEA client adds the header when metadataOperationType is non-null - Metadata client passes appropriate operation types: GetCatalogs, GetSchemas, GetTables, GetColumns, GetFunctions, GetPrimaryKeys, GetCrossReference - Thrift client signature updated (header ignored in Thrift mode) ## Testing <!-- Describe how the changes have been tested--> - DatabricksSdkClientTest - 23 tests passing - DatabricksMetadataSdkClientTest - 44 tests passing - DatabricksThriftServiceClientTest - 41 tests passing - DatabricksStatementTest - 49 tests passing - DatabricksPreparedStatementTest - 64 tests passing - DatabricksConnectionTest - 39 tests passing - Manual verification that header appears in SEA requests for metadata operations ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. --> NO_CHANGELOG=true --------- Co-authored-by: Claude (databricks-claude-opus-4-5) <noreply@anthropic.com>
1 parent c04e909 commit 10c59d5

14 files changed

Lines changed: 594 additions & 224 deletions

src/main/java/com/databricks/jdbc/api/impl/DatabricksSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,8 @@ public String getCurrentCatalog() throws DatabricksSQLException {
339339
new HashMap<>(),
340340
StatementType.METADATA,
341341
this,
342-
null);
342+
null,
343+
null /* metadataOperationType */);
343344

344345
if (resultSet.next()) {
345346
String currentCatalog = resultSet.getString(1);

src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,8 @@ DatabricksResultSet getResultFromClient(
887887
params,
888888
statementType,
889889
connection.getSession(),
890-
this);
890+
this,
891+
null /* metadataOperationType */);
891892
}
892893

893894
void checkIfClosed() throws DatabricksSQLException {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.databricks.jdbc.common;
2+
3+
/**
4+
* Enum representing metadata operation types for SEA metadata logging. These values are sent as
5+
* HTTP headers to track which metadata operation is being performed.
6+
*/
7+
public enum MetadataOperationType {
8+
GET_CATALOGS("GetCatalogs"),
9+
GET_SCHEMAS("GetSchemas"),
10+
GET_TABLES("GetTables"),
11+
GET_COLUMNS("GetColumns"),
12+
GET_FUNCTIONS("GetFunctions"),
13+
GET_PRIMARY_KEYS("GetPrimaryKeys"),
14+
GET_CROSS_REFERENCE("GetCrossReference");
15+
16+
private final String headerValue;
17+
18+
MetadataOperationType(String headerValue) {
19+
this.headerValue = headerValue;
20+
}
21+
22+
/** Returns the header value to be sent in the HTTP request. */
23+
public String getHeaderValue() {
24+
return headerValue;
25+
}
26+
}

src/main/java/com/databricks/jdbc/dbclient/IDatabricksClient.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.databricks.jdbc.api.internal.IDatabricksSession;
66
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
77
import com.databricks.jdbc.common.IDatabricksComputeResource;
8+
import com.databricks.jdbc.common.MetadataOperationType;
89
import com.databricks.jdbc.common.StatementType;
910
import com.databricks.jdbc.dbclient.impl.common.StatementId;
1011
import com.databricks.jdbc.exception.DatabricksSQLException;
@@ -53,6 +54,10 @@ ImmutableSessionInfo createSession(
5354
* @param statementType type of statement (metadata, update or generic SQL)
5455
* @param session underlying session
5556
* @param parentStatement statement instance if called from a statement
57+
* @param metadataOperationType optional metadata operation type for CP-side logging (e.g.,
58+
* GET_TABLES, GET_COLUMNS). Pass null for non-metadata operations. When provided, adds
59+
* X-Databricks-Metadata-Operation-Type header to help distinguish metadata operations from
60+
* regular SQL queries in logs.
5661
* @return response for statement execution
5762
*/
5863
@DatabricksMetricsTimed
@@ -62,7 +67,8 @@ DatabricksResultSet executeStatement(
6267
Map<Integer, ImmutableSqlParameter> parameters,
6368
StatementType statementType,
6469
IDatabricksSession session,
65-
IDatabricksStatementInternal parentStatement)
70+
IDatabricksStatementInternal parentStatement,
71+
MetadataOperationType metadataOperationType)
6672
throws SQLException;
6773

6874
/**

src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksMetadataSdkClient.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.databricks.jdbc.api.impl.DatabricksResultSet;
77
import com.databricks.jdbc.api.internal.IDatabricksSession;
8+
import com.databricks.jdbc.common.MetadataOperationType;
89
import com.databricks.jdbc.common.StatementType;
910
import com.databricks.jdbc.common.util.JdbcThreadUtils;
1011
import com.databricks.jdbc.common.util.WildcardUtil;
@@ -59,13 +60,15 @@ public DatabricksResultSet listCatalogs(IDatabricksSession session) throws SQLEx
5960
}
6061
String SQL = String.format("SELECT '%s' AS catalog", currentCatalog);
6162
LOGGER.debug("SQL command to fetch catalogs: {}", SQL);
62-
return metadataResultSetBuilder.getCatalogsResult(getResultSet(SQL, session));
63+
return metadataResultSetBuilder.getCatalogsResult(
64+
getResultSet(SQL, session, MetadataOperationType.GET_CATALOGS));
6365
}
6466

6567
CommandBuilder commandBuilder = new CommandBuilder(session);
6668
String SQL = commandBuilder.getSQLString(CommandName.LIST_CATALOGS);
6769
LOGGER.debug("SQL command to fetch catalogs: {}", SQL);
68-
return metadataResultSetBuilder.getCatalogsResult(getResultSet(SQL, session));
70+
return metadataResultSetBuilder.getCatalogsResult(
71+
getResultSet(SQL, session, MetadataOperationType.GET_CATALOGS));
6972
}
7073

7174
@Override
@@ -94,7 +97,8 @@ public DatabricksResultSet listSchemas(
9497
String SQL = commandBuilder.getSQLString(CommandName.LIST_SCHEMAS);
9598
LOGGER.debug("SQL command to fetch schemas: {}", SQL);
9699
try {
97-
return metadataResultSetBuilder.getSchemasResult(getResultSet(SQL, session), catalog);
100+
return metadataResultSetBuilder.getSchemasResult(
101+
getResultSet(SQL, session, MetadataOperationType.GET_SCHEMAS), catalog);
98102
} catch (SQLException e) {
99103
if (WildcardUtil.isNullOrWildcard(catalog)
100104
&& PARSE_SYNTAX_ERROR_SQL_STATE.equals(e.getSQLState())) {
@@ -139,7 +143,7 @@ public DatabricksResultSet listTables(
139143
LOGGER.debug(String.format("SQL command to fetch tables: {%s}", SQL));
140144
try {
141145
return metadataResultSetBuilder.getTablesResult(
142-
getResultSet(SQL, session), validatedTableTypes);
146+
getResultSet(SQL, session, MetadataOperationType.GET_TABLES), validatedTableTypes);
143147
} catch (SQLException e) {
144148
if (PARSE_SYNTAX_ERROR_SQL_STATE.equals(e.getSQLState())
145149
&& (catalog == null || catalog.equals("*") || catalog.equals("%"))) {
@@ -190,7 +194,8 @@ public DatabricksResultSet listColumns(
190194
.setColumnPattern(columnNamePattern);
191195
String SQL = commandBuilder.getSQLString(CommandName.LIST_COLUMNS);
192196
LOGGER.debug("SQL command to fetch columns: {}", SQL);
193-
return metadataResultSetBuilder.getColumnsResult(getResultSet(SQL, session));
197+
return metadataResultSetBuilder.getColumnsResult(
198+
getResultSet(SQL, session, MetadataOperationType.GET_COLUMNS));
194199
}
195200

196201
@Override
@@ -229,7 +234,8 @@ public DatabricksResultSet listFunctions(
229234
.setFunctionPattern(functionNamePattern);
230235
String SQL = commandBuilder.getSQLString(CommandName.LIST_FUNCTIONS);
231236
LOGGER.debug("SQL command to fetch functions: {}", SQL);
232-
return metadataResultSetBuilder.getFunctionsResult(getResultSet(SQL, session), catalog);
237+
return metadataResultSetBuilder.getFunctionsResult(
238+
getResultSet(SQL, session, MetadataOperationType.GET_FUNCTIONS), catalog);
233239
}
234240

235241
@Override
@@ -261,7 +267,8 @@ public DatabricksResultSet listPrimaryKeys(
261267
new CommandBuilder(catalog, session).setSchema(schema).setTable(table);
262268
String SQL = commandBuilder.getSQLString(CommandName.LIST_PRIMARY_KEYS);
263269
LOGGER.debug("SQL command to fetch primary keys: {}", SQL);
264-
return metadataResultSetBuilder.getPrimaryKeysResult(getResultSet(SQL, session));
270+
return metadataResultSetBuilder.getPrimaryKeysResult(
271+
getResultSet(SQL, session, MetadataOperationType.GET_PRIMARY_KEYS));
265272
}
266273

267274
@Override
@@ -295,7 +302,8 @@ public DatabricksResultSet listImportedKeys(
295302
new CommandBuilder(catalog, session).setSchema(schema).setTable(table);
296303
String SQL = commandBuilder.getSQLString(CommandName.LIST_FOREIGN_KEYS);
297304
try {
298-
return metadataResultSetBuilder.getImportedKeysResult(getResultSet(SQL, session));
305+
return metadataResultSetBuilder.getImportedKeysResult(
306+
getResultSet(SQL, session, MetadataOperationType.GET_CROSS_REFERENCE));
299307
} catch (SQLException e) {
300308
if (PARSE_SYNTAX_ERROR_SQL_STATE.equals(e.getSQLState())) {
301309
// This is a workaround for the issue where the SQL command fails with "syntax error at or
@@ -350,7 +358,10 @@ public DatabricksResultSet listCrossReferences(
350358
String SQL = commandBuilder.getSQLString(CommandName.LIST_FOREIGN_KEYS);
351359
try {
352360
return metadataResultSetBuilder.getCrossReferenceKeysResult(
353-
getResultSet(SQL, session), parentCatalog, parentSchema, parentTable);
361+
getResultSet(SQL, session, MetadataOperationType.GET_CROSS_REFERENCE),
362+
parentCatalog,
363+
parentSchema,
364+
parentTable);
354365
} catch (SQLException e) {
355366
if (PARSE_SYNTAX_ERROR_SQL_STATE.equals(e.getSQLState())) {
356367
// This is a workaround for the issue where the SQL command fails with "syntax error at or
@@ -392,15 +403,17 @@ private String autoFillCatalog(String catalog, String currentCatalog) {
392403
return catalog;
393404
}
394405

395-
private DatabricksResultSet getResultSet(String SQL, IDatabricksSession session)
406+
private DatabricksResultSet getResultSet(
407+
String SQL, IDatabricksSession session, MetadataOperationType metadataOperationType)
396408
throws SQLException {
397409
return sdkClient.executeStatement(
398410
SQL,
399411
session.getComputeResource(),
400412
new HashMap<>(),
401413
StatementType.METADATA,
402414
session,
403-
null /* parentStatement */);
415+
null /* parentStatement */,
416+
metadataOperationType);
404417
}
405418

406419
private DatabricksResultSet fetchSchemasAcrossCatalogs(

src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ public class DatabricksSdkClient implements IDatabricksClient {
6060
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(DatabricksSdkClient.class);
6161
private static final String SYNC_TIMEOUT_VALUE = "10s";
6262
private static final String ASYNC_TIMEOUT_VALUE = "0s";
63+
private static final String HEADER_METADATA_OPERATION_TYPE =
64+
"X-Databricks-Metadata-Operation-Type";
65+
6366
private final IDatabricksConnectionContext connectionContext;
6467
private final ClientConfigurator clientConfigurator;
6568
private volatile WorkspaceClient workspaceClient;
@@ -181,16 +184,18 @@ public DatabricksResultSet executeStatement(
181184
Map<Integer, ImmutableSqlParameter> parameters,
182185
StatementType statementType,
183186
IDatabricksSession session,
184-
IDatabricksStatementInternal parentStatement)
187+
IDatabricksStatementInternal parentStatement,
188+
MetadataOperationType metadataOperationType)
185189
throws SQLException {
186190
LOGGER.debug(
187-
"public DatabricksResultSet executeStatement(String sql = {}, compute resource = {}, Map<Integer, ImmutableSqlParameter> parameters = {}, StatementType statementType = {}, IDatabricksSession session = {}, parentStatement = {})",
191+
"public DatabricksResultSet executeStatement(String sql = {}, compute resource = {}, Map<Integer, ImmutableSqlParameter> parameters = {}, StatementType statementType = {}, IDatabricksSession session = {}, parentStatement = {}, metadataOperationType = {})",
188192
sql,
189193
computeResource.toString(),
190194
parameters,
191195
statementType,
192196
session,
193-
parentStatement);
197+
parentStatement,
198+
metadataOperationType);
194199
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
195200
long pollCount = 0;
196201
long executionStartTime = Instant.now().toEpochMilli();
@@ -207,7 +212,12 @@ public DatabricksResultSet executeStatement(
207212
ExecuteStatementResponse response;
208213
try {
209214
Request req = new Request(Request.POST, STATEMENT_PATH, apiClient.serialize(request));
210-
req.withHeaders(getHeaders("executeStatement", statementType, false));
215+
Map<String, String> additionalHeaders = new HashMap<>();
216+
if (metadataOperationType != null) {
217+
additionalHeaders.put(
218+
HEADER_METADATA_OPERATION_TYPE, metadataOperationType.getHeaderValue());
219+
}
220+
req.withHeaders(getHeaders("executeStatement", statementType, false, additionalHeaders));
211221
response = apiClient.execute(req, ExecuteStatementResponse.class);
212222
} catch (IOException e) {
213223
String errorMessage = "Error while processing the execute statement request";
@@ -549,11 +559,19 @@ private boolean useCloudFetchForResult(StatementType statementType) {
549559
}
550560

551561
private Map<String, String> getHeaders(String method) {
552-
return getHeaders(method, null, false);
562+
return getHeaders(method, null, false, null);
553563
}
554564

555565
private Map<String, String> getHeaders(
556566
String method, StatementType statementType, boolean isAsync) {
567+
return getHeaders(method, statementType, isAsync, null);
568+
}
569+
570+
private Map<String, String> getHeaders(
571+
String method,
572+
StatementType statementType,
573+
boolean isAsync,
574+
Map<String, String> additionalHeaders) {
557575
Map<String, String> headers = new HashMap<>(JSON_HTTP_HEADERS);
558576
if (connectionContext.isRequestTracingEnabled()) {
559577
String traceHeader = TracingUtil.getTraceHeader();
@@ -568,6 +586,12 @@ private Map<String, String> getHeaders(
568586
"Adding x-databricks-sea-can-run-fully-sync header for synchronous metadata request");
569587
}
570588

589+
// Add any additional headers passed by caller (e.g., metadata operation type)
590+
if (additionalHeaders != null && !additionalHeaders.isEmpty()) {
591+
headers.putAll(additionalHeaders);
592+
LOGGER.debug("Adding additional headers: {}", additionalHeaders);
593+
}
594+
571595
// Overriding with URL defined headers
572596
headers.putAll(this.connectionContext.getCustomHeaders());
573597
return headers;

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.databricks.jdbc.api.internal.IDatabricksSession;
1414
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
1515
import com.databricks.jdbc.common.IDatabricksComputeResource;
16+
import com.databricks.jdbc.common.MetadataOperationType;
1617
import com.databricks.jdbc.common.StatementType;
1718
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
1819
import com.databricks.jdbc.common.util.DriverUtil;
@@ -149,9 +150,11 @@ public DatabricksResultSet executeStatement(
149150
Map<Integer, ImmutableSqlParameter> parameters,
150151
StatementType statementType,
151152
IDatabricksSession session,
152-
IDatabricksStatementInternal parentStatement)
153+
IDatabricksStatementInternal parentStatement,
154+
MetadataOperationType metadataOperationType)
153155
throws SQLException {
154-
156+
// Note: metadataOperationType is ignored in Thrift mode as metadata operations use native
157+
// Thrift RPCs (GetTables, GetColumns, etc.) which are already logged correctly.
155158
LOGGER.debug(
156159
String.format(
157160
"public DatabricksResultSet executeStatement(String sql = {%s}, Compute cluster = {%s}, Map<Integer, ImmutableSqlParameter> parameters = {%s}, StatementType statementType = {%s}, IDatabricksSession session)",
@@ -574,6 +577,7 @@ public DatabricksResultSet listFunctions(
574577
Collections.emptyMap(),
575578
StatementType.METADATA,
576579
session,
580+
null,
577581
null)) {
578582
return metadataResultSetBuilder.getFunctionsResult(rs, catalog);
579583
}

src/test/java/com/databricks/jdbc/api/impl/DatabricksConnectionTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public void testGetAndSetSchemaAndCatalog() throws SQLException {
114114
eq(new HashMap<>()),
115115
eq(StatementType.SQL),
116116
any(),
117+
any(),
117118
any()))
118119
.thenReturn(resultSet);
119120
when(databricksClient.executeStatement(
@@ -122,6 +123,7 @@ public void testGetAndSetSchemaAndCatalog() throws SQLException {
122123
eq(new HashMap<>()),
123124
eq(StatementType.SQL),
124125
any(),
126+
any(),
125127
any()))
126128
.thenReturn(resultSet);
127129
assertEquals(connection.getCatalog(), CATALOG);
@@ -147,6 +149,7 @@ public void testSetCatalogAndSchemaWithHyphenatedIdentifiers() throws SQLExcepti
147149
eq(new HashMap<>()),
148150
eq(StatementType.SQL),
149151
any(),
152+
any(),
150153
any()))
151154
.thenReturn(resultSet);
152155
connection.setCatalog(catalogWithHyphen);
@@ -159,6 +162,7 @@ public void testSetCatalogAndSchemaWithHyphenatedIdentifiers() throws SQLExcepti
159162
eq(new HashMap<>()),
160163
eq(StatementType.SQL),
161164
any(),
165+
any(),
162166
any()))
163167
.thenReturn(resultSet);
164168
connection.setSchema(schemaWithHyphen);
@@ -171,6 +175,7 @@ public void testSetCatalogAndSchemaWithHyphenatedIdentifiers() throws SQLExcepti
171175
eq(new HashMap<>()),
172176
eq(StatementType.SQL),
173177
any(),
178+
any(),
174179
any());
175180
verify(databricksClient)
176181
.executeStatement(
@@ -179,6 +184,7 @@ public void testSetCatalogAndSchemaWithHyphenatedIdentifiers() throws SQLExcepti
179184
eq(new HashMap<>()),
180185
eq(StatementType.SQL),
181186
any(),
187+
any(),
182188
any());
183189
}
184190

@@ -199,6 +205,7 @@ public void testGetSchemaAndCatalog_schemaAndCatalogNotSetViaURL() throws SQLExc
199205
eq(new HashMap<>()),
200206
eq(StatementType.QUERY),
201207
any(),
208+
any(),
202209
any()))
203210
.thenReturn(resultSet);
204211
assertEquals(connection.getCatalog(), DEFAULT_CATALOG);
@@ -219,6 +226,7 @@ public void testGetAndSetSchemaAndCatalog_invalidSchemaAndCatalog_throwsExceptio
219226
eq(new HashMap<>()),
220227
eq(StatementType.SQL),
221228
any(),
229+
any(),
222230
any()))
223231
.thenThrow(
224232
new DatabricksSQLException(
@@ -230,6 +238,7 @@ public void testGetAndSetSchemaAndCatalog_invalidSchemaAndCatalog_throwsExceptio
230238
eq(new HashMap<>()),
231239
eq(StatementType.SQL),
232240
any(),
241+
any(),
233242
any()))
234243
.thenThrow(
235244
new DatabricksSQLException(

0 commit comments

Comments
 (0)