diff --git a/setup.py b/setup.py index d4edfff484..7202e9c7bd 100644 --- a/setup.py +++ b/setup.py @@ -49,6 +49,10 @@ def get_version() -> str: extras["mcp"] = ["mcp>=1.8.0"] +extras["jobs"] = [ + "cloudpickle", +] + extras["testing"] = ( extras["oauth"] + [ diff --git a/src/huggingface_hub/__init__.py b/src/huggingface_hub/__init__.py index aab27232ba..92a0e8b347 100644 --- a/src/huggingface_hub/__init__.py +++ b/src/huggingface_hub/__init__.py @@ -177,6 +177,7 @@ "add_collection_item", "add_space_secret", "add_space_variable", + "as_job", "auth_check", "cancel_access_request", "cancel_job", @@ -820,6 +821,7 @@ "add_collection_item", "add_space_secret", "add_space_variable", + "as_job", "attach_huggingface_oauth", "auth_check", "auth_list", @@ -1203,6 +1205,7 @@ def __dir__(): add_collection_item, # noqa: F401 add_space_secret, # noqa: F401 add_space_variable, # noqa: F401 + as_job, # noqa: F401 auth_check, # noqa: F401 cancel_access_request, # noqa: F401 cancel_job, # noqa: F401 diff --git a/src/huggingface_hub/hf_api.py b/src/huggingface_hub/hf_api.py index 89e442bc1e..72e3a8cec7 100644 --- a/src/huggingface_hub/hf_api.py +++ b/src/huggingface_hub/hf_api.py @@ -19,6 +19,8 @@ import json import re import struct +import sys +import tempfile import time import warnings from collections import defaultdict @@ -37,6 +39,7 @@ Iterator, Literal, Optional, + ParamSpec, TypeVar, Union, overload, @@ -114,6 +117,7 @@ from .inference._providers import PROVIDER_T from .utils._verification import FolderVerification +P = ParamSpec("P") # Parameters R = TypeVar("R") # Return type CollectionItemType_T = Literal["model", "dataset", "space", "paper", "collection"] @@ -10417,6 +10421,141 @@ def run_uv_job( token=token, ) + @experimental + def as_job( + self, + *, + dependencies: Optional[list[str]] = None, + python: Optional[str] = None, + env: Optional[dict[str, Any]] = None, + secrets: Optional[dict[str, Any]] = None, + flavor: Optional[SpaceHardware] = None, + timeout: Optional[Union[int, float, str]] = None, + namespace: Optional[str] = None, + token: Union[bool, str, None] = None, + ) -> Callable[[Callable[P, R]], Callable[P, R]]: + """ + Decorator to easily run functions as remote Jobs on Hugging Face. + + Args: + dependencies (`list[str]`, *optional*) + Dependencies to use to run the UV script. + + python (`str`, *optional*) + Use a specific Python version. Default is 3.12. + + env (`dict[str, Any]`, *optional*): + Defines the environment variables for the Job. + + secrets (`dict[str, Any]`, *optional*): + Defines the secret environment variables for the Job. + + flavor (`str`, *optional*): + Flavor for the hardware, as in Hugging Face Spaces. See [`SpaceHardware`] for possible values. + Defaults to `"cpu-basic"`. + + timeout (`Union[int, float, str]`, *optional*): + Max duration for the Job: int/float with s (seconds, default), m (minutes), h (hours) or d (days). + Example: `300` or `"5m"` for 5 minutes. + + namespace (`str`, *optional*): + The namespace where the Job will be created. Defaults to the current user's namespace. + + token `(Union[bool, str, None]`, *optional*): + A valid user access token. If not provided, the locally saved token will be used, which is the + recommended authentication method. Set to `False` to disable authentication. + Refer to: https://huggingface.co/docs/huggingface_hub/quick-start#authentication. + """ + try: + import cloudpickle + except ImportError as e: + raise ImportError( + "cloudpickle is required to use the 'as_job' decorator. Please install it with `pip install huggingface_hub[jobs]` or `pip install cloudpickle`." + ) from e + + # NOTE: for cloudpickle to work we must have the exact same python version as the client + # We also need uv to launch the script. This is why docker image is currently not customizable. + # + # NOTE: here we select Python major.minor version, but cloudpickle might still fail if + # the user has a different micro version or different platform (e.g., Windows vs Linux). + # + # TODO: make this more robust (e.g. by sending a Dockerfile directly in the payload?) + image = f"ghcr.io/astral-sh/uv:python{sys.version_info.major}.{sys.version_info.minor}-bookworm" + dependencies = dependencies or [] + dependencies.append(f"cloudpickle=={cloudpickle.__version__}") + + def decorator(func: Callable[P, R]) -> Callable[P, R]: + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + script_bytes = cloudpickle.dumps(func) + script_b64 = base64.b64encode(script_bytes).decode("utf-8") + + args_bytes = cloudpickle.dumps((args, kwargs)) + args_b64 = base64.b64encode(args_bytes).decode("utf-8") + + script = f""" +import base64 +import cloudpickle + +script_b64 = "{script_b64}" +args_b64 = "{args_b64}" + +script_bytes = base64.b64decode(script_b64) +func = cloudpickle.loads(script_bytes) + +args_bytes = base64.b64decode(args_b64) +args, kwargs = cloudpickle.loads(args_bytes) + +result = func(*args, **kwargs) + +results_b64 = base64.b64encode(cloudpickle.dumps(result)).decode("utf-8") + +print("result: ", results_b64) +""" + + with tempfile.NamedTemporaryFile("w+", suffix=".py") as tmp_script_file: + tmp_script_file.write(script) + tmp_script_file.flush() + + uv_command, uv_env, uv_secrets = self._create_uv_command_env_and_secrets( + script=tmp_script_file.name, + script_args=[], + dependencies=dependencies, + python=python, + env=env, + secrets=secrets, + namespace=namespace, + token=token, + ) + + job = self.run_job( + image=image, + command=uv_command, + env=uv_env, + secrets=uv_secrets, + flavor=flavor, + timeout=timeout, + namespace=namespace, + token=token, + ) + + print(f"Job {job.id} is running remotely. Fetching logs:") + print(job.url) + print("-----") + for log in self.fetch_job_logs(job_id=job.id): + if log.startswith("result: "): + result_b64 = log[len("result: ") :] + result_bytes = base64.b64decode(result_b64) + result = cloudpickle.loads(result_bytes) + return result + else: + print(log) + + raise RuntimeError("Job completed but no result was found in the logs.") + + return wrapper + + return decorator + def create_scheduled_job( self, *, @@ -11024,6 +11163,7 @@ def _parse_revision_from_pr_url(pr_url: str) -> str: inspect_job = api.inspect_job cancel_job = api.cancel_job run_uv_job = api.run_uv_job +as_job = api.as_job create_scheduled_job = api.create_scheduled_job list_scheduled_jobs = api.list_scheduled_jobs inspect_scheduled_job = api.inspect_scheduled_job