diff --git a/src/huggingface_hub/constants.py b/src/huggingface_hub/constants.py index 616a7c7557..fb69f0cb03 100644 --- a/src/huggingface_hub/constants.py +++ b/src/huggingface_hub/constants.py @@ -240,6 +240,9 @@ def list_files(repo_id: str): HF_XET_HIGH_PERFORMANCE: bool = _is_true(os.environ.get("HF_XET_HIGH_PERFORMANCE")) +# Opt-in to bucket-based script transport for Jobs (experimental) +HF_JOBS_USE_BUCKET_TRANSPORT: bool = _is_true(os.environ.get("HF_JOBS_USE_BUCKET_TRANSPORT")) + # hf_transfer is not used anymore. Let's warn user is case they set the env variable if _is_true(os.environ.get("HF_HUB_ENABLE_HF_TRANSFER")) and not HF_XET_HIGH_PERFORMANCE: import warnings diff --git a/src/huggingface_hub/hf_api.py b/src/huggingface_hub/hf_api.py index dcc1663026..78e9460640 100644 --- a/src/huggingface_hub/hf_api.py +++ b/src/huggingface_hub/hf_api.py @@ -130,6 +130,7 @@ from .utils._auth import _get_token_from_environment, _get_token_from_file, _get_token_from_google_colab from .utils._deprecation import _deprecate_arguments, _deprecate_method from .utils._http import _httpx_follow_relative_redirects_with_backoff +from .utils._runtime import is_xet_available from .utils._typing import CallableT from .utils._verification import collect_local_files, resolve_local_root, verify_maps from .utils.endpoint_helpers import _is_emission_within_threshold @@ -11467,7 +11468,7 @@ def run_uv_job( secrets = secrets or {} # Build command - command, env, secrets = self._create_uv_command_env_and_secrets( + command, env, secrets, extra_volumes = self._create_uv_command_env_and_secrets( script=script, script_args=script_args, dependencies=dependencies, @@ -11476,7 +11477,10 @@ def run_uv_job( secrets=secrets, namespace=namespace, token=token, + volumes=volumes, ) + if extra_volumes: + volumes = (volumes or []) + extra_volumes # Create RunCommand args return self.run_job( image=image, @@ -11886,7 +11890,7 @@ def create_scheduled_uv_job( """ image = image or "ghcr.io/astral-sh/uv:python3.12-bookworm" # Build command - command, env, secrets = self._create_uv_command_env_and_secrets( + command, env, secrets, extra_volumes = self._create_uv_command_env_and_secrets( script=script, script_args=script_args, dependencies=dependencies, @@ -11895,7 +11899,10 @@ def create_scheduled_uv_job( secrets=secrets, namespace=namespace, token=token, + volumes=volumes, ) + if extra_volumes: + volumes = (volumes or []) + extra_volumes # Create RunCommand args return self.create_scheduled_job( image=image, @@ -11913,6 +11920,10 @@ def create_scheduled_uv_job( token=token, ) + # Bucket transport constants for Jobs + _HF_JOBS_ARTIFACTS_MOUNT_PATH = "/artifacts" + _HF_JOBS_ARTIFACTS_BUCKET_NAME = "jobs-artifacts" + def _create_uv_command_env_and_secrets( self, *, @@ -11924,7 +11935,8 @@ def _create_uv_command_env_and_secrets( secrets: dict[str, Any] | None, namespace: str | None, token: bool | str | None, - ) -> tuple[list[str], dict[str, Any], dict[str, Any]]: + volumes: list[Volume] | None = None, + ) -> tuple[list[str], dict[str, Any], dict[str, Any], list[Volume]]: env = env or {} secrets = secrets or {} @@ -11957,50 +11969,125 @@ def _create_uv_command_env_and_secrets( if len(local_files_to_include) == 0: # Direct URL execution or command - no upload needed command = ["uv", "run"] + uv_args + [script] + script_args - else: - # Find appropriate remote file names - remote_to_local_file_names: dict[str, str] = {} - for local_file_to_include in local_files_to_include: - local_file_path = Path(local_file_to_include) - # remove spaces for proper xargs parsing - remote_file_path = Path(local_file_path.name.replace(" ", "_")) - if remote_file_path.name in remote_to_local_file_names: - for i in itertools.count(): - remote_file_name = remote_file_path.with_stem(remote_file_path.stem + f"({i})").name - if remote_file_name not in remote_to_local_file_names: - remote_to_local_file_names[remote_file_name] = local_file_to_include - break - else: - remote_to_local_file_names[remote_file_path.name] = local_file_to_include - local_to_remote_file_names = { - local_file_to_include: remote_file_name - for remote_file_name, local_file_to_include in remote_to_local_file_names.items() - } + return command, env, secrets, [] + + # Find appropriate remote file names + remote_to_local_file_names: dict[str, str] = {} + for local_file_to_include in local_files_to_include: + local_file_path = Path(local_file_to_include) + # remove spaces for proper xargs parsing + remote_file_path = Path(local_file_path.name.replace(" ", "_")) + if remote_file_path.name in remote_to_local_file_names: + for i in itertools.count(): + remote_file_name = remote_file_path.with_stem(remote_file_path.stem + f"({i})").name + if remote_file_name not in remote_to_local_file_names: + remote_to_local_file_names[remote_file_name] = local_file_to_include + break + else: + remote_to_local_file_names[remote_file_path.name] = local_file_to_include + local_to_remote_file_names = { + local_file_to_include: remote_file_name + for remote_file_name, local_file_to_include in remote_to_local_file_names.items() + } - # Replace local paths with remote paths in command - if script in local_to_remote_file_names: - script = local_to_remote_file_names[script] - script_args = [ - local_to_remote_file_names[arg] if arg in local_to_remote_file_names else arg for arg in script_args - ] + # Try bucket transport if opted in + use_bucket = constants.HF_JOBS_USE_BUCKET_TRANSPORT + if use_bucket: + # Check if /artifacts mount path is already taken by user volumes + existing_mount_paths = {v.mount_path for v in (volumes or [])} + if self._HF_JOBS_ARTIFACTS_MOUNT_PATH in existing_mount_paths: + logger.info( + f"Mount path {self._HF_JOBS_ARTIFACTS_MOUNT_PATH} already in use, falling back to base64 transport." + ) + use_bucket = False + elif not is_xet_available(): + logger.info("hf_xet not available, falling back to base64 transport for Jobs.") + use_bucket = False - # Load content to pass as environment variable with format - # file1 base64content1 - # file2 base64content2 - # ... - env["LOCAL_FILES_ENCODED"] = "\n".join( - remote_file_name + " " + base64.b64encode(Path(local_file_to_include).read_bytes()).decode() - for remote_file_name, local_file_to_include in remote_to_local_file_names.items() - ) - # Shell-quote each arg to prevent metacharacters (e.g. '>') from being interpreted by bash - quoted_parts = ["'" + arg.replace("'", r"'\''") + "'" for arg in [*uv_args, script, *script_args]] - command = [ - "bash", - "-c", - """echo $LOCAL_FILES_ENCODED | xargs -n 2 bash -c 'echo "$1" | base64 -d > "$0"' && """ - + f"uv run {' '.join(quoted_parts)}", - ] - return command, env, secrets + if use_bucket: + try: + extra_volumes, scripts_prefix = self._upload_scripts_to_bucket( + namespace=namespace, + remote_to_local_file_names=remote_to_local_file_names, + token=token, + ) + # Rewrite script and script_args to reference the mounted path + mount_path = self._HF_JOBS_ARTIFACTS_MOUNT_PATH + if script in local_to_remote_file_names: + script = f"{mount_path}/{scripts_prefix}/{local_to_remote_file_names[script]}" + script_args = [ + f"{mount_path}/{scripts_prefix}/{local_to_remote_file_names[arg]}" + if arg in local_to_remote_file_names + else arg + for arg in script_args + ] + command = ["uv", "run"] + uv_args + [script] + script_args + return command, env, secrets, extra_volumes + except Exception: + logger.warning( + "Failed to upload scripts to bucket, falling back to base64 transport.", + exc_info=True, + ) + + # Base64 transport path (default) + # Replace local paths with remote paths in command + if script in local_to_remote_file_names: + script = local_to_remote_file_names[script] + script_args = [ + local_to_remote_file_names[arg] if arg in local_to_remote_file_names else arg for arg in script_args + ] + + # Load content to pass as environment variable with format + # file1 base64content1 + # file2 base64content2 + # ... + env["LOCAL_FILES_ENCODED"] = "\n".join( + remote_file_name + " " + base64.b64encode(Path(local_file_to_include).read_bytes()).decode() + for remote_file_name, local_file_to_include in remote_to_local_file_names.items() + ) + # Shell-quote each arg to prevent metacharacters (e.g. '>') from being interpreted by bash + quoted_parts = ["'" + arg.replace("'", r"'\''") + "'" for arg in [*uv_args, script, *script_args]] + command = [ + "bash", + "-c", + """echo $LOCAL_FILES_ENCODED | xargs -n 2 bash -c 'echo "$1" | base64 -d > "$0"' && """ + + f"uv run {' '.join(quoted_parts)}", + ] + return command, env, secrets, [] + + def _upload_scripts_to_bucket( + self, + *, + namespace: str, + remote_to_local_file_names: dict[str, str], + token: bool | str | None, + ) -> tuple[list[Volume], str]: + """Upload script files to a bucket and return volumes to mount plus the scripts prefix. + + Creates a bucket ``{namespace}/jobs-artifacts`` (if it doesn't exist) and uploads + each script to ``scripts/{uuid}/{remote_name}`` inside it. Returns a :class:`Volume` + that mounts the bucket at ``/artifacts`` so the job can access the scripts directly. + """ + import uuid + + bucket_id = f"{namespace}/{self._HF_JOBS_ARTIFACTS_BUCKET_NAME}" + subfolder_id = str(uuid.uuid4()) + scripts_prefix = f"scripts/{subfolder_id}" + + self.create_bucket(bucket_id=bucket_id, exist_ok=True, token=token) + + add_ops: list[tuple[str | Path | bytes, str]] = [ + (Path(local_path), f"{scripts_prefix}/{remote_name}") + for remote_name, local_path in remote_to_local_file_names.items() + ] + self.batch_bucket_files(bucket_id=bucket_id, add=add_ops, token=token) + + volume = Volume( + type="bucket", + source=bucket_id, + mount_path=self._HF_JOBS_ARTIFACTS_MOUNT_PATH, + ) + return [volume], scripts_prefix @validate_hf_hub_args def create_bucket( diff --git a/tests/test_cli.py b/tests/test_cli.py index 25b3582892..fb92993eec 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -11,6 +11,7 @@ import typer from typer.testing import CliRunner +from huggingface_hub import constants from huggingface_hub._dataset_viewer import DatasetParquetEntry from huggingface_hub._jobs_api import _create_job_spec from huggingface_hub._space_api import Volume @@ -2884,7 +2885,7 @@ def test_dependencies_with_version_specifiers_are_quoted(self, tmp_path: Path) - script_path.write_text("print('hello')") api = HfApi() - command, env, secrets = api._create_uv_command_env_and_secrets( + command, env, secrets, extra_volumes = api._create_uv_command_env_and_secrets( script=str(script_path), script_args=None, dependencies=["torch>=2.1", "numpy"], @@ -2903,6 +2904,242 @@ def test_dependencies_with_version_specifiers_are_quoted(self, tmp_path: Path) - assert "'numpy'" in bash_script # The script name must also be quoted assert "'train.py'" in bash_script + # No extra volumes added in default base64 path + assert extra_volumes == [] + + +class TestBucketTransport: + """Test bucket-based script transport for Jobs (experimental, opt-in via HF_JOBS_USE_BUCKET_TRANSPORT).""" + + def test_bucket_transport_uploads_and_returns_volume( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """When bucket transport is enabled, scripts are uploaded to a bucket and a Volume is returned.""" + from huggingface_hub.hf_api import HfApi + + monkeypatch.setattr(constants, "HF_JOBS_USE_BUCKET_TRANSPORT", True) + + script_path = tmp_path / "train.py" + script_path.write_text("print('hello')") + + api = HfApi() + with ( + patch.object(api, "create_bucket") as mock_create_bucket, + patch.object(api, "batch_bucket_files") as mock_batch, + patch("huggingface_hub.hf_api.is_xet_available", return_value=True), + ): + command, env, secrets, extra_volumes = api._create_uv_command_env_and_secrets( + script=str(script_path), + script_args=None, + dependencies=["torch>=2.1"], + python=None, + env=None, + secrets=None, + namespace="test-user", + token=None, + ) + + # Bucket was created with correct ID + mock_create_bucket.assert_called_once_with(bucket_id="test-user/jobs-artifacts", exist_ok=True, token=None) + + # Files were uploaded + mock_batch.assert_called_once() + call_kwargs = mock_batch.call_args + assert call_kwargs.kwargs["bucket_id"] == "test-user/jobs-artifacts" + add_ops = call_kwargs.kwargs["add"] + assert len(add_ops) == 1 + # Uploaded file path starts with scripts/ + assert add_ops[0][1].startswith("scripts/") + assert add_ops[0][1].endswith("/train.py") + + # Command is plain uv run (no bash -c wrapper) + assert command[0] == "uv" + assert command[1] == "run" + assert "--with" in command + assert "torch>=2.1" in command + # Script path references the mount + script_arg = [arg for arg in command if "train.py" in arg][0] + assert script_arg.startswith("/artifacts/scripts/") + + # No LOCAL_FILES_ENCODED in env + assert "LOCAL_FILES_ENCODED" not in env + + # Extra volume returned + assert len(extra_volumes) == 1 + vol = extra_volumes[0] + assert vol.type == "bucket" + assert vol.source == "test-user/jobs-artifacts" + assert vol.mount_path == "/artifacts" + + def test_bucket_transport_falls_back_on_failure( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture + ) -> None: + """When bucket upload fails, falls back to base64 transport.""" + from huggingface_hub.hf_api import HfApi + + monkeypatch.setattr(constants, "HF_JOBS_USE_BUCKET_TRANSPORT", True) + + script_path = tmp_path / "train.py" + script_path.write_text("print('hello')") + + api = HfApi() + with ( + patch.object(api, "create_bucket", side_effect=Exception("network error")), + patch("huggingface_hub.hf_api.is_xet_available", return_value=True), + ): + command, env, secrets, extra_volumes = api._create_uv_command_env_and_secrets( + script=str(script_path), + script_args=None, + dependencies=None, + python=None, + env=None, + secrets=None, + namespace="test-user", + token=None, + ) + + # Fell back to base64 + assert command[0] == "bash" + assert command[1] == "-c" + assert "LOCAL_FILES_ENCODED" in env + assert extra_volumes == [] + + # Warning was logged + assert any("falling back to base64" in r.message.lower() for r in caplog.records if r.levelname == "WARNING") + + def test_bucket_transport_skipped_when_xet_unavailable( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """When hf_xet is not available, falls back to base64 without attempting bucket upload.""" + from huggingface_hub.hf_api import HfApi + + monkeypatch.setattr(constants, "HF_JOBS_USE_BUCKET_TRANSPORT", True) + + script_path = tmp_path / "train.py" + script_path.write_text("print('hello')") + + api = HfApi() + with ( + patch.object(api, "create_bucket") as mock_create_bucket, + patch("huggingface_hub.hf_api.is_xet_available", return_value=False), + ): + command, env, secrets, extra_volumes = api._create_uv_command_env_and_secrets( + script=str(script_path), + script_args=None, + dependencies=None, + python=None, + env=None, + secrets=None, + namespace="test-user", + token=None, + ) + + # Never attempted bucket creation + mock_create_bucket.assert_not_called() + + # Fell back to base64 + assert command[0] == "bash" + assert "LOCAL_FILES_ENCODED" in env + assert extra_volumes == [] + + def test_bucket_transport_skipped_when_mount_path_taken( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """When /artifacts mount is already used by a user volume, falls back to base64.""" + from huggingface_hub.hf_api import HfApi + + monkeypatch.setattr(constants, "HF_JOBS_USE_BUCKET_TRANSPORT", True) + + script_path = tmp_path / "train.py" + script_path.write_text("print('hello')") + + existing_volume = Volume(type="bucket", source="user/other-bucket", mount_path="/artifacts") + + api = HfApi() + with ( + patch.object(api, "create_bucket") as mock_create_bucket, + patch("huggingface_hub.hf_api.is_xet_available", return_value=True), + ): + command, env, secrets, extra_volumes = api._create_uv_command_env_and_secrets( + script=str(script_path), + script_args=None, + dependencies=None, + python=None, + env=None, + secrets=None, + namespace="test-user", + token=None, + volumes=[existing_volume], + ) + + # Never attempted bucket creation + mock_create_bucket.assert_not_called() + + # Fell back to base64 + assert command[0] == "bash" + assert extra_volumes == [] + + def test_bucket_transport_not_used_by_default(self, tmp_path: Path) -> None: + """Without HF_JOBS_USE_BUCKET_TRANSPORT, base64 transport is used.""" + from huggingface_hub.hf_api import HfApi + + script_path = tmp_path / "train.py" + script_path.write_text("print('hello')") + + api = HfApi() + command, env, secrets, extra_volumes = api._create_uv_command_env_and_secrets( + script=str(script_path), + script_args=None, + dependencies=None, + python=None, + env=None, + secrets=None, + namespace="test-user", + token=None, + ) + + assert command[0] == "bash" + assert "LOCAL_FILES_ENCODED" in env + assert extra_volumes == [] + + def test_bucket_transport_with_multiple_files(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Multiple local files are all uploaded to the bucket under the same scripts prefix.""" + from huggingface_hub.hf_api import HfApi + + monkeypatch.setattr(constants, "HF_JOBS_USE_BUCKET_TRANSPORT", True) + + script_path = tmp_path / "train.py" + script_path.write_text("import config") + config_path = tmp_path / "config.yaml" + config_path.write_text("lr: 0.001") + + api = HfApi() + with ( + patch.object(api, "create_bucket"), + patch.object(api, "batch_bucket_files") as mock_batch, + patch("huggingface_hub.hf_api.is_xet_available", return_value=True), + ): + command, env, secrets, extra_volumes = api._create_uv_command_env_and_secrets( + script=str(script_path), + script_args=[str(config_path)], + dependencies=None, + python=None, + env=None, + secrets=None, + namespace="test-user", + token=None, + ) + + # Both files uploaded + add_ops = mock_batch.call_args.kwargs["add"] + assert len(add_ops) == 2 + uploaded_names = {op[1].split("/")[-1] for op in add_ops} + assert uploaded_names == {"train.py", "config.yaml"} + + # Both command args reference the mount + assert command[0] == "uv" + mounted_args = [arg for arg in command if arg.startswith("/artifacts/")] + assert len(mounted_args) == 2 class TestParseNamespaceFromJobId: