Skip to content

Commit 4c85e8a

Browse files
committed
AP-664: LDC provider and corpus file fetcher DAG
The Linguistic Data Consortium catalog provides downloads to linguistic corpora. As LDC does not provide an API, the LDC hook creates a session to temporarily persist session cookies and fetches HTML to be parsed into structured data. - Adds a new Airflow LDC provider under mokelumne/providers/ldc with provider.yaml and get_provider_info.py. - Implements LDCHook for authenticated LDC catalog access, including session creation, refresh handling, corpora page retrieval, and download response streaming. - Adds LDC helper methods to `mokelumne.util.ldc`. - Implements a DAG to fetch files from LDC corpora.
1 parent 05d54fb commit 4c85e8a

14 files changed

Lines changed: 1039 additions & 1 deletion

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ airflow.cfg
1515
*.egg-info
1616
.coverage
1717
build/
18+
public/
1819
uv.lock
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
"""
2+
Fetches files for a given corpus from the Linguistic Data Consortium catalog.
3+
"""
4+
5+
# pyright: reportTypedDictNotRequiredAccess=false
6+
7+
from __future__ import annotations
8+
import hashlib
9+
import json
10+
import logging
11+
import re
12+
13+
from mmap import mmap, ACCESS_READ
14+
15+
from airflow.sdk import Param, chain, dag, get_current_context, task
16+
from bs4 import BeautifulSoup
17+
18+
from mokelumne.providers.ldc.hooks.ldc import LDCHook
19+
from mokelumne.util.ldc import filter_corpora, scrape_corpus_metadata
20+
from mokelumne.util.storage import run_dir
21+
22+
logger = logging.getLogger(__name__)
23+
24+
@dag(
25+
description="Fetches files for a given corpus from the Linguistic Data Consortium catalog",
26+
schedule=None,
27+
catchup=False,
28+
params={
29+
"ldc_corpus": Param(
30+
default="",
31+
title="LDC Catalog ID",
32+
description="The catalog ID for the desired LDC corpus",
33+
type="string",
34+
),
35+
"filename_regex": Param(
36+
default="",
37+
title="Filename regular expression",
38+
description="""Regular expression to match file metadata in the
39+
LDC catalog. Note that this is not necessarily the same as the downloaded
40+
filename reported by LDC's webserver.""",
41+
type=["string", "null"],
42+
format="regex",
43+
),
44+
},
45+
tags=["ldsp"],
46+
)
47+
def fetch_ldc_corpus_files():
48+
"""
49+
Fetch a corpus from the `Linguistic Data Consortium catalog`_. LDC
50+
does not provide an API, so we have to screenscrape into an authorized
51+
session to fetch the list of available datasets.
52+
53+
This is effectively a reimplementation of `ldcdl`_ by Jonathan May and
54+
Alex Hedges.
55+
56+
.. _Linguistic Data Consortium catalog: https://catalog.ldc.upenn.edu/
57+
.. _ldcdl: https://github.com/jonmay/ldcdl
58+
"""
59+
60+
hook = LDCHook()
61+
62+
@task
63+
def get_available_ldc_corpora() -> str:
64+
"""
65+
Fetch the page listing the corpora available for download from the LDC
66+
catalog. This is an HTML page cached locally for further parsing.
67+
68+
:returns: Path to the HTML file fetched from LDC.
69+
:rtype: str
70+
"""
71+
ctx = get_current_context()
72+
dest_dir = run_dir(ctx["run_id"])
73+
corpora_html_path = dest_dir / "corpora.html"
74+
75+
response = hook.get_corpora_response()
76+
with open(corpora_html_path, "wb") as f:
77+
for chunk in response.iter_content(chunk_size=8192):
78+
f.write(chunk)
79+
80+
return str(corpora_html_path)
81+
82+
83+
@task
84+
def parse_corpora_metadata(corpora_file) -> list[dict[str, str]]:
85+
"""
86+
Parse the HTML of the catalog to create a structured representation for
87+
further use. There is no API for LDC, so we are forced to screenscrape.
88+
89+
:param corpora_file: Location of the fetched downloads page.
90+
:returns: Parsed LDC metadata as a list of dicts.
91+
:rtype: list[dict[str, str]]
92+
"""
93+
ctx = get_current_context()
94+
corpora_json = run_dir(ctx["run_id"]) / "corpora.json"
95+
96+
with open(corpora_file) as page:
97+
corpora_html = page.read()
98+
99+
data = BeautifulSoup(corpora_html, "html.parser")
100+
rows = data.select("#user-corpora-download-table > tbody > tr")
101+
corpora = [scrape_corpus_metadata(row) for row in rows]
102+
103+
with open(corpora_json, "w") as corpora_out:
104+
corpora_out.write(json.dumps(corpora))
105+
106+
return corpora
107+
108+
109+
@task.short_circuit
110+
def corpus_is_available(corpora_list) -> list[dict[str, str]]:
111+
"""
112+
Check to see if the requested corpus is listed in the set of corpora
113+
available for download. Used to shortcircuit the ``fetch_ldc_corpus_files()``
114+
task if the dataset is not available.
115+
116+
Since the LDC catalog provides multiple duplicate downloads based on
117+
invoice date, this also filters the downloads available to
118+
those with the latest invoice date.
119+
120+
:param corpora_list: Parsed LDC metadata list.
121+
:returns: A list of LDC downloads for fetching or an empty list.
122+
:rtype: list[dict[str, str]]
123+
"""
124+
ctx = get_current_context()
125+
id_ = ctx["params"].get("ldc_corpus", "")
126+
filename_regex = ctx["params"].get("filename_regex")
127+
128+
return filter_corpora(corpora=corpora_list, corpus_id=id_, filename_regex=filename_regex)
129+
130+
131+
@task
132+
def download_corpus_from_ldc(filedict) -> str:
133+
"""
134+
Download a corpus from the LDC catalog using the authenticated hook and
135+
verify that the MD5 checksum reported by LDC matches the downloaded file.
136+
137+
:param filedict: A dict representing the file metadata
138+
:returns: The location of the downloaded dataset file.
139+
:rtype: str
140+
"""
141+
ctx = get_current_context()
142+
dest_dir = run_dir(ctx["run_id"])
143+
resp = hook.get_corpus_file(filedict["download_link"])
144+
145+
match = re.match(
146+
r'^attachment; filename="(.*)"$', resp.headers.get("Content-Disposition", "")
147+
)
148+
149+
if match:
150+
dest = dest_dir / match.group(1)
151+
else:
152+
logger.warning("No Content-Disposition header; falling back to catalog filename")
153+
dest = dest_dir / filedict["filename"]
154+
155+
with open(dest, "wb") as out:
156+
for chunk in resp.iter_content(chunk_size=(8*1024)):
157+
if chunk:
158+
out.write(chunk)
159+
160+
with (
161+
open(dest, "rb") as f,
162+
mmap(f.fileno(), 0, access=ACCESS_READ) as f
163+
):
164+
dl_checksum = hashlib.md5(f).hexdigest()
165+
166+
if dl_checksum != filedict["checksum"]:
167+
logger.warning(
168+
"Downloaded file's checksum %s does not match LDC checksum %s" % (
169+
dl_checksum, filedict["checksum"]
170+
)
171+
)
172+
return str(dest)
173+
174+
175+
corpora_file = get_available_ldc_corpora()
176+
available_corpora = parse_corpora_metadata(corpora_file)
177+
files_to_download = corpus_is_available(available_corpora)
178+
chain(
179+
files_to_download,
180+
download_corpus_from_ldc.expand(filedict=files_to_download)
181+
)
182+
183+
184+
fetch_ldc_corpus_files() # pyright: ignore[reportUnusedExpression]

mokelumne/providers/ldc/__init__.py

Whitespace-only changes.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from __future__ import annotations
2+
3+
from pathlib import Path
4+
import yaml
5+
6+
_RUNTIME_FIELDS = {
7+
"package-name",
8+
"name",
9+
"description",
10+
"hook-class-names",
11+
"connection-types",
12+
"hooks",
13+
"operators",
14+
"sensors",
15+
"transfers",
16+
"triggers",
17+
"bundles",
18+
"integrations",
19+
"filesystems",
20+
"asset-uris",
21+
"dialects",
22+
"extra-links",
23+
"auth-backends",
24+
"auth-managers",
25+
"notifications",
26+
"executors",
27+
"config",
28+
}
29+
30+
31+
def get_provider_info() -> dict:
32+
data = (Path(__file__).parent / "provider.yaml").read_text()
33+
raw = yaml.safe_load(data)
34+
return {k: v for k, v in raw.items() if k in _RUNTIME_FIELDS}

mokelumne/providers/ldc/hooks/__init__.py

Whitespace-only changes.
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
"""Provides a hook for authenticating and fetching files from LDC."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from functools import cached_property
7+
from urllib.parse import urljoin
8+
9+
import requests
10+
from airflow.sdk.exceptions import AirflowException
11+
from airflow.sdk import BaseHook
12+
13+
from mokelumne.util.ldc import get_csrf_token
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class LDCHook(BaseHook):
19+
"""Interact with the LDC catalog using an authenticated requests session."""
20+
21+
conn_type = "ldc"
22+
conn_name_attr = "conn_id"
23+
default_conn_name = "ldc_default"
24+
hook_name = "LDC"
25+
26+
def __init__(self, conn_id: str = "ldc_default") -> None:
27+
super().__init__()
28+
self.conn_id = conn_id
29+
30+
def get_conn(self) -> requests.Session:
31+
"""Return an authenticated requests session for the LDC catalog."""
32+
return self._get_session()
33+
34+
@cached_property
35+
def conn(self) -> requests.Session:
36+
"""Return a cached authenticated requests session."""
37+
return self._get_session()
38+
39+
def _get_session(self) -> requests.Session:
40+
connection = self.get_connection(self.conn_id)
41+
if not connection.host:
42+
raise AirflowException("LDC connection host is not configured")
43+
44+
login_url = urljoin(connection.host, "login")
45+
session = requests.Session()
46+
47+
response = session.get(login_url)
48+
if response.status_code != 200:
49+
raise AirflowException(
50+
f"LDC login page request failed: {response.status_code}"
51+
)
52+
53+
form_data = get_csrf_token(response.text)
54+
if not form_data:
55+
raise AirflowException(
56+
"Unable to extract CSRF token from LDC login page"
57+
)
58+
59+
form_data["spree_user[login]"] = connection.login or ""
60+
form_data["spree_user[password]"] = connection.password or ""
61+
form_data["utf8"] = "✓"
62+
63+
login_request = requests.Request("POST", url=login_url, data=form_data)
64+
prepped = session.prepare_request(login_request)
65+
login_response = session.send(prepped)
66+
if login_response.status_code not in (200, 302):
67+
raise AirflowException(
68+
f"LDC authentication failed: {login_response.status_code}"
69+
)
70+
71+
return session
72+
73+
def refresh_session(self) -> None:
74+
"""Clear the cached session so it is recreated on next access."""
75+
if "conn" in self.__dict__:
76+
del self.__dict__["conn"]
77+
78+
def test_connection(self) -> tuple[bool, str]:
79+
try:
80+
self.get_conn()
81+
return True, "Connection successful"
82+
except Exception as exc:
83+
return False, str(exc)
84+
85+
def get_corpora_response(self) -> requests.Response:
86+
"""Fetch the LDC corpora downloads page response."""
87+
connection = self.get_connection(self.conn_id)
88+
session = self.get_conn()
89+
datasets_url = urljoin(connection.host, "organization/downloads") # pyright: ignore[reportArgumentType]
90+
91+
response = session.get(datasets_url, stream=True)
92+
if response.status_code == 401:
93+
logger.warning("LDC corpora fetch received 401, refreshing session")
94+
self.refresh_session()
95+
session = self.get_conn()
96+
response = session.get(datasets_url, stream=True)
97+
98+
if response.status_code != 200:
99+
raise AirflowException(
100+
f"Failed to fetch LDC corpora page: {response.status_code}"
101+
)
102+
103+
return response
104+
105+
def get_corpus_file(self, download_link: str) -> requests.Response:
106+
"""Fetch a corpus download response for the given download link."""
107+
if not download_link:
108+
raise AirflowException("Download link is missing")
109+
110+
connection = self.get_connection(self.conn_id)
111+
session = self.get_conn()
112+
dl_uri = urljoin(connection.host, download_link) # pyright: ignore[reportArgumentType]
113+
response = session.get(dl_uri, stream=True)
114+
if response.status_code == 401:
115+
logger.warning("LDC download request received 401, refreshing session")
116+
self.refresh_session()
117+
session = self.get_conn()
118+
response = session.get(dl_uri, stream=True)
119+
120+
try:
121+
response.raise_for_status()
122+
except requests.HTTPError as exc:
123+
raise AirflowException(f"LDC download failed: {exc}") from exc
124+
125+
return response
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package-name: mokelumne
2+
name: LDC
3+
description: LDC Airflow provider.
4+
connection-types:
5+
- connection-type: ldc
6+
hook-class-name: mokelumne.providers.ldc.hooks.ldc.LDCHook
7+
hook-name: Linguistic Data Consortium
8+
ui-field-behaviour:
9+
hidden-fields:
10+
- port
11+
- schema
12+
- extra
13+
relabeling:
14+
host: LDC Catalog URL
15+
login: LDC Login Username
16+
password: LDC Password
17+
placeholders:
18+
host: "https://catalog.ldc.upenn.edu"
19+
login: "your-ldc-username"
20+
password: "your-ldc-password"

0 commit comments

Comments
 (0)