diff --git a/docs/source/en/package_reference/cli.md b/docs/source/en/package_reference/cli.md index 3cf39e8104..7c6c28edbb 100644 --- a/docs/source/en/package_reference/cli.md +++ b/docs/source/en/package_reference/cli.md @@ -211,6 +211,7 @@ $ hf buckets [OPTIONS] COMMAND [ARGS]... * `cp`: Copy a single file to or from a bucket. * `create`: Create a new bucket. * `delete`: Delete a bucket. +* `import`: Import files from an S3 bucket into a... * `info`: Get info about a bucket. * `list`: List buckets or files in a bucket. [alias: ls] * `move`: Move (rename) a bucket to a new name or... @@ -322,6 +323,49 @@ Learn more Read the documentation at https://huggingface.co/docs/huggingface_hub/en/guides/cli +### `hf buckets import` + +Import files from an S3 bucket into a Hugging Face bucket. + +Data is streamed from S3 through the local machine and re-uploaded to HF. +Requires the `s3fs` package (`pip install s3fs`). AWS credentials are resolved +by the standard boto/botocore chain (env vars, ~/.aws/credentials, instance profiles, etc.). + +**Usage**: + +```console +$ hf buckets import [OPTIONS] SOURCE DEST +``` + +**Arguments**: + +* `SOURCE`: S3 source URI (e.g. s3://my-bucket or s3://my-bucket/prefix/). [required] +* `DEST`: HF bucket destination (e.g. hf://buckets/namespace/bucket-name or hf://buckets/namespace/bucket-name/prefix). [required] + +**Options**: + +* `--dry-run`: List files that would be imported without actually transferring. +* `--include TEXT`: Include only files matching pattern (can specify multiple). +* `--exclude TEXT`: Exclude files matching pattern (can specify multiple). +* `-w, --workers INTEGER`: Number of parallel S3 download threads. [default: 4] +* `--batch-size INTEGER`: Number of files per upload batch. [default: 50] +* `-v, --verbose`: Show per-file transfer details. +* `-q, --quiet`: Print only IDs (one per line). +* `--token TEXT`: A User Access Token generated from https://huggingface.co/settings/tokens. +* `--help`: Show this message and exit. + +Examples + $ hf buckets import s3://my-data-bucket hf://buckets/user/my-bucket + $ hf buckets import s3://my-data-bucket/prefix/ hf://buckets/user/my-bucket/dest-prefix + $ hf buckets import s3://my-data-bucket hf://buckets/user/my-bucket --dry-run + $ hf buckets import s3://my-data-bucket hf://buckets/user/my-bucket --include "*.parquet" + $ hf buckets import s3://my-data-bucket hf://buckets/user/my-bucket --exclude "*.tmp" --workers 8 + +Learn more + Use `hf --help` for more information about a command. + Read the documentation at https://huggingface.co/docs/huggingface_hub/en/guides/cli + + ### `hf buckets info` Get info about a bucket. diff --git a/setup.py b/setup.py index 35a321c336..bbe2622540 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,8 @@ def get_version() -> str: extras["mcp"] = ["mcp>=1.8.0"] +extras["s3"] = ["s3fs"] + extras["testing"] = ( extras["oauth"] + [ diff --git a/src/huggingface_hub/__init__.py b/src/huggingface_hub/__init__.py index 70f83e8009..a56b9d966b 100644 --- a/src/huggingface_hub/__init__.py +++ b/src/huggingface_hub/__init__.py @@ -58,8 +58,10 @@ "BucketFolder", "BucketInfo", "BucketUrl", + "ImportStats", "SyncOperation", "SyncPlan", + "import_from_s3", ], "_commit_scheduler": [ "CommitScheduler", @@ -741,6 +743,7 @@ "ImageToVideoOutput", "ImageToVideoParameters", "ImageToVideoTargetSize", + "ImportStats", "InferenceClient", "InferenceEndpoint", "InferenceEndpointError", @@ -974,6 +977,7 @@ "hf_hub_url", "hf_raise_for_status", "hffs", + "import_from_s3", "inspect_job", "inspect_scheduled_job", "interpreter_login", @@ -1184,8 +1188,10 @@ def __dir__(): BucketFolder, # noqa: F401 BucketInfo, # noqa: F401 BucketUrl, # noqa: F401 + ImportStats, # noqa: F401 SyncOperation, # noqa: F401 SyncPlan, # noqa: F401 + import_from_s3, # noqa: F401 ) from ._commit_scheduler import CommitScheduler # noqa: F401 from ._eval_results import ( diff --git a/src/huggingface_hub/_buckets.py b/src/huggingface_hub/_buckets.py index f291890d06..97e68c3289 100644 --- a/src/huggingface_hub/_buckets.py +++ b/src/huggingface_hub/_buckets.py @@ -1175,3 +1175,302 @@ def sync_bucket_internal( print("Sync completed.") return sync_plan + + +# ============================================================================= +# S3 import +# ============================================================================= + + +@dataclass +class ImportStats: + """Statistics for an S3-to-bucket import operation.""" + + files_transferred: int = 0 + files_skipped: int = 0 + files_failed: int = 0 + bytes_transferred: int = 0 + elapsed_seconds: float = 0.0 + + @property + def throughput_mb_s(self) -> float: + if self.elapsed_seconds <= 0: + return 0.0 + return (self.bytes_transferred / (1024 * 1024)) / self.elapsed_seconds + + def summary_str(self) -> str: + parts = [ + f"Transferred {self.files_transferred} file(s)", + f"({_format_import_size(self.bytes_transferred)})", + ] + if self.files_skipped > 0: + parts.append(f"skipped {self.files_skipped}") + if self.files_failed > 0: + parts.append(f"failed {self.files_failed}") + parts.append(f"in {self.elapsed_seconds:.1f}s") + if self.throughput_mb_s > 0: + parts.append(f"({self.throughput_mb_s:.1f} MB/s)") + return ", ".join(parts) + + +def _format_import_size(size_bytes: Union[int, float]) -> str: + size: float = float(size_bytes) + for unit in ["B", "KB", "MB", "GB", "TB"]: + if size < 1000: + if unit == "B": + return f"{int(size)} {unit}" + return f"{size:.1f} {unit}" + size /= 1000 + return f"{size:.1f} PB" + + +def _list_s3_files(s3_fs: Any, s3_path: str, prefix: str = "") -> Iterator[tuple[str, int]]: + """List all files under an S3 path. + + Yields: + tuple: (relative_path, size_bytes) for each file + """ + try: + entries = s3_fs.ls(s3_path, detail=True) + except FileNotFoundError: + raise ValueError(f"S3 path not found: s3://{s3_path}") + + for entry in entries: + if entry["type"] == "directory": + dir_rel = entry["name"] + yield from _list_s3_files(s3_fs, dir_rel, prefix) + else: + full_key = entry["name"] + if prefix: + rel_path = full_key[len(prefix) :].lstrip("/") + else: + rel_path = full_key.split("/", 1)[1] if "/" in full_key else full_key + if rel_path: + yield rel_path, entry.get("size", 0) + + +def _normalize_s3_path(s3_url: str) -> str: + """Strip the s3:// prefix and return the raw bucket/key path.""" + if s3_url.startswith("s3://"): + return s3_url[len("s3://") :] + return s3_url + + +def import_from_s3( + s3_source: str, + bucket_dest: str, + *, + api: "HfApi", + include: Optional[list[str]] = None, + exclude: Optional[list[str]] = None, + dry_run: bool = False, + verbose: bool = False, + quiet: bool = False, + workers: int = 4, + batch_size: int = 50, + token: Union[bool, str, None] = None, +) -> ImportStats: + """Import files from an S3 bucket into a Hugging Face bucket. + + Data is streamed from S3 through the local machine and uploaded to HF. The ``s3fs`` + package is required (``pip install s3fs``). AWS credentials are resolved by the + standard boto chain (env vars, ``~/.aws/credentials``, instance profile, etc.). + + Args: + s3_source (`str`): + S3 URI, e.g. ``s3://my-bucket/prefix/``. + bucket_dest (`str`): + HF bucket path, e.g. ``hf://buckets/namespace/bucket-name`` or + ``hf://buckets/namespace/bucket-name/dest-prefix``. + api ([`HfApi`]): + The HfApi instance to use. + include (`list[str]`, *optional*): + Only transfer files matching these fnmatch patterns. + exclude (`list[str]`, *optional*): + Skip files matching these fnmatch patterns. + dry_run (`bool`): + List files that would be transferred without actually transferring. + verbose (`bool`): + Print per-file progress. + quiet (`bool`): + Suppress all output. + workers (`int`): + Number of parallel download threads for S3 files. + batch_size (`int`): + Number of files per ``batch_bucket_files`` call. + token: + HF token override. + + Returns: + [`ImportStats`]: Transfer statistics. + """ + if not s3_source.startswith("s3://"): + raise ValueError(f"Source must be an S3 URI (s3://...): {s3_source}") + + if not _is_bucket_path(bucket_dest): + raise ValueError(f"Destination must be a bucket path (hf://buckets/...): {bucket_dest}") + + try: + import s3fs + except ImportError: + raise ImportError( + "The `s3fs` package is required for S3 imports. Install it with:\n" + " pip install s3fs\n" + "or:\n" + " pip install huggingface_hub[s3]" + ) + + if token is not None: + from .hf_api import HfApi + + api = HfApi(token=token) + + bucket_id, dest_prefix = _parse_bucket_path(bucket_dest) + dest_prefix = dest_prefix.rstrip("/") + + s3_raw = _normalize_s3_path(s3_source) + s3_bucket_name = s3_raw.split("/")[0] + s3_prefix = s3_raw[len(s3_bucket_name) :].strip("/") + full_s3_prefix = f"{s3_bucket_name}/{s3_prefix}" if s3_prefix else s3_bucket_name + + s3 = s3fs.S3FileSystem() + + status = StatusLine(enabled=not quiet and not dry_run) + filter_matcher = ( + FilterMatcher(include_patterns=include, exclude_patterns=exclude) if (include or exclude) else None + ) + + if status: + status.update("Listing S3 files...") + + all_files: list[tuple[str, int]] = [] + for rel_path, size in _list_s3_files(s3, full_s3_prefix, prefix=full_s3_prefix): + if filter_matcher and not filter_matcher.matches(rel_path): + continue + all_files.append((rel_path, size)) + if status: + status.update(f"Listing S3 files ({len(all_files)} found)") + + if status: + status.done(f"Found {len(all_files)} files in S3") + + if not all_files: + if not quiet: + print("No files found in S3 source.") + return ImportStats() + + total_size = sum(s for _, s in all_files) + + if dry_run: + for rel_path, size in all_files: + remote_dest = f"{dest_prefix}/{rel_path}" if dest_prefix else rel_path + print( + f" {_format_import_size(size):>10} s3://{full_s3_prefix}/{rel_path} -> {BUCKET_PREFIX}{bucket_id}/{remote_dest}" + ) + print(f"\n(dry run) Would transfer {len(all_files)} file(s), {_format_import_size(total_size)} total.") + return ImportStats(files_transferred=0, files_skipped=len(all_files), bytes_transferred=0) + + if not quiet: + print( + f"Importing {len(all_files)} file(s) ({_format_import_size(total_size)}) from S3 to {BUCKET_PREFIX}{bucket_id}" + ) + + stats = ImportStats() + start_time = time.monotonic() + + import tempfile + from concurrent.futures import ThreadPoolExecutor, as_completed + + def _download_one(rel_path: str, size: int, tmp_dir: str) -> Optional[tuple[str, str, int]]: + s3_key = f"{full_s3_prefix}/{rel_path}" + local_tmp = os.path.join(tmp_dir, rel_path.replace("/", os.sep)) + os.makedirs(os.path.dirname(local_tmp), exist_ok=True) + try: + s3.get(s3_key, local_tmp) + return rel_path, local_tmp, size + except Exception as e: + logger.warning(f"Failed to download s3://{s3_key}: {e}") + return None + + batch_index = 0 + transferred_in_batch = 0 + + with tempfile.TemporaryDirectory(prefix="hf-s3-import-") as tmp_dir: + for batch_start in range(0, len(all_files), batch_size): + batch = all_files[batch_start : batch_start + batch_size] + batch_index += 1 + + if status: + status.update(f"Batch {batch_index}: downloading {len(batch)} files from S3") + + downloaded: list[tuple[str, str, int]] = [] + + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = { + executor.submit(_download_one, rel_path, size, tmp_dir): (rel_path, size) + for rel_path, size in batch + } + for future in as_completed(futures): + result = future.result() + if result is not None: + downloaded.append(result) + else: + stats.files_failed += 1 + + if not downloaded: + continue + + add_files: list[tuple[Union[str, Path, bytes], str]] = [] + for rel_path, local_tmp, size in downloaded: + remote_dest = f"{dest_prefix}/{rel_path}" if dest_prefix else rel_path + add_files.append((local_tmp, remote_dest)) + + if status: + status.update(f"Batch {batch_index}: uploading {len(add_files)} files to HF bucket") + + try: + if quiet: + disable_progress_bars() + try: + api.batch_bucket_files(bucket_id, add=add_files) + finally: + if quiet: + enable_progress_bars() + + for rel_path, local_tmp, size in downloaded: + stats.files_transferred += 1 + stats.bytes_transferred += size + transferred_in_batch += 1 + if verbose: + remote_dest = f"{dest_prefix}/{rel_path}" if dest_prefix else rel_path + print( + f" {rel_path} -> {BUCKET_PREFIX}{bucket_id}/{remote_dest} ({_format_import_size(size)})" + ) + except Exception as e: + logger.error(f"Failed to upload batch {batch_index}: {e}") + stats.files_failed += len(downloaded) + + for _, local_tmp, _ in downloaded: + try: + os.remove(local_tmp) + except OSError: + pass + + if status: + elapsed = time.monotonic() - start_time + throughput = (stats.bytes_transferred / (1024 * 1024)) / elapsed if elapsed > 0 else 0 + status.update( + f"Progress: {stats.files_transferred}/{len(all_files)} files, " + f"{_format_import_size(stats.bytes_transferred)}, " + f"{throughput:.1f} MB/s" + ) + + stats.elapsed_seconds = time.monotonic() - start_time + + if status: + status.done(stats.summary_str()) + + if not quiet: + print(f"Import completed: {stats.summary_str()}") + + return stats diff --git a/src/huggingface_hub/cli/buckets.py b/src/huggingface_hub/cli/buckets.py index 881d52b19c..840b14fd2b 100644 --- a/src/huggingface_hub/cli/buckets.py +++ b/src/huggingface_hub/cli/buckets.py @@ -31,6 +31,7 @@ _is_bucket_path, _parse_bucket_path, _split_bucket_id_and_prefix, + import_from_s3, ) from huggingface_hub.utils import ( SoftTemporaryDirectory, @@ -1059,3 +1060,97 @@ def cp( if not quiet: print(f"Uploaded: {src} -> {BUCKET_PREFIX}{bucket_id}/{remote_path}") + + +# ============================================================================= +# Import command (S3 -> HF bucket) +# ============================================================================= + + +@buckets_cli.command( + name="import", + examples=[ + "hf buckets import s3://my-data-bucket hf://buckets/user/my-bucket", + "hf buckets import s3://my-data-bucket/prefix/ hf://buckets/user/my-bucket/dest-prefix", + "hf buckets import s3://my-data-bucket hf://buckets/user/my-bucket --dry-run", + 'hf buckets import s3://my-data-bucket hf://buckets/user/my-bucket --include "*.parquet"', + 'hf buckets import s3://my-data-bucket hf://buckets/user/my-bucket --exclude "*.tmp" --workers 8', + ], +) +def import_cmd( + source: Annotated[ + str, + typer.Argument( + help="S3 source URI (e.g. s3://my-bucket or s3://my-bucket/prefix/).", + ), + ], + dest: Annotated[ + str, + typer.Argument( + help="HF bucket destination (e.g. hf://buckets/namespace/bucket-name or hf://buckets/namespace/bucket-name/prefix).", + ), + ], + dry_run: Annotated[ + bool, + typer.Option( + "--dry-run", + help="List files that would be imported without actually transferring.", + ), + ] = False, + include: Annotated[ + Optional[list[str]], + typer.Option( + help="Include only files matching pattern (can specify multiple).", + ), + ] = None, + exclude: Annotated[ + Optional[list[str]], + typer.Option( + help="Exclude files matching pattern (can specify multiple).", + ), + ] = None, + workers: Annotated[ + int, + typer.Option( + "--workers", + "-w", + help="Number of parallel S3 download threads.", + ), + ] = 4, + batch_size: Annotated[ + int, + typer.Option( + "--batch-size", + help="Number of files per upload batch.", + ), + ] = 50, + verbose: Annotated[ + bool, + typer.Option( + "--verbose", + "-v", + help="Show per-file transfer details.", + ), + ] = False, + quiet: QuietOpt = False, + token: TokenOpt = None, +) -> None: + """Import files from an S3 bucket into a Hugging Face bucket. + + Data is streamed from S3 through the local machine and re-uploaded to HF. + Requires the `s3fs` package (`pip install s3fs`). AWS credentials are resolved + by the standard boto/botocore chain (env vars, ~/.aws/credentials, instance profiles, etc.). + """ + api = get_hf_api(token=token) + import_from_s3( + s3_source=source, + bucket_dest=dest, + api=api, + include=include, + exclude=exclude, + dry_run=dry_run, + verbose=verbose, + quiet=quiet, + workers=workers, + batch_size=batch_size, + ) diff --git a/tests/test_buckets_import.py b/tests/test_buckets_import.py new file mode 100644 index 0000000000..36f5c5b66b --- /dev/null +++ b/tests/test_buckets_import.py @@ -0,0 +1,353 @@ +# coding=utf-8 +# Copyright 2026-present, the HuggingFace Inc. team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for S3-to-HF bucket import functionality.""" + +import os +from unittest.mock import MagicMock, patch + +import pytest + +from huggingface_hub._buckets import ( + ImportStats, + _format_import_size, + _normalize_s3_path, + import_from_s3, +) + + +class TestImportStats: + def test_throughput_calculation(self): + stats = ImportStats(bytes_transferred=10 * 1024 * 1024, elapsed_seconds=2.0) + assert abs(stats.throughput_mb_s - 5.0) < 0.01 + + def test_throughput_zero_elapsed(self): + stats = ImportStats(bytes_transferred=1000, elapsed_seconds=0.0) + assert stats.throughput_mb_s == 0.0 + + def test_summary_str(self): + stats = ImportStats( + files_transferred=10, + files_skipped=2, + files_failed=1, + bytes_transferred=1024 * 1024, + elapsed_seconds=5.0, + ) + s = stats.summary_str() + assert "10 file(s)" in s + assert "skipped 2" in s + assert "failed 1" in s + assert "5.0s" in s + + +class TestFormatImportSize: + def test_bytes(self): + assert _format_import_size(500) == "500 B" + + def test_kb(self): + assert _format_import_size(1500) == "1.5 KB" + + def test_mb(self): + assert _format_import_size(1_500_000) == "1.5 MB" + + def test_gb(self): + assert _format_import_size(1_500_000_000) == "1.5 GB" + + def test_zero(self): + assert _format_import_size(0) == "0 B" + + +class TestNormalizeS3Path: + def test_with_prefix(self): + assert _normalize_s3_path("s3://my-bucket/prefix") == "my-bucket/prefix" + + def test_without_prefix(self): + assert _normalize_s3_path("my-bucket/prefix") == "my-bucket/prefix" + + def test_just_bucket(self): + assert _normalize_s3_path("s3://my-bucket") == "my-bucket" + + +class TestImportFromS3: + def test_invalid_source_raises(self): + api = MagicMock() + with pytest.raises(ValueError, match="S3 URI"): + import_from_s3( + s3_source="/local/path", + bucket_dest="hf://buckets/user/my-bucket", + api=api, + ) + + def test_invalid_dest_raises(self): + api = MagicMock() + with pytest.raises(ValueError, match="bucket path"): + import_from_s3( + s3_source="s3://my-bucket", + bucket_dest="/local/path", + api=api, + ) + + def test_missing_s3fs_raises(self): + api = MagicMock() + import builtins + + real_import = builtins.__import__ + + def mock_import(name, *args, **kwargs): + if name == "s3fs": + raise ImportError("No module named 's3fs'") + return real_import(name, *args, **kwargs) + + with patch.object(builtins, "__import__", side_effect=mock_import): + with pytest.raises(ImportError, match="s3fs"): + import_from_s3( + s3_source="s3://my-bucket", + bucket_dest="hf://buckets/user/my-bucket", + api=api, + ) + + def test_dry_run_no_upload(self, capsys): + api = MagicMock() + + mock_s3 = MagicMock() + mock_s3.ls.return_value = [ + {"name": "my-bucket/file1.txt", "type": "file", "size": 100}, + {"name": "my-bucket/file2.txt", "type": "file", "size": 200}, + ] + + mock_s3fs_class = MagicMock(return_value=mock_s3) + mock_s3fs_module = MagicMock() + mock_s3fs_module.S3FileSystem = mock_s3fs_class + + with patch.dict("sys.modules", {"s3fs": mock_s3fs_module}): + stats = import_from_s3( + s3_source="s3://my-bucket", + bucket_dest="hf://buckets/user/my-bucket", + api=api, + dry_run=True, + ) + + assert stats.files_transferred == 0 + assert stats.files_skipped == 2 + api.batch_bucket_files.assert_not_called() + + output = capsys.readouterr().out + assert "file1.txt" in output + assert "file2.txt" in output + assert "dry run" in output + + def test_import_with_include_filter(self, capsys): + api = MagicMock() + + mock_s3 = MagicMock() + mock_s3.ls.return_value = [ + {"name": "my-bucket/data.parquet", "type": "file", "size": 1000}, + {"name": "my-bucket/readme.md", "type": "file", "size": 50}, + {"name": "my-bucket/other.parquet", "type": "file", "size": 2000}, + ] + + mock_s3fs_class = MagicMock(return_value=mock_s3) + mock_s3fs_module = MagicMock() + mock_s3fs_module.S3FileSystem = mock_s3fs_class + + with patch.dict("sys.modules", {"s3fs": mock_s3fs_module}): + stats = import_from_s3( + s3_source="s3://my-bucket", + bucket_dest="hf://buckets/user/my-bucket", + api=api, + include=["*.parquet"], + dry_run=True, + ) + + assert stats.files_skipped == 2 + output = capsys.readouterr().out + assert "data.parquet" in output + assert "other.parquet" in output + assert "readme.md" not in output + + def test_import_with_exclude_filter(self, capsys): + api = MagicMock() + + mock_s3 = MagicMock() + mock_s3.ls.return_value = [ + {"name": "my-bucket/data.parquet", "type": "file", "size": 1000}, + {"name": "my-bucket/temp.tmp", "type": "file", "size": 50}, + ] + + mock_s3fs_class = MagicMock(return_value=mock_s3) + mock_s3fs_module = MagicMock() + mock_s3fs_module.S3FileSystem = mock_s3fs_class + + with patch.dict("sys.modules", {"s3fs": mock_s3fs_module}): + stats = import_from_s3( + s3_source="s3://my-bucket", + bucket_dest="hf://buckets/user/my-bucket", + api=api, + exclude=["*.tmp"], + dry_run=True, + ) + + assert stats.files_skipped == 1 + output = capsys.readouterr().out + assert "data.parquet" in output + assert "temp.tmp" not in output + + def test_import_empty_source(self, capsys): + api = MagicMock() + + mock_s3 = MagicMock() + mock_s3.ls.return_value = [] + + mock_s3fs_class = MagicMock(return_value=mock_s3) + mock_s3fs_module = MagicMock() + mock_s3fs_module.S3FileSystem = mock_s3fs_class + + with patch.dict("sys.modules", {"s3fs": mock_s3fs_module}): + stats = import_from_s3( + s3_source="s3://my-bucket", + bucket_dest="hf://buckets/user/my-bucket", + api=api, + ) + + assert stats.files_transferred == 0 + output = capsys.readouterr().out + assert "No files found" in output + + def test_import_with_dest_prefix(self, capsys): + api = MagicMock() + + mock_s3 = MagicMock() + mock_s3.ls.return_value = [ + {"name": "my-bucket/file1.txt", "type": "file", "size": 100}, + ] + + mock_s3fs_class = MagicMock(return_value=mock_s3) + mock_s3fs_module = MagicMock() + mock_s3fs_module.S3FileSystem = mock_s3fs_class + + with patch.dict("sys.modules", {"s3fs": mock_s3fs_module}): + import_from_s3( + s3_source="s3://my-bucket", + bucket_dest="hf://buckets/user/my-bucket/imported-data", + api=api, + dry_run=True, + ) + + output = capsys.readouterr().out + assert "imported-data/file1.txt" in output + + def test_actual_transfer(self): + """Test the full transfer path with mocked S3 and HfApi.""" + api = MagicMock() + + mock_s3 = MagicMock() + mock_s3.ls.return_value = [ + {"name": "my-bucket/small.txt", "type": "file", "size": 5}, + ] + + def mock_get(s3_key, local_path): + os.makedirs(os.path.dirname(local_path), exist_ok=True) + with open(local_path, "w") as f: + f.write("hello") + + mock_s3.get = mock_get + + mock_s3fs_class = MagicMock(return_value=mock_s3) + mock_s3fs_module = MagicMock() + mock_s3fs_module.S3FileSystem = mock_s3fs_class + + with patch.dict("sys.modules", {"s3fs": mock_s3fs_module}): + stats = import_from_s3( + s3_source="s3://my-bucket", + bucket_dest="hf://buckets/user/my-bucket", + api=api, + quiet=True, + ) + + assert stats.files_transferred == 1 + assert stats.bytes_transferred == 5 + assert stats.files_failed == 0 + api.batch_bucket_files.assert_called_once() + call_kwargs = api.batch_bucket_files.call_args + assert call_kwargs[0][0] == "user/my-bucket" + add_list = call_kwargs[1]["add"] + assert len(add_list) == 1 + assert add_list[0][1] == "small.txt" + + def test_transfer_with_s3_prefix(self): + """Test that S3 prefix paths are handled correctly.""" + api = MagicMock() + + mock_s3 = MagicMock() + mock_s3.ls.return_value = [ + {"name": "my-bucket/data/train/part-0.parquet", "type": "file", "size": 100}, + ] + + def mock_get(s3_key, local_path): + os.makedirs(os.path.dirname(local_path), exist_ok=True) + with open(local_path, "w") as f: + f.write("x" * 100) + + mock_s3.get = mock_get + + mock_s3fs_class = MagicMock(return_value=mock_s3) + mock_s3fs_module = MagicMock() + mock_s3fs_module.S3FileSystem = mock_s3fs_class + + with patch.dict("sys.modules", {"s3fs": mock_s3fs_module}): + stats = import_from_s3( + s3_source="s3://my-bucket/data/", + bucket_dest="hf://buckets/user/my-bucket", + api=api, + quiet=True, + ) + + assert stats.files_transferred == 1 + call_kwargs = api.batch_bucket_files.call_args + add_list = call_kwargs[1]["add"] + assert add_list[0][1] == "train/part-0.parquet" + + def test_download_failure_counts(self): + """Test that S3 download failures are tracked properly.""" + api = MagicMock() + + mock_s3 = MagicMock() + mock_s3.ls.return_value = [ + {"name": "my-bucket/good.txt", "type": "file", "size": 5}, + {"name": "my-bucket/bad.txt", "type": "file", "size": 5}, + ] + + def mock_get(s3_key, local_path): + if "bad.txt" in s3_key: + raise Exception("S3 download error") + os.makedirs(os.path.dirname(local_path), exist_ok=True) + with open(local_path, "w") as f: + f.write("hello") + + mock_s3.get = mock_get + + mock_s3fs_class = MagicMock(return_value=mock_s3) + mock_s3fs_module = MagicMock() + mock_s3fs_module.S3FileSystem = mock_s3fs_class + + with patch.dict("sys.modules", {"s3fs": mock_s3fs_module}): + stats = import_from_s3( + s3_source="s3://my-bucket", + bucket_dest="hf://buckets/user/my-bucket", + api=api, + quiet=True, + ) + + assert stats.files_transferred == 1 + assert stats.files_failed == 1