Skip to content

Commit ff408c8

Browse files
committed
Refactor LDC integration, add LDCHook provider, and include DAG structure test
- Added a new Airflow LDC provider under mokelumne/providers/ldc with provider.yaml and get_provider_info.py. - Implemented LDCHook for authenticated LDC catalog access, including session creation, refresh handling, corpora page retrieval, and download response streaming. - Refactored mokelumne/dags/fetch_ldc_corpus.py to use LDCHook for HTTP operations while keeping file writing in the DAG. - Moved corpus filtering logic into mokelumne.util.ldc.filter_corpora and updated the DAG to consume parsed metadata lists. - Added/updated unit tests for LDCHook, util LDC helpers, and DAG structure, including test_connection coverage. - Updated imports for Airflow 3 compatibility and added public/ to .gitignore.
1 parent 3222ac4 commit ff408c8

12 files changed

Lines changed: 518 additions & 106 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ airflow.cfg
1515
*.egg-info
1616
.coverage
1717
build/
18+
public/
1819
uv.lock

mokelumne/dags/fetch_ldc_corpus.py

Lines changed: 43 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,13 @@
1313
import logging
1414
import re
1515

16-
from http.cookiejar import MozillaCookieJar
1716
from mmap import mmap, ACCESS_READ
1817

19-
import requests
20-
21-
from airflow.sdk import Connection, Param, chain, dag, get_current_context, task
18+
from airflow.sdk import Param, chain, dag, get_current_context, task
2219
from bs4 import BeautifulSoup
2320

24-
from mokelumne.util.ldc import get_csrf_token, scrape_corpus_metadata, get_latest_invoice_date
21+
from mokelumne.providers.ldc.hooks.ldc import LDCHook
22+
from mokelumne.util.ldc import filter_corpora, scrape_corpus_metadata, get_latest_invoice_date
2523
from mokelumne.util.storage import run_dir
2624

2725
logger = logging.getLogger(__name__)
@@ -62,78 +60,38 @@ def fetch_ldc_corpus():
6260
.. _ldcdl: https://github.com/jonmay/ldcdl
6361
"""
6462

65-
@task
66-
def authenticate_session() -> str:
67-
"""
68-
Authenticate a browser session and persist the session's cookies to a
69-
cookiejar file.
70-
71-
The LDC catalog checks for the presence of the `_xiexie` cookie,
72-
which is a session cookie and thus not automatically included in
73-
serialized cookie jars. If not present, the LDC Catalog automatically
74-
redirects to the login page.
75-
76-
:returns: Location of the cookiejar file.
77-
:rtype: str
78-
"""
79-
ctx = get_current_context()
80-
cj = run_dir(ctx["run_id"]) / "cookies.txt"
81-
conn = Connection.get("ldc")
82-
login_url = f"{conn.host}/login"
83-
session = requests.Session()
84-
login_page = session.get(login_url)
85-
form_data = get_csrf_token(login_page.text)
86-
form_data["spree_user[login]"] = conn.login
87-
form_data["spree_user[password]"] = conn.password
88-
form_data["utf8"] = "✓"
89-
90-
login_request = requests.Request("POST", url=login_url, data=form_data)
91-
prepped = session.prepare_request(login_request)
92-
_ = session.send(prepped)
93-
cookies = MozillaCookieJar(filename=cj)
94-
for c in session.cookies:
95-
cookies.set_cookie(c)
96-
cookies.save(ignore_discard=True)
97-
return str(cj)
98-
63+
hook = LDCHook()
9964

10065
@task
101-
def get_available_ldc_corpora(cookiejar) -> str:
66+
def get_available_ldc_corpora() -> str:
10267
"""
10368
Fetch the page listing the corpora available for download from the LDC
10469
catalog. This is an HTML page cached locally for further parsing.
10570
106-
:param cookiejar: The path to a cookiejar file for LDC
10771
:returns: Path to the HTML file fetched from LDC.
10872
:rtype: str
10973
"""
110-
cookies = MozillaCookieJar(filename=cookiejar)
111-
cookies.load(ignore_discard=True)
11274
ctx = get_current_context()
113-
conn = Connection.get("ldc")
114-
output = run_dir(ctx["run_id"]) / "corpora.html"
115-
datasets_url = f"{conn.host}/organization/downloads"
116-
117-
with (requests.Session() as session, open(output, "wb") as outfile):
118-
session.cookies = cookies # pyright: ignore[reportAttributeAccessIssue]
119-
request = requests.Request('GET', datasets_url)
120-
prepped = session.prepare_request(request)
121-
resp = session.send(prepped, stream=True)
122-
for chunk in resp.iter_content(chunk_size=(8*1024)):
123-
outfile.write(chunk)
75+
dest_dir = run_dir(ctx["run_id"])
76+
corpora_html_path = dest_dir / "corpora.html"
77+
78+
response = hook.get_corpora_response()
79+
with open(corpora_html_path, "wb") as f:
80+
for chunk in response.iter_content(chunk_size=8192):
81+
f.write(chunk)
12482

125-
return str(output)
83+
return str(corpora_html_path)
12684

12785

12886
@task
129-
def parse_corpora_metadata(corpora_file) -> str:
87+
def parse_corpora_metadata(corpora_file) -> list[dict[str, str]]:
13088
"""
13189
Parse the HTML of the catalog to create a structured representation for
13290
further use. There is no API for LDC, so we are forced to screenscrape.
13391
13492
:param corpora_file: Location of the fetched downloads page.
135-
:returns: Path to JSON-serialized LDC metadata.
136-
:rtype:
93+
:returns: Parsed LDC metadata as a list of dicts.
94+
:rtype: list[dict[str, str]]
13795
"""
13896
ctx = get_current_context()
13997
corpora_json = run_dir(ctx["run_id"]) / "corpora.json"
@@ -148,80 +106,59 @@ def parse_corpora_metadata(corpora_file) -> str:
148106
with open(corpora_json, "w") as corpora_out:
149107
corpora_out.write(json.dumps(corpora))
150108

151-
return str(corpora_json)
109+
return corpora
152110

153111

154112
@task.short_circuit
155-
def corpus_is_available(corpora_json) -> list[dict[str, str]]:
113+
def corpus_is_available(corpora_list) -> list[dict[str, str]]:
156114
"""
157115
Check to see if the requested corpus is listed in the set of corpora
158-
avilable for download. Used to shortcircuit the ``fetch_ldc_corpus()``
116+
available for download. Used to shortcircuit the ``fetch_ldc_corpus()``
159117
task if the dataset is not available.
160118
161119
Since the LDC catalog provides multiple duplicate downloads based on
162-
invoice date, this also filters the downloads availalble to
120+
invoice date, this also filters the downloads available to
163121
those with the latest invoice date.
164122
165-
:param corpora_json: Path to JSON-serialized LDC catalog metadata.
123+
:param corpora_list: Parsed LDC metadata list.
166124
:returns: A list of LDC downloads for fetching or an empty list.
167125
:rtype: list[dict[str, str]]
168126
"""
169127
ctx = get_current_context()
170-
id_ = ctx["params"].get("ldc_corpus")
171-
with open(corpora_json) as corpora_fp:
172-
corpora = json.load(corpora_fp)
173-
174-
latest = get_latest_invoice_date(corpora=corpora, corpus_id=id_) # pyright: ignore[reportArgumentType]
175-
if latest:
176-
logger.debug("Latest invoice date for %s is %s" % (id_, latest))
177-
fnregex = re.compile(ctx["params"].get("filename_regex", ".*"))
178-
return [c for c in corpora if (
179-
c["catalog_id"] == id_
180-
and c["invoice_date"] == latest
181-
and re.search(fnregex, c.get("file"))
182-
)]
183-
return []
128+
id_ = ctx["params"].get("ldc_corpus", "")
129+
filename_regex = ctx["params"].get("filename_regex")
130+
131+
return filter_corpora(corpora=corpora_list, corpus_id=id_, filename_regex=filename_regex)
184132

185133

186134
@task
187-
def download_corpus_from_ldc(filedict, cookiejar) -> str:
135+
def download_corpus_from_ldc(filedict) -> str:
188136
"""
189-
Download a corpus from the LDC catalog and verify that the MD5
190-
checksum reported by LDC matches that of the downloaded file.
137+
Download a corpus from the LDC catalog using the authenticated hook and
138+
verify that the MD5 checksum reported by LDC matches the downloaded file.
191139
192140
:param filedict: A dict representing the file metadata
193-
:param cookiejar: Location of the cookiejar file.
194-
:param available_corpora: The metadata for the available corpora.
195141
:returns: The location of the downloaded dataset file.
196142
:rtype: str
197143
"""
198-
cookies = MozillaCookieJar(cookiejar)
199-
cookies.load(ignore_discard=True)
200-
conn = Connection.get("ldc")
201144
ctx = get_current_context()
202-
corpus = ctx["params"].get("ldc_corpus")
203-
204-
# corpus_metadata = available_corpora.get(corpus)
205-
dl_uri = f"{conn.host}/{filedict.get("download_link")}"
206-
logger.info("Fetching corpus %s: %s" % (corpus, filedict))
207-
session = requests.session()
208-
session.cookies = cookies # pyright: ignore[reportAttributeAccessIssue]
209-
request = requests.Request('GET', dl_uri)
210-
prepped = session.prepare_request(request)
211-
resp = session.send(prepped, stream=True)
145+
dest_dir = run_dir(ctx["run_id"])
146+
resp = hook.get_corpus_file(filedict["download_link"])
147+
212148
match = re.match(
213-
r'^attachment; filename="(.*)"$', resp.headers['Content-Disposition']
149+
r'^attachment; filename="(.*)"$', resp.headers.get("Content-Disposition", "")
214150
)
215151

216152
if match:
217-
dest = run_dir(ctx["run_id"]) / match.group(1)
153+
dest = dest_dir / match.group(1)
218154
else:
219155
logger.warning("No Content-Disposition header; falling back to catalog filename")
220-
dest = run_dir(ctx["run_id"]) / filedict["filename"]
156+
dest = dest_dir / filedict["filename"]
221157

222158
with open(dest, "wb") as out:
223159
for chunk in resp.iter_content(chunk_size=(8*1024)):
224-
out.write(chunk)
160+
if chunk:
161+
out.write(chunk)
225162

226163
with (
227164
open(dest, "rb") as f,
@@ -230,18 +167,20 @@ def download_corpus_from_ldc(filedict, cookiejar) -> str:
230167
dl_checksum = hashlib.md5(f).hexdigest()
231168

232169
if dl_checksum != filedict["checksum"]:
233-
logger.warning("Downloaded file's checksum %s does not match LDC checksum %s" % (dl_checksum, filedict["checksum"]))
234-
170+
logger.warning(
171+
"Downloaded file's checksum %s does not match LDC checksum %s" % (
172+
dl_checksum, filedict["checksum"]
173+
)
174+
)
235175
return str(dest)
236176

237177

238-
cookiejar = authenticate_session()
239-
corpora_file = get_available_ldc_corpora(cookiejar)
178+
corpora_file = get_available_ldc_corpora()
240179
available_corpora = parse_corpora_metadata(corpora_file)
241180
files_to_download = corpus_is_available(available_corpora)
242181
chain(
243182
files_to_download,
244-
download_corpus_from_ldc.partial(cookiejar=cookiejar).expand(filedict=files_to_download)
183+
download_corpus_from_ldc.expand(filedict=files_to_download)
245184
)
246185

247186

mokelumne/providers/ldc/__init__.py

Whitespace-only changes.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from __future__ import annotations
2+
3+
from pathlib import Path
4+
import yaml
5+
6+
_RUNTIME_FIELDS = {
7+
"package-name",
8+
"name",
9+
"description",
10+
"hook-class-names",
11+
"connection-types",
12+
"hooks",
13+
"operators",
14+
"sensors",
15+
"transfers",
16+
"triggers",
17+
"bundles",
18+
"integrations",
19+
"filesystems",
20+
"asset-uris",
21+
"dialects",
22+
"extra-links",
23+
"auth-backends",
24+
"auth-managers",
25+
"notifications",
26+
"executors",
27+
"config",
28+
}
29+
30+
31+
def get_provider_info() -> dict:
32+
data = (Path(__file__).parent / "provider.yaml").read_text()
33+
raw = yaml.safe_load(data)
34+
return {k: v for k, v in raw.items() if k in _RUNTIME_FIELDS}

mokelumne/providers/ldc/hooks/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)