diff --git a/README.md b/README.md index 8d8841b..072db45 100644 --- a/README.md +++ b/README.md @@ -85,14 +85,13 @@ Important environment variables for our build/environment: | `AIRFLOW_IMAGE_NAME` | Sets an alternate base image for Airflow, e.g. for `slim` images | `AIRFLOW_IMAGE_NAME="apache/airflow:slim-latest"` | | `AIRFLOW__CORE__FERNET_KEY` | [Fernet](https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html) encryption key used to encrypt Airflow secrets | `AIRFLOW__CORE__FERNET_KEY="somebase64value="` | | `AIRFLOW__API_AUTH__JWT_SECRET` | Secret key used to sign JWT tokens for Airflow's API authentication. The default value used in development and testing should be replaced in production. | `AIRFLOW__API_AUTH__JWT_SECRET="some32bytesecret"` | +| `AIRFLOW_CONN_TIND_DEFAULT` | Airflow connection json string for TIND access.
Note: the Connection params listed in the example are all needed! | `AIRFLOW_CONN_TIND_DEFAULT='{"conn_type": "http","password": "your-tind-key-here","host": "https://digicoll.lib.berkeley.edu/api/v1","schema": "https"}'` | | `OIDC_CLIENT_SECRET` | Client secret for OIDC authentication. Used by the Airflow webserver to authenticate OIDC token requests. In development, also used by `keycloak-config-cli` to configure the client secret. This should match Keycloak configuration in development and testing, and CalNet in production. | `OIDC_CLIENT_SECRET="some32charactersecret"` | | `OIDC_NAME` | Name appended to the OIDC login button | `OIDC_NAME="keycloak"` | | `OIDC_CLIENT_ID` | Client ID specified in the OIDC provider. | `OIDC_CLIENT_ID="mokelumne"` | | `OIDC_WELL_KNOWN` | URL for the OIDC provider's well-known configuration. Used by the Airflow webserver to fetch the OIDC provider's public key for validating OIDC tokens in development and testing. Dev should be configured to point at keycloak's well known and prod points to CAS OIDC well known | `OIDC_WELL_KNOWN="http://keycloak:8180/realms/berkeley-local/.well-known/openid-configuration"` | | `OIDC_ADMIN_GROUP` | Name of the OIDC group whose members should be mapped to the "Admin" role in Airflow. Used by keycloak-config-cli to configure group membership for the 'testadmin' user and by the Airflow webserver to map OIDC groups to Airflow roles in development and testing. For simplicity this should match the what we use for prod | `OIDC_ADMIN_GROUP="cn=edu:berkeley:org:libr:mokelumne:admins,ou=campus groups,dc=berkeley,dc=edu"` | | `OIDC_USER_GROUP` | Similar to admin group. This group is for users in both admin and user roles.| `OIDC_USER_GROUP="cn=edu:berkeley:org:libr:mokelumne:users,ou=campus groups,dc=berkeley,dc=edu"` | -| `TIND_API_KEY` | API key for TIND access | `TIND_API_KEY="..."` | -| `TIND_API_URL` | URL for TIND access | `TIND_API_URL="https://digicoll.lib.berkeley.edu/api/v1"` | | `TIND_IIIF_MANIFEST_URL_PATTERN` | URL pattern for TIND IIIF manifests | `TIND_IIIF_MANIFEST_URL_PATTERN="https://digicoll.lib.berkeley.edu/record/{tind_id}/export/iiif_manifest"` | | `MOKELUMNE_TIND_DOWNLOAD_DIR` | Path for downloaded image cache | `MOKELUMNE_TIND_DOWNLOAD_DIR="/some/path/to/download/to"` | |`LANGFUSE_HOST`|Host for Langfuse|`LANGFUSE_HOST="https://us.cloud.langfuse.com"`| diff --git a/example.env b/example.env index 10af104..c9fea1e 100644 --- a/example.env +++ b/example.env @@ -4,6 +4,7 @@ AIRFLOW__API_AUTH__JWT_SECRET= # @see https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html#generating-fernet-key AIRFLOW__CORE__FERNET_KEY= +AIRFLOW_CONN_TIND_DEFAULT='{"conn_type": "http","password": "your-tind-key-here","host": "https://digicoll.lib.berkeley.edu/api/v1","schema": "https"}' AIRFLOW_UID=49003 # Set KeyCloak's logging level. "DEBUG" can be useful when @@ -19,9 +20,6 @@ OIDC_NAME="keycloak" OIDC_USER_GROUP="cn=edu:berkeley:org:libr:mokelumne:users,ou=campus groups,dc=berkeley,dc=edu" OIDC_WELL_KNOWN="http://keycloak:8180/realms/berkeley-local/.well-known/openid-configuration" -# TBD -TIND_API_KEY= -TIND_API_URL= TIND_IIIF_MANIFEST_URL_PATTERN=https://digicoll.lib.berkeley.edu/record/{tind_id}/export/iiif_manifest LANGFUSE_HOST=https://us.cloud.langfuse.com diff --git a/mokelumne/dags/fetch_images.py b/mokelumne/dags/fetch_images.py index e871d40..3d95687 100644 --- a/mokelumne/dags/fetch_images.py +++ b/mokelumne/dags/fetch_images.py @@ -53,13 +53,16 @@ def read_csv_to_process() -> dict[str, List[str] | str]: record_ids: list[str] = [] records: dict[str, list[str]] = {} - with csv_path.open('r', encoding='utf-8') as csv_file: + with csv_path.open("r", encoding="utf-8") as csv_file: reader = csv.reader(csv_file) for row in reader: record_ids.append(row[0]) records[row[0]] = row - return {"record_ids": record_ids[1:], "records": records, - "original_run_id": original_run_id} + return { + "record_ids": record_ids[1:], + "records": records, + "original_run_id": original_run_id, + } @task def load_record_ids(processed: dict[str, List[str] | str]) -> List[str]: @@ -67,8 +70,9 @@ def load_record_ids(processed: dict[str, List[str] | str]) -> List[str]: return processed["record_ids"] @task - def load_records(processed: dict[str, List[str] | str | dict[str, list[str]]])\ - -> dict[str, list[str]]: + def load_records( + processed: dict[str, List[str] | str | dict[str, list[str]]], + ) -> dict[str, list[str]]: """Load the records from the to_process job.""" return processed["records"] @@ -76,11 +80,14 @@ def load_records(processed: dict[str, List[str] | str | dict[str, list[str]]])\ def fetch_image_to_record_directory(orig_run_id: str, tind_id: str) -> RunStatus: """Fetch an image from TIND to the target record's storage directory.""" try: - client = FetchTind(orig_run_id) + client = FetchTind.from_connection(orig_run_id, conn="tind_default") filemd = client.get_first_file_metadata(tind_id) if not filemd.get("mime") in SUPPORTED_IMAGE_TYPES: - return RunStatus(tind_id=tind_id, path="", - status=f"skipped: Unsupported file type {filemd.get('mime')}") + return RunStatus( + tind_id=tind_id, + path="", + status=f"skipped: Unsupported file type {filemd.get('mime')}", + ) target_width = width = filemd.get("width", 0) target_height = height = filemd.get("height", 0) @@ -130,17 +137,18 @@ def fetch_image_to_record_directory(orig_run_id: str, tind_id: str) -> RunStatus return RunStatus(tind_id=tind_id, status="fetched", path=path) @task(outlets=[fetched_csv]) - def write_status_to_fetched_csv(orig_run_id: str, records: dict[str, list[str]], - statuses: List[RunStatus]) -> None: + def write_status_to_fetched_csv( + orig_run_id: str, records: dict[str, list[str]], statuses: List[RunStatus] + ) -> None: """Write the status of processed records to a CSV file.""" context = get_current_context() - fetched_path = run_dir(orig_run_id) / 'fetched.csv' - with fetched_path.open('w', encoding='utf-8') as csv_file: + fetched_path = run_dir(orig_run_id) / "fetched.csv" + with fetched_path.open("w", encoding="utf-8") as csv_file: writer = csv.writer(csv_file) - writer.writerow((*records['Record ID'], 'Image Path')) + writer.writerow((*records["Record ID"], "Image Path")) - status_col = records['Record ID'].index('Status') + status_col = records["Record ID"].index("Status") for status in statuses: record = [*records[status[0]], *status[2:]] diff --git a/mokelumne/dags/fetch_tind_records.py b/mokelumne/dags/fetch_tind_records.py index b8c0016..9c300ea 100644 --- a/mokelumne/dags/fetch_tind_records.py +++ b/mokelumne/dags/fetch_tind_records.py @@ -86,12 +86,16 @@ def write_query_results_to_xml() -> int: context = get_current_context() run_id = context["run_id"] tind_query = context["params"]["tind_query"] - fetch_tind = FetchTind(run_id) + fetch_tind = FetchTind.from_connection(run_id, conn="tind_default") try: - records_written = fetch_tind.write_query_results_to_xml(tind_query, "tind_bulk.xml") + records_written = fetch_tind.write_query_results_to_xml( + tind_query, "tind_bulk.xml" + ) except Exception as ex: - raise AirflowFailException(f"Failed to write query results to XML: {ex}") from ex + raise AirflowFailException( + f"Failed to write query results to XML: {ex}" + ) from ex if records_written == 0: raise AirflowSkipException(f"No records found for query: {tind_query}") @@ -103,7 +107,9 @@ def write_query_results_to_xml() -> int: return records_written - validate_params() >> write_query_results_to_xml() # pyright: ignore[reportUnusedExpression] + ( + validate_params() >> write_query_results_to_xml() + ) # pyright: ignore[reportUnusedExpression] fetch_tind_records() diff --git a/mokelumne/util/fetch_tind.py b/mokelumne/util/fetch_tind.py index 37339c1..87f15b4 100644 --- a/mokelumne/util/fetch_tind.py +++ b/mokelumne/util/fetch_tind.py @@ -2,7 +2,9 @@ import logging from os import environ as ENV -from typing import Any +from typing import Any, Optional + +from airflow.sdk import Connection import requests from piffle.image import IIIFImageClient @@ -18,9 +20,24 @@ class FetchTind: """Helper methods for fetching items from TIND using TINDClient.""" - def __init__(self, _run_id: str): + + def __init__(self, _run_id: str, tind_client: Optional[TINDClient] = None): self.run_id = _run_id - self.client = TINDClient(default_storage_dir=str(run_dir(_run_id))) + self.client = tind_client or TINDClient( + default_storage_dir=str(run_dir(_run_id)) + ) + + @classmethod + def from_connection(cls, _run_id: str, conn: Connection | str) -> "FetchTind": + """Create a FetchTind instance from an Airflow Connection.""" + if isinstance(conn, str): + conn = Connection.get(conn) + client = TINDClient( + api_url=conn.host, + api_key=conn.password, + default_storage_dir=str(run_dir(_run_id)), + ) + return cls(_run_id, tind_client=client) def get_ids(self, tind_query: str) -> list[str]: """Return the TIND IDs that match a given query.""" @@ -75,5 +92,7 @@ def download_image_from_record_sized(self, tind_id: str, width: int, height: int def write_query_results_to_xml(self, tind_query: str, file_name: str = "") -> int: """Download the XML results of a search query from TIND.""" - records_written = self.client.write_search_results_to_file(tind_query, file_name) + records_written = self.client.write_search_results_to_file( + tind_query, file_name + ) return int(records_written)