88import datetime
99import argparse
1010import json
11+ import jq
1112import logging
13+ import subprocess
1214from typing import Tuple
15+ import tempfile
1316
1417import asyncio
1518import aiofiles
1619import pytz
1720import yaml
18- from opentelemetry import trace # , metrics
21+ from opentelemetry import trace # , metrics
1922from opentelemetry .sdk .resources import Resource
2023from opentelemetry .sdk .trace import TracerProvider
2124from opentelemetry .sdk .trace .sampling import ALWAYS_ON
2225from opentelemetry .sdk .trace .export import BatchSpanProcessor
2326from opentelemetry .exporter .otlp .proto .grpc .trace_exporter import OTLPSpanExporter
27+
2428# from opentelemetry.sdk.metrics import MeterProvider
2529# from opentelemetry.sdk.metrics.export import (
2630# ConsoleMetricExporter,
@@ -59,18 +63,16 @@ def setup_opentelemetry(otlp_config: dict) -> None:
5963 opentelemetry .instrumentation .urllib .URLLibInstrumentor ().instrument ()
6064 opentelemetry .instrumentation .aiohttp_client .AioHttpClientInstrumentor ().instrument ()
6165
62- endpoint = otlp_config .get (' endpoint' )
66+ endpoint = otlp_config .get (" endpoint" )
6367 if endpoint :
6468 # Initialize OpenTelemetry for Tracing to OTLP endpoint
6569 trace .set_tracer_provider (
6670 TracerProvider (
67- resource = Resource .create ({
68- "service.name" : "FAIRagro middleware"
69- }),
71+ resource = Resource .create ({"service.name" : "FAIRagro middleware" }),
7072 active_span_processor = BatchSpanProcessor (
7173 OTLPSpanExporter (endpoint = endpoint )
7274 ),
73- sampler = ALWAYS_ON
75+ sampler = ALWAYS_ON ,
7476 )
7577 )
7678 else :
@@ -88,9 +90,10 @@ def setup_opentelemetry(otlp_config: dict) -> None:
8890
8991
9092async def scrape_repo_and_write_to_file (
91- folder_path : str ,
92- scraper_config : MetadataScraperConfig ,
93- default_http_config : HttpSessionConfig ) -> Tuple [str , datetime .datetime ]:
93+ folder_path : str ,
94+ scraper_config : MetadataScraperConfig ,
95+ default_http_config : HttpSessionConfig ,
96+ ) -> Tuple [str , datetime .datetime ]:
9497 """
9598 Scrapes research repository metadata and writes it to a file.
9699
@@ -114,15 +117,17 @@ async def scrape_repo_and_write_to_file(
114117 # simple synchronous gauge values.
115118 # count_sites = len(sites)
116119 # count_metadata = len(metadata)
117- path = os .path .join (folder_path , f'{ scraper_config .name } .json' )
118- async with aiofiles .open (path , 'w' , encoding = 'utf-8' ) as f :
119- await f .write (json .dumps (metadata , indent = 2 , ensure_ascii = False , sort_keys = True ))
120+ path = os .path .join (folder_path , f"{ scraper_config .name } .json" )
121+ async with aiofiles .open (path , "w" , encoding = "utf-8" ) as f :
122+ await f .write (
123+ json .dumps (metadata , indent = 2 , ensure_ascii = False , sort_keys = True )
124+ )
120125 return path , start_timestamp , report
121126
122- def commit_to_git ( sitemap_url : str ,
123- git_repo : GitRepo ,
124- path : str ,
125- starttime : datetime ) -> None :
127+
128+ def commit_to_git (
129+ sitemap_url : str , git_repo : GitRepo , path : str , starttime : datetime
130+ ) -> None :
126131 """
127132 Create a log message and commit file to git.
128133
@@ -141,53 +146,108 @@ def commit_to_git(sitemap_url: str,
141146 -------
142147 None
143148 """
144- formatted_time = starttime .strftime ('%Y-%m-%d %H:%M:%S.%f %Z%z' )
145- msg = (
146- f'harvested by FAIRargo middleware at { formatted_time } from { sitemap_url } '
147- )
149+ formatted_time = starttime .strftime ("%Y-%m-%d %H:%M:%S.%f %Z%z" )
150+ msg = f"harvested by FAIRargo middleware at { formatted_time } from { sitemap_url } "
148151 git_repo .add_and_commit ([path ], msg )
149152
153+
150154def setup_andconfig () -> dict :
151155 """
152156 This function will perform setup work and reads the configuration file.
153157 """
154158
155159 try :
156160 parser = argparse .ArgumentParser (
157- prog = ' fairagro-middleware' ,
158- description = ' Extracts schema.org meta data from research data repositories.' ,
161+ prog = " fairagro-middleware" ,
162+ description = " Extracts schema.org meta data from research data repositories." ,
159163 )
160- parser .add_argument ('--config' , '-c' ,
161- type = Path ,
162- default = 'config.yml' ,
163- help = 'Config file for this tool.' )
164164 parser .add_argument (
165- '--git' ,
165+ "--config" ,
166+ "-c" ,
167+ type = Path ,
168+ default = "config.yml" ,
169+ help = "Config file for this tool." ,
170+ )
171+ parser .add_argument (
172+ "--git" ,
166173 action = argparse .BooleanOptionalAction ,
167174 default = True ,
168175 help = (
169- ' Specify this flag to enabled or disable git interactions.'
170- ' If disabled the outout files will nevrtheless be written to git.local_path '
171- ' as specified within the config file.'
172- )
176+ " Specify this flag to enabled or disable git interactions."
177+ " If disabled the outout files will nevrtheless be written to git.local_path "
178+ " as specified within the config file."
179+ ),
173180 )
174181 args = parser .parse_args ()
175182
176183 if not os .path .isfile (args .config ):
177- raise FileNotFoundError (f' Config file { args .config } does not exist.' )
184+ raise FileNotFoundError (f" Config file { args .config } does not exist." )
178185
179186 # load config
180- with open (args .config , 'r' , encoding = ' utf-8' ) as f :
187+ with open (args .config , "r" , encoding = " utf-8" ) as f :
181188 config = yaml .safe_load (f )
182189
183- setup_opentelemetry (config [' opentelemetry' ])
190+ setup_opentelemetry (config [" opentelemetry" ])
184191
185192 return args , config
186193 # pylint: disable-next=broad-except
187194 except Exception :
188195 logging .exception ("An error occured during initialization" )
189196 sys .exit (1 )
190197
198+
199+ def transform_publisso_to_publisso_schemaorg ():
200+ """
201+ Transform the Publisso metadata to schema.org format.
202+ """
203+
204+ # Archivos
205+ input_file = Path ("./output/publisso.json" ).resolve ()
206+ jq_script = Path ("./scripts/publiso_conversor.jq" ).resolve ()
207+
208+ if not input_file .exists ():
209+ print (f"❌ Archivo de entrada no encontrado: { input_file } " )
210+ return
211+
212+ # Crear directorio de salida si no existe
213+ input_file .parent .mkdir (parents = True , exist_ok = True )
214+
215+ # Usar archivo temporal para el resultado
216+ with tempfile .NamedTemporaryFile (
217+ mode = "w" , delete = False , dir = input_file .parent , suffix = ".json"
218+ ) as tmp_file :
219+ tmp_path = Path (tmp_file .name )
220+
221+ try :
222+ # Ejecutar jq en memoria
223+ p1 = subprocess .Popen (
224+ ["jq" , "-f" , str (jq_script ), str (input_file )], stdout = subprocess .PIPE
225+ )
226+ p2 = subprocess .Popen (
227+ ["jq" , "-s" , "." ], stdin = p1 .stdout , stdout = open (tmp_path , "w" )
228+ )
229+ p1 .stdout .close () # Permite que p1 reciba SIGPIPE si p2 falla
230+ p2 .communicate () # Espera a que termine
231+
232+ # Reemplazar archivo original
233+ os .remove (input_file ) # Eliminar input original
234+ tmp_path .rename (input_file ) # Renombrar temp como input original
235+
236+ print (f"✅ Transformación completada, archivo actualizado: { input_file } " )
237+
238+ except subprocess .CalledProcessError as e :
239+ print (f"❌ Error al ejecutar jq: { e } " )
240+ if tmp_path .exists ():
241+ tmp_path .unlink ()
242+
243+ def extract_thunen_from_openagrar_metadata ():
244+ """
245+ Extract Thünen metadata from OpenAgrar metadata.
246+ """
247+ with open ("openagrar_metadata.json" , "r" , encoding = "utf-8" ) as f :
248+ openagrar_metadata = json .load (f )
249+
250+
191251async def main ():
192252 """
193253 The main async function of the basic middleware
@@ -199,30 +259,33 @@ async def main():
199259 try :
200260 # setup git repo if desired
201261 if args .git :
202- git_config = GitRepoConfig (** config [' git' ])
262+ git_config = GitRepoConfig (** config [" git" ])
203263 git_repo = GitRepo (git_config )
204264 local_path = git_repo .working_dir
205265 git_repo .pull ()
206266 else :
207267 git_repo = None
208- local_path = config [' git' ][ ' local_path' ]
268+ local_path = config [" git" ][ " local_path" ]
209269 os .makedirs (local_path , exist_ok = True )
210270
211- default_http_config = HttpSessionConfig (** config [' http_client' ])
271+ default_http_config = HttpSessionConfig (** config [" http_client" ])
212272 full_report = []
213273 # scrape sites
214- for sitemap in config [' sitemaps' ]:
274+ for sitemap in config [" sitemaps" ]:
215275 scraper_config = MetadataScraperConfig (** sitemap )
216276 path , starttime , repo_report = await scrape_repo_and_write_to_file (
217- local_path , scraper_config , default_http_config )
218- full_report += [{'repo_name' : sitemap ['name' ] , ** repo_report }]
277+ local_path , scraper_config , default_http_config
278+ )
279+ full_report += [{"repo_name" : sitemap ["name" ], ** repo_report }]
280+ if sitemap ["name" ] == "publisso" :
281+ transform_publisso_to_publisso_schemaorg ()
219282 if git_repo :
220283 commit_to_git (scraper_config .url , git_repo , path , starttime )
221284
222285 if git_repo :
223286 git_repo .push ()
224287
225- print (json .dumps (full_report , indent = 2 , ensure_ascii = False , sort_keys = True ))
288+ # print(json.dumps(full_report, indent=2, ensure_ascii=False, sort_keys=True))
226289 # pylint: disable-next=broad-except
227290 except Exception as e :
228291 otel_span = trace .get_current_span ()
@@ -232,5 +295,6 @@ async def main():
232295 logging .exception (msg )
233296 sys .exit (1 )
234297
235- if __name__ == '__main__' :
298+
299+ if __name__ == "__main__" :
236300 asyncio .run (main ())
0 commit comments