Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 271 additions & 0 deletions mokelumne/dags/fetch_ldc_corpus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
# pyright: reportTypedDictNotRequiredAccess=false

from __future__ import annotations
import hashlib
import json
import logging
import re

from http.cookiejar import MozillaCookieJar
from mmap import mmap, ACCESS_READ
from typing import Sequence

import requests

from airflow.sdk import Connection, Param, chain, dag, get_current_context, task
from bs4 import BeautifulSoup
from bs4.element import Tag
from bs4._typing import _IncomingMarkup, _AtMostOneTag

from mokelumne.util.storage import run_dir

logger = logging.getLogger(__name__)

@dag(
schedule=None,
catchup=False,
params={
"ldc_corpus": Param("", description="LDC Catalog ID", type="string")
},
tags=["ldsp"],
)
def fetch_ldc_corpus():
"""
Fetch a corpus from the `Linguistic Data Consortium catalog`_. LDC
does not provide an API, so we have to screenscrape into an authorized
session to fetch the list of available datasets.

This is effectively a reimplementation of `ldcdl`_ by Jonathan May and
Alex Hedges.

TODO: Integrate a destination directory for the downloaded files.

.. _Linguistic Data Consortium catalog: https://catalog.ldc.upenn.edu/
.. _ldcdl: https://github.com/jonmay/ldcdl
"""

def get_csrf_token(
markup: _IncomingMarkup, param_name: str="authenticity_token"
) -> dict[str, str]:
"""
Given HTML, extract the CSRF authenticity token and its parameter
name.

:param markup: the incoming HTML markup to parse.
:type markup: bs4._typing._IncomingMarkup
:param param_name: the parameter name to look up and return
:type param_name: str
:returns: The parameter name and its value.
:rtype: dict[str, str]
"""
soup = BeautifulSoup(markup=markup, features="html.parser")
tag = soup.find(name="input", attrs={"name": param_name})
if tag and tag.get("value"):
return {param_name: tag["value"]} # pyright: ignore[reportReturnType]
return {}

@task
def authenticate_session() -> str:
"""
Authenticate a browser session and persist the session's cookies to a
cookiejar file.

:returns: Location of the cookiejar file.
:rtype: str
"""
ctx = get_current_context()
cj = run_dir(ctx["run_id"]) / "cookies.txt"
conn = Connection.get("ldc")
login_url = f"{conn.host}/login"
session = requests.Session()
login_page = session.get(login_url)
form_data = get_csrf_token(login_page.text)
form_data["spree_user[login]"] = conn.login
form_data["spree_user[password]"] = conn.password
form_data["utf8"] = "✓"

login_request = requests.Request("POST", url=login_url, data=form_data)
prepped = session.prepare_request(login_request)
_ = session.send(prepped)
cookies = MozillaCookieJar(filename=cj)
for c in session.cookies:
cookies.set_cookie(c)
# LDC catalog checks for the presence of the `_xiexie` cookie,
# which is a session cookie. If not present, it automatically
# redirects to login.
cookies.save(ignore_discard=True)
return str(cj)


@task
def get_available_ldc_corpora(cookiejar) -> str:
"""
Fetch the page listing the corpora available for download from the LDC
catalog. This is an HTML page cached locally for further parsing.

:param cookiejar: The path to a cookiejar file for LDC
:returns: Path to the HTML file fetched from LDC.
:rtype: str
"""
cookies = MozillaCookieJar(filename=cookiejar)
cookies.load(ignore_discard=True)
ctx = get_current_context()
conn = Connection.get("ldc")
output = run_dir(ctx["run_id"]) / "corpora.html"
datasets_url = f"{conn.host}/organization/downloads"

with (requests.Session() as session, open(output, "wb") as outfile):
session.cookies = cookies # pyright: ignore[reportAttributeAccessIssue]
request = requests.Request('GET', datasets_url)
prepped = session.prepare_request(request)
resp = session.send(prepped, stream=True)
for chunk in resp.iter_content(chunk_size=(8*1024)):
outfile.write(chunk)

return str(output)


def scrape_corpus_metadata(tag: Tag) -> dict[str, str]:
"""
Parse the HTML of a given table row to return structed metadata.

:param tag: a `<tr>` tag preparsed by BeautifulSoup.
:type tag: bs4.element.Tag
:returns: Structured metadata for a corpus. .
:rtype: dict[str, str]
"""
cells = tag.find_all("td")
if len(cells) > 0:
catalog_id = cells[0].get_text(strip=True)
corpus_name = cells[1].get_text(strip=True)
# we don't want to get the invoice_date, because that leads to
# duplicate entries that are harder to filter.
download_link = cells[3].a["href"] # pyright: ignore[reportOptionalSubscript]
# the technical metadata is not broken up into distinct cells, so
# we have to parse it more. the "file" metadata is also not
# the true filename as returned by LDC.
techmd = cells[4].get_text(strip=True, separator="\n").splitlines()
file, filesize, checksum = [
re.sub(r"^\s*(File Size|MD5 Checksum): ", "", t) for t in techmd
]
return {
"catalog_id": catalog_id,
"corpus_name": corpus_name,
"download_link": download_link, # pyright: ignore[reportReturnType]
"file": file,
"filesize": filesize,
"checksum": checksum
}
return {}

@task
def parse_corpora_metadata(corpora_file) -> Sequence[dict[str, str]]:
"""
Parse the HTML of the catalog to create a structured representation for
further use. There is no API for LDC, so we are forced to screenscrape.

:param corpora_file: Location of the fetched downloads page.
:returns: Metadata for the available corpora. The keys of
in the returned dict are the LDC Catalog IDs for each corpus.
:rtype: dict[str, str]
"""
ctx = get_current_context()
corpora_json = run_dir(ctx["run_id"]) / "corpora.json"

with open(corpora_file) as page:
corpora_html = page.read()

data = BeautifulSoup(corpora_html, "html.parser")
rows = data.select("#user-corpora-download-table > tbody > tr")
corpora = [scrape_corpus_metadata(row) for row in rows]

"""
for c in corpora_rows:
data = c.find_all('td')
cid = data[0].get_text(strip=True)
if cid not in corpora:
corpora[cid] = {}
corpora[cid]["name"] = data[1].get_text(strip=True)
corpora[cid]["invoice_date"] = data[2].get_text(strip=True)
corpora[cid]["download_link"] = data[3].a["href"]

techmd = data[4].get_text(strip=True, separator="\n").splitlines()
corpora[cid]["filename"], corpora[cid]["filesize"], corpora[cid]["checksum"] = [
re.sub(r"^\s*(File Size|MD5 Checksum): ", "", t) for t in techmd
]
"""

with open(corpora_json, "w") as corpora_out:
corpora_out.write(json.dumps(corpora))

return corpora


@task.short_circuit
def corpus_is_available(corpora) -> list[dict[str, str]]:
"""
Check to see if the requested corpus is listed in the set of corpora
avilable for download. Used to shortcircuit the ``fetch_ldc_corpus()``
task if the dataset is not available

:param corpora: Metadata for the available corpora.
:returns: Whether the corpus is available for download.
:rtype: bool
"""
ctx = get_current_context()
ldc_corpus = ctx["params"].get("ldc_corpus")
return ldc_corpus in corpora.keys()


@task
def download_corpus_from_ldc(cookiejar, available_corpora):
"""
Download a corpus from the LDC catalog and verify that the MD5
checksum reported by LDC matches that of the downloaded file.

:param cookiejar: Location of the cookiejar file.
:param available_corpora: The metadata for the available corpora.
:returns: The location of the downloaded dataset file.
:rtype: str
"""
cookies = MozillaCookieJar(cookiejar)
cookies.load(ignore_discard=True)
conn = Connection.get("ldc")
ctx = get_current_context()
corpus = ctx["params"].get("ldc_corpus")

corpus_metadata = available_corpora.get(corpus)
dl_uri = f"{conn.host}/{corpus_metadata.get("download_link")}"
logger.info("Fetching corpus %s: %s" % (corpus, corpus_metadata))

dest = run_dir(ctx["run_id"]) / corpus_metadata["filename"]

with (requests.Session() as session, open(dest, "wb") as out):
session.cookies = cookies # pyright: ignore[reportAttributeAccessIssue]
request = requests.Request('GET', dl_uri)
prepped = session.prepare_request(request)
resp = session.send(prepped, stream=True)
for chunk in resp.iter_content(chunk_size=(8*1024)):
out.write(chunk)

with (
open(dest, "rb") as f,
mmap(f.fileno(), 0, access=ACCESS_READ) as f
):
dl_checksum = hashlib.md5(f).hexdigest()

if dl_checksum != corpus_metadata["checksum"]:
logger.warning("Downloaded file's checksum %s does not match LDC checksum %s" % dl_checksum, corpus_metadata["checksum"])

return str(dest)


cookiejar = authenticate_session()
corpora_file = get_available_ldc_corpora(cookiejar)
available_corpora = parse_corpora_metadata(corpora_file)
chain(
corpus_is_available(available_corpora),
download_corpus_from_ldc(cookiejar, available_corpora)
)

fetch_ldc_corpus() # pyright: ignore[reportUnusedExpression]
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"flask-appbuilder",
"langchain[aws]",
"langfuse",
"mechanize",
"piffle", # IIIF API used in batch image description DAG
"pymarc",
"python-tind-client",
Expand Down
12 changes: 12 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,10 @@ h11==0.16.0 \
# via
# httpcore
# uvicorn
html5lib==1.1 \
--hash=sha256:0d78f8fde1c230e99fe37986a60526d7049ed4bf8a9fadbad5f00e22e58e041d \
--hash=sha256:b2e5b40261e20f354d198eae92afc10d750afb487ed5e50f9c4eaf07c184146f
# via mechanize
httpcore==1.0.9 \
--hash=sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55 \
--hash=sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8
Expand Down Expand Up @@ -1206,6 +1210,10 @@ mdurl==0.1.2 \
--hash=sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8 \
--hash=sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba
# via markdown-it-py
mechanize==0.4.10 \
--hash=sha256:1dea947f9be7ea0ab610f7bbc4a4e36b45d6bfdfceea29ad3d389a88a1957ddf \
--hash=sha256:246e21aa30a74ca608c2a06a922454e699fcb37edc9b79fcbba0c67712c2ec79
# via mokelumne (pyproject.toml)
methodtools==0.4.7 \
--hash=sha256:5e188c780b236adc12e75b5f078c5afb419ef99eb648569fc6d7071f053a1f11 \
--hash=sha256:e213439dd64cfe60213f7015da6efe5dd4003fd89376db3baa09fe13ec2bb0ba
Expand Down Expand Up @@ -2750,6 +2758,10 @@ watchfiles==1.1.1 \
--hash=sha256:f8979280bdafff686ba5e4d8f97840f929a87ed9cdf133cbbd42f7766774d2aa \
--hash=sha256:f9a2ae5c91cecc9edd47e041a930490c31c3afb1f5e6d71de3dc671bfaca02bf
# via uvicorn
webencodings==0.5.1 \
--hash=sha256:a0af1213f3c2226497a97e2b3aa01a7e4bee4f403f95be16fc9acd2947514a78 \
--hash=sha256:b36a1c245f2d304965eb4e0a82848379241dc04b865afcc4aab16748587e1923
# via html5lib
websockets==16.0 \
--hash=sha256:0298d07ee155e2e9fda5be8a9042200dd2e3bb0b8a38482156576f863a9d457c \
--hash=sha256:04cdd5d2d1dacbad0a7bf36ccbcd3ccd5a30ee188f2560b7a62a30d14107b31a \
Expand Down
Loading