#!/usr/bin/python3
"""
Collection of file helper functions
"""
import logging
import operator
import os
import re
from typing import Any
log = logging.getLogger(__name__)
# flake8: noqa: C901
[docs]
def is_file_readable(path: str) -> bool:
"""Checks a file in a given path to make sure it:
- exists
- is a file
- is readable
- is not an empty file
Args:
path (str): Path to configuration file.
Returns:
bool: True/False.
"""
if (
os.path.exists(path)
and os.path.isfile(path)
and os.access(path, os.R_OK)
and os.path.getsize(path) > 0
):
return True
return False
[docs]
def is_file_writable(path: str) -> bool:
"""Checks a file in a given path to make sure it:
- exists
- is a file
- is writable
Args:
path (str): Path to configuration file.
Returns:
bool: True/False.
"""
if os.path.exists(path) and os.path.isfile(path) and os.access(path, os.W_OK):
return True
return False
[docs]
def is_directory_readable(path: str) -> bool:
"""Checks a directory in a given path to make sure it:
- exists
- is a directory
- is readable
Args:
path (str): Path to configuration file.
Returns:
bool: True/False.
"""
if os.path.exists(path) and os.path.isdir(path) and os.access(path, os.R_OK):
return True
return False
[docs]
def is_directory_writable(path: str) -> bool:
"""Checks a directory in a given path to make sure it:
- exists
- is a directory
- is writable
Args:
path (str): Path to configuration file.
Returns:
bool: True/False.
"""
if os.path.exists(path) and os.path.isdir(path) and os.access(path, os.W_OK):
return True
return False
def _special_sort(data: list, order_by: str, order: str) -> list[dict]:
"""Internal function to sort filesystem meta data by special keys.
Args:
data (dict): Data to sort.
order_by (str): Key to perform primary sort by.
order (str): Order of results. asc|desc
"""
available_order_bys = [
"type",
"path",
"size",
"symlink",
"mount_point",
"name",
"access_time",
"create_time",
"modify_time",
"group",
"owner",
"permissions",
"inode",
]
if order not in ["asc", "ascending", "desc", "descending"]:
raise ValueError(f"Invalid sort order {order}")
if order_by in ["symlink", "mount_point"] or order_by not in available_order_bys:
raise ValueError(f"Invalid sort key {order_by}")
fallback_sort = ["type", "path", "size"]
if order_by in fallback_sort:
del fallback_sort[fallback_sort.index(order_by)]
computed_sort = [order_by, *fallback_sort]
size_result = [result for result in data if "size" in result]
size_result = sorted(size_result, key=operator.itemgetter("size"))
if order_by != "size":
del computed_sort[computed_sort.index("size")]
for result in size_result:
data.remove(result)
data.append(result)
data = sorted(
data,
key=operator.itemgetter(
*computed_sort,
),
)
return data if order.lower() in ["asc", "ascending"] else list(reversed(data))
[docs]
def find_directory_content( # pylint: disable=R0912,R0914
path: str, depth: int = 0, **options: Any
) -> list[dict]:
"""Get directory content and allow os walk.
Includes sub levels of user defined depth with metadata and item names as list of dict return.
Args:
path (str): Path to configuration file.
depth (int, optional): Depth to walk. Defaults to 0/unlimited.
Options:
exclude_files (bool, optional): Exclude files from return. Defaults to False.
exclude_directories (bool, optional): Exclude directories from return. Defaults to False.
exclude_symlinks (bool, optional): Exclude symlinks from return. Defaults to False.
exclude_pattern (str, optional): Pattern to exclude. Defaults to "".
follow_symlinks (bool, optional): Follow symlinks. Defaults to False.
order_by (str, optional): Order by key. Defaults to "path". Not "symlink" or "mount_point"
order (str, optional): Order by direction. Defaults to "desc". "asc"|"desc"
Returns:
list[dict]: List of dict with metadata and item names
type (file, directory, symlink),
name,
path,
size, (file only)
access time,
create time,
modify time,
group,
owner,
permissions,
inode
mount_point (directory only)
symlink (symlink only)
"""
# Handle options
follow_symlinks = options.get("follow_symlinks", False)
exclude_directories = options.get("exclude_directories", False)
exclude_files = options.get("exclude_files", False)
exclude_symlinks = options.get("exclude_symlinks", False)
exclude_patterns: list[str] = options.get("exclude_patterns", [])
order_by = options.get("order_by", "path")
order = options.get("order", "desc")
results = []
# Abort if base path is not readable
if not is_directory_readable(path):
raise OSError(f"Path {path} is not readable")
path = os.path.normpath(path)
base_depth = path.count(os.path.sep)
# Walk the directory tree and process items
for root, dirs, files in os.walk(
path,
topdown=True,
followlinks=follow_symlinks,
):
cur_depth = root.count(os.path.sep)
if depth and base_depth + depth <= cur_depth:
del dirs[:]
del files[:]
# continue
sym_dirs = [d for d in dirs if os.path.islink(os.path.join(root, d))]
sym_files = [f for f in files if os.path.islink(os.path.join(root, f))]
if exclude_symlinks:
if sym_dirs:
log.debug("Excluding %s symlinked directories", len(sym_dirs))
dirs[:] = [d for d in dirs if d not in sym_dirs]
if sym_files:
log.debug("Excluding %s symlinked files", len(sym_files))
files[:] = [f for f in files if f not in sym_files]
if exclude_patterns:
for exclude_pattern in exclude_patterns:
dir_count = len(dirs)
file_count = len(files)
dirs[:] = [d for d in dirs if not re.findall(exclude_pattern, d)]
files[:] = [f for f in files if not re.findall(exclude_pattern, f)]
log.debug(
"Excluding %s files and %s directories due to pattern matching: %s",
file_count - len(files),
dir_count - len(dirs),
exclude_pattern,
)
if not exclude_directories and dirs:
for directory in dirs:
results.append(get_metadata(os.path.join(root, directory)))
elif exclude_directories and not exclude_symlinks and sym_dirs:
for directory in sym_dirs:
results.append(get_metadata(os.path.join(root, directory)))
if not exclude_files and files:
for file in files:
results.append(get_metadata(os.path.join(root, file)))
elif exclude_files and not exclude_symlinks and sym_files:
for file in sym_files:
results.append(get_metadata(os.path.join(root, file)))
return _special_sort(data=results, order_by=order_by, order=order)