Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,13 @@ Important environment variables for our build/environment:
| `AIRFLOW_IMAGE_NAME` | Sets an alternate base image for Airflow, e.g. for `slim` images | `AIRFLOW_IMAGE_NAME="apache/airflow:slim-latest"` |
| `AIRFLOW__CORE__FERNET_KEY` | [Fernet](https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html) encryption key used to encrypt Airflow secrets | `AIRFLOW__CORE__FERNET_KEY="somebase64value="` |
| `AIRFLOW__API_AUTH__JWT_SECRET` | Secret key used to sign JWT tokens for Airflow's API authentication. The default value used in development and testing should be replaced in production. | `AIRFLOW__API_AUTH__JWT_SECRET="some32bytesecret"` |
| `AIRFLOW_CONN_TIND_DEFAULT` | Airflow connection json string for TIND access.<br>Note: the Connection params listed in the example are all needed! | `AIRFLOW_CONN_TIND_DEFAULT='{"conn_type": "http","password": "your-tind-key-here","host": "https://digicoll.lib.berkeley.edu/api/v1","schema": "https"}'` |
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on the result of the above, ensure this is kept up to date with what we end up with.

Copy link
Copy Markdown
Member

@anarchivist anarchivist Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we want this in here at all and rather have a section of the README on connections.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We should also probably separate out Variables, if/when we get around to that refactor.

| `OIDC_CLIENT_SECRET` | Client secret for OIDC authentication. Used by the Airflow webserver to authenticate OIDC token requests. In development, also used by `keycloak-config-cli` to configure the client secret. This should match Keycloak configuration in development and testing, and CalNet in production. | `OIDC_CLIENT_SECRET="some32charactersecret"` |
| `OIDC_NAME` | Name appended to the OIDC login button | `OIDC_NAME="keycloak"` |
| `OIDC_CLIENT_ID` | Client ID specified in the OIDC provider. | `OIDC_CLIENT_ID="mokelumne"` |
| `OIDC_WELL_KNOWN` | URL for the OIDC provider's well-known configuration. Used by the Airflow webserver to fetch the OIDC provider's public key for validating OIDC tokens in development and testing. Dev should be configured to point at keycloak's well known and prod points to CAS OIDC well known | `OIDC_WELL_KNOWN="http://keycloak:8180/realms/berkeley-local/.well-known/openid-configuration"` |
| `OIDC_ADMIN_GROUP` | Name of the OIDC group whose members should be mapped to the "Admin" role in Airflow. Used by keycloak-config-cli to configure group membership for the 'testadmin' user and by the Airflow webserver to map OIDC groups to Airflow roles in development and testing. For simplicity this should match the what we use for prod | `OIDC_ADMIN_GROUP="cn=edu:berkeley:org:libr:mokelumne:admins,ou=campus groups,dc=berkeley,dc=edu"` |
| `OIDC_USER_GROUP` | Similar to admin group. This group is for users in both admin and user roles.| `OIDC_USER_GROUP="cn=edu:berkeley:org:libr:mokelumne:users,ou=campus groups,dc=berkeley,dc=edu"` |
| `TIND_API_KEY` | API key for TIND access | `TIND_API_KEY="..."` |
| `TIND_API_URL` | URL for TIND access | `TIND_API_URL="https://digicoll.lib.berkeley.edu/api/v1"` |
| `TIND_IIIF_MANIFEST_URL_PATTERN` | URL pattern for TIND IIIF manifests | `TIND_IIIF_MANIFEST_URL_PATTERN="https://digicoll.lib.berkeley.edu/record/{tind_id}/export/iiif_manifest"` |
| `MOKELUMNE_TIND_DOWNLOAD_DIR` | Path for downloaded image cache | `MOKELUMNE_TIND_DOWNLOAD_DIR="/some/path/to/download/to"` |
|`LANGFUSE_HOST`|Host for Langfuse|`LANGFUSE_HOST="https://us.cloud.langfuse.com"`|
Expand Down
4 changes: 1 addition & 3 deletions example.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
AIRFLOW__API_AUTH__JWT_SECRET=
# @see https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html#generating-fernet-key
AIRFLOW__CORE__FERNET_KEY=
AIRFLOW_CONN_TIND_DEFAULT='{"conn_type": "http","password": "your-tind-key-here","host": "https://digicoll.lib.berkeley.edu/api/v1","schema": "https"}'
Comment thread
jason-raitz marked this conversation as resolved.
AIRFLOW_UID=49003

# Set KeyCloak's logging level. "DEBUG" can be useful when
Expand All @@ -19,9 +20,6 @@ OIDC_NAME="keycloak"
OIDC_USER_GROUP="cn=edu:berkeley:org:libr:mokelumne:users,ou=campus groups,dc=berkeley,dc=edu"
OIDC_WELL_KNOWN="http://keycloak:8180/realms/berkeley-local/.well-known/openid-configuration"

# TBD
TIND_API_KEY=
TIND_API_URL=
TIND_IIIF_MANIFEST_URL_PATTERN=https://digicoll.lib.berkeley.edu/record/{tind_id}/export/iiif_manifest

LANGFUSE_HOST=https://us.cloud.langfuse.com
Expand Down
36 changes: 22 additions & 14 deletions mokelumne/dags/fetch_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,34 +53,41 @@ def read_csv_to_process() -> dict[str, List[str] | str]:
record_ids: list[str] = []
records: dict[str, list[str]] = {}

with csv_path.open('r', encoding='utf-8') as csv_file:
with csv_path.open("r", encoding="utf-8") as csv_file:
reader = csv.reader(csv_file)
for row in reader:
record_ids.append(row[0])
records[row[0]] = row
return {"record_ids": record_ids[1:], "records": records,
"original_run_id": original_run_id}
return {
"record_ids": record_ids[1:],
"records": records,
"original_run_id": original_run_id,
}

@task
def load_record_ids(processed: dict[str, List[str] | str]) -> List[str]:
"""Load the record IDs from the to_process job."""
return processed["record_ids"]

@task
def load_records(processed: dict[str, List[str] | str | dict[str, list[str]]])\
-> dict[str, list[str]]:
def load_records(
processed: dict[str, List[str] | str | dict[str, list[str]]],
) -> dict[str, list[str]]:
"""Load the records from the to_process job."""
return processed["records"]

@task
def fetch_image_to_record_directory(orig_run_id: str, tind_id: str) -> RunStatus:
"""Fetch an image from TIND to the target record's storage directory."""
try:
client = FetchTind(orig_run_id)
client = FetchTind.from_connection(orig_run_id, conn="tind_default")
filemd = client.get_first_file_metadata(tind_id)
if not filemd.get("mime") in SUPPORTED_IMAGE_TYPES:
return RunStatus(tind_id=tind_id, path="",
status=f"skipped: Unsupported file type {filemd.get('mime')}")
return RunStatus(
tind_id=tind_id,
path="",
status=f"skipped: Unsupported file type {filemd.get('mime')}",
)

target_width = width = filemd.get("width", 0)
target_height = height = filemd.get("height", 0)
Expand Down Expand Up @@ -130,17 +137,18 @@ def fetch_image_to_record_directory(orig_run_id: str, tind_id: str) -> RunStatus
return RunStatus(tind_id=tind_id, status="fetched", path=path)

@task(outlets=[fetched_csv])
def write_status_to_fetched_csv(orig_run_id: str, records: dict[str, list[str]],
statuses: List[RunStatus]) -> None:
def write_status_to_fetched_csv(
orig_run_id: str, records: dict[str, list[str]], statuses: List[RunStatus]
) -> None:
"""Write the status of processed records to a CSV file."""
context = get_current_context()

fetched_path = run_dir(orig_run_id) / 'fetched.csv'
with fetched_path.open('w', encoding='utf-8') as csv_file:
fetched_path = run_dir(orig_run_id) / "fetched.csv"
with fetched_path.open("w", encoding="utf-8") as csv_file:
writer = csv.writer(csv_file)
writer.writerow((*records['Record ID'], 'Image Path'))
writer.writerow((*records["Record ID"], "Image Path"))

status_col = records['Record ID'].index('Status')
status_col = records["Record ID"].index("Status")

for status in statuses:
record = [*records[status[0]], *status[2:]]
Expand Down
14 changes: 10 additions & 4 deletions mokelumne/dags/fetch_tind_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ def write_query_results_to_xml() -> int:
context = get_current_context()
run_id = context["run_id"]
tind_query = context["params"]["tind_query"]
fetch_tind = FetchTind(run_id)
fetch_tind = FetchTind.from_connection(run_id, conn="tind_default")

try:
records_written = fetch_tind.write_query_results_to_xml(tind_query, "tind_bulk.xml")
records_written = fetch_tind.write_query_results_to_xml(
tind_query, "tind_bulk.xml"
)
except Exception as ex:
raise AirflowFailException(f"Failed to write query results to XML: {ex}") from ex
raise AirflowFailException(
f"Failed to write query results to XML: {ex}"
) from ex

if records_written == 0:
raise AirflowSkipException(f"No records found for query: {tind_query}")
Expand All @@ -103,7 +107,9 @@ def write_query_results_to_xml() -> int:

return records_written

validate_params() >> write_query_results_to_xml() # pyright: ignore[reportUnusedExpression]
(
validate_params() >> write_query_results_to_xml()
) # pyright: ignore[reportUnusedExpression]
Comment thread
jason-raitz marked this conversation as resolved.


fetch_tind_records()
27 changes: 23 additions & 4 deletions mokelumne/util/fetch_tind.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import logging
from os import environ as ENV
from typing import Any
from typing import Any, Optional

from airflow.sdk import Connection

import requests
from piffle.image import IIIFImageClient
Expand All @@ -18,9 +20,24 @@

class FetchTind:
"""Helper methods for fetching items from TIND using TINDClient."""
def __init__(self, _run_id: str):

def __init__(self, _run_id: str, tind_client: Optional[TINDClient] = None):
self.run_id = _run_id
self.client = TINDClient(default_storage_dir=str(run_dir(_run_id)))
self.client = tind_client or TINDClient(
default_storage_dir=str(run_dir(_run_id))
)

@classmethod
def from_connection(cls, _run_id: str, conn: Connection | str) -> "FetchTind":
"""Create a FetchTind instance from an Airflow Connection."""
if isinstance(conn, str):
conn = Connection.get(conn)
client = TINDClient(
api_url=conn.host,
api_key=conn.password,
default_storage_dir=str(run_dir(_run_id)),
)
return cls(_run_id, tind_client=client)

def get_ids(self, tind_query: str) -> list[str]:
"""Return the TIND IDs that match a given query."""
Expand Down Expand Up @@ -75,5 +92,7 @@ def download_image_from_record_sized(self, tind_id: str, width: int, height: int

def write_query_results_to_xml(self, tind_query: str, file_name: str = "") -> int:
"""Download the XML results of a search query from TIND."""
records_written = self.client.write_search_results_to_file(tind_query, file_name)
records_written = self.client.write_search_results_to_file(
tind_query, file_name
)
return int(records_written)
Loading