Skip to content

Commit 8a53721

Browse files
akashverma0786Akash VermaTeddyCrgithub-actions[bot]
authored
Feature: burstiq profiler (#26983)
* Feature: burstiq profiler * Fix: Added burstiq profiler interface * Update generated TypeScript types * Fix: Resolved comments * Fix: pycheckstyle * Fixed failing test, and addressed comment * Added sampler test * Fixed sampler changes * Fix: resolved comments * Fix: Failing tests * Fix: Added caching to reduce profiler runtime, fixed md file * Fix: minor improvement --------- Co-authored-by: Akash Verma <akashverma@Akashs-MacBook-Pro-2.local> Co-authored-by: Teddy <teddy.crepineau@gmail.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 895bbf6 commit 8a53721

29 files changed

Lines changed: 1445 additions & 169 deletions

File tree

ingestion/src/metadata/ingestion/source/database/burstiq/client.py

Lines changed: 94 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,19 @@
2020
from metadata.generated.schema.entity.services.connections.database.burstIQConnection import (
2121
BurstIQConnection,
2222
)
23+
from metadata.ingestion.source.database.burstiq.models import (
24+
BurstIQDictionary,
25+
BurstIQEdge,
26+
SdzMetricsResponse,
27+
TokenResponse,
28+
TQLRecord,
29+
)
2330
from metadata.utils.logger import ingestion_logger
2431

2532
logger = ingestion_logger()
2633

27-
AUTH_TIMEOUT = (10, 30) # 10s connect, 30s read for authentication
28-
API_TIMEOUT = (
29-
10,
30-
120,
31-
) # 10s connect, 120s read for API calls (handles 600+ dictionaries)
34+
AUTH_TIMEOUT = (10, 30)
35+
API_TIMEOUT = (10, 120)
3236

3337
AUTH_SERVER_BASE = "https://auth.burstiq.com"
3438
API_BASE_URL = "https://api.burstiq.com"
@@ -41,18 +45,12 @@ class BurstIQClient:
4145
"""
4246

4347
def __init__(self, config: BurstIQConnection):
44-
"""
45-
Initialize BurstIQ Client
46-
47-
Args:
48-
config: BurstIQConnection configuration
49-
"""
5048
self.config = config
5149
self.api_base_url = getattr(config, "apiUrl", API_BASE_URL).rstrip("/")
5250

53-
# Token management
5451
self.access_token: Optional[str] = None
5552
self.token_expires_at: Optional[datetime] = None
53+
self._chain_metrics: Optional[Dict[str, int]] = None
5654

5755
def test_authenticate(self):
5856
"""
@@ -65,13 +63,11 @@ def test_authenticate(self):
6563
self._authenticate()
6664

6765
def _authenticate(self):
68-
"""Authenticate with BurstIQ and get access token"""
69-
# Get configuration values
66+
"""Authenticate with BurstIQ and store the access token."""
7067
realm_name = getattr(self.config, "realmName", None)
7168
username = getattr(self.config, "username", None)
7269
password = getattr(self.config, "password", None)
7370

74-
# Validate required fields
7571
if not realm_name:
7672
raise ValueError("realmName is required for authentication")
7773
if not username:
@@ -92,30 +88,27 @@ def _authenticate(self):
9288
"password": password.get_secret_value(),
9389
}
9490

95-
headers = {"Content-Type": "application/x-www-form-urlencoded"}
96-
9791
try:
9892
logger.info(f"Authenticating with BurstIQ at: {token_url}")
9993
response = requests.post(
100-
token_url, data=payload, headers=headers, timeout=AUTH_TIMEOUT
94+
token_url,
95+
data=payload,
96+
headers={"Content-Type": "application/x-www-form-urlencoded"},
97+
timeout=AUTH_TIMEOUT,
10198
)
10299
response.raise_for_status()
103100

104-
token_data = response.json()
105-
106-
self.access_token = token_data.get("access_token")
107-
108-
# Calculate token expiration
109-
expires_in = token_data.get("expires_in", 3600)
101+
token = TokenResponse.model_validate(response.json())
102+
self.access_token = token.access_token
110103
self.token_expires_at = datetime.now() + timedelta(
111-
seconds=expires_in - 60
112-
) # 60s buffer
104+
seconds=token.expires_in - 60
105+
)
113106

114107
customer_name = getattr(self.config, "biqCustomerName", None)
115108
sdz_name = getattr(self.config, "biqSdzName", None)
116109

117110
logger.info(
118-
f"Authentication successful. Token expires in {expires_in} seconds"
111+
f"Authentication successful. Token expires in {token.expires_in} seconds"
119112
)
120113
if customer_name and sdz_name:
121114
logger.info(f"Customer: {customer_name}, SDZ: {sdz_name}")
@@ -127,17 +120,14 @@ def _authenticate(self):
127120

128121
def _get_auth_header(self) -> Dict[str, str]:
129122
"""
130-
Get authentication headers with current access token.
131-
Authenticates on first call if not already authenticated.
123+
Get authentication headers, refreshing the token if necessary.
132124
133125
Returns:
134126
Dictionary of headers
135127
"""
136-
# Authenticate if not already done (lazy authentication)
137128
if not self.access_token:
138129
logger.info("No access token found, authenticating...")
139130
self._authenticate()
140-
# Check if token needs refresh
141131
elif self.token_expires_at and datetime.now() >= self.token_expires_at:
142132
logger.info("Access token expired, re-authenticating...")
143133
self._authenticate()
@@ -148,20 +138,20 @@ def _get_auth_header(self) -> Dict[str, str]:
148138
"Accept": "application/json",
149139
}
150140

151-
# Add BurstIQ-specific headers from config
152141
customer_name = getattr(self.config, "biqCustomerName", None)
153142
sdz_name = getattr(self.config, "biqSdzName", None)
143+
system_wallet_id = getattr(self.config, "biqSystemWalletId", None)
154144

155145
if customer_name:
156146
headers["biq_customer_name"] = customer_name
157147
if sdz_name:
158148
headers["biq_sdz_name"] = sdz_name
149+
if system_wallet_id:
150+
headers["biq_system_wallet_id"] = system_wallet_id
159151

160152
return headers
161153

162-
def _make_request(
163-
self, method: str, endpoint: str, **kwargs
164-
) -> Optional[Dict[str, Any]]:
154+
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[Any]:
165155
"""
166156
Make HTTP request to BurstIQ API
167157
@@ -178,11 +168,9 @@ def _make_request(
178168
url = f"{self.api_base_url}/{endpoint.lstrip('/')}"
179169
headers = self._get_auth_header()
180170

181-
# Merge with any additional headers provided
182171
if "headers" in kwargs:
183172
headers.update(kwargs.pop("headers"))
184173

185-
# Log request params for debugging
186174
params = kwargs.get("params", {})
187175
logger.debug(f"Making {method} request to {url} with params: {params}")
188176

@@ -199,14 +187,12 @@ def _make_request(
199187

200188
response.raise_for_status()
201189

202-
# Parse JSON response
203190
json_data = response.json()
204191

205-
# Log response size
206192
if isinstance(json_data, list):
207193
logger.debug(f"Received {len(json_data)} items in response")
208194
else:
209-
logger.debug(f"Received single item response")
195+
logger.debug("Received single item response")
210196

211197
return json_data
212198

@@ -229,15 +215,15 @@ def _make_request(
229215
logger.debug(traceback.format_exc())
230216
raise
231217

232-
def get_dictionaries(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
218+
def get_dictionaries(self, limit: Optional[int] = None) -> List[BurstIQDictionary]:
233219
"""
234220
Fetch all data dictionaries from BurstIQ
235221
236222
Args:
237223
limit: Optional limit on number of dictionaries to fetch
238224
239225
Returns:
240-
List of dictionary objects
226+
List of BurstIQDictionary model instances
241227
"""
242228
params = {}
243229
if limit:
@@ -249,22 +235,26 @@ def get_dictionaries(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
249235
if data is None:
250236
return []
251237

252-
dictionaries = data if isinstance(data, list) else [data]
238+
raw_items = data if isinstance(data, list) else [data]
239+
dictionaries = [BurstIQDictionary.model_validate(item) for item in raw_items]
253240
logger.info(f"Found {len(dictionaries)} dictionaries")
254241
return dictionaries
255242

256-
def get_dictionary_by_name(self, name: str) -> Optional[Dict[str, Any]]:
243+
def get_dictionary_by_name(self, name: str) -> Optional[BurstIQDictionary]:
257244
"""
258245
Get a specific dictionary by name
259246
260247
Args:
261248
name: Dictionary name
262249
263250
Returns:
264-
Dictionary object or None
251+
BurstIQDictionary instance or None
265252
"""
266253
logger.debug(f"Fetching dictionary: {name}")
267-
return self._make_request("GET", f"/api/metadata/dictionary/{name}")
254+
data = self._make_request("GET", f"/api/metadata/dictionary/{name}")
255+
if data is None:
256+
return None
257+
return BurstIQDictionary.model_validate(data)
268258

269259
def get_edges(
270260
self,
@@ -273,7 +263,7 @@ def get_edges(
273263
to_dictionary: Optional[str] = None,
274264
limit: Optional[int] = None,
275265
skip: Optional[int] = None,
276-
) -> List[Dict[str, Any]]:
266+
) -> List[BurstIQEdge]:
277267
"""
278268
Query edge definitions (lineage relationships) from BurstIQ
279269
@@ -285,7 +275,7 @@ def get_edges(
285275
skip: Optional number of edges to skip (pagination)
286276
287277
Returns:
288-
List of edge definition objects
278+
List of BurstIQEdge model instances
289279
"""
290280
params = {}
291281
if name:
@@ -307,10 +297,64 @@ def get_edges(
307297
if data is None:
308298
return []
309299

310-
edges = data if isinstance(data, list) else [data]
300+
raw_items = data if isinstance(data, list) else [data]
301+
edges = [BurstIQEdge.model_validate(item) for item in raw_items]
311302
logger.info(f"Found {len(edges)} edge definitions")
312303
return edges
313304

305+
def get_chain_metrics(self) -> Dict[str, int]:
306+
"""
307+
Fetch asset counts per chain from BurstIQ metrics endpoint.
308+
309+
Returns:
310+
Dict mapping chain name to asset (row) count
311+
"""
312+
if self._chain_metrics is not None:
313+
return self._chain_metrics
314+
logger.info("Fetching chain metrics from BurstIQ...")
315+
data = self._make_request("GET", "/api/metrics/sdz")
316+
if data is None:
317+
return {}
318+
metrics = SdzMetricsResponse.model_validate(data)
319+
self._chain_metrics = {
320+
name: chain.assets for name, chain in metrics.chainMetrics.items()
321+
}
322+
return self._chain_metrics
323+
324+
def get_records_by_tql(
325+
self, chain: str, limit: int, skip: int = 0
326+
) -> List[Dict[str, Any]]:
327+
"""
328+
Fetch data records from a chain using TQL (Temporal Query Language).
329+
330+
Args:
331+
chain: Chain (dictionary) name to query
332+
limit: Maximum number of records to fetch
333+
skip: Number of records to skip (for pagination)
334+
335+
Returns:
336+
List of flat record dicts (data envelope unwrapped)
337+
"""
338+
tql = f"FROM {chain} SKIP {skip} LIMIT {limit} SELECT data.*"
339+
logger.info(f"Fetching records for chain '{chain}' via TQL (limit={limit})")
340+
try:
341+
raw = self._make_request(
342+
"POST", "/api/graphchain/query", json={"query": tql}
343+
)
344+
except Exception as exc:
345+
logger.warning(f"TQL query failed for chain '{chain}': {exc}")
346+
return []
347+
348+
if not isinstance(raw, list):
349+
return []
350+
351+
records = [
352+
TQLRecord.model_validate(item).to_record()
353+
for item in raw
354+
if isinstance(item, dict)
355+
]
356+
logger.info(f"Fetched {len(records)} records for chain '{chain}'")
357+
return records
358+
314359
def close(self):
315-
"""Cleanup method - no session to close when using plain requests"""
316360
pass

ingestion/src/metadata/ingestion/source/database/burstiq/connection.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
"""
1212
Source connection handler for BurstIQ
1313
"""
14-
from typing import Optional
14+
import hashlib
15+
from typing import Dict, Optional
1516

1617
from metadata.generated.schema.entity.automations.workflow import (
1718
Workflow as AutomationWorkflow,
@@ -30,18 +31,24 @@
3031

3132
logger = ingestion_logger()
3233

34+
_CLIENT_CACHE: Dict[str, BurstIQClient] = {}
35+
3336

3437
def get_connection(connection: BurstIQConnection) -> BurstIQClient:
3538
"""
36-
Create BurstIQ client connection
37-
38-
Args:
39-
connection: BurstIQConnection configuration
39+
Create or return a cached BurstIQ client connection.
4040
41-
Returns:
42-
BurstIQClient instance
41+
Caching avoids re-authentication on every table during profiler ingestion,
42+
where SamplerInterface.__init__ calls get_ssl_connection once per table.
43+
Using id(connection) was unreliable because each table deserialization
44+
produces a new object with a different id. A SHA-256 digest of the
45+
serialised config is used as the key: collision-resistant but never
46+
stores plaintext credentials in the cache keys.
4347
"""
44-
return BurstIQClient(config=connection)
48+
key = hashlib.sha256(connection.model_dump_json().encode()).hexdigest()
49+
if key not in _CLIENT_CACHE:
50+
_CLIENT_CACHE[key] = BurstIQClient(config=connection)
51+
return _CLIENT_CACHE[key]
4552

4653

4754
def test_connection(

0 commit comments

Comments
 (0)