Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/flytekit-ray/flytekitplugins/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

HeadNodeConfig
RayJobConfig
ResourcesConfig
WorkerNodeConfig
"""

from .task import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
from .task import AutoscalerOptionsConfig, HeadNodeConfig, RayJobConfig, WorkerNodeConfig
75 changes: 75 additions & 0 deletions plugins/flytekit-ray/flytekitplugins/ray/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import typing

from flyteidl.core import literals_pb2 as _literals_pb2
from flyteidl.plugins import ray_pb2 as _ray_pb2

from flytekit.models import common as _common
from flytekit.models import task as _task_models
from flytekit.models.task import K8sPod


Expand Down Expand Up @@ -146,6 +148,71 @@ def from_flyte_idl(cls, proto):
)


class AutoscalerOptions(_common.FlyteIdlEntity):
class UpscalingMode(object):
UNSPECIFIED = _ray_pb2.AutoscalerOptions.UpscalingMode.UPSCALING_MODE_UNSPECIFIED
DEFAULT = _ray_pb2.AutoscalerOptions.UpscalingMode.UPSCALING_MODE_DEFAULT
AGGRESSIVE = _ray_pb2.AutoscalerOptions.UpscalingMode.UPSCALING_MODE_AGGRESSIVE
CONSERVATIVE = _ray_pb2.AutoscalerOptions.UpscalingMode.UPSCALING_MODE_CONSERVATIVE

def __init__(
self,
upscaling_mode: typing.Optional["AutoscalerOptions.UpscalingMode"] = None,
idle_timeout_seconds: typing.Optional[int] = None,
image: typing.Optional[str] = None,
env: typing.Optional[typing.Dict[str, str]] = None,
resources: typing.Optional[_task_models.Resources] = None,
):
self._upscaling_mode = upscaling_mode
self._idle_timeout_seconds = idle_timeout_seconds
self._image = image
self._env = env
self._resources = resources

@property
def upscaling_mode(self) -> typing.Optional["AutoscalerOptions.UpscalingMode"]:
return self._upscaling_mode

@property
def idle_timeout_seconds(self) -> typing.Optional[int]:
return self._idle_timeout_seconds

@property
def image(self) -> typing.Optional[str]:
return self._image

@property
def env(self) -> typing.Optional[typing.Dict[str, str]]:
return self._env

@property
def resources(self) -> typing.Optional[_task_models.Resources]:
return self._resources

def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions:
envs = []
if self.env:
for key, val in self.env.items():
envs.append(_literals_pb2.KeyValuePair(key=key, value=val))
return _ray_pb2.AutoscalerOptions(
upscaling_mode=self.upscaling_mode,
idle_timeout_seconds=self.idle_timeout_seconds,
image=self.image,
env=envs if envs else None,
resources=self.resources.to_flyte_idl() if self.resources else None,
)

@classmethod
def from_flyte_idl(cls, proto):
return cls(
upscaling_mode=proto.upscaling_mode,
idle_timeout_seconds=proto.idle_timeout_seconds,
image=proto.image,
env={e.key: e.value for e in proto.env} if proto.env else None,
resources=_task_models.Resources.from_flyte_idl(proto.resources) if proto.HasField("resources") else None,
)


class RayCluster(_common.FlyteIdlEntity):
"""
Define RayCluster spec that will be used by KubeRay to launch the cluster.
Expand All @@ -156,10 +223,12 @@ def __init__(
worker_group_spec: typing.List[WorkerGroupSpec],
head_group_spec: typing.Optional[HeadGroupSpec] = None,
enable_autoscaling: bool = False,
autoscaler_options: typing.Optional[AutoscalerOptions] = None,
):
self._head_group_spec = head_group_spec
self._worker_group_spec = worker_group_spec
self._enable_autoscaling = enable_autoscaling
self._autoscaler_options = autoscaler_options

@property
def head_group_spec(self) -> HeadGroupSpec:
Expand All @@ -185,6 +254,10 @@ def enable_autoscaling(self) -> bool:
"""
return self._enable_autoscaling

@property
def autoscaler_options(self) -> typing.Optional[AutoscalerOptions]:
return self._autoscaler_options

def to_flyte_idl(self) -> _ray_pb2.RayCluster:
"""
:rtype: flyteidl.plugins._ray_pb2.RayCluster
Expand All @@ -193,6 +266,7 @@ def to_flyte_idl(self) -> _ray_pb2.RayCluster:
head_group_spec=self.head_group_spec.to_flyte_idl() if self.head_group_spec else None,
worker_group_spec=[wg.to_flyte_idl() for wg in self.worker_group_spec],
enable_autoscaling=self.enable_autoscaling,
autoscaler_options=self.autoscaler_options.to_flyte_idl() if self.autoscaler_options else None,
)

@classmethod
Expand All @@ -205,6 +279,7 @@ def from_flyte_idl(cls, proto):
head_group_spec=HeadGroupSpec.from_flyte_idl(proto.head_group_spec) if proto.head_group_spec else None,
worker_group_spec=[WorkerGroupSpec.from_flyte_idl(wg) for wg in proto.worker_group_spec],
enable_autoscaling=proto.enable_autoscaling,
autoscaler_options=AutoscalerOptions.from_flyte_idl(proto.autoscaler_options),
)


Expand Down
29 changes: 28 additions & 1 deletion plugins/flytekit-ray/flytekitplugins/ray/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import yaml
from flytekitplugins.ray.models import (
AutoscalerOptions,
HeadGroupSpec,
RayCluster,
RayJob,
Expand All @@ -18,7 +19,7 @@
from flytekit.configuration import SerializationSettings
from flytekit.core.context_manager import ExecutionParameters, FlyteContextManager
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.resources import pod_spec_from_resources
from flytekit.core.resources import convert_resources_to_resource_model, pod_spec_from_resources
from flytekit.extend import TaskPlugins
from flytekit.models.task import K8sPod

Expand Down Expand Up @@ -57,11 +58,23 @@ def __post_init__(self):
raise ValueError("Cannot specify both pod_template and requests/limits")


@dataclass
class AutoscalerOptionsConfig:
UpscalingMode = AutoscalerOptions.UpscalingMode
upscaling_mode: Optional["AutoscalerOptions.UpscalingMode"] = None
idle_timeout_seconds: Optional[int] = None
env: Optional[Dict[str, str]] = None
image: Optional[str] = None
requests: Optional[Resources] = None
limits: Optional[Resources] = None


@dataclass
class RayJobConfig:
worker_node_config: typing.List[WorkerNodeConfig]
head_node_config: typing.Optional[HeadNodeConfig] = None
enable_autoscaling: bool = False
autoscaler_options: Optional[AutoscalerOptionsConfig] = None
runtime_env: typing.Optional[dict] = None
address: typing.Optional[str] = None
shutdown_after_job_finishes: bool = False
Expand Down Expand Up @@ -141,11 +154,25 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any]
WorkerGroupSpec(c.group_name, c.replicas, c.min_replicas, c.max_replicas, c.ray_start_params, k8s_pod)
)

autoscalerOptions = None
if cfg.autoscaler_options is not None:
autoscalerOptions = AutoscalerOptions(
upscaling_mode=cfg.autoscaler_options.upscaling_mode,
idle_timeout_seconds=cfg.autoscaler_options.idle_timeout_seconds,
image=cfg.autoscaler_options.image,
env=cfg.autoscaler_options.env,
resources=convert_resources_to_resource_model(
requests=cfg.autoscaler_options.requests,
limits=cfg.autoscaler_options.limits,
),
)

ray_job = RayJob(
ray_cluster=RayCluster(
head_group_spec=head_group_spec,
worker_group_spec=worker_group_spec,
enable_autoscaling=(cfg.enable_autoscaling if cfg.enable_autoscaling else False),
autoscaler_options=autoscalerOptions,
),
runtime_env=runtime_env,
runtime_env_yaml=runtime_env_yaml,
Expand Down
23 changes: 21 additions & 2 deletions plugins/flytekit-ray/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
import ray
import yaml

from flytekit.core.resources import pod_spec_from_resources
from flytekit.core.resources import convert_resources_to_resource_model, pod_spec_from_resources
from flytekitplugins.ray import HeadNodeConfig
from flytekitplugins.ray.models import (
AutoscalerOptions,
HeadGroupSpec,
RayCluster,
RayJob,
WorkerGroupSpec,
)
from flytekitplugins.ray.task import RayJobConfig, WorkerNodeConfig
from flytekitplugins.ray.task import AutoscalerOptionsConfig, RayJobConfig, WorkerNodeConfig
from google.protobuf.json_format import MessageToDict

from flytekit import PythonFunctionTask, task, PodTemplate, Resources
Expand All @@ -39,6 +40,14 @@
head_node_config=HeadNodeConfig(requests=Resources(cpu="1", mem="1Gi"), limits=Resources(cpu="2", mem="2Gi")),
runtime_env={"pip": ["numpy"]},
enable_autoscaling=True,
autoscaler_options=AutoscalerOptionsConfig(
upscaling_mode=AutoscalerOptionsConfig.UpscalingMode.CONSERVATIVE,
idle_timeout_seconds=120,
image="rayproject/ray:2.9.0",
env={"lKeyA": "lValA"},
requests=Resources(cpu="1", mem="1Gi"),
limits=Resources(cpu="2", mem="2Gi"),
),
shutdown_after_job_finishes=True,
ttl_seconds_after_finished=20,
)
Expand Down Expand Up @@ -86,6 +95,16 @@ def t1(a: int) -> str:
],
head_group_spec=HeadGroupSpec(k8s_pod=K8sPod.from_pod_template(head_pod_template)),
enable_autoscaling=True,
autoscaler_options=AutoscalerOptions(
upscaling_mode=AutoscalerOptions.UpscalingMode.CONSERVATIVE,
idle_timeout_seconds=120,
image="rayproject/ray:2.9.0",
env={"lKeyA": "lValA"},
resources=convert_resources_to_resource_model(
requests=Resources(cpu="1", mem="1Gi"),
limits=Resources(cpu="2", mem="2Gi"),
),
),
),
runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(),
runtime_env_yaml=yaml.dump({"pip": ["numpy"]}),
Expand Down