diff --git a/CKANActionAPI.py b/CKANActionAPI.py new file mode 100644 index 0000000..ef4586c --- /dev/null +++ b/CKANActionAPI.py @@ -0,0 +1,122 @@ +import csv +import json +import requests +import time +from pathlib import Path +from urllib.parse import urljoin + +INPUT_CSV_FILE = "10Url.csv" +OUTPUT_CSV_FILE = "output3.csv" + +class CKANMetadataExtractor: + def __init__(self): + self.session = requests.Session() + self.session.headers.update({'User-Agent': 'CKAN-Metadata-Extractor/1.0'}) + + def normalize_url(self, url: str) -> str: + url = url.strip() + if not url: + return url + if not url.startswith(('http://', 'https://')): + url = 'https://' + url + return url.rstrip('/') + + def make_api_call(self, base_url: str, endpoint: str): + try: + api_url = urljoin(base_url + '/', f'api/3/action/{endpoint}') + response = self.session.get(api_url, timeout=30) + response.raise_for_status() + data = response.json() + if data.get('success', False): + return data + except: + pass + return None + + def process_ckan_instance(self, url: str): + print(f"Processing: {url}") + + normalized_url = self.normalize_url(url) + if not normalized_url: + return self.get_empty_result() + + result = self.get_empty_result() + + status_data = self.make_api_call(normalized_url, 'status_show') + if status_data and status_data.get('result'): + api_result = status_data['result'] + result['ckan_version'] = str(api_result.get('ckan_version', '')) + result['description'] = str(api_result.get('site_description', '')) + result['api_title'] = str(api_result.get('site_title', '')) + contact_email = api_result.get('error_emails_to') + result['contact_email'] = str(contact_email) if contact_email else '' + result['primary_language'] = str(api_result.get('locale_default', '')) + extensions = api_result.get('extensions', []) + if isinstance(extensions, list): + result['extensions'] = ', '.join(extensions) + else: + result['extensions'] = str(extensions) if extensions else '' + + time.sleep(1) + + group_data = self.make_api_call(normalized_url, 'group_list') + if group_data and isinstance(group_data.get('result'), list): + result['num_groups'] = str(len(group_data['result'])) + + time.sleep(1) + + org_data = self.make_api_call(normalized_url, 'organization_list') + if org_data and isinstance(org_data.get('result'), list): + result['num_organizations'] = str(len(org_data['result'])) + + time.sleep(1) + + package_data = self.make_api_call(normalized_url, 'package_list') + if package_data and isinstance(package_data.get('result'), list): + result['num_datasets'] = str(len(package_data['result'])) + + return result + + def get_empty_result(self): + return { + 'ckan_version': '', + 'description': '', + 'api_title': '', + 'contact_email': '', + 'primary_language': '', + 'extensions': '', + 'num_groups': '0', + 'num_organizations': '0', + 'num_datasets': '0' + } + + def process_csv(self, input_file: str, output_file: str): + with open(input_file, 'r', encoding='utf-8', newline='') as f: + reader = csv.DictReader(f) + rows = list(reader) + + processed_rows = [] + for row in rows: + url = row.get('URL', '').strip() + if not url: + continue + + metadata = self.process_ckan_instance(url) + combined_row = {'URL': url, **metadata} + processed_rows.append(combined_row) + time.sleep(1) + + fieldnames = ['URL', 'ckan_version', 'description', 'api_title', 'contact_email', + 'primary_language', 'extensions', 'num_groups', 'num_organizations', 'num_datasets'] + + with open(output_file, 'w', encoding='utf-8', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(processed_rows) + +def main(): + extractor = CKANMetadataExtractor() + extractor.process_csv(INPUT_CSV_FILE, OUTPUT_CSV_FILE) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/Description.py b/Description.py new file mode 100644 index 0000000..737e5e5 --- /dev/null +++ b/Description.py @@ -0,0 +1,396 @@ +import requests +from bs4 import BeautifulSoup +import re +import csv +import time +import logging +from urllib.parse import urljoin, urlparse +from typing import Optional, Tuple +from googletrans import Translator +from langdetect import detect_langs, LangDetectException +import signal +from contextlib import contextmanager + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +class TimeoutException(Exception): + pass + + +@contextmanager +def timeout(seconds): + """Context manager for timeouts""" + def timeout_handler(signum, frame): + raise TimeoutException() + + # Set the signal handler and a timeout alarm + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(seconds) + try: + yield + finally: + signal.alarm(0) # Disable the alarm + + +class CKANAboutExtractor: + def __init__(self, page_timeout: int = 10, total_timeout: int = 30): + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' + }) + self.translator = Translator() + self.page_timeout = page_timeout # Timeout for individual page requests + self.total_timeout = total_timeout # Total timeout for processing one site + + def is_default_description(self, text: str) -> bool: + """Check if the description is a default CKAN description.""" + if not text: + return True + + text_lower = text.lower().strip() + + # Check if it starts with "CKAN is the" - very common default + if text_lower.startswith("ckan is the"): + logger.info("Description starts with 'CKAN is the' - identified as default") + return True + + # Check against known default patterns + default_patterns = [ + r"^ckan is the world's leading open[- ]?source", + r"^ckan is a powerful data management system", + r"^welcome to ckan", + r"^this is a ckan instance", + r"^ckan is an open[- ]?source data portal", + r"^ckan is a tool for making open data websites", + r"^comprehensive knowledge archive network", + r"^ckan is a registry of open knowledge", + r"^ckan, the world's leading open source data portal platform", + r"^ckan is the open source data management system", + r"^ckan is the leading open source data portal", + r"^ckan is a data catalogue software", + r"^ckan is free and open source software" + ] + + for pattern in default_patterns: + if re.search(pattern, text_lower): + logger.info(f"Description matches default pattern: {pattern}") + return True + + # Check if it's too short to be meaningful + if len(text.strip()) < 50: + return True + + return False + + def detect_and_translate(self, text: str) -> Tuple[str, str, bool]: + """Detect language and translate if needed. Returns (translated_text, original_language, was_translated)""" + try: + # Detect language + detected_langs = detect_langs(text) + if detected_langs: + lang_code = detected_langs[0].lang + confidence = detected_langs[0].prob + + # Only translate if we're confident about the language and it's not English + if confidence > 0.7 and lang_code != 'en': + try: + # Get the language name + lang_name = self.translator.translate(lang_code, src='en', dest='en').text + + # Translate to English + translation = self.translator.translate(text, src=lang_code, dest='en') + translated_text = translation.text + + logger.info(f"Translated from {lang_name} to English") + return translated_text, lang_name, True + + except Exception as e: + logger.warning(f"Translation error: {str(e)}") + return text, "Unknown", False + + # Text is already in English or couldn't detect language + return text, "English", False + + except Exception as e: + logger.warning(f"Language detection error: {str(e)}") + return text, "Unknown", False + + def format_description(self, original_text: str, translated_text: str, original_language: str) -> str: + """Format the description with translation information.""" + if original_language == "English" or original_language == "Unknown": + return original_text + + formatted = f"{translated_text}\n\n" + formatted += f"*Translated from {original_language}*\n" + formatted += "---\n" + formatted += "**Original Text:**\n" + formatted += original_text + + return formatted + + def normalize_url(self, url: str) -> str: + """Normalize URL with proper protocol handling.""" + url = url.strip().rstrip('/') + + # If no protocol, try https first, then http + if not url.startswith(('http://', 'https://')): + return url # Will be handled in get_detailed_description + + return url + + def try_url_with_protocols(self, url: str) -> Optional[requests.Response]: + """Try accessing URL with both HTTPS and HTTP.""" + # If URL doesn't have protocol, try both + if not url.startswith(('http://', 'https://')): + # Try HTTPS first + try: + https_url = 'https://' + url + response = self.session.get(https_url, timeout=self.page_timeout, verify=False) + if response.status_code == 200: + return response + except: + pass + + # Try HTTP if HTTPS fails + try: + http_url = 'http://' + url + response = self.session.get(http_url, timeout=self.page_timeout, verify=False) + if response.status_code == 200: + return response + except: + pass + else: + # URL has protocol, use as is + try: + response = self.session.get(url, timeout=self.page_timeout, verify=False) + if response.status_code == 200: + return response + except: + pass + + return None + + def get_detailed_description(self, base_url: str) -> Optional[str]: + """Extract detailed description from the About page with timeout.""" + try: + with timeout(self.total_timeout): + # Normalize the base URL + base_url = self.normalize_url(base_url) + + # Try different possible About page URLs + about_paths = [ + "/about", + "/about/about", + "/about-us", + "/pages/about", + "/en/about", + "/about.html", + "/about/", + "/info/about" + ] + + detailed_description = "" + + for path in about_paths: + try: + # Construct full URL + if base_url.startswith(('http://', 'https://')): + url = base_url + path + else: + url = base_url + path # Will be handled by try_url_with_protocols + + logger.info(f"Trying about page: {url}") + + # Try to get the page with both protocols if needed + response = self.try_url_with_protocols(url) + + if response: + soup = BeautifulSoup(response.text, 'html.parser') + + # Strategy 1: Look for main content area + main_content = None + content_selectors = [ + '.main-content', '#main-content', '.content', '#content', + 'main', 'article', '.page-content', '#page-content', + '.about-content', '#about-content', '.primary', '#primary', + '.col-md-9', '.span9', '[role="main"]' + ] + + for selector in content_selectors: + main_content = soup.select_one(selector) + if main_content: + logger.debug(f"Found main content with selector: {selector}") + break + + # If we found a main content area + if main_content: + # Extract all paragraphs + paragraphs = main_content.find_all('p') + + # Combine paragraphs into a single text + if paragraphs: + detailed_description = " ".join([p.get_text().strip() for p in paragraphs]) + logger.info(f"Found description from main content: {len(detailed_description)} chars") + break + + # Strategy 2: If main content area not found or no paragraphs + if not detailed_description: + body_paragraphs = soup.find_all('p') + + # Filter out very short paragraphs + meaningful_paragraphs = [p.get_text().strip() for p in body_paragraphs + if len(p.get_text().strip()) > 50] + + if meaningful_paragraphs: + detailed_description = " ".join(meaningful_paragraphs) + logger.info(f"Found description from body paragraphs: {len(detailed_description)} chars") + break + + except TimeoutException: + logger.warning(f"Timeout while processing {url}") + raise + except Exception as e: + logger.debug(f"Error accessing {path}: {str(e)}") + continue + + # Clean up the text + if detailed_description: + # Remove extra whitespace + detailed_description = re.sub(r'\s+', ' ', detailed_description) + # Remove multiple newlines + detailed_description = re.sub(r'\n+', ' ', detailed_description) + # Trim + detailed_description = detailed_description.strip() + + # Check if it's meaningful length + if len(detailed_description) > 100: + # Detect language and translate if needed + translated_text, original_language, was_translated = self.detect_and_translate(detailed_description) + + # IMPORTANT: Check if the TRANSLATED text is a default description + if self.is_default_description(translated_text): + logger.info("Translated description is default CKAN text, returning None") + return None + + # Format the description + if was_translated: + formatted_description = self.format_description( + detailed_description, translated_text, original_language + ) + return formatted_description + else: + # Still need to check if English text is default + if self.is_default_description(detailed_description): + logger.info("Description is default CKAN text, returning None") + return None + return detailed_description + else: + logger.warning(f"Description too short for {base_url}, ignoring") + + logger.warning(f"No detailed description found for {base_url}") + return None + + except TimeoutException: + logger.error(f"Total timeout exceeded for {base_url}") + return None + except Exception as e: + logger.error(f"Error getting detailed description for {base_url}: {str(e)}") + return None + + def process_csv(self, input_file: str, output_file: str, url_column: str = 'url', + description_column: str = 'description'): + """Process a CSV file with CKAN URLs and extract descriptions.""" + results = [] + + try: + # Read input CSV + with open(input_file, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + fieldnames = reader.fieldnames.copy() + + # Add description column if not present + if description_column not in fieldnames: + fieldnames.append(description_column) + + # Process each row + for i, row in enumerate(reader, 1): + url = row.get(url_column, '').strip() + + if url: + logger.info(f"\nProcessing {i}: {url}") + + # Extract description + description = self.get_detailed_description(url) + row[description_column] = description if description else '' + + # Add small delay to be respectful to servers + time.sleep(1) + else: + row[description_column] = '' + + results.append(row) + + # Write output CSV + with open(output_file, 'w', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(results) + + logger.info(f"\nProcessing complete. Results saved to {output_file}") + + except Exception as e: + logger.error(f"Error processing CSV: {str(e)}") + raise + + def extract_single_url(self, url: str) -> Optional[str]: + """Extract description from a single URL.""" + return self.get_detailed_description(url) + + +def main(): + """Main function to run the extractor.""" + # Configuration + INPUT_FILE = 'ckan_instances.csv' # Change this to your input file + OUTPUT_FILE = 'ckan_instances_with_descriptions.csv' # Change this to your desired output file + URL_COLUMN = 'url' # Change this if your URL column has a different name + DESCRIPTION_COLUMN = 'description' # Name for the description column + PAGE_TIMEOUT = 10 # Timeout for individual page requests (seconds) + TOTAL_TIMEOUT = 30 # Total timeout for processing one site (seconds) + + print(f"Starting CKAN About Page Description Extractor...") + print(f"Input file: {INPUT_FILE}") + print(f"Output file: {OUTPUT_FILE}") + print(f"URL column: {URL_COLUMN}") + print(f"Description column: {DESCRIPTION_COLUMN}") + print(f"Page timeout: {PAGE_TIMEOUT}s") + print(f"Total timeout per site: {TOTAL_TIMEOUT}s") + + # Check if input file exists + import os + if not os.path.exists(INPUT_FILE): + print(f"ERROR: Input file '{INPUT_FILE}' not found!") + print("Please make sure the CSV file exists in the current directory.") + return + + try: + # Create extractor instance with timeout settings + extractor = CKANAboutExtractor(page_timeout=PAGE_TIMEOUT, total_timeout=TOTAL_TIMEOUT) + + # Process the CSV file + extractor.process_csv(INPUT_FILE, OUTPUT_FILE, URL_COLUMN, DESCRIPTION_COLUMN) + + print(f"\nProcessing completed successfully!") + print(f"Results saved to: {OUTPUT_FILE}") + + except Exception as e: + print(f"\nERROR during processing: {str(e)}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + main() + \ No newline at end of file diff --git a/Name-Process.py b/Name-Process.py new file mode 100644 index 0000000..7415f61 --- /dev/null +++ b/Name-Process.py @@ -0,0 +1,431 @@ +import csv +import requests +from bs4 import BeautifulSoup +import re +from urllib.parse import urljoin, urlparse +from googletrans import Translator +import time +from typing import Optional, Tuple +import logging +from slugify import slugify +import unicodedata +from langdetect import detect_langs, LangDetectException +import string + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class CKANInstanceNameExtractor: + def __init__(self): + self.translator = Translator() + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' + }) + + # English default patterns to check AFTER translation + self.english_default_patterns = [ + r'^ckan$', + r'^welcome\s*(to\s*)?(the\s*)?ckan$', + r'^ckan\s*[-–]\s*welcome$', + r'^welcome\s*[-–]\s*ckan$', + r'^home\s*[-–]\s*ckan$', + r'^ckan\s*[-–]\s*home$', + r'^(welcome|home|start|enter|portal|website|site|data|platform|system)$', + r'^ckan\s*(portal|site|website|data|platform|instance|system)$', + r'^(portal|site|website|data|platform|instance|system)\s*ckan$', + r'^(welcome|home)\s*(page|site|portal)?$', + r'^(data|open\s*data)\s*(portal|platform)?$', + r'^default\s*(site|portal|title)?$', + r'^untitled(\s*site)?$', + r'^no\s*title$', + r'^example(\s*site)?$', + r'^test(\s*site)?$', + r'^demo(\s*site)?$' + ] + + def is_non_english(self, text: str) -> bool: + """Robustly detect if text is non-English""" + if not text or not text.strip(): + return False + + try: + # Method 1: Check for non-ASCII characters (quick check) + non_ascii_chars = sum(1 for char in text if ord(char) > 127) + total_chars = len(text.strip()) + + # If more than 10% non-ASCII, likely non-English + if total_chars > 0 and (non_ascii_chars / total_chars) > 0.1: + return True + + # Method 2: Check for Latin extended characters (covers European languages) + latin_extended_pattern = r'[àáäâèéëêìíïîòóöôùúüûñçßøåæœÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇØÅÆŒ]' + if re.search(latin_extended_pattern, text): + return True + + # Method 3: Use langdetect for more accurate detection + try: + # Get language probabilities + langs = detect_langs(text) + if langs: + # Check if English is the most probable language + top_lang = langs[0] + if top_lang.lang != 'en' and top_lang.prob > 0.7: + return True + # If English probability is low, consider it non-English + if top_lang.lang == 'en' and top_lang.prob < 0.8: + # Check if there are other language candidates + if len(langs) > 1: + return True + except LangDetectException: + pass + + # Method 4: Check for common non-English words/patterns + non_english_indicators = [ + # Spanish + r'\b(el|la|los|las|de|del|para|por|con|sin|sobre|bajo|entre)\b', + # French + r'\b(le|la|les|de|du|des|pour|avec|sans|sur|sous|dans|entre)\b', + # German + r'\b(der|die|das|den|dem|des|für|mit|ohne|auf|unter|zwischen)\b', + # Italian + r'\b(il|lo|la|gli|le|di|del|della|per|con|senza|su|sotto|tra)\b', + # Portuguese + r'\b(o|a|os|as|do|da|dos|das|para|com|sem|sobre|sob|entre)\b', + # Dutch + r'\b(de|het|een|van|voor|met|zonder|op|onder|tussen)\b' + ] + + text_lower = text.lower() + for pattern in non_english_indicators: + if re.search(pattern, text_lower): + return True + + except Exception as e: + logger.debug(f"Language detection error: {str(e)}") + + return False + + def is_default_value(self, value: str) -> bool: + """Check if a value is a default CKAN value (should be called AFTER translation)""" + if not value: + return True + + cleaned_value = value.lower().strip() + + # Remove extra spaces and normalize + cleaned_value = ' '.join(cleaned_value.split()) + + # Check against English default patterns + for pattern in self.english_default_patterns: + if re.match(pattern, cleaned_value, re.IGNORECASE): + return True + + # Check if it contains CKAN with default context words + if 'ckan' in cleaned_value: + # If it's just variations of CKAN with common words, it's likely default + ckan_with_defaults = [ + 'welcome', 'home', 'portal', 'site', 'website', 'platform', + 'data', 'open', 'system', 'instance', 'catalog', 'repository' + ] + + # Remove CKAN and see what's left + without_ckan = cleaned_value.replace('ckan', '').strip(' -–—') + + # If what's left is just a default word, it's a default title + if without_ckan in ckan_with_defaults: + return True + + # Check if too short to be meaningful (less than 4 chars) + if len(cleaned_value) < 4 and cleaned_value not in ['nyc', 'la', 'sf', 'uk', 'usa', 'eu']: + return True + + # Check if it's just punctuation and/or CKAN + if re.match(r'^[^a-zA-Z0-9]*ckan[^a-zA-Z0-9]*$', cleaned_value, re.IGNORECASE): + return True + + return False + + def translate_if_needed(self, text: str, locale: Optional[str] = None) -> Tuple[str, str, bool]: + """Translate text to English if needed. Returns (translated_text, original_text, was_translated)""" + try: + # If we have a locale from the API, use it + if locale and locale != 'en': + try: + # Map common locale codes to language codes if needed + lang_code = locale.split('_')[0].split('-')[0].lower() + + # Only translate if not English + if lang_code != 'en': + translation = self.translator.translate(text, src=lang_code, dest='en') + translated_text = translation.text + + logger.info(f"Translated '{text}' from locale '{locale}' to '{translated_text}'") + return translated_text, text, True + + except Exception as e: + logger.warning(f"Translation error using locale '{locale}' for '{text}': {str(e)}") + # Fall back to auto-detection + + # If no locale or translation failed, use auto-detection + if self.is_non_english(text): + try: + # Translate to English + translation = self.translator.translate(text, dest='en') + translated_text = translation.text + + logger.info(f"Translated '{text}' to '{translated_text}' (auto-detected)") + return translated_text, text, True + + except Exception as e: + logger.warning(f"Translation error for '{text}': {str(e)}") + return text, text, False + + # Text is already in English + return text, text, False + + except Exception as e: + logger.warning(f"Language detection error for '{text}': {str(e)}") + return text, text, False + + def extract_from_html(self, url: str) -> Optional[str]: + """Extract title from HTML page""" + try: + response = self.session.get(url, timeout=10, verify=False) + response.raise_for_status() + + soup = BeautifulSoup(response.text, 'html.parser') + + # Try to find title tag + title_tag = soup.find('title') + if title_tag and title_tag.text: + title = title_tag.text.strip() + logger.info(f"Found title from HTML for {url}: {title}") + return title + + # Try meta tags as fallback + meta_tags = [ + {'property': 'og:title'}, + {'name': 'og:title'}, + {'property': 'og:site_name'}, + {'name': 'og:site_name'}, + {'name': 'title'}, + {'property': 'twitter:title'} + ] + + for meta_attrs in meta_tags: + meta = soup.find('meta', attrs=meta_attrs) + if meta and meta.get('content'): + content = meta.get('content').strip() + logger.info(f"Found title from meta tag for {url}: {content}") + return content + + # Try h1 tag as last resort + h1 = soup.find('h1') + if h1 and h1.text: + h1_text = h1.text.strip() + logger.info(f"Found title from h1 tag for {url}: {h1_text}") + return h1_text + + except Exception as e: + logger.warning(f"Error extracting HTML title from {url}: {str(e)}") + + return None + + def extract_from_api(self, url: str) -> Tuple[Optional[str], Optional[str]]: + """Extract title and locale from CKAN API. Returns (title, locale)""" + try: + # Try different API endpoints + api_endpoints = [ + '/api/3/action/status_show', + '/api/action/status_show', + '/api/2/util/status', + '/api/util/status' + ] + + for endpoint in api_endpoints: + api_url = urljoin(url.rstrip('/') + '/', endpoint.lstrip('/')) + + try: + response = self.session.get(api_url, timeout=10, verify=False) + if response.status_code == 200: + data = response.json() + + # Try to find site_title and locale in different locations + site_title = None + locale = None + + if isinstance(data, dict): + # Standard CKAN API response + if 'result' in data and isinstance(data['result'], dict): + site_title = data['result'].get('site_title') + locale = data['result'].get('locale_default') + # Direct response + elif 'site_title' in data: + site_title = data['site_title'] + locale = data.get('locale_default') + + if site_title: + logger.info(f"Found title from API for {url}: {site_title}, locale: {locale}") + return str(site_title).strip(), locale + + except Exception as e: + logger.debug(f"API endpoint {api_url} failed: {str(e)}") + continue + + except Exception as e: + logger.warning(f"Error extracting API data from {url}: {str(e)}") + + return None, None + + def extract_instance_name(self, url: str) -> str: + """Extract instance name from URL""" + try: + # Clean URL + url = url.strip() + original_url = url # Keep the original URL + if not url.startswith(('http://', 'https://')): + url = 'https://' + url + + logger.info(f"Processing URL: {url}") + + # Try API first (usually more reliable) - now also gets locale + title, locale = self.extract_from_api(url) + + # If API fails, try HTML + if not title: + title = self.extract_from_html(url) + locale = None # No locale from HTML + + # If we found a title + if title: + # Translate if needed (now with locale information) + translated_title, original_title, was_translated = self.translate_if_needed(title, locale) + + # Check if the translated title is a default value + if not self.is_default_value(translated_title): + # If it was translated, return formatted version + if was_translated: + return f"{translated_title} ({original_title})" + else: + return translated_title + else: + logger.info(f"Title '{translated_title}' identified as default value") + + # If all else fails (both methods returned default values), return the exact URL without https:// + clean_url = url + if clean_url.startswith('https://'): + clean_url = clean_url[8:] + elif clean_url.startswith('http://'): + clean_url = clean_url[7:] + + # Remove trailing slashes + clean_url = clean_url.rstrip('/') + + logger.info(f"Using exact URL as title for {url}: {clean_url}") + return clean_url + + except Exception as e: + logger.error(f"Error processing {url}: {str(e)}") + # Return URL without protocol as fallback + fallback = url + if fallback.startswith('https://'): + fallback = fallback[8:] + elif fallback.startswith('http://'): + fallback = fallback[7:] + return fallback.rstrip('/') + + def create_url_friendly_name(self, title: str) -> str: + """Convert title to URL-friendly format""" + # Use slugify for robust conversion + return slugify(title, lowercase=True) + + def process_csv(self, input_file: str, output_file: str, url_column: str = 'url'): + """Process CSV file with CKAN URLs""" + results = [] + + try: + # Read input CSV + with open(input_file, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + fieldnames = reader.fieldnames.copy() + + # Add new columns if not present + if 'title' not in fieldnames: + fieldnames.append('title') + if 'name' not in fieldnames: + fieldnames.append('name') + + # Process each row + for i, row in enumerate(reader, 1): + url = row.get(url_column, '').strip() + + if url: + logger.info(f"Processing {i}: {url}") + + # Extract title + title = self.extract_instance_name(url) + row['title'] = title + + # Create URL-friendly name + row['name'] = self.create_url_friendly_name(title) + + # Add small delay to avoid overwhelming servers + time.sleep(0.5) + else: + row['title'] = '' + row['name'] = '' + + results.append(row) + + # Write output CSV + with open(output_file, 'w', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(results) + + logger.info(f"Processing complete. Results saved to {output_file}") + + except Exception as e: + logger.error(f"Error processing CSV: {str(e)}") + raise + + +def main(): + """Main function to run the extractor""" + # Configuration + INPUT_FILE = 'ckan_instances.csv' # Change this to your input file + OUTPUT_FILE = 'ckan_instances_with_names.csv' # Change this to your desired output file + URL_COLUMN = 'url' # Change this if your URL column has a different name + + print(f"Starting CKAN Instance Name Extractor...") + print(f"Input file: {INPUT_FILE}") + print(f"Output file: {OUTPUT_FILE}") + print(f"URL column: {URL_COLUMN}") + + # Check if input file exists + import os + if not os.path.exists(INPUT_FILE): + print(f"ERROR: Input file '{INPUT_FILE}' not found!") + print("Please make sure the CSV file exists in the current directory.") + return + + try: + # Create extractor instance + extractor = CKANInstanceNameExtractor() + + # Process the CSV file + extractor.process_csv(INPUT_FILE, OUTPUT_FILE, URL_COLUMN) + + print(f"\nProcessing completed successfully!") + print(f"Results saved to: {OUTPUT_FILE}") + + except Exception as e: + print(f"\nERROR during processing: {str(e)}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + main() \ No newline at end of file