diff --git a/mokelumne/dags/fetch_ldc_corpus.py b/mokelumne/dags/fetch_ldc_corpus.py new file mode 100644 index 0000000..2003525 --- /dev/null +++ b/mokelumne/dags/fetch_ldc_corpus.py @@ -0,0 +1,271 @@ +# pyright: reportTypedDictNotRequiredAccess=false + +from __future__ import annotations +import hashlib +import json +import logging +import re + +from http.cookiejar import MozillaCookieJar +from mmap import mmap, ACCESS_READ +from typing import Sequence + +import requests + +from airflow.sdk import Connection, Param, chain, dag, get_current_context, task +from bs4 import BeautifulSoup +from bs4.element import Tag +from bs4._typing import _IncomingMarkup, _AtMostOneTag + +from mokelumne.util.storage import run_dir + +logger = logging.getLogger(__name__) + +@dag( + schedule=None, + catchup=False, + params={ + "ldc_corpus": Param("", description="LDC Catalog ID", type="string") + }, + tags=["ldsp"], +) +def fetch_ldc_corpus(): + """ + Fetch a corpus from the `Linguistic Data Consortium catalog`_. LDC + does not provide an API, so we have to screenscrape into an authorized + session to fetch the list of available datasets. + + This is effectively a reimplementation of `ldcdl`_ by Jonathan May and + Alex Hedges. + + TODO: Integrate a destination directory for the downloaded files. + + .. _Linguistic Data Consortium catalog: https://catalog.ldc.upenn.edu/ + .. _ldcdl: https://github.com/jonmay/ldcdl + """ + + def get_csrf_token( + markup: _IncomingMarkup, param_name: str="authenticity_token" + ) -> dict[str, str]: + """ + Given HTML, extract the CSRF authenticity token and its parameter + name. + + :param markup: the incoming HTML markup to parse. + :type markup: bs4._typing._IncomingMarkup + :param param_name: the parameter name to look up and return + :type param_name: str + :returns: The parameter name and its value. + :rtype: dict[str, str] + """ + soup = BeautifulSoup(markup=markup, features="html.parser") + tag = soup.find(name="input", attrs={"name": param_name}) + if tag and tag.get("value"): + return {param_name: tag["value"]} # pyright: ignore[reportReturnType] + return {} + + @task + def authenticate_session() -> str: + """ + Authenticate a browser session and persist the session's cookies to a + cookiejar file. + + :returns: Location of the cookiejar file. + :rtype: str + """ + ctx = get_current_context() + cj = run_dir(ctx["run_id"]) / "cookies.txt" + conn = Connection.get("ldc") + login_url = f"{conn.host}/login" + session = requests.Session() + login_page = session.get(login_url) + form_data = get_csrf_token(login_page.text) + form_data["spree_user[login]"] = conn.login + form_data["spree_user[password]"] = conn.password + form_data["utf8"] = "✓" + + login_request = requests.Request("POST", url=login_url, data=form_data) + prepped = session.prepare_request(login_request) + _ = session.send(prepped) + cookies = MozillaCookieJar(filename=cj) + for c in session.cookies: + cookies.set_cookie(c) + # LDC catalog checks for the presence of the `_xiexie` cookie, + # which is a session cookie. If not present, it automatically + # redirects to login. + cookies.save(ignore_discard=True) + return str(cj) + + + @task + def get_available_ldc_corpora(cookiejar) -> str: + """ + Fetch the page listing the corpora available for download from the LDC + catalog. This is an HTML page cached locally for further parsing. + + :param cookiejar: The path to a cookiejar file for LDC + :returns: Path to the HTML file fetched from LDC. + :rtype: str + """ + cookies = MozillaCookieJar(filename=cookiejar) + cookies.load(ignore_discard=True) + ctx = get_current_context() + conn = Connection.get("ldc") + output = run_dir(ctx["run_id"]) / "corpora.html" + datasets_url = f"{conn.host}/organization/downloads" + + with (requests.Session() as session, open(output, "wb") as outfile): + session.cookies = cookies # pyright: ignore[reportAttributeAccessIssue] + request = requests.Request('GET', datasets_url) + prepped = session.prepare_request(request) + resp = session.send(prepped, stream=True) + for chunk in resp.iter_content(chunk_size=(8*1024)): + outfile.write(chunk) + + return str(output) + + + def scrape_corpus_metadata(tag: Tag) -> dict[str, str]: + """ + Parse the HTML of a given table row to return structed metadata. + + :param tag: a `` tag preparsed by BeautifulSoup. + :type tag: bs4.element.Tag + :returns: Structured metadata for a corpus. . + :rtype: dict[str, str] + """ + cells = tag.find_all("td") + if len(cells) > 0: + catalog_id = cells[0].get_text(strip=True) + corpus_name = cells[1].get_text(strip=True) + # we don't want to get the invoice_date, because that leads to + # duplicate entries that are harder to filter. + download_link = cells[3].a["href"] # pyright: ignore[reportOptionalSubscript] + # the technical metadata is not broken up into distinct cells, so + # we have to parse it more. the "file" metadata is also not + # the true filename as returned by LDC. + techmd = cells[4].get_text(strip=True, separator="\n").splitlines() + file, filesize, checksum = [ + re.sub(r"^\s*(File Size|MD5 Checksum): ", "", t) for t in techmd + ] + return { + "catalog_id": catalog_id, + "corpus_name": corpus_name, + "download_link": download_link, # pyright: ignore[reportReturnType] + "file": file, + "filesize": filesize, + "checksum": checksum + } + return {} + + @task + def parse_corpora_metadata(corpora_file) -> Sequence[dict[str, str]]: + """ + Parse the HTML of the catalog to create a structured representation for + further use. There is no API for LDC, so we are forced to screenscrape. + + :param corpora_file: Location of the fetched downloads page. + :returns: Metadata for the available corpora. The keys of + in the returned dict are the LDC Catalog IDs for each corpus. + :rtype: dict[str, str] + """ + ctx = get_current_context() + corpora_json = run_dir(ctx["run_id"]) / "corpora.json" + + with open(corpora_file) as page: + corpora_html = page.read() + + data = BeautifulSoup(corpora_html, "html.parser") + rows = data.select("#user-corpora-download-table > tbody > tr") + corpora = [scrape_corpus_metadata(row) for row in rows] + + """ + for c in corpora_rows: + data = c.find_all('td') + cid = data[0].get_text(strip=True) + if cid not in corpora: + corpora[cid] = {} + corpora[cid]["name"] = data[1].get_text(strip=True) + corpora[cid]["invoice_date"] = data[2].get_text(strip=True) + corpora[cid]["download_link"] = data[3].a["href"] + + techmd = data[4].get_text(strip=True, separator="\n").splitlines() + corpora[cid]["filename"], corpora[cid]["filesize"], corpora[cid]["checksum"] = [ + re.sub(r"^\s*(File Size|MD5 Checksum): ", "", t) for t in techmd + ] + """ + + with open(corpora_json, "w") as corpora_out: + corpora_out.write(json.dumps(corpora)) + + return corpora + + + @task.short_circuit + def corpus_is_available(corpora) -> list[dict[str, str]]: + """ + Check to see if the requested corpus is listed in the set of corpora + avilable for download. Used to shortcircuit the ``fetch_ldc_corpus()`` + task if the dataset is not available + + :param corpora: Metadata for the available corpora. + :returns: Whether the corpus is available for download. + :rtype: bool + """ + ctx = get_current_context() + ldc_corpus = ctx["params"].get("ldc_corpus") + return ldc_corpus in corpora.keys() + + + @task + def download_corpus_from_ldc(cookiejar, available_corpora): + """ + Download a corpus from the LDC catalog and verify that the MD5 + checksum reported by LDC matches that of the downloaded file. + + :param cookiejar: Location of the cookiejar file. + :param available_corpora: The metadata for the available corpora. + :returns: The location of the downloaded dataset file. + :rtype: str + """ + cookies = MozillaCookieJar(cookiejar) + cookies.load(ignore_discard=True) + conn = Connection.get("ldc") + ctx = get_current_context() + corpus = ctx["params"].get("ldc_corpus") + + corpus_metadata = available_corpora.get(corpus) + dl_uri = f"{conn.host}/{corpus_metadata.get("download_link")}" + logger.info("Fetching corpus %s: %s" % (corpus, corpus_metadata)) + + dest = run_dir(ctx["run_id"]) / corpus_metadata["filename"] + + with (requests.Session() as session, open(dest, "wb") as out): + session.cookies = cookies # pyright: ignore[reportAttributeAccessIssue] + request = requests.Request('GET', dl_uri) + prepped = session.prepare_request(request) + resp = session.send(prepped, stream=True) + for chunk in resp.iter_content(chunk_size=(8*1024)): + out.write(chunk) + + with ( + open(dest, "rb") as f, + mmap(f.fileno(), 0, access=ACCESS_READ) as f + ): + dl_checksum = hashlib.md5(f).hexdigest() + + if dl_checksum != corpus_metadata["checksum"]: + logger.warning("Downloaded file's checksum %s does not match LDC checksum %s" % dl_checksum, corpus_metadata["checksum"]) + + return str(dest) + + + cookiejar = authenticate_session() + corpora_file = get_available_ldc_corpora(cookiejar) + available_corpora = parse_corpora_metadata(corpora_file) + chain( + corpus_is_available(available_corpora), + download_corpus_from_ldc(cookiejar, available_corpora) + ) + +fetch_ldc_corpus() # pyright: ignore[reportUnusedExpression] diff --git a/pyproject.toml b/pyproject.toml index fc9217d..94e5846 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "flask-appbuilder", "langchain[aws]", "langfuse", + "mechanize", "piffle", # IIIF API used in batch image description DAG "pymarc", "python-tind-client", diff --git a/requirements.txt b/requirements.txt index 9995863..a163781 100644 --- a/requirements.txt +++ b/requirements.txt @@ -797,6 +797,10 @@ h11==0.16.0 \ # via # httpcore # uvicorn +html5lib==1.1 \ + --hash=sha256:0d78f8fde1c230e99fe37986a60526d7049ed4bf8a9fadbad5f00e22e58e041d \ + --hash=sha256:b2e5b40261e20f354d198eae92afc10d750afb487ed5e50f9c4eaf07c184146f + # via mechanize httpcore==1.0.9 \ --hash=sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55 \ --hash=sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8 @@ -1206,6 +1210,10 @@ mdurl==0.1.2 \ --hash=sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8 \ --hash=sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba # via markdown-it-py +mechanize==0.4.10 \ + --hash=sha256:1dea947f9be7ea0ab610f7bbc4a4e36b45d6bfdfceea29ad3d389a88a1957ddf \ + --hash=sha256:246e21aa30a74ca608c2a06a922454e699fcb37edc9b79fcbba0c67712c2ec79 + # via mokelumne (pyproject.toml) methodtools==0.4.7 \ --hash=sha256:5e188c780b236adc12e75b5f078c5afb419ef99eb648569fc6d7071f053a1f11 \ --hash=sha256:e213439dd64cfe60213f7015da6efe5dd4003fd89376db3baa09fe13ec2bb0ba @@ -2750,6 +2758,10 @@ watchfiles==1.1.1 \ --hash=sha256:f8979280bdafff686ba5e4d8f97840f929a87ed9cdf133cbbd42f7766774d2aa \ --hash=sha256:f9a2ae5c91cecc9edd47e041a930490c31c3afb1f5e6d71de3dc671bfaca02bf # via uvicorn +webencodings==0.5.1 \ + --hash=sha256:a0af1213f3c2226497a97e2b3aa01a7e4bee4f403f95be16fc9acd2947514a78 \ + --hash=sha256:b36a1c245f2d304965eb4e0a82848379241dc04b865afcc4aab16748587e1923 + # via html5lib websockets==16.0 \ --hash=sha256:0298d07ee155e2e9fda5be8a9042200dd2e3bb0b8a38482156576f863a9d457c \ --hash=sha256:04cdd5d2d1dacbad0a7bf36ccbcd3ccd5a30ee188f2560b7a62a30d14107b31a \