diff --git a/config.yml b/config.yml index 94fba89..4a7e94b 100644 --- a/config.yml +++ b/config.yml @@ -23,9 +23,9 @@ sitemaps: - name: publisso url: "https://frl.publisso.de/find?q=contentType:researchData&from=0&until=200&format=json" sitemap: publisso - - name: thunen_atlas - url: "https://atlas.thuenen.de/api/v2/resources?page_size=200&format=json" - sitemap: thunen_atlas + # - name: thunen_atlas + # url: "https://atlas.thuenen.de/api/v2/resources?page_size=200&format=json" + # sitemap: thunen_atlas # Configures the HTTP client uses to download sitemaps and datasets http_client: diff --git a/middleware/main.py b/middleware/main.py index 927eaee..4693c6c 100644 --- a/middleware/main.py +++ b/middleware/main.py @@ -8,19 +8,23 @@ import datetime import argparse import json +import re import logging +import subprocess from typing import Tuple +import tempfile import asyncio import aiofiles import pytz import yaml -from opentelemetry import trace #, metrics +from opentelemetry import trace # , metrics from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.sampling import ALWAYS_ON from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + # from opentelemetry.sdk.metrics import MeterProvider # from opentelemetry.sdk.metrics.export import ( # ConsoleMetricExporter, @@ -59,18 +63,16 @@ def setup_opentelemetry(otlp_config: dict) -> None: opentelemetry.instrumentation.urllib.URLLibInstrumentor().instrument() opentelemetry.instrumentation.aiohttp_client.AioHttpClientInstrumentor().instrument() - endpoint = otlp_config.get('endpoint') + endpoint = otlp_config.get("endpoint") if endpoint: # Initialize OpenTelemetry for Tracing to OTLP endpoint trace.set_tracer_provider( TracerProvider( - resource=Resource.create({ - "service.name": "FAIRagro middleware" - }), + resource=Resource.create({"service.name": "FAIRagro middleware"}), active_span_processor=BatchSpanProcessor( OTLPSpanExporter(endpoint=endpoint) ), - sampler=ALWAYS_ON + sampler=ALWAYS_ON, ) ) else: @@ -88,9 +90,10 @@ def setup_opentelemetry(otlp_config: dict) -> None: async def scrape_repo_and_write_to_file( - folder_path: str, - scraper_config: MetadataScraperConfig, - default_http_config: HttpSessionConfig) -> Tuple[str, datetime.datetime]: + folder_path: str, + scraper_config: MetadataScraperConfig, + default_http_config: HttpSessionConfig, +) -> Tuple[str, datetime.datetime]: """ Scrapes research repository metadata and writes it to a file. @@ -114,15 +117,17 @@ async def scrape_repo_and_write_to_file( # simple synchronous gauge values. # count_sites = len(sites) # count_metadata = len(metadata) - path = os.path.join(folder_path, f'{scraper_config.name}.json') - async with aiofiles.open(path, 'w', encoding='utf-8') as f: - await f.write(json.dumps(metadata, indent=2, ensure_ascii=False, sort_keys=True)) + path = os.path.join(folder_path, f"{scraper_config.name}.json") + async with aiofiles.open(path, "w", encoding="utf-8") as f: + await f.write( + json.dumps(metadata, indent=2, ensure_ascii=False, sort_keys=True) + ) return path, start_timestamp, report -def commit_to_git(sitemap_url: str, - git_repo: GitRepo, - path: str, - starttime: datetime) -> None: + +def commit_to_git( + sitemap_url: str, git_repo: GitRepo, path: str, starttime: datetime +) -> None: """ Create a log message and commit file to git. @@ -141,12 +146,11 @@ def commit_to_git(sitemap_url: str, ------- None """ - formatted_time = starttime.strftime('%Y-%m-%d %H:%M:%S.%f %Z%z') - msg = ( - f'harvested by FAIRargo middleware at {formatted_time} from {sitemap_url}' - ) + formatted_time = starttime.strftime("%Y-%m-%d %H:%M:%S.%f %Z%z") + msg = f"harvested by FAIRargo middleware at {formatted_time} from {sitemap_url}" git_repo.add_and_commit([path], msg) + def setup_andconfig() -> dict: """ This function will perform setup work and reads the configuration file. @@ -154,33 +158,36 @@ def setup_andconfig() -> dict: try: parser = argparse.ArgumentParser( - prog = 'fairagro-middleware', - description= 'Extracts schema.org meta data from research data repositories.', + prog="fairagro-middleware", + description="Extracts schema.org meta data from research data repositories.", ) - parser.add_argument('--config', '-c', - type=Path, - default='config.yml', - help='Config file for this tool.') parser.add_argument( - '--git', + "--config", + "-c", + type=Path, + default="config.yml", + help="Config file for this tool.", + ) + parser.add_argument( + "--git", action=argparse.BooleanOptionalAction, default=True, help=( - 'Specify this flag to enabled or disable git interactions.' - 'If disabled the outout files will nevrtheless be written to git.local_path ' - 'as specified within the config file.' - ) + "Specify this flag to enabled or disable git interactions." + "If disabled the outout files will nevrtheless be written to git.local_path " + "as specified within the config file." + ), ) args = parser.parse_args() if not os.path.isfile(args.config): - raise FileNotFoundError(f'Config file {args.config} does not exist.') + raise FileNotFoundError(f"Config file {args.config} does not exist.") # load config - with open(args.config, 'r', encoding='utf-8') as f: + with open(args.config, "r", encoding="utf-8") as f: config = yaml.safe_load(f) - setup_opentelemetry(config['opentelemetry']) + setup_opentelemetry(config["opentelemetry"]) return args, config # pylint: disable-next=broad-except @@ -188,6 +195,94 @@ def setup_andconfig() -> dict: logging.exception("An error occured during initialization") sys.exit(1) + +def transform_publisso_to_publisso_schemaorg(): + """ + Transform the Publisso metadata to schema.org format. + """ + + # Archivos + input_file = Path("./output/publisso.json").resolve() + jq_script = Path("./scripts/publiso_conversor.jq").resolve() + + if not input_file.exists(): + print(f"❌ Archivo de entrada no encontrado: {input_file}") + return + + # Crear directorio de salida si no existe + input_file.parent.mkdir(parents=True, exist_ok=True) + + # Usar archivo temporal para el resultado + with tempfile.NamedTemporaryFile( + mode="w", delete=False, dir=input_file.parent, suffix=".json" + ) as tmp_file: + tmp_path = Path(tmp_file.name) + + try: + # Ejecutar jq en memoria + p1 = subprocess.Popen( + ["jq", "-f", str(jq_script), str(input_file)], stdout=subprocess.PIPE + ) + p2 = subprocess.Popen( + ["jq", "-s", "."], stdin=p1.stdout, stdout=open(tmp_path, "w") + ) + p1.stdout.close() # Permite que p1 reciba SIGPIPE si p2 falla + p2.communicate() # Espera a que termine + + # Reemplazar archivo original + os.remove(input_file) # Eliminar input original + tmp_path.rename(input_file) # Renombrar temp como input original + + print(f"✅ Transformación completada, archivo actualizado: {input_file}") + + except subprocess.CalledProcessError as e: + print(f"❌ Error al ejecutar jq: {e}") + if tmp_path.exists(): + tmp_path.unlink() + + +def extract_thunen_from_openagrar_metadata(): + """ + Extract Thünen metadata from OpenAgrar metadata. + """ + # Configuration + input_file = Path("./output/openagrar.json").resolve() + output_file = Path("./output/thunen.json").resolve() + + if not input_file.exists(): + print(f"❌ Archivo de entrada no encontrado: {input_file}") + return + # Regex pattern to match publisher synonyms + publisher_pattern = re.compile( + r"Thünen[- ]?Institut|Thuenen Institute|Thünen-Atlas", re.IGNORECASE + ) + + # Load JSON + with open(input_file, "r", encoding="utf-8") as f: + data = json.load(f) + print(f"Found {len(data)} datasets in {input_file}") + # Separate datasets + filtered = [] + remaining = [] + for dataset in data: + publisher_name = dataset.get("publisher", {}).get("name", "") + if publisher_pattern.search(publisher_name): + filtered.append(dataset) + else: + remaining.append(dataset) + + # Write filtered datasets to new file + with open(output_file, "w", encoding="utf-8") as f: + json.dump(filtered, f, ensure_ascii=False, indent=2) + + # Update original file with remaining datasets + with open(input_file, "w", encoding="utf-8") as f: + json.dump(remaining, f, ensure_ascii=False, indent=2) + + print(f"Extracted {len(filtered)} datasets to {output_file}") + print(f"{len(remaining)} datasets remain in {input_file}") + + async def main(): """ The main async function of the basic middleware @@ -199,30 +294,35 @@ async def main(): try: # setup git repo if desired if args.git: - git_config = GitRepoConfig(**config['git']) + git_config = GitRepoConfig(**config["git"]) git_repo = GitRepo(git_config) local_path = git_repo.working_dir git_repo.pull() else: git_repo = None - local_path = config['git']['local_path'] + local_path = config["git"]["local_path"] os.makedirs(local_path, exist_ok=True) - default_http_config = HttpSessionConfig(**config['http_client']) + default_http_config = HttpSessionConfig(**config["http_client"]) full_report = [] # scrape sites - for sitemap in config['sitemaps']: + for sitemap in config["sitemaps"]: scraper_config = MetadataScraperConfig(**sitemap) path, starttime, repo_report = await scrape_repo_and_write_to_file( - local_path, scraper_config, default_http_config) - full_report += [{'repo_name': sitemap['name'] , **repo_report}] + local_path, scraper_config, default_http_config + ) + full_report += [{"repo_name": sitemap["name"], **repo_report}] + if sitemap["name"] == "publisso": + transform_publisso_to_publisso_schemaorg() + if sitemap["name"] == "openagrar": + extract_thunen_from_openagrar_metadata() if git_repo: commit_to_git(scraper_config.url, git_repo, path, starttime) if git_repo: git_repo.push() - print(json.dumps(full_report, indent=2, ensure_ascii=False, sort_keys=True)) + # print(json.dumps(full_report, indent=2, ensure_ascii=False, sort_keys=True)) # pylint: disable-next=broad-except except Exception as e: otel_span = trace.get_current_span() @@ -232,5 +332,6 @@ async def main(): logging.exception(msg) sys.exit(1) -if __name__ == '__main__': + +if __name__ == "__main__": asyncio.run(main()) diff --git a/middleware/scripts/publiso_conversor.jq b/middleware/scripts/publiso_conversor.jq new file mode 100644 index 0000000..b6c7c7a --- /dev/null +++ b/middleware/scripts/publiso_conversor.jq @@ -0,0 +1,75 @@ +..[] | { + "@context": { + "@language": "en", + "@vocab":"https://schema.org/"}, + "@type": "Dataset", + "@id": ("https://doi.org/" + .doi), + "name": (if has("title") then (.title[]) else null end), + "alternativeHeadline": (if has("alternative") then .alternative[0] else null end), + "description": (if has("description") then (.description[]) else null end), + "identifier": (if has("doi") then (.doi | {"@type":"PropertyValue", "value":., "propertyID": "https://registry.identifiers.org/registry/doi", "url": ("https://doi.org/" + .)}) else null end), + "creator": + [ + + (if has("creator") then + (.creator[] + | { + "@type": "Person", + "familyName": (.prefLabel | split(", ")[0]), + "givenName": (.prefLabel | split(", ")[1: ] | join(" ")), + "identifier": ."@id" + } + ) + else null end) + ], + "contributor": + [ + (if has("contributor") then + (.contributor[] + | { + "@type": "Person", + "familyName": (.prefLabel | split(", ")[0]), + "givenName": (.prefLabel | split(", ")[1: ] | join(" ")), + "identifier": ."@id" + } + ) + else null end) +], + "sourceOrganization": + [ + (if has("institution") then (.institution[] | {"@type":"Organization", "@id": ."@id", "name": .prefLabel}) else null end) + ], + "datePublished": (if has("issued") then (.issued) else null end), + "copyrightYear": (if has("yearOfCopyright") then (.yearOfCopyright[0]) else null end), + "keywords": + [ + (if has("ddc") then (.ddc[] | {"@type":"DefinedTerm", "name": .prefLabel, "identifier": ."@id", "inDefinedTermSet": "https://www.oclc.org/en/dewey.html"}) else null end), + (if has("subject") then (.subject[] | {"@type":"DefinedTerm", "name": .prefLabel, "identifier": ."@id"}) else null end) + ], + "measurementTechnique": + [ + (if has("dataOrigin") then (.dataOrigin[] | {"@type":"DefinedTerm", "name": .prefLabel, "identifier": ."@id"}) else null end) + ], + "funder": + [ + (if has("fundingId") then (.fundingId[] | {"@type":"Organization", "@id": ."@id", "name": .prefLabel}) else null end) + ], + "funding": + [ + (if has("joinedFunding") then (.joinedFunding[] | {"@type":"Grant", "name": .fundingProgramJoined, "funder": .fundingJoined | {"@type":"Organization", "@id": ."@id", "name": .prefLabel}}) else (if has("fundingProgram") then (.fundingProgram[] | {"@type":"Grant", "name": .}) else null end) end) + ], + "distribution": + [ + (if has("hasPart") then (.hasPart[] | {"@type":"DataDownload", "@id": ("https://repository.publisso.de/resource/" + ."@id"), "name": .prefLabel}) else null end) + ], + "inLanguage": + [ + (if has("language") then (.language[] | {"@type":"Language", "@id": ."@id", "name": .prefLabel}) else null end) + ], + "license": (if has("license") then (.license[]."@id") else null end), + "spatial": + [ + (if has("recordingCoordinates") then (.recordingCoordinates[] | {"@type":"Place", "url": ."@id"}) else null end), + (if has("recordingLocation") then (.recordingLocation[] | {"@type":"Place", "name": .prefLabel, "url": ."@id"}) else null end) + ] + } | del(..|nulls) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index de1d0cb..7051d57 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,6 +20,7 @@ html_text==0.7.0 idna==3.10 importlib_metadata==8.6.1 isodate==0.7.2 +jq==1.10.0 jstyleson==0.0.2 lxml==5.4.0 lxml_html_clean==0.4.2 @@ -55,3 +56,5 @@ webencodings==0.5.1 wrapt==1.17.2 yarl==1.20.0 zipp==3.21.0 + +