Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions onetl/_util/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
import io
import os
from datetime import datetime
from pathlib import Path, PurePath
from pathlib import Path
from typing import TypeVar

from onetl.base.path_protocol import PathProtocol
from onetl.base.pure_path_protocol import PurePathProtocol
from onetl.exception import NotAFileError
from onetl.impl import path_repr

Expand Down Expand Up @@ -50,7 +53,10 @@ def is_file_readable(path: str | os.PathLike) -> Path:
return path


def generate_temp_path(root: PurePath) -> PurePath:
T = TypeVar("T", PurePathProtocol, PathProtocol)


def generate_temp_path(root: T) -> T:
"""
Returns prefix which will be used for creating temp directory

Expand Down
62 changes: 34 additions & 28 deletions onetl/base/path_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,51 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from typing_extensions import Protocol, runtime_checkable
from pathlib import Path
from typing import TYPE_CHECKING

from typing_extensions import Protocol, TypeAlias, runtime_checkable

from onetl.base.path_stat_protocol import PathStatProtocol
from onetl.base.pure_path_protocol import PurePathProtocol

if TYPE_CHECKING:
PathProtocol: TypeAlias = Path
PathWithStatsProtocol: TypeAlias = PathProtocol
else:

@runtime_checkable
class PathProtocol(PurePathProtocol, Protocol):
"""
Generic protocol for `pathlib.Path` like objects.

Includes only minimal set of methods which allow to determine path type (file, directory) and existence
"""

def is_dir(self) -> bool:
"""
Checks if this path is a directory
"""

def is_file(self) -> bool:
"""
Checks if this path is a file
@runtime_checkable
class PathProtocol(PurePathProtocol, Protocol):
"""
Generic protocol for `pathlib.Path` like objects.

def exists(self) -> bool:
"""
Checks if this path exists
Includes only minimal set of methods which allow to determine path type (file, directory) and existence
"""

def is_dir(self) -> bool:
"""
Checks if this path is a directory
"""

@runtime_checkable
class PathWithStatsProtocol(PathProtocol, Protocol):
"""
Protocol for `pathlib.Path`-like file objects.
def is_file(self) -> bool:
"""
Checks if this path is a file
"""

Includes only minimal set of methods which allow to determine if file exists, or get stats, e.g. size
"""
def exists(self) -> bool:
"""
Checks if this path exists
"""

def stat(self) -> PathStatProtocol:
@runtime_checkable
class PathWithStatsProtocol(PathProtocol, Protocol):
"""
Returns stats object with file information
Protocol for `pathlib.Path`-like file objects.

Includes only minimal set of methods which allow to determine if file exists, or get stats, e.g. size
"""

def stat(self) -> PathStatProtocol:
"""
Returns stats object with file information
"""
179 changes: 91 additions & 88 deletions onetl/base/pure_path_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,91 +2,94 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from typing import Sequence, TypeVar

from typing_extensions import Protocol, runtime_checkable

T = TypeVar("T", bound="PurePathProtocol", covariant=True) # noqa: PLC0105


@runtime_checkable
class PurePathProtocol(Protocol[T]):
"""
Generic protocol for `pathlib.PurePath` like objects.

Includes only minimal set of methods which allow to get path items, like parent, name, etc
"""

def __fspath__(self) -> str:
"""
Get string representation of path
"""

def __eq__(self, other) -> bool:
"""
Check if two paths are equal
"""

def __hash__(self) -> int:
"""
Get hash value for path
"""

def __truediv__(self, key) -> T:
"""
Add items to path
"""

def __rtruediv__(self, key) -> T:
"""
Add items to path
"""

@property
def name(self) -> str:
"""
Get path name
"""

@property
def parent(self) -> T:
"""
Get parent path
"""

@property
def parents(self) -> Sequence[T]:
"""
Get parent paths
"""

@property
def parts(self) -> Sequence[str]:
"""
Get path parts
"""

def is_absolute(self) -> bool:
"""
Checks if this path is absolute
"""

def match(self, path_pattern) -> bool:
"""
Checks if path matches a glob pattern
"""

def relative_to(self, *other) -> T:
"""
Return the relative path to another path
"""

def as_posix(self) -> str:
"""
Get POSIX representation of path
"""

def joinpath(self, *args) -> T:
"""
Add items to path
"""
from pathlib import PurePath
from typing import TYPE_CHECKING, Sequence, TypeVar

from typing_extensions import Protocol, TypeAlias, runtime_checkable

if TYPE_CHECKING:
PurePathProtocol: TypeAlias = PurePath
else:
T = TypeVar("T", bound="PurePathProtocol", covariant=True) # noqa: PLC0105

@runtime_checkable
class PurePathProtocol(Protocol[T]):
"""
Generic protocol for `pathlib.PurePath` like objects.

Includes only minimal set of methods which allow to get path items, like parent, name, etc
"""

def __fspath__(self) -> str:
"""
Get string representation of path
"""

def __eq__(self, other) -> bool:
"""
Check if two paths are equal
"""

def __hash__(self) -> int:
"""
Get hash value for path
"""

def __truediv__(self, key) -> T:
"""
Add items to path
"""

def __rtruediv__(self, key) -> T:
"""
Add items to path
"""

@property
def name(self) -> str:
"""
Get path name
"""

@property
def parent(self) -> T:
"""
Get parent path
"""

@property
def parents(self) -> Sequence[T]:
"""
Get parent paths
"""

@property
def parts(self) -> Sequence[str]:
"""
Get path parts
"""

def is_absolute(self) -> bool:
"""
Checks if this path is absolute
"""

def match(self, path_pattern) -> bool:
"""
Checks if path matches a glob pattern
"""

def relative_to(self, *other) -> T:
"""
Return the relative path to another path
"""

def as_posix(self) -> str:
"""
Get POSIX representation of path
"""

def joinpath(self, *args) -> T:
"""
Add items to path
"""
19 changes: 10 additions & 9 deletions onetl/connection/file_connection/file_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def resolve_file(self, path: os.PathLike | str) -> RemoteFile:
return RemoteFile(path=remote_path, stats=stat)

@slot
def read_text(self, path: os.PathLike | str, encoding: str = "utf-8", **kwargs) -> str:
def read_text(self, path: os.PathLike | str, encoding: str = "utf-8", **kwargs) -> str: # type: ignore[override]
log.debug(
"|%s| Reading string with encoding %r and options %r from '%s'",
self.__class__.__name__,
Expand All @@ -211,14 +211,14 @@ def read_text(self, path: os.PathLike | str, encoding: str = "utf-8", **kwargs)
return self._read_text(remote_path, encoding=encoding, **kwargs)

@slot
def read_bytes(self, path: os.PathLike | str, **kwargs) -> bytes:
def read_bytes(self, path: os.PathLike | str, **kwargs) -> bytes: # type: ignore[override]
log.debug("|%s| Reading bytes with options %r from '%s'", self.__class__.__name__, kwargs, path)

remote_path = self.resolve_file(path)
return self._read_bytes(remote_path, **kwargs)

@slot
def write_text(self, path: os.PathLike | str, content: str, encoding: str = "utf-8", **kwargs) -> RemoteFile:
def write_text(self, path: os.PathLike | str, content: str, encoding: str = "utf-8", **kwargs) -> RemoteFile: # type: ignore[override]
if not isinstance(content, str):
msg = f"content must be str, not '{content.__class__.__name__}'"
raise TypeError(msg)
Expand Down Expand Up @@ -248,7 +248,7 @@ def write_text(self, path: os.PathLike | str, content: str, encoding: str = "utf
return self.resolve_file(remote_path)

@slot
def write_bytes(self, path: os.PathLike | str, content: bytes, **kwargs) -> RemoteFile:
def write_bytes(self, path: os.PathLike | str, content: bytes, **kwargs) -> RemoteFile: # type: ignore[override]
if not isinstance(content, bytes):
msg = f"content must be bytes, not '{content.__class__.__name__}'"
raise TypeError(msg)
Expand Down Expand Up @@ -420,7 +420,7 @@ def rename_file(
return self.resolve_file(target_file)

@slot
def list_dir(
def list_dir( # type: ignore[override]
self,
path: os.PathLike | str,
filters: Iterable[BaseFileFilter] | None = None,
Expand Down Expand Up @@ -453,7 +453,7 @@ def list_dir(
return result

@slot
def walk(
def walk( # type: ignore[override]
self,
root: os.PathLike | str,
*,
Expand Down Expand Up @@ -515,7 +515,8 @@ def _walk( # noqa: C901
return

log.debug("|%s| Walking through directory '%s'", self.__class__.__name__, root)
dirs, files = [], []
dirs: list[RemoteDirectory] = []
files: list[RemoteFile] = []

for entry in self._scan_entries(root):
if limits_reached(limits):
Expand All @@ -539,8 +540,8 @@ def _walk( # noqa: C901
files.append(file)

if topdown and not limits_reached(limits):
for name in dirs:
yield from self._walk(root=root / name, topdown=topdown, filters=filters, limits=limits)
for directory in dirs:
yield from self._walk(root=directory, topdown=topdown, filters=filters, limits=limits)

log.debug(
"|%s| Directory '%s' contains %d nested directories and %d files",
Expand Down
8 changes: 4 additions & 4 deletions onetl/connection/file_connection/ftps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pydantic import Field # type: ignore[no-redef, assignment]

try:
from onetl.connection.file_connection.ftp import FTP
from onetl.connection.file_connection.ftp import FTP, FTPExtra
except (ImportError, NameError) as e:
raise ImportError(
textwrap.dedent(
Expand Down Expand Up @@ -46,12 +46,12 @@ def ntransfercmd(self, cmd, rest=None):
return conn, size


class FTPSExtra(FTP.Extra):
__doc__ = FTP.Extra.__doc__.replace("FTP", "FTPS")
class FTPSExtra(FTPExtra):
__doc__ = FTPExtra.__doc__.replace("FTP", "FTPS") # type: ignore[union-attr]


class FTPS(FTP):
__doc__ = FTP.__doc__.replace("FTP", "FTPS")
__doc__ = FTP.__doc__.replace("FTP", "FTPS") # type: ignore[union-attr]

extra: FTPSExtra = Field(default_factory=FTPSExtra)

Expand Down
Loading
Loading