Skip to content

Commit 0544a18

Browse files
authored
Merge pull request #212 from fairagro/improve_tracing
improve tracing
2 parents bdccc54 + 785b6f3 commit 0544a18

5 files changed

Lines changed: 204 additions & 174 deletions

File tree

middleware/http_session/__init__.py

Lines changed: 63 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import aiofiles
1515
import chardet
1616

17+
from middleware.utils.tracer import traced
18+
1719

1820
class HttpSessionConfig(NamedTuple):
1921
"""
@@ -117,6 +119,7 @@ async def __aexit__(
117119
"""
118120
return await super().__aexit__(exc_type, exc_val, exc_tb)
119121

122+
@traced
120123
async def get_decoded_url(self, url: str) -> str:
121124
"""
122125
Fetches the content of the given URL and decodes it using the detected encoding.
@@ -132,70 +135,69 @@ async def get_decoded_url(self, url: str) -> str:
132135
The decoded content of the URL.
133136
"""
134137

135-
with trace.get_tracer(__name__).start_as_current_span(
136-
"HttpSession.get_decoded_url") as otel_span:
137-
otel_span.set_attribute(url_attributes.URL_FULL, url)
138-
139-
parsed_url = urlparse(url) # does not raise
140-
if parsed_url.scheme in ["http", "https"]:
141-
try:
142-
async with self.get(url) as response:
143-
# treat 5xx like a technical network error
144-
if 500 <= response.status < 600:
145-
otel_span.add_event(
146-
"server reponse 5xx, raising HttpSessionTechnicalError")
147-
raise HttpSessionTechnicalError(
148-
f"Server error {response.status} for {url}"
149-
)
150-
# treat 4xx as response error
151-
if 400 <= response.status < 500:
152-
otel_span.add_event(
153-
"server reponse 4xx, raising HttpSessionResponseError")
154-
raise HttpSessionResponseError(
155-
f"Server error {response.status} for {url}"
156-
)
157-
encoded_content = await response.read()
158-
except (ClientError, asyncio.TimeoutError) as e:
159-
otel_span.record_exception(e)
160-
otel_span.add_event(
161-
"caught network-related exception, raising HttpSessionTechnicalError")
162-
raise HttpSessionTechnicalError(
163-
f"Cannot fetch {url}: {e}") from e
164-
elif parsed_url.scheme == "file":
165-
try:
166-
# We need to deal with the following situation:
167-
# urlparse('file://test') => netloc = 'test', path = '', joined = 'test'
168-
# urlparse('file:///test') => netloc = '', path = '/test', joined = '\test'
169-
# urlparse('file://./test') => netloc = '.', path = '/test', joined = '\test'
170-
# In the last case the path is relative, so the result is wrong. Thus this code:
171-
base_path = PurePath(parsed_url.netloc)
172-
if base_path == PurePath('.'):
173-
path = base_path / parsed_url.path.lstrip("/").lstrip("\\")
174-
else:
175-
path = base_path / parsed_url.path
176-
async with aiofiles.open(path, 'rb') as f:
177-
encoded_content = await f.read()
178-
except Exception as e:
179-
otel_span.record_exception(e)
180-
otel_span.add_event(
181-
"caught exception when trying to read file, "
182-
"raising HttpSessionResponseError")
183-
raise HttpSessionResponseError(
184-
f"Cannot read file {url}: {e}") from e
185-
else:
186-
otel_span.add_event(
187-
"found unsupported URL protocol, raising HttpSessionArgumentError")
188-
raise HttpSessionArgumentError(
189-
f"Unsupported URL scheme: {parsed_url.scheme} in URL {url}")
138+
otel_span = trace.get_current_span()
139+
otel_span.set_attribute(url_attributes.URL_FULL, url)
190140

141+
parsed_url = urlparse(url) # does not raise
142+
if parsed_url.scheme in ["http", "https"]:
143+
try:
144+
async with self.get(url) as response:
145+
# treat 5xx like a technical network error
146+
if 500 <= response.status < 600:
147+
otel_span.add_event(
148+
"server reponse 5xx, raising HttpSessionTechnicalError")
149+
raise HttpSessionTechnicalError(
150+
f"Server error {response.status} for {url}"
151+
)
152+
# treat 4xx as response error
153+
if 400 <= response.status < 500:
154+
otel_span.add_event(
155+
"server reponse 4xx, raising HttpSessionResponseError")
156+
raise HttpSessionResponseError(
157+
f"Server error {response.status} for {url}"
158+
)
159+
encoded_content = await response.read()
160+
except (ClientError, asyncio.TimeoutError) as e:
161+
otel_span.record_exception(e)
162+
otel_span.add_event(
163+
"caught network-related exception, raising HttpSessionTechnicalError")
164+
raise HttpSessionTechnicalError(
165+
f"Cannot fetch {url}: {e}") from e
166+
elif parsed_url.scheme == "file":
191167
try:
192-
encoding = str(chardet.detect(encoded_content)['encoding']) or 'utf-8'
193-
content = encoded_content.decode(encoding)
168+
# We need to deal with the following situation:
169+
# urlparse('file://test') => netloc = 'test', path = '', joined = 'test'
170+
# urlparse('file:///test') => netloc = '', path = '/test', joined = '\test'
171+
# urlparse('file://./test') => netloc = '.', path = '/test', joined = '\test'
172+
# In the last case the path is relative, so the result is wrong. Thus this code:
173+
base_path = PurePath(parsed_url.netloc)
174+
if base_path == PurePath('.'):
175+
path = base_path / parsed_url.path.lstrip("/").lstrip("\\")
176+
else:
177+
path = base_path / parsed_url.path
178+
async with aiofiles.open(path, 'rb') as f:
179+
encoded_content = await f.read()
194180
except Exception as e:
195181
otel_span.record_exception(e)
196182
otel_span.add_event(
197-
"caught exception during decoding, raising HttpSessionDecodeError")
198-
raise HttpSessionDecodeError(
199-
f"cannot decode URL content from {url}: {e}") from e
183+
"caught exception when trying to read file, "
184+
"raising HttpSessionResponseError")
185+
raise HttpSessionResponseError(
186+
f"Cannot read file {url}: {e}") from e
187+
else:
188+
otel_span.add_event(
189+
"found unsupported URL protocol, raising HttpSessionArgumentError")
190+
raise HttpSessionArgumentError(
191+
f"Unsupported URL scheme: {parsed_url.scheme} in URL {url}")
192+
193+
try:
194+
encoding = str(chardet.detect(encoded_content)['encoding']) or 'utf-8'
195+
content = encoded_content.decode(encoding)
196+
except Exception as e:
197+
otel_span.record_exception(e)
198+
otel_span.add_event(
199+
"caught exception during decoding, raising HttpSessionDecodeError")
200+
raise HttpSessionDecodeError(
201+
f"cannot decode URL content from {url}: {e}") from e
200202

201-
return content
203+
return content

middleware/main.py

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@
3131
from middleware.git_repo import GitRepo, GitRepoConfig
3232
from middleware.http_session import HttpSessionConfig
3333
from middleware.metadata_scraper import MetadataScraperConfig, scrape_repo
34-
35-
# add the script directory to the python module path
36-
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
34+
from middleware.utils.tracer import traced
3735

3836
# Disable pylint warning that imports are not on top. But we need to adapt the import path before.
3937
# Is there another solution so packages next top the main script can be found?
@@ -372,37 +370,38 @@ async def process_sitemap(sitemap, local_path, default_http_config, git_repo):
372370
return repo_reports
373371

374372

373+
@traced
375374
async def main():
376375
"""
377376
The main async function of the basic middleware
378377
"""
379378

380379
args, config = setup_and_config()
381380

382-
with trace.get_tracer(__name__).start_as_current_span("main") as otel_span:
383-
try:
384-
git_repo, local_path = await setup_repo(args, config)
385-
default_http_config = HttpSessionConfig(**config["http_client"])
386-
387-
full_report = []
388-
for sitemap in config["sitemaps"]:
389-
repo_reports = await process_sitemap(
390-
sitemap, local_path, default_http_config, git_repo
391-
)
392-
full_report.extend(repo_reports)
393-
394-
if git_repo:
395-
git_repo.push()
396-
397-
print(json.dumps(full_report, indent=2, ensure_ascii=False, sort_keys=True))
398-
399-
# pylint: disable-next=broad-except
400-
except Exception as e:
401-
otel_span.record_exception(e)
402-
msg = "Error when scraping repositories"
403-
otel_span.add_event(msg)
404-
logging.exception(msg)
405-
sys.exit(1)
381+
try:
382+
git_repo, local_path = await setup_repo(args, config)
383+
default_http_config = HttpSessionConfig(**config["http_client"])
384+
385+
full_report = []
386+
for sitemap in config["sitemaps"]:
387+
repo_reports = await process_sitemap(
388+
sitemap, local_path, default_http_config, git_repo
389+
)
390+
full_report.extend(repo_reports)
391+
392+
if git_repo:
393+
git_repo.push()
394+
395+
print(json.dumps(full_report, indent=2, ensure_ascii=False, sort_keys=True))
396+
397+
# pylint: disable-next=broad-except
398+
except Exception as e:
399+
otel_span = trace.get_current_span()
400+
otel_span.record_exception(e)
401+
msg = "Error when scraping repositories"
402+
otel_span.add_event(msg)
403+
logging.exception(msg)
404+
sys.exit(1)
406405

407406

408407
if __name__ == "__main__":

middleware/metadata_scraper/__init__.py

Lines changed: 72 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
SitemapParseError, SitemapParser)
2222
from middleware.metadata_scraper.metadata_extractor.metadata_extractor import (
2323
MetadataExtractor, MetadataParseError)
24+
from middleware.utils.tracer import traced
2425

2526

2627
class MetadataScraperConfig(NamedTuple):
@@ -49,6 +50,7 @@ class MetadataScraperConfig(NamedTuple):
4950
}
5051

5152

53+
@traced
5254
async def _extract_metadata(
5355
url: str,
5456
session: HttpSession,
@@ -70,26 +72,26 @@ async def _extract_metadata(
7072
Optional[List[Dict]]
7173
A dictionary containing the extracted metadata.
7274
"""
73-
with trace.get_tracer(__name__).start_as_current_span(
74-
"MetadataScraper._extract_metadata") as otel_span:
75-
otel_span.set_attribute(url_attributes.URL_FULL, url)
76-
try:
77-
content = await session.get_decoded_url(url)
78-
metadata = extractor.get_metadata_or_log_error(content, url)
79-
return metadata
80-
except (HttpSessionResponseError, HttpSessionDecodeError) as e:
81-
# These exceptions are raised by get_decoded_url.
82-
# Treat them as errors that only relate to single datasets and
83-
# skip this dataset.
84-
# (Same approach as get_metadata_or_log_error performs internally
85-
# when it encounters parsing errors)
86-
otel_span.record_exception(e)
87-
msg = "caught recoverable exception, omitting metadataset"
88-
otel_span.add_event(msg)
89-
logging.exception(msg)
90-
return None
91-
92-
75+
otel_span = trace.get_current_span()
76+
otel_span.set_attribute(url_attributes.URL_FULL, url)
77+
try:
78+
content = await session.get_decoded_url(url)
79+
metadata = extractor.get_metadata_or_log_error(content, url)
80+
return metadata
81+
except (HttpSessionResponseError, HttpSessionDecodeError) as e:
82+
# These exceptions are raised by get_decoded_url.
83+
# Treat them as errors that only relate to single datasets and
84+
# skip this dataset.
85+
# (Same approach as get_metadata_or_log_error performs internally
86+
# when it encounters parsing errors)
87+
otel_span.record_exception(e)
88+
msg = "caught recoverable exception, omitting metadataset"
89+
otel_span.add_event(msg)
90+
logging.exception(msg)
91+
return None
92+
93+
94+
@traced
9395
async def _extract_many_metadata(
9496
urls: List[str],
9597
session: HttpSession,
@@ -114,29 +116,28 @@ async def _extract_many_metadata(
114116
include several several metadata entries or none (especially in case the
115117
metadata extraction failed).
116118
"""
117-
with trace.get_tracer(__name__).start_as_current_span(
118-
"MetadataScraper.extract_metadata") as otel_span:
119-
extractors = [_extract_metadata(
120-
url, session, extractor) for url in urls]
121-
datasets = await asyncio.gather(*extractors, return_exceptions=True)
122-
for dataset in datasets:
123-
if isinstance(dataset, Exception):
124-
otel_span.record_exception(dataset)
125-
msg = "caught unrecoverable exception, omitting all metadata of RDI"
126-
otel_span.add_event(msg)
127-
logging.exception(msg)
128-
return None, SKIP_RDI_REPORT
129-
130-
filtered_datasets = (m for m in datasets if isinstance(m, list))
131-
result = list(itertools.chain.from_iterable(filtered_datasets))
132-
report = {
133-
'valid_entries': len(result),
134-
'failed_entries': len(datasets)-len(result),
135-
'skipped': False
136-
}
137-
return result, report
119+
otel_span = trace.get_current_span()
120+
extractors = [_extract_metadata(url, session, extractor) for url in urls]
121+
datasets = await asyncio.gather(*extractors, return_exceptions=True)
122+
for dataset in datasets:
123+
if isinstance(dataset, Exception):
124+
otel_span.record_exception(dataset)
125+
msg = "caught unrecoverable exception, omitting all metadata of RDI"
126+
otel_span.add_event(msg)
127+
logging.exception(msg)
128+
return None, SKIP_RDI_REPORT
129+
130+
filtered_datasets = (m for m in datasets if isinstance(m, list))
131+
result = list(itertools.chain.from_iterable(filtered_datasets))
132+
report = {
133+
'valid_entries': len(result),
134+
'failed_entries': len(datasets)-len(result),
135+
'skipped': False
136+
}
137+
return result, report
138138

139139

140+
@traced
140141
async def scrape_repo(
141142
config: MetadataScraperConfig,
142143
default_session_config: HttpSessionConfig) -> Tuple[Optional[List[Dict]], Dict]:
@@ -156,34 +157,33 @@ async def scrape_repo(
156157
The extracted metadata in terms of python dictonaries.
157158
"""
158159

159-
with trace.get_tracer(__name__).start_as_current_span(
160-
"MetadataScraper.scrape_repo") as otel_span:
161-
otel_span.set_attribute(
162-
"FAIRagro.middleware.MetadataScraper.repository_name", config.name)
163-
otel_span.set_attribute(
164-
"FAIRagro.middleware.MetadataScraper.repository_sitemap_url", config.url)
165-
try:
166-
if config.http_client:
167-
http_session_config = HttpSessionConfig(**config.http_client)
168-
else:
169-
http_session_config = default_session_config
170-
async with HttpSession(http_session_config) as session:
171-
sitemap_content = await session.get_decoded_url(config.url)
172-
parser = SitemapParser.create_instance(
173-
config.sitemap, sitemap_content)
174-
if parser.has_metadata:
175-
return parser.metadata
176-
177-
urls = list(parser.datasets)
178-
if config.metadata:
179-
extractor = MetadataExtractor.create_instance(config.metadata)
180-
metadata, report = await _extract_many_metadata(urls, session, extractor)
181-
return metadata, report
182-
183-
except (HttpSessionFetchError, SitemapParseError, MetadataParseError) as e:
184-
otel_span.record_exception(e)
185-
msg = "Could not download or parse RDI sitemap, skipping RDI"
186-
otel_span.add_event(msg)
187-
logging.exception(msg)
188-
189-
return None, SKIP_RDI_REPORT
160+
otel_span = trace.get_current_span()
161+
otel_span.set_attribute(
162+
"FAIRagro.middleware.MetadataScraper.repository_name", config.name)
163+
otel_span.set_attribute(
164+
"FAIRagro.middleware.MetadataScraper.repository_sitemap_url", config.url)
165+
try:
166+
if config.http_client:
167+
http_session_config = HttpSessionConfig(**config.http_client)
168+
else:
169+
http_session_config = default_session_config
170+
async with HttpSession(http_session_config) as session:
171+
sitemap_content = await session.get_decoded_url(config.url)
172+
parser = SitemapParser.create_instance(
173+
config.sitemap, sitemap_content)
174+
if parser.has_metadata:
175+
return parser.metadata
176+
177+
urls = list(parser.datasets)
178+
if config.metadata:
179+
extractor = MetadataExtractor.create_instance(config.metadata)
180+
metadata, report = await _extract_many_metadata(urls, session, extractor)
181+
return metadata, report
182+
183+
except (HttpSessionFetchError, SitemapParseError, MetadataParseError) as e:
184+
otel_span.record_exception(e)
185+
msg = "Could not download or parse RDI sitemap, skipping RDI"
186+
otel_span.add_event(msg)
187+
logging.exception(msg)
188+
189+
return None, SKIP_RDI_REPORT

0 commit comments

Comments
 (0)