Skip to content

GH-127807: pathlib ABCs: move private copying methods to dedicated class #127810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 127 additions & 101 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,132 @@ def concat_path(path, text):
return path.with_segments(str(path) + text)


class CopyWorker:
"""
Class that implements copying between path objects. An instance of this
class is available from the PathBase.copy property; it's made callable so
that PathBase.copy() can be treated as a method.

The target path's CopyWorker drives the process from its _create() method.
Files and directories are exchanged by calling methods on the source and
target paths, and metadata is exchanged by calling
source.copy._read_metadata() and target.copy._write_metadata().
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

def __call__(self, target, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy this file or directory tree to the given destination.
"""
if not isinstance(target, PathBase):
target = self._path.with_segments(target)

# Delegate to the target path's CopyWorker object.
return target.copy._create(self._path, follow_symlinks, dirs_exist_ok, preserve_metadata)

_readable_metakeys = frozenset()

def _read_metadata(self, metakeys, *, follow_symlinks=True):
"""
Returns path metadata as a dict with string keys.
"""
raise NotImplementedError

_writable_metakeys = frozenset()

def _write_metadata(self, metadata, *, follow_symlinks=True):
"""
Sets path metadata from the given dict with string keys.
"""
raise NotImplementedError

def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
self._ensure_distinct_path(source)
if preserve_metadata:
metakeys = self._writable_metakeys & source.copy._readable_metakeys
else:
metakeys = None
if not follow_symlinks and source.is_symlink():
self._create_symlink(source, metakeys)
elif source.is_dir():
self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
else:
self._create_file(source, metakeys)
return self._path

def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
"""Copy the given directory to our path."""
children = list(source.iterdir())
self._path.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
dst.copy._create_symlink(src, metakeys)
elif src.is_dir():
dst.copy._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
else:
dst.copy._create_file(src, metakeys)
if metakeys:
metadata = source.copy._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_file(self, source, metakeys):
"""Copy the given file to our path."""
self._ensure_different_file(source)
with source.open('rb') as source_f:
try:
with self._path.open('wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not self._path.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {self._path}') from e
raise
if metakeys:
metadata = source.copy._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_symlink(self, source, metakeys):
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
if metakeys:
metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)

def _ensure_different_file(self, source):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
pass

def _ensure_distinct_path(self, source):
"""
Raise OSError(EINVAL) if the other path is within this path.
"""
# Note: there is no straightforward, foolproof algorithm to determine
# if one directory is within another (a particularly perverse example
# would be a single network share mounted in one location via NFS, and
# in another location via CIFS), so we simply checks whether the
# other path is lexically equal to, or within, this path.
if source == self._path:
err = OSError(EINVAL, "Source and target are the same path")
elif source in self._path.parents:
err = OSError(EINVAL, "Source path is a parent of target path")
else:
return
err.filename = str(source)
err.filename2 = str(self._path)
raise err


class PurePathBase:
"""Base class for pure path objects.

Expand Down Expand Up @@ -374,31 +500,6 @@ def is_symlink(self):
except (OSError, ValueError):
return False

def _ensure_different_file(self, other_path):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
pass

def _ensure_distinct_path(self, other_path):
"""
Raise OSError(EINVAL) if the other path is within this path.
"""
# Note: there is no straightforward, foolproof algorithm to determine
# if one directory is within another (a particularly perverse example
# would be a single network share mounted in one location via NFS, and
# in another location via CIFS), so we simply checks whether the
# other path is lexically equal to, or within, this path.
if self == other_path:
err = OSError(EINVAL, "Source and target are the same path")
elif self in other_path.parents:
err = OSError(EINVAL, "Source path is a parent of target path")
else:
return
err.filename = str(self)
err.filename2 = str(other_path)
raise err

def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
"""
Expand Down Expand Up @@ -537,88 +638,13 @@ def symlink_to(self, target, target_is_directory=False):
"""
raise NotImplementedError

def _symlink_to_target_of(self, link):
"""
Make this path a symlink with the same target as the given link. This
is used by copy().
"""
self.symlink_to(link.readlink())

def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
Create a new directory at this given path.
"""
raise NotImplementedError

# Metadata keys supported by this path type.
_readable_metadata = _writable_metadata = frozenset()

def _read_metadata(self, keys=None, *, follow_symlinks=True):
"""
Returns path metadata as a dict with string keys.
"""
raise NotImplementedError

def _write_metadata(self, metadata, *, follow_symlinks=True):
"""
Sets path metadata from the given dict with string keys.
"""
raise NotImplementedError

def _copy_metadata(self, target, *, follow_symlinks=True):
"""
Copies metadata (permissions, timestamps, etc) from this path to target.
"""
# Metadata types supported by both source and target.
keys = self._readable_metadata & target._writable_metadata
if keys:
metadata = self._read_metadata(keys, follow_symlinks=follow_symlinks)
target._write_metadata(metadata, follow_symlinks=follow_symlinks)

def _copy_file(self, target):
"""
Copy the contents of this file to the given target.
"""
self._ensure_different_file(target)
with self.open('rb') as source_f:
try:
with target.open('wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not target.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {target}') from e
else:
raise

def copy(self, target, *, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy this file or directory tree to the given destination.
"""
if not isinstance(target, PathBase):
target = self.with_segments(target)
self._ensure_distinct_path(target)
stack = [(self, target)]
while stack:
src, dst = stack.pop()
if not follow_symlinks and src.is_symlink():
dst._symlink_to_target_of(src)
if preserve_metadata:
src._copy_metadata(dst, follow_symlinks=False)
elif src.is_dir():
children = src.iterdir()
dst.mkdir(exist_ok=dirs_exist_ok)
stack.extend((child, dst.joinpath(child.name))
for child in children)
if preserve_metadata:
src._copy_metadata(dst)
else:
src._copy_file(dst)
if preserve_metadata:
src._copy_metadata(dst)
return target
copy = property(CopyWorker, doc=CopyWorker.__call__.__doc__)

def copy_into(self, target_dir, *, follow_symlinks=True,
dirs_exist_ok=False, preserve_metadata=False):
Expand Down
Loading
Loading