Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/huggingface_hub/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ def list_files(repo_id: str):

HF_XET_HIGH_PERFORMANCE: bool = _is_true(os.environ.get("HF_XET_HIGH_PERFORMANCE"))

# Opt-in to bucket-based script transport for Jobs (experimental)
HF_JOBS_USE_BUCKET_TRANSPORT: bool = _is_true(os.environ.get("HF_JOBS_USE_BUCKET_TRANSPORT"))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo we can already set it to True by default

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably makes sense indeed! I'll wait to see what @Wauplin thinks


# hf_transfer is not used anymore. Let's warn user is case they set the env variable
if _is_true(os.environ.get("HF_HUB_ENABLE_HF_TRANSFER")) and not HF_XET_HIGH_PERFORMANCE:
import warnings
Expand Down
177 changes: 132 additions & 45 deletions src/huggingface_hub/hf_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
from .utils._auth import _get_token_from_environment, _get_token_from_file, _get_token_from_google_colab
from .utils._deprecation import _deprecate_arguments, _deprecate_method
from .utils._http import _httpx_follow_relative_redirects_with_backoff
from .utils._runtime import is_xet_available
from .utils._typing import CallableT
from .utils._verification import collect_local_files, resolve_local_root, verify_maps
from .utils.endpoint_helpers import _is_emission_within_threshold
Expand Down Expand Up @@ -11467,7 +11468,7 @@ def run_uv_job(
secrets = secrets or {}

# Build command
command, env, secrets = self._create_uv_command_env_and_secrets(
command, env, secrets, extra_volumes = self._create_uv_command_env_and_secrets(
script=script,
script_args=script_args,
dependencies=dependencies,
Expand All @@ -11476,7 +11477,10 @@ def run_uv_job(
secrets=secrets,
namespace=namespace,
token=token,
volumes=volumes,
)
if extra_volumes:
volumes = (volumes or []) + extra_volumes
# Create RunCommand args
return self.run_job(
image=image,
Expand Down Expand Up @@ -11886,7 +11890,7 @@ def create_scheduled_uv_job(
"""
image = image or "ghcr.io/astral-sh/uv:python3.12-bookworm"
# Build command
command, env, secrets = self._create_uv_command_env_and_secrets(
command, env, secrets, extra_volumes = self._create_uv_command_env_and_secrets(
script=script,
script_args=script_args,
dependencies=dependencies,
Expand All @@ -11895,7 +11899,10 @@ def create_scheduled_uv_job(
secrets=secrets,
namespace=namespace,
token=token,
volumes=volumes,
)
if extra_volumes:
volumes = (volumes or []) + extra_volumes
# Create RunCommand args
return self.create_scheduled_job(
image=image,
Expand All @@ -11913,6 +11920,10 @@ def create_scheduled_uv_job(
token=token,
)

# Bucket transport constants for Jobs
_HF_JOBS_ARTIFACTS_MOUNT_PATH = "/artifacts"
_HF_JOBS_ARTIFACTS_BUCKET_NAME = "jobs-artifacts"
Comment on lines +11923 to +11925
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good names !


def _create_uv_command_env_and_secrets(
self,
*,
Expand All @@ -11924,7 +11935,8 @@ def _create_uv_command_env_and_secrets(
secrets: dict[str, Any] | None,
namespace: str | None,
token: bool | str | None,
) -> tuple[list[str], dict[str, Any], dict[str, Any]]:
volumes: list[Volume] | None = None,
) -> tuple[list[str], dict[str, Any], dict[str, Any], list[Volume]]:
env = env or {}
secrets = secrets or {}

Expand Down Expand Up @@ -11957,50 +11969,125 @@ def _create_uv_command_env_and_secrets(
if len(local_files_to_include) == 0:
# Direct URL execution or command - no upload needed
command = ["uv", "run"] + uv_args + [script] + script_args
else:
# Find appropriate remote file names
remote_to_local_file_names: dict[str, str] = {}
for local_file_to_include in local_files_to_include:
local_file_path = Path(local_file_to_include)
# remove spaces for proper xargs parsing
remote_file_path = Path(local_file_path.name.replace(" ", "_"))
if remote_file_path.name in remote_to_local_file_names:
for i in itertools.count():
remote_file_name = remote_file_path.with_stem(remote_file_path.stem + f"({i})").name
if remote_file_name not in remote_to_local_file_names:
remote_to_local_file_names[remote_file_name] = local_file_to_include
break
else:
remote_to_local_file_names[remote_file_path.name] = local_file_to_include
local_to_remote_file_names = {
local_file_to_include: remote_file_name
for remote_file_name, local_file_to_include in remote_to_local_file_names.items()
}
return command, env, secrets, []

# Find appropriate remote file names
remote_to_local_file_names: dict[str, str] = {}
for local_file_to_include in local_files_to_include:
local_file_path = Path(local_file_to_include)
# remove spaces for proper xargs parsing
remote_file_path = Path(local_file_path.name.replace(" ", "_"))
if remote_file_path.name in remote_to_local_file_names:
for i in itertools.count():
remote_file_name = remote_file_path.with_stem(remote_file_path.stem + f"({i})").name
if remote_file_name not in remote_to_local_file_names:
remote_to_local_file_names[remote_file_name] = local_file_to_include
break
else:
remote_to_local_file_names[remote_file_path.name] = local_file_to_include
local_to_remote_file_names = {
local_file_to_include: remote_file_name
for remote_file_name, local_file_to_include in remote_to_local_file_names.items()
}

# Replace local paths with remote paths in command
if script in local_to_remote_file_names:
script = local_to_remote_file_names[script]
script_args = [
local_to_remote_file_names[arg] if arg in local_to_remote_file_names else arg for arg in script_args
]
# Try bucket transport if opted in
use_bucket = constants.HF_JOBS_USE_BUCKET_TRANSPORT
if use_bucket:
# Check if /artifacts mount path is already taken by user volumes
existing_mount_paths = {v.mount_path for v in (volumes or [])}
if self._HF_JOBS_ARTIFACTS_MOUNT_PATH in existing_mount_paths:
logger.info(
f"Mount path {self._HF_JOBS_ARTIFACTS_MOUNT_PATH} already in use, falling back to base64 transport."
)
use_bucket = False
elif not is_xet_available():
logger.info("hf_xet not available, falling back to base64 transport for Jobs.")
use_bucket = False

# Load content to pass as environment variable with format
# file1 base64content1
# file2 base64content2
# ...
env["LOCAL_FILES_ENCODED"] = "\n".join(
remote_file_name + " " + base64.b64encode(Path(local_file_to_include).read_bytes()).decode()
for remote_file_name, local_file_to_include in remote_to_local_file_names.items()
)
# Shell-quote each arg to prevent metacharacters (e.g. '>') from being interpreted by bash
quoted_parts = ["'" + arg.replace("'", r"'\''") + "'" for arg in [*uv_args, script, *script_args]]
command = [
"bash",
"-c",
"""echo $LOCAL_FILES_ENCODED | xargs -n 2 bash -c 'echo "$1" | base64 -d > "$0"' && """
+ f"uv run {' '.join(quoted_parts)}",
]
return command, env, secrets
if use_bucket:
try:
extra_volumes, scripts_prefix = self._upload_scripts_to_bucket(
namespace=namespace,
remote_to_local_file_names=remote_to_local_file_names,
token=token,
)
# Rewrite script and script_args to reference the mounted path
mount_path = self._HF_JOBS_ARTIFACTS_MOUNT_PATH
if script in local_to_remote_file_names:
script = f"{mount_path}/{scripts_prefix}/{local_to_remote_file_names[script]}"
script_args = [
f"{mount_path}/{scripts_prefix}/{local_to_remote_file_names[arg]}"
if arg in local_to_remote_file_names
else arg
for arg in script_args
]
command = ["uv", "run"] + uv_args + [script] + script_args
return command, env, secrets, extra_volumes
except Exception:
logger.warning(
"Failed to upload scripts to bucket, falling back to base64 transport.",
exc_info=True,
)

# Base64 transport path (default)
# Replace local paths with remote paths in command
if script in local_to_remote_file_names:
script = local_to_remote_file_names[script]
script_args = [
local_to_remote_file_names[arg] if arg in local_to_remote_file_names else arg for arg in script_args
]

# Load content to pass as environment variable with format
# file1 base64content1
# file2 base64content2
# ...
env["LOCAL_FILES_ENCODED"] = "\n".join(
remote_file_name + " " + base64.b64encode(Path(local_file_to_include).read_bytes()).decode()
for remote_file_name, local_file_to_include in remote_to_local_file_names.items()
)
# Shell-quote each arg to prevent metacharacters (e.g. '>') from being interpreted by bash
quoted_parts = ["'" + arg.replace("'", r"'\''") + "'" for arg in [*uv_args, script, *script_args]]
command = [
"bash",
"-c",
"""echo $LOCAL_FILES_ENCODED | xargs -n 2 bash -c 'echo "$1" | base64 -d > "$0"' && """
+ f"uv run {' '.join(quoted_parts)}",
]
return command, env, secrets, []

def _upload_scripts_to_bucket(
self,
*,
namespace: str,
remote_to_local_file_names: dict[str, str],
token: bool | str | None,
) -> tuple[list[Volume], str]:
"""Upload script files to a bucket and return volumes to mount plus the scripts prefix.

Creates a bucket ``{namespace}/jobs-artifacts`` (if it doesn't exist) and uploads
each script to ``_scripts/{uuid}/{remote_name}`` inside it. Returns a :class:`Volume`
that mounts the bucket at ``/artifacts`` so the job can access the scripts directly.
"""
import uuid

bucket_id = f"{namespace}/{self._HF_JOBS_ARTIFACTS_BUCKET_NAME}"
subfolder_id = str(uuid.uuid4())
scripts_prefix = f"scripts/{subfolder_id}"

self.create_bucket(bucket_id=bucket_id, exist_ok=True, token=token)

add_ops: list[tuple[str | Path | bytes, str]] = [
(Path(local_path), f"{scripts_prefix}/{remote_name}")
for remote_name, local_path in remote_to_local_file_names.items()
]
self.batch_bucket_files(bucket_id=bucket_id, add=add_ops, token=token)

volume = Volume(
type="bucket",
source=bucket_id,
mount_path=self._HF_JOBS_ARTIFACTS_MOUNT_PATH,
)
return [volume], scripts_prefix

@validate_hf_hub_args
def create_bucket(
Expand Down
Loading
Loading