Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ sitemaps:
- name: publisso
url: "https://frl.publisso.de/find?q=contentType:researchData&from=0&until=200&format=json"
sitemap: publisso
- name: thunen_atlas
url: "https://atlas.thuenen.de/api/v2/resources?page_size=200&format=json"
sitemap: thunen_atlas
# - name: thunen_atlas
# url: "https://atlas.thuenen.de/api/v2/resources?page_size=200&format=json"
# sitemap: thunen_atlas

# Configures the HTTP client uses to download sitemaps and datasets
http_client:
Expand Down
186 changes: 144 additions & 42 deletions middleware/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,23 @@
import datetime
import argparse
import json
import re
import logging
import subprocess
from typing import Tuple
import tempfile

import asyncio
import aiofiles
import pytz
import yaml
from opentelemetry import trace #, metrics
from opentelemetry import trace # , metrics
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# from opentelemetry.sdk.metrics import MeterProvider
# from opentelemetry.sdk.metrics.export import (
# ConsoleMetricExporter,
Expand Down Expand Up @@ -59,18 +63,16 @@ def setup_opentelemetry(otlp_config: dict) -> None:
opentelemetry.instrumentation.urllib.URLLibInstrumentor().instrument()
opentelemetry.instrumentation.aiohttp_client.AioHttpClientInstrumentor().instrument()

endpoint = otlp_config.get('endpoint')
endpoint = otlp_config.get("endpoint")
if endpoint:
# Initialize OpenTelemetry for Tracing to OTLP endpoint
trace.set_tracer_provider(
TracerProvider(
resource=Resource.create({
"service.name": "FAIRagro middleware"
}),
resource=Resource.create({"service.name": "FAIRagro middleware"}),
active_span_processor=BatchSpanProcessor(
OTLPSpanExporter(endpoint=endpoint)
),
sampler=ALWAYS_ON
sampler=ALWAYS_ON,
)
)
else:
Expand All @@ -88,9 +90,10 @@ def setup_opentelemetry(otlp_config: dict) -> None:


async def scrape_repo_and_write_to_file(
folder_path: str,
scraper_config: MetadataScraperConfig,
default_http_config: HttpSessionConfig) -> Tuple[str, datetime.datetime]:
folder_path: str,
scraper_config: MetadataScraperConfig,
default_http_config: HttpSessionConfig,
) -> Tuple[str, datetime.datetime]:
"""
Scrapes research repository metadata and writes it to a file.

Expand All @@ -114,15 +117,17 @@ async def scrape_repo_and_write_to_file(
# simple synchronous gauge values.
# count_sites = len(sites)
# count_metadata = len(metadata)
path = os.path.join(folder_path, f'{scraper_config.name}.json')
async with aiofiles.open(path, 'w', encoding='utf-8') as f:
await f.write(json.dumps(metadata, indent=2, ensure_ascii=False, sort_keys=True))
path = os.path.join(folder_path, f"{scraper_config.name}.json")
async with aiofiles.open(path, "w", encoding="utf-8") as f:
await f.write(
json.dumps(metadata, indent=2, ensure_ascii=False, sort_keys=True)
)
return path, start_timestamp, report

def commit_to_git(sitemap_url: str,
git_repo: GitRepo,
path: str,
starttime: datetime) -> None:

def commit_to_git(
sitemap_url: str, git_repo: GitRepo, path: str, starttime: datetime
) -> None:
"""
Create a log message and commit file to git.

Expand All @@ -141,53 +146,144 @@ def commit_to_git(sitemap_url: str,
-------
None
"""
formatted_time = starttime.strftime('%Y-%m-%d %H:%M:%S.%f %Z%z')
msg = (
f'harvested by FAIRargo middleware at {formatted_time} from {sitemap_url}'
)
formatted_time = starttime.strftime("%Y-%m-%d %H:%M:%S.%f %Z%z")
msg = f"harvested by FAIRargo middleware at {formatted_time} from {sitemap_url}"
git_repo.add_and_commit([path], msg)


def setup_andconfig() -> dict:
"""
This function will perform setup work and reads the configuration file.
"""

try:
parser = argparse.ArgumentParser(
prog = 'fairagro-middleware',
description= 'Extracts schema.org meta data from research data repositories.',
prog="fairagro-middleware",
description="Extracts schema.org meta data from research data repositories.",
)
parser.add_argument('--config', '-c',
type=Path,
default='config.yml',
help='Config file for this tool.')
parser.add_argument(
'--git',
"--config",
"-c",
type=Path,
default="config.yml",
help="Config file for this tool.",
)
parser.add_argument(
"--git",
action=argparse.BooleanOptionalAction,
default=True,
help=(
'Specify this flag to enabled or disable git interactions.'
'If disabled the outout files will nevrtheless be written to git.local_path '
'as specified within the config file.'
)
"Specify this flag to enabled or disable git interactions."
"If disabled the outout files will nevrtheless be written to git.local_path "
"as specified within the config file."
),
)
args = parser.parse_args()

if not os.path.isfile(args.config):
raise FileNotFoundError(f'Config file {args.config} does not exist.')
raise FileNotFoundError(f"Config file {args.config} does not exist.")

# load config
with open(args.config, 'r', encoding='utf-8') as f:
with open(args.config, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)

setup_opentelemetry(config['opentelemetry'])
setup_opentelemetry(config["opentelemetry"])

return args, config
# pylint: disable-next=broad-except
except Exception:
logging.exception("An error occured during initialization")
sys.exit(1)


def transform_publisso_to_publisso_schemaorg():
"""
Transform the Publisso metadata to schema.org format.
"""

# Archivos
input_file = Path("./output/publisso.json").resolve()
jq_script = Path("./scripts/publiso_conversor.jq").resolve()

if not input_file.exists():
print(f"❌ Archivo de entrada no encontrado: {input_file}")
return

# Crear directorio de salida si no existe
input_file.parent.mkdir(parents=True, exist_ok=True)

# Usar archivo temporal para el resultado
with tempfile.NamedTemporaryFile(
mode="w", delete=False, dir=input_file.parent, suffix=".json"
) as tmp_file:
tmp_path = Path(tmp_file.name)

try:
# Ejecutar jq en memoria
p1 = subprocess.Popen(
["jq", "-f", str(jq_script), str(input_file)], stdout=subprocess.PIPE
)
with open(tmp_path, "w", encoding="utf-8") as outfile:
p2 = subprocess.Popen(
["jq", "-s", "."], stdin=p1.stdout, stdout=outfile
)
p1.stdout.close() # Permite que p1 reciba SIGPIPE si p2 falla
p2.communicate() # Espera a que termine

# Reemplazar archivo original
os.remove(input_file) # Eliminar input original
tmp_path.rename(input_file) # Renombrar temp como input original

print(f"✅ Transformación completada, archivo actualizado: {input_file}")

except subprocess.CalledProcessError as e:
print(f"❌ Error al ejecutar jq: {e}")
if tmp_path.exists():
tmp_path.unlink()


def extract_thunen_from_openagrar_metadata():
"""
Extract Thünen metadata from OpenAgrar metadata.
"""
# Configuration
input_file = Path("./output/openagrar.json").resolve()
output_file = Path("./output/thunen.json").resolve()

if not input_file.exists():
print(f"❌ Archivo de entrada no encontrado: {input_file}")
return
# Regex pattern to match publisher synonyms
publisher_pattern = re.compile(
r"Thünen[- ]?Institut|Thuenen Institute|Thünen-Atlas", re.IGNORECASE
)

# Load JSON
with open(input_file, "r", encoding="utf-8") as f:
data = json.load(f)
print(f"Found {len(data)} datasets in {input_file}")
# Separate datasets
filtered = []
remaining = []
for dataset in data:
publisher_name = dataset.get("publisher", {}).get("name", "")
if publisher_pattern.search(publisher_name):
filtered.append(dataset)
else:
remaining.append(dataset)

# Write filtered datasets to new file
with open(output_file, "w", encoding="utf-8") as f:
json.dump(filtered, f, ensure_ascii=False, indent=2)

# Update original file with remaining datasets
with open(input_file, "w", encoding="utf-8") as f:
json.dump(remaining, f, ensure_ascii=False, indent=2)

print(f"Extracted {len(filtered)} datasets to {output_file}")
print(f"{len(remaining)} datasets remain in {input_file}")


async def main():
"""
The main async function of the basic middleware
Expand All @@ -199,30 +295,35 @@ async def main():
try:
# setup git repo if desired
if args.git:
git_config = GitRepoConfig(**config['git'])
git_config = GitRepoConfig(**config["git"])
git_repo = GitRepo(git_config)
local_path = git_repo.working_dir
git_repo.pull()
else:
git_repo = None
local_path = config['git']['local_path']
local_path = config["git"]["local_path"]
os.makedirs(local_path, exist_ok=True)

default_http_config = HttpSessionConfig(**config['http_client'])
default_http_config = HttpSessionConfig(**config["http_client"])
full_report = []
# scrape sites
for sitemap in config['sitemaps']:
for sitemap in config["sitemaps"]:
scraper_config = MetadataScraperConfig(**sitemap)
path, starttime, repo_report = await scrape_repo_and_write_to_file(
local_path, scraper_config, default_http_config)
full_report += [{'repo_name': sitemap['name'] , **repo_report}]
local_path, scraper_config, default_http_config
)
full_report += [{"repo_name": sitemap["name"], **repo_report}]
if sitemap["name"] == "publisso":
transform_publisso_to_publisso_schemaorg()
if sitemap["name"] == "openagrar":
extract_thunen_from_openagrar_metadata()
if git_repo:
commit_to_git(scraper_config.url, git_repo, path, starttime)

if git_repo:
git_repo.push()

print(json.dumps(full_report, indent=2, ensure_ascii=False, sort_keys=True))
# print(json.dumps(full_report, indent=2, ensure_ascii=False, sort_keys=True))
# pylint: disable-next=broad-except
except Exception as e:
otel_span = trace.get_current_span()
Expand All @@ -232,5 +333,6 @@ async def main():
logging.exception(msg)
sys.exit(1)

if __name__ == '__main__':

if __name__ == "__main__":
asyncio.run(main())
75 changes: 75 additions & 0 deletions middleware/scripts/publiso_conversor.jq
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
..[] | {
"@context": {
"@language": "en",
"@vocab":"https://schema.org/"},
"@type": "Dataset",
"@id": ("https://doi.org/" + .doi),
"name": (if has("title") then (.title[]) else null end),
"alternativeHeadline": (if has("alternative") then .alternative[0] else null end),
"description": (if has("description") then (.description[]) else null end),
"identifier": (if has("doi") then (.doi | {"@type":"PropertyValue", "value":., "propertyID": "https://registry.identifiers.org/registry/doi", "url": ("https://doi.org/" + .)}) else null end),
"creator":
[

(if has("creator") then
(.creator[]
| {
"@type": "Person",
"familyName": (.prefLabel | split(", ")[0]),
"givenName": (.prefLabel | split(", ")[1: ] | join(" ")),
"identifier": ."@id"
}
)
else null end)
],
"contributor":
[
(if has("contributor") then
(.contributor[]
| {
"@type": "Person",
"familyName": (.prefLabel | split(", ")[0]),
"givenName": (.prefLabel | split(", ")[1: ] | join(" ")),
"identifier": ."@id"
}
)
else null end)
],
"sourceOrganization":
[
(if has("institution") then (.institution[] | {"@type":"Organization", "@id": ."@id", "name": .prefLabel}) else null end)
],
"datePublished": (if has("issued") then (.issued) else null end),
"copyrightYear": (if has("yearOfCopyright") then (.yearOfCopyright[0]) else null end),
"keywords":
[
(if has("ddc") then (.ddc[] | {"@type":"DefinedTerm", "name": .prefLabel, "identifier": ."@id", "inDefinedTermSet": "https://www.oclc.org/en/dewey.html"}) else null end),
(if has("subject") then (.subject[] | {"@type":"DefinedTerm", "name": .prefLabel, "identifier": ."@id"}) else null end)
],
"measurementTechnique":
[
(if has("dataOrigin") then (.dataOrigin[] | {"@type":"DefinedTerm", "name": .prefLabel, "identifier": ."@id"}) else null end)
],
"funder":
[
(if has("fundingId") then (.fundingId[] | {"@type":"Organization", "@id": ."@id", "name": .prefLabel}) else null end)
],
"funding":
[
(if has("joinedFunding") then (.joinedFunding[] | {"@type":"Grant", "name": .fundingProgramJoined, "funder": .fundingJoined | {"@type":"Organization", "@id": ."@id", "name": .prefLabel}}) else (if has("fundingProgram") then (.fundingProgram[] | {"@type":"Grant", "name": .}) else null end) end)
],
"distribution":
[
(if has("hasPart") then (.hasPart[] | {"@type":"DataDownload", "@id": ("https://repository.publisso.de/resource/" + ."@id"), "name": .prefLabel}) else null end)
],
"inLanguage":
[
(if has("language") then (.language[] | {"@type":"Language", "@id": ."@id", "name": .prefLabel}) else null end)
],
"license": (if has("license") then (.license[]."@id") else null end),
"spatial":
[
(if has("recordingCoordinates") then (.recordingCoordinates[] | {"@type":"Place", "url": ."@id"}) else null end),
(if has("recordingLocation") then (.recordingLocation[] | {"@type":"Place", "name": .prefLabel, "url": ."@id"}) else null end)
]
} | del(..|nulls)
Loading
Loading