diff --git a/docs/source/en/guides/buckets.md b/docs/source/en/guides/buckets.md index 24c4870bfb..af2d89689e 100644 --- a/docs/source/en/guides/buckets.md +++ b/docs/source/en/guides/buckets.md @@ -348,6 +348,21 @@ Use [`batch_bucket_files`] to upload files to a bucket. You can upload from loca ... ) ``` +You can also copy xet files from another bucket or repository using the `copy` parameter. This is a server-side operation — no data is downloaded or re-uploaded: + +```python +# Copy files by xet hash (source_repo_type, source_repo_id, xet_hash, destination) +>>> batch_bucket_files( +... "username/my-bucket", +... copy=[ +... ("bucket", "username/source-bucket", "", "models/model.safetensors"), +... ("model", "username/my-model", "", "models/config.safetensors"), +... ], +... ) +``` + +Xet hashes can be retrieved using `list_repo_tree`. + You can also delete files while uploading others. ```python @@ -360,7 +375,7 @@ You can also delete files while uploading others. ``` > [!WARNING] -> Calls to [`batch_bucket_files`] are non-transactional. If an error occurs during the process, some files may have been uploaded or deleted while others haven't. +> Calls to [`batch_bucket_files`] are non-transactional. If an error occurs during the process, some files may have been uploaded, copied, or deleted while others haven't. ### Upload a single file with the CLI @@ -470,6 +485,42 @@ Use `hf buckets sync` to download all files from a bucket to a local directory: See the [Sync directories](#sync-directories) section below for the full set of sync options. +## Copy files to Bucket + +Use [`copy_files`] to copy files already hosted on the Hub to a Bucket: + +```py +>>> from huggingface_hub import copy_files + +# Bucket to bucket (same or different bucket) +>>> copy_files( +... "hf://buckets/username/source-bucket/checkpoints/model.safetensors", +... "hf://buckets/username/destination-bucket/archive/model.safetensors", +... ) + +# Repo to bucket +>>> copy_files( +... "hf://datasets/username/my-dataset/processed/", +... "hf://buckets/username/my-bucket/datasets/processed/", +... ) +``` + +The same is available from the CLI: + +```bash +# Bucket to bucket +>>> hf buckets cp hf://buckets/username/source-bucket/logs/ hf://buckets/username/destination-bucket/logs/ + +# Repo to bucket +>>> hf buckets cp hf://username/my-model/config.json hf://buckets/username/my-bucket/models/config.json +``` + +Notes: + +- Bucket-to-repo copy is not yet supported. +- Files tracked with Xet (in buckets or repos) are copied server-side by hash — no data is downloaded or re-uploaded. +- Small text files not tracked with Xet on repo sources are downloaded and re-uploaded to the destination bucket. + ## Sync directories The `hf buckets sync` command (and its API equivalent [`sync_bucket`]) is the most powerful way to transfer files between a local directory and a bucket. It compares source and destination, and only transfers files that have changed. diff --git a/docs/source/en/guides/cli.md b/docs/source/en/guides/cli.md index 84d0dc65eb..1f21a5f65a 100644 --- a/docs/source/en/guides/cli.md +++ b/docs/source/en/guides/cli.md @@ -673,7 +673,7 @@ To filter by prefix, append the prefix to the bucket path: ### Copy single files -Use `hf buckets cp` to copy individual files to and from a bucket. Bucket paths use the `hf://buckets/` prefix. +Use `hf buckets cp` to copy individual files to and from a bucket, or to copy any file hosted on the Hub to a Bucket. To upload a file: @@ -703,6 +703,20 @@ You can also stream to stdout or from stdin using `-`: >>> echo "hello" | hf buckets cp - hf://buckets/username/my-bucket/hello.txt ``` +To copy from a repo or a bucket on the Hub: + +```bash +# Bucket to bucket +>>> hf buckets cp hf://buckets/username/source-bucket/logs/ hf://buckets/username/archive-bucket/logs/ + +# Repo to bucket +>>> hf buckets cp hf://datasets/username/my-dataset/data/train/ hf://buckets/username/my-bucket/datasets/train/ +``` + +Notes: + +- Bucket-to-repo copy is not supported. + ### Sync directories Use `hf buckets sync` to synchronize directories between your local machine and a bucket. It compares source and destination and transfers only changed files. diff --git a/docs/source/en/package_reference/cli.md b/docs/source/en/package_reference/cli.md index 25c90d7c7d..383e99c45f 100644 --- a/docs/source/en/package_reference/cli.md +++ b/docs/source/en/package_reference/cli.md @@ -208,7 +208,7 @@ $ hf buckets [OPTIONS] COMMAND [ARGS]... **Commands**: -* `cp`: Copy a single file to or from a bucket. +* `cp`: Copy files to or from buckets. * `create`: Create a new bucket. * `delete`: Delete a bucket. * `info`: Get info about a bucket. @@ -219,7 +219,7 @@ $ hf buckets [OPTIONS] COMMAND [ARGS]... ### `hf buckets cp` -Copy a single file to or from a bucket. +Copy files to or from buckets. **Usage**: @@ -229,8 +229,8 @@ $ hf buckets cp [OPTIONS] SRC [DST] **Arguments**: -* `SRC`: Source: local file, hf://buckets/... path, or - for stdin [required] -* `[DST]`: Destination: local path, hf://buckets/... path, or - for stdout +* `SRC`: Source: local file, HF handle (hf://...), or - for stdin [required] +* `[DST]`: Destination: local path, HF handle (hf://...), or - for stdout **Options**: @@ -247,6 +247,8 @@ Examples $ hf buckets cp my-config.json hf://buckets/user/my-bucket/logs/ $ hf buckets cp my-config.json hf://buckets/user/my-bucket/remote-config.json $ hf buckets cp - hf://buckets/user/my-bucket/config.json + $ hf buckets cp hf://buckets/user/my-bucket/logs/ hf://buckets/user/archive-bucket/logs/ + $ hf buckets cp hf://datasets/user/my-dataset/processed/ hf://buckets/user/my-bucket/dataset/processed/ Learn more Use `hf --help` for more information about a command. diff --git a/src/huggingface_hub/__init__.py b/src/huggingface_hub/__init__.py index 3cbf2812d7..e4cfa23d6a 100644 --- a/src/huggingface_hub/__init__.py +++ b/src/huggingface_hub/__init__.py @@ -203,6 +203,7 @@ "cancel_job", "change_discussion_status", "comment_discussion", + "copy_files", "create_branch", "create_bucket", "create_collection", @@ -903,6 +904,7 @@ "check_cli_update", "close_session", "comment_discussion", + "copy_files", "create_branch", "create_bucket", "create_collection", @@ -1327,6 +1329,7 @@ def __dir__(): cancel_job, # noqa: F401 change_discussion_status, # noqa: F401 comment_discussion, # noqa: F401 + copy_files, # noqa: F401 create_branch, # noqa: F401 create_bucket, # noqa: F401 create_collection, # noqa: F401 diff --git a/src/huggingface_hub/_buckets.py b/src/huggingface_hub/_buckets.py index 2be4546a8e..204c616f82 100644 --- a/src/huggingface_hub/_buckets.py +++ b/src/huggingface_hub/_buckets.py @@ -119,6 +119,21 @@ def __post_init__(self) -> None: ) +@dataclass +class _BucketCopyFile: + destination: str + xet_hash: str + source_repo_type: str # "model", "dataset", "space", "bucket" + source_repo_id: str + size: int | None = field(default=None) + mtime: int = field(init=False) + content_type: str | None = field(init=False) + + def __post_init__(self) -> None: + self.content_type = mimetypes.guess_type(self.destination)[0] + self.mtime = int(time.time() * 1000) + + @dataclass class _BucketDeleteFile: path: str diff --git a/src/huggingface_hub/cli/buckets.py b/src/huggingface_hub/cli/buckets.py index 5c411475c4..14310069b0 100644 --- a/src/huggingface_hub/cli/buckets.py +++ b/src/huggingface_hub/cli/buckets.py @@ -57,6 +57,10 @@ buckets_cli = typer_factory(help="Commands to interact with buckets.") +def _is_hf_handle(path: str) -> bool: + return path.startswith("hf://") + + def _parse_bucket_argument(argument: str) -> tuple[str, str]: """Parse a bucket argument accepting both 'namespace/name(/prefix)' and 'hf://buckets/namespace/name(/prefix)'. @@ -928,28 +932,44 @@ def sync( "hf buckets cp my-config.json hf://buckets/user/my-bucket/logs/", "hf buckets cp my-config.json hf://buckets/user/my-bucket/remote-config.json", "hf buckets cp - hf://buckets/user/my-bucket/config.json", + "hf buckets cp hf://buckets/user/my-bucket/logs/ hf://buckets/user/archive-bucket/logs/", + "hf buckets cp hf://datasets/user/my-dataset/processed/ hf://buckets/user/my-bucket/dataset/processed/", ], ) def cp( - src: Annotated[str, typer.Argument(help="Source: local file, hf://buckets/... path, or - for stdin")], + src: Annotated[str, typer.Argument(help="Source: local file, HF handle (hf://...), or - for stdin")], dst: Annotated[ - str | None, typer.Argument(help="Destination: local path, hf://buckets/... path, or - for stdout") + str | None, typer.Argument(help="Destination: local path, HF handle (hf://...), or - for stdout") ] = None, quiet: QuietOpt = False, token: TokenOpt = None, ) -> None: - """Copy a single file to or from a bucket.""" + """Copy files to or from buckets.""" api = get_hf_api(token=token) + src_is_hf = _is_hf_handle(src) + dst_is_hf = dst is not None and _is_hf_handle(dst) src_is_bucket = _is_bucket_path(src) dst_is_bucket = dst is not None and _is_bucket_path(dst) src_is_stdin = src == "-" dst_is_stdout = dst == "-" - # --- Validation --- - if src_is_bucket and dst_is_bucket: - raise typer.BadParameter("Remote-to-remote copy not supported.") + # Remote to remote copy + if src_is_hf and dst_is_hf: + if quiet: + disable_progress_bars() + try: + api.copy_files(src, dst) # type: ignore + finally: + if quiet: + enable_progress_bars() + + if not quiet: + print(f"Copied: {src} -> {dst}") + return + # Local to remote copy + # --- Validation --- if not src_is_bucket and not dst_is_bucket and not src_is_stdin: if dst is None: raise typer.BadParameter("Missing destination. Provide a bucket path as DST.") diff --git a/src/huggingface_hub/hf_api.py b/src/huggingface_hub/hf_api.py index e5013d0746..7496c656dd 100644 --- a/src/huggingface_hub/hf_api.py +++ b/src/huggingface_hub/hf_api.py @@ -29,15 +29,8 @@ from functools import wraps from itertools import islice from pathlib import Path -from typing import ( - TYPE_CHECKING, - Any, - BinaryIO, - Literal, - TypeVar, - overload, -) -from urllib.parse import quote +from typing import TYPE_CHECKING, Any, BinaryIO, Literal, TypeVar, overload +from urllib.parse import quote, unquote import httpcore import httpx @@ -59,6 +52,7 @@ BucketUrl, SyncPlan, _BucketAddFile, + _BucketCopyFile, _BucketDeleteFile, _split_bucket_id_and_prefix, sync_bucket_internal, @@ -244,6 +238,17 @@ _BUCKET_PATHS_INFO_BATCH_SIZE = 1000 _BUCKET_BATCH_ADD_CHUNK_SIZE = 100 _BUCKET_BATCH_DELETE_CHUNK_SIZE = 1000 + +# Regex used to match special revisions with "/" in them (see #1710) +SPECIAL_REFS_REVISION_REGEX = re.compile( + r""" + (^refs\/convert\/\w+) # `refs/convert/parquet` revisions + | + (^refs\/pr\/\d+) # PR revisions + """, + re.VERBOSE, +) + logger = logging.get_logger(__name__) @@ -400,6 +405,63 @@ def repo_type_and_id_from_hf_id(hf_id: str, hub_url: str | None = None) -> tuple return repo_type, namespace, repo_id +def _parse_hf_copy_handle(hf_handle: str) -> _BucketCopyHandle | _RepoCopyHandle: + # TODO: Harmonize hf:// parsing. See https://github.com/huggingface/huggingface_hub/issues/3971 + if not hf_handle.startswith("hf://"): + raise ValueError(f"Invalid HF handle: '{hf_handle}'. Expected a path starting with 'hf://'.") + + path = hf_handle.removeprefix("hf://") + if path.startswith("buckets/"): + bucket_id, bucket_path = _split_bucket_id_and_prefix(path.removeprefix("buckets/")) + return _BucketCopyHandle( + bucket_id=bucket_id, + path=bucket_path.strip("/"), + ) + + path = path.strip("/") + if path == "": + raise ValueError(f"Invalid HF handle: '{hf_handle}'.") + + parts = path.split("/") + repo_type: str = constants.REPO_TYPE_MODEL + if parts[0] in constants.REPO_TYPES_MAPPING: + repo_type = constants.REPO_TYPES_MAPPING[parts[0]] + parts = parts[1:] + + if len(parts) < 2: + raise ValueError( + f"Invalid repo HF handle: '{hf_handle}'. Expected format 'hf:////path' or with explicit repo type prefix." + ) + + namespace, repo_name_with_revision = parts[0], parts[1] + remaining_parts = parts[2:] + revision: str | None = None + if "@" in repo_name_with_revision: + repo_name, revision = repo_name_with_revision.split("@", 1) + else: + repo_name = repo_name_with_revision + + if revision is None: + revision = constants.DEFAULT_REVISION + else: + revision = unquote(revision) + if remaining_parts: + maybe_special_ref = f"{revision}/{remaining_parts[0]}" + match = SPECIAL_REFS_REVISION_REGEX.match(maybe_special_ref) + if match is not None: + revision = match.group() + suffix = maybe_special_ref.removeprefix(revision).lstrip("/") + remaining_parts = ([suffix] if suffix else []) + remaining_parts[1:] + + repo_path = "/".join(remaining_parts).strip("/") + return _RepoCopyHandle( + repo_type=repo_type, # type: ignore + repo_id=f"{namespace}/{repo_name}", + revision=revision, + path=repo_path, + ) + + @dataclass class LastCommitInfo(dict): oid: str @@ -662,6 +724,20 @@ def __repr__(self) -> str: return f"RepoUrl('{self}', endpoint='{self.endpoint}', repo_type='{self.repo_type}', repo_id='{self.repo_id}')" +@dataclass(frozen=True) +class _BucketCopyHandle: + bucket_id: str + path: str + + +@dataclass(frozen=True) +class _RepoCopyHandle: + repo_type: Literal["model", "dataset", "space"] + repo_id: str + revision: str + path: str + + @dataclass class RepoSibling: """ @@ -12406,18 +12482,226 @@ def get_bucket_paths_info( for path_info in response.json(): yield BucketFile(**path_info) + @validate_hf_hub_args + def copy_files(self, source: str, destination: str, *, token: str | bool | None = None) -> None: + """Copy files between locations on the Hub. + + Copy files from a bucket or repository (model, dataset, space) to a bucket. Both individual files and + entire folders are supported. + + Currently, only bucket destinations are supported. Copying to a repository is not supported. + + Args: + source (`str`): + Source location as an `hf://` handle. Can be a bucket path (e.g. `"hf://buckets/my-bucket/path/to/file"`) + or a repo path (e.g. `"hf://username/my-model/weights.bin"`, `"hf://datasets/username/my-dataset/data/"`). + destination (`str`): + Destination location as an `hf://` handle pointing to a bucket + (e.g. `"hf://buckets/my-bucket/target/path"`). + token (`bool` or `str`, *optional*): + A valid user access token (string). Defaults to the locally saved + token, which is the recommended method for authentication (see + https://huggingface.co/docs/huggingface_hub/quick-start#authentication). + To disable authentication, pass `False`. + + Raises: + [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError): + If the destination is not a bucket or if the source/destination handles are invalid. + + Example: + ```python + >>> from huggingface_hub import copy_files + + # Copy a single file between buckets + >>> copy_files("hf://buckets/my-bucket/data.bin", "hf://buckets/other-bucket/data.bin") + + # Copy a folder from a bucket to another bucket + >>> copy_files("hf://buckets/my-bucket/models/", "hf://buckets/other-bucket/backup/") + + # Copy a file from a model repo to a bucket + >>> copy_files("hf://username/my-model/model.safetensors", "hf://buckets/my-bucket/") + + # Copy an entire dataset to a bucket + >>> copy_files("hf://datasets/username/my-dataset/", "hf://buckets/my-bucket/datasets/") + ``` + """ + source_handle = _parse_hf_copy_handle(source) + destination_handle = _parse_hf_copy_handle(destination) + + if isinstance(destination_handle, _RepoCopyHandle): + raise ValueError("Bucket-to-repo and repo-to-repo copy are not supported. Destination must be a bucket.") + + destination_bucket_id = destination_handle.bucket_id + destination_path = destination_handle.path + if destination_path == "": + destination_is_directory = True + else: + # Check if destination path is an existing file or a directory in the bucket + dest_path_info = list(self.get_bucket_paths_info(destination_bucket_id, [destination_path], token=token)) + if dest_path_info: + destination_is_directory = False + else: + destination_is_directory = ( + next( + iter( + self.list_bucket_tree( + destination_bucket_id, prefix=destination_path, recursive=False, token=token + ) + ), + None, + ) + is not None + ) + + all_adds: list[tuple[str, str]] = [] + all_copies: list[_BucketCopyFile] = [] + + def _resolve_target_path(src_file_path: str, src_root_path: str | None, is_single_file: bool) -> str: + basename = src_file_path.rsplit("/", 1)[-1] + if is_single_file: + if destination_path == "": + return basename + if destination_is_directory: + return f"{destination_path.rstrip('/')}/{basename}" + return destination_path + + if src_root_path is None: + rel_path = src_file_path + elif src_file_path.startswith(src_root_path + "/"): + rel_path = src_file_path[len(src_root_path) + 1 :] + elif src_file_path == src_root_path: + rel_path = src_file_path.rsplit("/", 1)[-1] + else: + raise ValueError(f"Unexpected source path while copying folder: '{src_file_path}'.") + + if rel_path == "": + raise ValueError("Cannot copy an empty relative path.") + if destination_path == "": + return rel_path + return f"{destination_path.rstrip('/')}/{rel_path}" + + def _build_copy_op( + target_path: str, xet_hash: str, size: int, source_repo_type: str, source_repo_id: str + ) -> _BucketCopyFile: + """Server-side copy by xet hash — no data transfer needed.""" + return _BucketCopyFile( + destination=target_path, + xet_hash=xet_hash, + source_repo_type=source_repo_type, + source_repo_id=source_repo_id, + size=size, + ) + + def _download_from_repo(file_path: str) -> str: + """Download a repo file to local cache, return the cache path.""" + return self.hf_hub_download( + repo_id=source_handle.repo_id, # type: ignore + repo_type=source_handle.repo_type, # type: ignore + filename=file_path, + revision=source_handle.revision, # type: ignore + token=token, + ) + + def _add_repo_file(file: RepoFile, target_path: str) -> None: + """Queue a repo file: copy-by-hash if xet-backed, otherwise download first.""" + if file.xet_hash is not None: + all_copies.append( + _build_copy_op( + target_path, + file.xet_hash, + file.size, + source_handle.repo_type, # type: ignore + source_handle.repo_id, # type: ignore + ) + ) + else: + # TODO: optimize this to download in parallel (low prio) + all_adds.append((_download_from_repo(file.path), target_path)) + + # === Source is a bucket: always hash-based copy (no download needed) === + if isinstance(source_handle, _BucketCopyHandle): + source_path = source_handle.path + source_path_info = list(self.get_bucket_paths_info(source_handle.bucket_id, [source_path], token=token)) + + if source_path_info: + # Source path matched a single file + source_file = source_path_info[0] + target_path = _resolve_target_path(source_file.path, None, is_single_file=True) + all_copies.append( + _build_copy_op( + target_path, source_file.xet_hash, source_file.size, "bucket", source_handle.bucket_id + ) + ) + else: + # Source path is a folder (or prefix) — list and copy all matching files + destination_is_directory = True + for item in self.list_bucket_tree( + source_handle.bucket_id, prefix=source_path or None, recursive=True, token=token + ): + if not isinstance(item, BucketFile): + continue + if source_path and not (item.path == source_path or item.path.startswith(source_path + "/")): + continue + target_path = _resolve_target_path(item.path, source_path or None, is_single_file=False) + all_copies.append( + _build_copy_op(target_path, item.xet_hash, item.size, "bucket", source_handle.bucket_id) + ) + + # === Source is a repo: copy-by-hash if xet-backed, download otherwise === + else: + source_path = source_handle.path + source_repo_path_info: list[RepoFile | RepoFolder] = [] + if source_path != "": + source_repo_path_info = self.get_paths_info( + repo_id=source_handle.repo_id, + paths=[source_path], + repo_type=source_handle.repo_type, + revision=source_handle.revision, + token=token, + ) + + if len(source_repo_path_info) == 1 and isinstance(source_repo_path_info[0], RepoFile): + # Source path matched a single file + target_path = _resolve_target_path(source_repo_path_info[0].path, None, is_single_file=True) + _add_repo_file(source_repo_path_info[0], target_path) + else: + # Source path is a folder — list and copy all files recursively + destination_is_directory = True + for repo_item in self.list_repo_tree( + repo_id=source_handle.repo_id, + path_in_repo=source_path, + recursive=True, + repo_type=source_handle.repo_type, + revision=source_handle.revision, + token=token, + ): + if not isinstance(repo_item, RepoFile): + continue + target_path = _resolve_target_path(repo_item.path, source_path or None, is_single_file=False) + _add_repo_file(repo_item, target_path) + + # Send copies first (no upload needed), then adds (may need upload) + if all_copies: + for copy_chunk in chunk_iterable(all_copies, chunk_size=_BUCKET_BATCH_ADD_CHUNK_SIZE): + self._batch_bucket_files(destination_bucket_id, copy=list(copy_chunk), token=token) + if all_adds: + for add_chunk in chunk_iterable(all_adds, chunk_size=_BUCKET_BATCH_ADD_CHUNK_SIZE): + self._batch_bucket_files(destination_bucket_id, add=list(add_chunk), token=token) + @validate_hf_hub_args def batch_bucket_files( self, bucket_id: str, *, add: list[tuple[str | Path | bytes, str]] | None = None, + copy: list[tuple[str, str, str, str]] | None = None, delete: list[str] | None = None, token: str | bool | None = None, ): - """Add and/or delete files in a bucket. + """Add, copy, and/or delete files in a bucket. - This is a non-transactional operation. If an error occurs in the process, some files may have been uploaded or deleted, + This is a non-transactional operation. If an error occurs in the process, some files may have been uploaded, + copied, or deleted while others haven't. Args: bucket_id (`str`): @@ -12425,6 +12709,15 @@ def batch_bucket_files( add (`list` of `tuple`, *optional*): Files to upload. Each element is a `(source, destination)` tuple where `source` is a path to a local file (`str` or `Path`) or raw `bytes` content, and `destination` is the path in the bucket. + copy (`list` of `tuple`, *optional*): + Files to copy by xet hash. Each element is a `(source_repo_type, source_repo_id, xet_hash, + destination)` tuple where: + - `source_repo_type` is the type of the source repository: `"model"`, `"dataset"`, `"space"`, or + `"bucket"`. + - `source_repo_id` is the ID of the source repository or bucket (e.g. `"username/my-model"`). + - `xet_hash` is the xet hash of the file to copy. + - `destination` is the destination path in the bucket. + This is a server-side operation — no data is downloaded or re-uploaded. delete (`list` of `str`, *optional*): Paths of files to delete from the bucket. token (`bool` or `str`, *optional*): @@ -12446,6 +12739,15 @@ def batch_bucket_files( ... ], ... ) + # Copy xet files from another bucket or repo (server-side, no data transfer) + >>> batch_bucket_files( + ... "username/my-bucket", + ... copy=[ + ... ("bucket", "username/source-bucket", "", "models/model.safetensors"), + ... ("model", "username/my-model", "", "models/config.safetensors"), + ... ], + ... ) + # Delete files >>> batch_bucket_files("username/my-bucket", delete=["old-model.bin"]) @@ -12458,14 +12760,15 @@ def batch_bucket_files( ``` """ add = add or [] + copy = copy or [] delete = delete or [] # Small batch: do everything in one call - if len(add) + len(delete) <= _BUCKET_BATCH_ADD_CHUNK_SIZE: - self._batch_bucket_files(bucket_id, add=add or None, delete=delete or None, token=token) + if len(add) + len(copy) + len(delete) <= _BUCKET_BATCH_ADD_CHUNK_SIZE: + self._batch_bucket_files(bucket_id, add=add, copy=copy, delete=delete, token=token) # type: ignore return - # Large batch: chunk adds first, then deletes + # Large batch: chunk copies first (no upload), then adds, then deletes from .utils._xet_progress_reporting import XetProgressReporter if add and not are_progress_bars_disabled(): @@ -12474,6 +12777,9 @@ def batch_bucket_files( progress = None try: + for copy_chunk in chunk_iterable(copy, chunk_size=_BUCKET_BATCH_ADD_CHUNK_SIZE): + self._batch_bucket_files(bucket_id, copy=list(copy_chunk), token=token) + for add_chunk in chunk_iterable(add, chunk_size=_BUCKET_BATCH_ADD_CHUNK_SIZE): self._batch_bucket_files(bucket_id, add=list(add_chunk), token=token, _progress=progress) @@ -12489,20 +12795,42 @@ def _batch_bucket_files( self, bucket_id: str, *, - add: list[tuple[str | Path | bytes, str]] | None = None, - delete: list[str] | None = None, + add: list[tuple[str | Path | bytes, str] | _BucketAddFile] | None = None, + copy: list[tuple[str, str, str, str] | _BucketCopyFile] | None = None, + delete: list[str | _BucketDeleteFile] | None = None, token: str | bool | None = None, _progress: XetProgressReporter | None = None, ): """Internal method: process a single batch of bucket file operations (upload to XET + call /batch).""" # Convert public API inputs to internal operation objects - operations: list[_BucketAddFile | _BucketDeleteFile] = [] + operations: list[_BucketAddFile | _BucketCopyFile | _BucketDeleteFile] = [] if add: - for source, destination in add: - operations.append(_BucketAddFile(source=source, destination=destination)) + for add_item in add: + if isinstance(add_item, _BucketAddFile): + operations.append(add_item) + else: + source, destination = add_item + operations.append(_BucketAddFile(source=source, destination=destination)) + if copy: + for copy_item in copy: + if isinstance(copy_item, _BucketCopyFile): + operations.append(copy_item) + else: + source_repo_type, source_repo_id, xet_hash, destination = copy_item + operations.append( + _BucketCopyFile( + destination=destination, + xet_hash=xet_hash, + source_repo_type=source_repo_type, + source_repo_id=source_repo_id, + ) + ) if delete: - for path in delete: - operations.append(_BucketDeleteFile(path=path)) + for delete_item in delete: + if isinstance(delete_item, _BucketDeleteFile): + operations.append(delete_item) + else: + operations.append(_BucketDeleteFile(path=delete_item)) if not operations: return @@ -12514,10 +12842,11 @@ def _batch_bucket_files( headers = self._build_hf_headers(token=token) add_operations = [op for op in operations if isinstance(op, _BucketAddFile)] + add_operations_to_upload = [op for op in add_operations if op.xet_hash is None] add_bytes_operations = [op for op in add_operations if isinstance(op.source, bytes)] add_path_operations = [op for op in add_operations if not isinstance(op.source, bytes)] - if len(add_operations) > 0: + if len(add_operations_to_upload) > 0: try: xet_connection_info = fetch_xet_connection_info_from_repo_info( token_type=XetTokenType.WRITE, @@ -12562,7 +12891,7 @@ def token_refresher() -> tuple[str, int]: try: # 2.a. Upload path files xet_upload_infos = upload_files( - [str(op.source) for op in add_path_operations], + [str(op.source) for op in add_path_operations if op.xet_hash is None], xet_endpoint, access_token_info, token_refresher, @@ -12570,7 +12899,9 @@ def token_refresher() -> tuple[str, int]: "bucket", skip_sha256=True, ) - for upload_info, op in zip(xet_upload_infos, add_path_operations): + for upload_info, op in zip( + xet_upload_infos, [op for op in add_path_operations if op.xet_hash is None] + ): op.xet_hash = upload_info.hash op.size = upload_info.filesize @@ -12579,7 +12910,7 @@ def token_refresher() -> tuple[str, int]: # 2.b. Upload bytes files xet_upload_infos = upload_bytes( - [op.source for op in add_bytes_operations], + [op.source for op in add_bytes_operations if op.xet_hash is None], xet_endpoint, access_token_info, token_refresher, @@ -12587,7 +12918,9 @@ def token_refresher() -> tuple[str, int]: "bucket", skip_sha256=True, ) - for upload_info, op in zip(xet_upload_infos, add_bytes_operations): + for upload_info, op in zip( + xet_upload_infos, [op for op in add_bytes_operations if op.xet_hash is None] + ): op.xet_hash = upload_info.hash op.size = upload_info.filesize @@ -12609,6 +12942,14 @@ def _payload_as_ndjson() -> Iterable[bytes]: } if op.content_type is not None: payload["contentType"] = op.content_type + elif isinstance(op, _BucketCopyFile): + payload = { + "type": "copyFile", + "path": op.destination, + "xetHash": op.xet_hash, + "sourceRepoType": op.source_repo_type, + "sourceRepoId": op.source_repo_id, + } else: payload = { "type": "deleteFile", @@ -13293,6 +13634,7 @@ def get_local_safetensors_metadata(path: str | Path) -> SafetensorsRepoMetadata: move_bucket = api.move_bucket list_bucket_tree = api.list_bucket_tree get_bucket_paths_info = api.get_bucket_paths_info +copy_files = api.copy_files batch_bucket_files = api.batch_bucket_files get_bucket_file_metadata = api.get_bucket_file_metadata download_bucket_files = api.download_bucket_files diff --git a/src/huggingface_hub/hf_file_system.py b/src/huggingface_hub/hf_file_system.py index 3ae8493006..55aa490ff2 100644 --- a/src/huggingface_hub/hf_file_system.py +++ b/src/huggingface_hub/hf_file_system.py @@ -1,5 +1,4 @@ import os -import re import tempfile import threading from collections import deque @@ -28,22 +27,11 @@ RevisionNotFoundError, ) from .file_download import hf_hub_url, http_get -from .hf_api import BucketFile, BucketFolder, HfApi, LastCommitInfo, RepoFile, RepoFolder +from .hf_api import SPECIAL_REFS_REVISION_REGEX, BucketFile, BucketFolder, HfApi, LastCommitInfo, RepoFile, RepoFolder from .utils import HFValidationError, hf_raise_for_status, http_backoff, http_stream_backoff from .utils.insecure_hashlib import md5 -# Regex used to match special revisions with "/" in them (see #1710) -SPECIAL_REFS_REVISION_REGEX = re.compile( - r""" - (^refs\/convert\/\w+) # `refs/convert/parquet` revisions - | - (^refs\/pr\/\d+) # PR revisions - """, - re.VERBOSE, -) - - @dataclass class HfFileSystemResolvedPath: """Top level Data structure containing information about a resolved Hugging Face file system path.""" diff --git a/tests/test_buckets.py b/tests/test_buckets.py index c4af820f59..fb79c2b8e2 100644 --- a/tests/test_buckets.py +++ b/tests/test_buckets.py @@ -95,6 +95,13 @@ def bucket_write(api: HfApi) -> str: return bucket.bucket_id +@pytest.fixture(scope="function") +def bucket_write_2(api: HfApi) -> str: + """Second bucket for read-write tests (rebuilt every test).""" + bucket = api.create_bucket(bucket_name()) + return bucket.bucket_id + + def test_create_bucket(api: HfApi): bucket_id = f"{USER}/{bucket_name()}" bucket_url = api.create_bucket(bucket_id) @@ -302,6 +309,146 @@ def test_download_bucket_files_raises_on_missing_when_requested(api: HfApi, buck assert "non_existent_file.txt" in str(exc_info.value) +@requires("hf_xet") +def test_copy_files_bucket_to_same_bucket_file(api: HfApi, bucket_write: str, tmp_path): + api.batch_bucket_files(bucket_write, add=[(b"bucket-content", "source.txt")]) + + api.copy_files( + f"hf://buckets/{bucket_write}/source.txt", + f"hf://buckets/{bucket_write}/copied.txt", + ) + + output_path = tmp_path / "copied.txt" + api.download_bucket_files(bucket_write, [("copied.txt", str(output_path))]) + assert output_path.read_bytes() == b"bucket-content" + + +@requires("hf_xet") +def test_copy_files_bucket_to_different_bucket_folder(api: HfApi, bucket_write: str, bucket_write_2: str, tmp_path): + api.batch_bucket_files(bucket_write, add=[(b"a", "logs/a.txt"), (b"b", "logs/sub/b.txt"), (b"c", "other/c.txt")]) + + api.copy_files( + f"hf://buckets/{bucket_write}/logs", + f"hf://buckets/{bucket_write_2}/backup/", + ) + + destination_files = {entry.path for entry in api.list_bucket_tree(bucket_write_2)} + assert "backup/a.txt" in destination_files + assert "backup/sub/b.txt" in destination_files + assert "backup/c.txt" not in destination_files + + # Check exact content + a_path = tmp_path / "a.txt" + b_path = tmp_path / "b.txt" + api.download_bucket_files(bucket_write_2, [("backup/a.txt", str(a_path)), ("backup/sub/b.txt", str(b_path))]) + assert a_path.read_bytes() == b"a" + assert b_path.read_bytes() == b"b" + + +@requires("hf_xet") +def test_copy_files_repo_to_bucket_with_revision(api: HfApi, bucket_write: str, tmp_path): + repo_id = api.create_repo(repo_id=repo_name(prefix="copy-files")).repo_id + branch = "copy-files-branch" + api.upload_file(repo_id=repo_id, path_in_repo="main.txt", path_or_fileobj=b"main") + api.create_branch(repo_id=repo_id, branch=branch) + api.upload_file(repo_id=repo_id, path_in_repo="nested/from-branch.txt", path_or_fileobj=b"branch", revision=branch) + + api.copy_files( + f"hf://{repo_id}@{branch}/nested/from-branch.txt", + f"hf://buckets/{bucket_write}/from-repo.txt", + ) + + output_path = tmp_path / "from-repo.txt" + api.download_bucket_files(bucket_write, [("from-repo.txt", str(output_path))]) + assert output_path.read_bytes() == b"branch" + + +@requires("hf_xet") +def test_copy_files_bucket_to_repo_raises(api: HfApi, bucket_write: str): + repo_id = api.create_repo(repo_id=repo_name(prefix="copy-files-dst")).repo_id + api.batch_bucket_files(bucket_write, add=[(b"x", "x.txt")]) + with pytest.raises(ValueError, match="Destination must be a bucket"): + api.copy_files(f"hf://buckets/{bucket_write}/x.txt", f"hf://{repo_id}/x.txt") + + +@requires("hf_xet") +def test_copy_files_folder_to_nonexistent_dest(api: HfApi, bucket_write: str, bucket_write_2: str): + """source=folder, dest doesn't exist => files copied under dest path.""" + api.batch_bucket_files(bucket_write, add=[(b"a", "folder/a.txt"), (b"b", "folder/sub/b.txt")]) + + api.copy_files( + f"hf://buckets/{bucket_write}/folder", + f"hf://buckets/{bucket_write_2}/target-folder", + ) + + destination_files = {entry.path for entry in api.list_bucket_tree(bucket_write_2)} + assert "target-folder/a.txt" in destination_files + assert "target-folder/sub/b.txt" in destination_files + + +@requires("hf_xet") +def test_copy_files_folder_to_existing_folder_dest(api: HfApi, bucket_write: str, bucket_write_2: str): + """source=folder, dest is an existing folder => files merged under dest path.""" + api.batch_bucket_files(bucket_write, add=[(b"a", "folder/a.txt"), (b"b", "folder/sub/b.txt")]) + api.batch_bucket_files(bucket_write_2, add=[(b"existing", "target-folder/existing.txt")]) + + api.copy_files( + f"hf://buckets/{bucket_write}/folder", + f"hf://buckets/{bucket_write_2}/target-folder", + ) + + destination_files = {entry.path for entry in api.list_bucket_tree(bucket_write_2)} + assert "target-folder/existing.txt" in destination_files + assert "target-folder/a.txt" in destination_files + assert "target-folder/sub/b.txt" in destination_files + + +@requires("hf_xet") +def test_copy_files_file_to_existing_file_dest(api: HfApi, bucket_write: str, bucket_write_2: str, tmp_path): + """source=file, dest is an existing file => must work (overwrite).""" + api.batch_bucket_files(bucket_write, add=[(b"new-content", "source.txt")]) + api.batch_bucket_files(bucket_write_2, add=[(b"old-content", "dest.txt")]) + + api.copy_files( + f"hf://buckets/{bucket_write}/source.txt", + f"hf://buckets/{bucket_write_2}/dest.txt", + ) + + output_path = tmp_path / "dest.txt" + api.download_bucket_files(bucket_write_2, [("dest.txt", str(output_path))]) + assert output_path.read_bytes() == b"new-content" + + +@requires("hf_xet") +def test_copy_files_file_to_nonexistent_dest(api: HfApi, bucket_write: str, bucket_write_2: str, tmp_path): + """source=file, dest doesn't exist => must work (creates file).""" + api.batch_bucket_files(bucket_write, add=[(b"content", "source.txt")]) + + api.copy_files( + f"hf://buckets/{bucket_write}/source.txt", + f"hf://buckets/{bucket_write_2}/new-file.txt", + ) + + output_path = tmp_path / "new-file.txt" + api.download_bucket_files(bucket_write_2, [("new-file.txt", str(output_path))]) + assert output_path.read_bytes() == b"content" + + +@requires("hf_xet") +def test_copy_files_file_to_folder_dest(api: HfApi, bucket_write: str, bucket_write_2: str, tmp_path): + """source=file, dest is a folder (trailing '/') => file added to folder.""" + api.batch_bucket_files(bucket_write, add=[(b"content", "source.txt")]) + + api.copy_files( + f"hf://buckets/{bucket_write}/source.txt", + f"hf://buckets/{bucket_write_2}/folder/", + ) + + output_path = tmp_path / "source.txt" + api.download_bucket_files(bucket_write_2, [("folder/source.txt", str(output_path))]) + assert output_path.read_bytes() == b"content" + + @pytest.mark.parametrize( "source, destination, expected_content_type", [ diff --git a/tests/test_buckets_cli.py b/tests/test_buckets_cli.py index e5f7ba83b1..da95411171 100644 --- a/tests/test_buckets_cli.py +++ b/tests/test_buckets_cli.py @@ -876,10 +876,52 @@ def test_cp_download_creates_parent_dirs(bucket_with_files: str, tmp_path: Path) # -- Validation error tests -- -def test_cp_error_remote_to_remote(): - """Both src and dst are bucket paths.""" - result = cli("hf buckets cp hf://buckets/user/a/file.txt hf://buckets/user/b/file.txt") +def test_cp_remote_bucket_to_bucket(api: HfApi): + source_bucket = api.create_bucket(bucket_name()).bucket_id + destination_bucket = api.create_bucket(bucket_name()).bucket_id + api.batch_bucket_files(source_bucket, add=[(b"aaa", "logs/a.txt"), (b"bbb", "logs/sub/b.txt"), (b"ccc", "c.txt")]) + + cli(f"hf buckets cp hf://buckets/{source_bucket}/logs hf://buckets/{destination_bucket}/backup/") + + files = _remote_files(api, destination_bucket) + assert "backup/a.txt" in files + assert "backup/sub/b.txt" in files + assert "backup/c.txt" not in files + + +def test_cp_remote_repo_to_bucket(api: HfApi): + repo_id = api.create_repo(repo_id=repo_name(prefix="cp-copy")).repo_id + branch = "cp-copy-branch" + destination_bucket = api.create_bucket(bucket_name()).bucket_id + + api.upload_file(repo_id=repo_id, path_in_repo="main.txt", path_or_fileobj=b"main") + api.create_branch(repo_id=repo_id, branch=branch) + api.upload_file(repo_id=repo_id, path_in_repo="nested/from-branch.txt", path_or_fileobj=b"branch", revision=branch) + + cli(f"hf buckets cp hf://{repo_id}@{branch}/nested/from-branch.txt hf://buckets/{destination_bucket}/copied.txt") + + assert "copied.txt" in _remote_files(api, destination_bucket) + + +def test_cp_error_bucket_to_repo(api: HfApi, bucket_write: str): + repo_id = api.create_repo(repo_id=repo_name(prefix="cp-copy-dst")).repo_id + try: + api.batch_bucket_files(bucket_write, add=[(b"data", "file.txt")]) + result = cli(f"hf buckets cp hf://buckets/{bucket_write}/file.txt hf://{repo_id}/file.txt") + assert result.exit_code != 0 + assert "destination must be a bucket" in result.output.lower() + finally: + api.delete_repo(repo_id=repo_id) + + +def test_cp_error_remote_folder_requires_destination_suffix(api: HfApi): + source_bucket = api.create_bucket(bucket_name()).bucket_id + destination_bucket = api.create_bucket(bucket_name()).bucket_id + api.batch_bucket_files(source_bucket, add=[(b"aaa", "folder/a.txt")]) + + result = cli(f"hf buckets cp hf://buckets/{source_bucket}/folder hf://buckets/{destination_bucket}/target") assert result.exit_code != 0 + assert "destination to end with '/'" in result.output def test_cp_error_both_local(tmp_path: Path):