glasswall.utils

  1import ctypes as ct
  2import functools
  3import io
  4import math
  5import os
  6import pathlib
  7import stat
  8import tempfile
  9import warnings
 10from typing import Any, Callable, Dict, Iterable, List, Optional, Union
 11
 12from lxml import etree
 13
 14import glasswall
 15from glasswall.config.logging import log
 16
 17
 18def as_bytes(file_: Union[bytes, bytearray, io.BytesIO]):
 19    """ Returns file_ as bytes.
 20
 21    Args:
 22        file_ (Union[bytes, bytearray, io.BytesIO]): The file
 23
 24    Returns:
 25        bytes
 26
 27    Raises:
 28        TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO
 29    """
 30    if isinstance(file_, bytes):
 31        return file_
 32    elif isinstance(file_, bytearray):
 33        return bytes(file_)
 34    elif isinstance(file_, io.BytesIO):
 35        file_.seek(0)
 36        return file_.read()
 37    else:
 38        raise TypeError(file_)
 39
 40
 41def as_io_BytesIO(file_: Union[bytes, bytearray]):
 42    """ Returns file_ as io.BytesIO object.
 43
 44    Args:
 45        file_ (Union[bytes, bytearray]): The bytes or bytearray of the file
 46
 47    Returns:
 48        io.BytesIO object
 49
 50    Raises:
 51        TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO
 52    """
 53    if isinstance(file_, bytes):
 54        return io.BytesIO(file_)
 55    elif isinstance(file_, bytearray):
 56        return io.BytesIO(bytes(file_))
 57    elif isinstance(file_, io.BytesIO):
 58        return file_
 59    else:
 60        raise TypeError(file_)
 61
 62
 63def as_snake_case(string):
 64    return ''.join(
 65        [
 66            '_' + char.lower()
 67            if char.isupper() else char
 68            for char in string
 69        ]
 70    ).lstrip('_')
 71
 72
 73def as_title(string):
 74    return ''.join(
 75        word.title()
 76        for word in string.replace(".", "_").split("_")
 77    )
 78
 79
 80def buffer_to_bytes(buffer: ct.c_void_p, buffer_length: ct.c_size_t):
 81    """ Convert ctypes buffer and buffer_length to bytes.
 82
 83    Args:
 84        buffer (ct.c_void_p()): The file buffer.
 85        buffer_length (ct.c_size_t()): The file buffer length.
 86
 87    Returns:
 88        bytes (bytes): The file as bytes.
 89    """
 90
 91    file_buffer = (ct.c_byte * buffer_length.value)()
 92    ct.memmove(file_buffer, buffer.value, buffer_length.value)
 93
 94    return bytes(file_buffer)
 95
 96
 97class CwdHandler:
 98    """ Changes the current working directory to new_cwd on __enter__, and back to previous cwd on __exit__.
 99
100    Args:
101        new_cwd (str): The new current working directory to temporarily change to.
102    """
103
104    def __init__(self, new_cwd: str):
105        self.new_cwd = new_cwd if os.path.isdir(new_cwd) else os.path.dirname(new_cwd)
106        self.old_cwd = os.getcwd()
107
108    def __enter__(self):
109        os.chdir(self.new_cwd)
110
111    def __exit__(self, type, value, traceback):
112        os.chdir(self.old_cwd)
113
114
115def delete_directory(directory: str, keep_folder: bool = False):
116    """ Delete a directory and its contents.
117
118    Args:
119        directory (str): The directory path.
120        keep_folder (bool, optional): Default False. If False, only delete contents.
121    """
122    if os.path.isdir(directory):
123        # Delete all files in directory
124        for file_ in list_file_paths(directory, followlinks=False):
125            os.remove(file_)
126
127        # Delete all empty subdirectories
128        delete_empty_subdirectories(directory)
129
130        # Delete the directory
131        if keep_folder is False:
132            os.rmdir(directory)
133
134
135def delete_empty_subdirectories(directory: str):
136    """ Deletes all empty subdirectories of a given directory.
137
138    Args:
139        directory (str): The directory to delete subdirectories from.
140
141    Returns:
142        None
143    """
144
145    for root, dirs, _ in os.walk(directory, topdown=False):
146        for dir_ in dirs:
147            absolute_path = os.path.join(root, dir_)
148            try:
149                os.rmdir(absolute_path)
150            except PermissionError:
151                # directory might be read-only
152                try:
153                    os.chmod(absolute_path, stat.S_IWRITE)
154                except Exception:
155                    log.warning(f"PermissionError while attempting to delete {absolute_path}. Attempted chmod but failed.")
156                try:
157                    os.rmdir(absolute_path)
158                except OSError:
159                    # cannot be deleted
160                    pass
161            except OSError:
162                # not empty, don't delete
163                pass
164
165
166def flatten_list(list_: Iterable):
167    """ Returns a flattened list. [[1, 2], ["3"], (4, 5,), [6]] --> [1, 2, "3", 4, 5, 6] """
168    return [
169        item
170        for sublist in list_
171        for item in sublist
172    ]
173
174
175def get_file_type(file_path: str):
176    """ Returns the filetype of a file. "data/files/splat.zip" -> "zip" """
177    return os.path.splitext(file_path)[-1].replace(".", "")
178
179
180def get_libraries(directory: str, library_names: Optional[List[str]] = None, ignore_errors: bool = False):
181    """ Recursively calls get_library on each library from glasswall.libraries.os_info on the given directory.
182
183    Args:
184        directory (str): The directory to search from.
185        library_names (List[str], optional): List of libraries to return, if None iterates all libraries found in glasswall.libraries.os_info
186        ignore_errors (bool, optional): Default False, prevents get_library raising FileNotFoundError when True.
187
188    Returns:
189        libraries (dict[str, str]): A dictionary of library names and their absolute file paths.
190    """
191    libraries = {}
192
193    if not library_names:
194        library_names = glasswall.libraries.os_info[glasswall._OPERATING_SYSTEM].keys()
195
196    for library_name in library_names:
197        try:
198            libraries[library_name] = get_library(library_name, directory)
199        except FileNotFoundError:
200            if ignore_errors is True:
201                continue
202            raise
203
204    return libraries
205
206
207def get_library(library: str, directory: str):
208    """ Returns a path to the specified library found from the current directory or any subdirectory. If multiple libraries exist, returns the file with the latest modified time.
209
210    Args:
211        library (str): The library to search for, ie: "rebuild", "word_search"
212        directory (str): The directory to search from.
213
214    Returns:
215        library_file_path (str): The absolute file path to the library.
216
217    Raises:
218        KeyError: Unsupported OS or library name was not found in glasswall.libraries.os_info.
219        FileNotFoundError: Library was not found.
220    """
221    if not os.path.isdir(directory):
222        raise NotADirectoryError(directory)
223
224    library = as_snake_case(library)
225    library_file_names = glasswall.libraries.os_info[glasswall._OPERATING_SYSTEM][library]["file_name"]
226
227    if isinstance(library_file_names, str):
228        library_file_names = [library_file_names]
229
230    matches = []
231    for alias in library_file_names:
232        p = pathlib.Path(directory)
233        alias_matches = list(p.rglob(alias))
234        matches.extend(alias_matches)
235
236    if matches:
237        latest_library = str(max(matches, key=os.path.getctime).resolve())
238        if len(matches) > 1:
239            # warn that multiple libraries found, list library paths if there are <= 5
240            if len(matches) <= 5:
241                log.warning(f"Found {len(matches)} {library} libraries, but expected only one:\n{chr(10).join(str(item) for item in matches)}\nLatest library: {latest_library}")
242            else:
243                log.warning(f"Found {len(matches)} {library} libraries, but expected only one.\nLatest library: {latest_library}")
244
245        # Return library with latest change time
246        return latest_library
247
248    # exhausted, not found
249    raise FileNotFoundError(f'Could not find any files: "{library_file_names}" under directory: "{directory}"')
250
251
252def iterate_directory_entries(directory: str, file_type: str = 'all', absolute: bool = True, recursive: bool = True, followlinks: bool = True, start_directory: str = None):
253    """ Generate entries (files, directories, or both) in a given directory using os.scandir().
254
255    Args:
256        directory (str): The path to the directory whose entries are to be listed.
257        file_type (str, optional): Type of entries to return.
258            - 'all': Return both files and directories (default).
259            - 'files': Return only files.
260            - 'directories': Return only directories.
261        absolute (bool, optional): Whether to return absolute paths (default) or relative paths.
262        recursive (bool, optional): Whether to recurse into subdirectories (default is True).
263        followlinks (bool, optional): Whether to follow symbolic links and yield entries from the target directory (default is True).
264        start_directory (str, optional): The starting directory used to calculate relative paths (default is None).
265
266    Yields:
267        str: The full path of each file or directory found in the specified directory.
268
269    Raises:
270        ValueError: If an invalid 'file_type' value is provided.
271        NotADirectoryError: If the directory does not exist.
272
273    Example:
274        directory = '/path/to/your/directory'
275
276        # Iterate through all entries (files and directories) in the directory
277        for entry in iterate_directory_entries(directory):
278            print(entry)
279
280        # Iterate through only file entries in the directory
281        for file in iterate_directory_entries(directory, file_type='files'):
282            print("File:", file)
283
284        # Iterate through only directory entries in the directory
285        for directory in iterate_directory_entries(directory, file_type='directories'):
286            print("Directory:", directory)
287    """
288    if not os.path.isdir(directory):
289        raise NotADirectoryError(directory)
290
291    allowed_types = ['all', 'files', 'directories']
292
293    # Check if the provided file_type is valid
294    if file_type not in allowed_types:
295        raise ValueError(f"Invalid file_type '{file_type}'. Allowed values are {', '.join(allowed_types)}.")
296
297    # Convert the directory to an absolute path
298    directory = os.path.abspath(directory)
299
300    # Set the start_directory to the provided directory if not specified
301    start_directory = start_directory or directory
302
303    # Get the directory entries using os.scandir()
304    for entry in os.scandir(directory):
305        # If the entry is a directory
306        if entry.is_dir(follow_symlinks=followlinks):
307            # If recursive is True, traverse the subdirectory
308            if recursive:
309                yield from iterate_directory_entries(entry.path, file_type, absolute, recursive, followlinks, start_directory)
310
311            # If the file_type is not "files", yield the directory entry
312            if file_type != "files":
313                if absolute:
314                    yield entry.path
315                else:
316                    yield os.path.relpath(entry.path, start=start_directory)
317
318        # If the entry is a file
319        elif entry.is_file(follow_symlinks=followlinks):
320            # If the file_type is not "directories", yield the file entry
321            if file_type != "directories":
322                if absolute:
323                    yield entry.path
324                else:
325                    yield os.path.relpath(entry.path, start=start_directory)
326
327
328def list_file_paths(directory: str, file_type: str = 'files', absolute: bool = True, recursive: bool = True, followlinks: bool = True) -> list:
329    """ List all file paths in a given directory and its subdirectories.
330
331    Args:
332        directory (str): The path to the directory whose file paths are to be listed.
333        file_type (str, optional): Type of entries to return.
334            - 'all': Return both files and directories.
335            - 'files': Return only files (default).
336            - 'directories': Return only directories.
337        absolute (bool, optional): Whether to return absolute paths (default is True).
338        recursive (bool, optional): Whether to recurse into subdirectories (default is True).
339        followlinks (bool, optional): Whether to follow symbolic links and list file paths from the target directory (default is True).
340
341    Returns:
342        list: A list of file paths found in the specified directory and its subdirectories.
343
344    Example:
345        directory = '/path/to/your/directory'
346        file_paths = list_file_paths(directory)
347        print(file_paths)
348    """
349    # Remove duplicate file paths (symlinks of same files or other symlinks), and sort
350    return sorted(set(iterate_directory_entries(directory, file_type, absolute, recursive, followlinks)))
351
352
353def list_subdirectory_paths(directory: str, recursive: bool = False, absolute: bool = True):
354    """ Returns a list of paths to subdirectories in a directory.
355
356    Args:
357        directory (str): The directory to list subdirectories from.
358        recursive (bool, optional): Default False. Include subdirectories of subdirectories.
359        absolute (bool, optional): Default True. Return paths as absolute paths. If False, returns relative paths.
360
361    Returns:
362        subdirectories (list): A list of subdirectory paths.
363    """
364    subdirectories = [f.path for f in os.scandir(directory) if f.is_dir()]
365
366    if recursive:
367        for subdirectory in subdirectories:
368            subdirectories.extend(list_subdirectory_paths(subdirectory, recursive=True))
369
370    if absolute:
371        subdirectories = [os.path.abspath(path) for path in subdirectories]
372    else:
373        subdirectories = [os.path.relpath(path, directory) for path in subdirectories]
374
375    return subdirectories
376
377
378def load_dependencies(dependencies: list, ignore_errors: bool = False):
379    """ Calls ctypes.cdll.LoadLibrary on each file path in `dependencies`.
380
381    Args:
382        dependencies (list): A list of absolute file paths of library dependencies.
383        ignore_errors (bool, optional): Default False, avoid raising exceptions from ct.cdll.LoadLibrary if ignore_errors is True.
384
385    Returns:
386        missing_dependencies (list): A list of missing dependencies, or an empty list.
387    """
388    missing_dependencies = [dependency for dependency in dependencies if not os.path.isfile(dependency)]
389
390    for dependency in dependencies:
391        # Try to load dependencies that exist
392        if dependency not in missing_dependencies:
393            try:
394                ct.cdll.LoadLibrary(dependency)
395            except Exception:
396                if ignore_errors:
397                    pass
398                else:
399                    raise
400
401    return missing_dependencies
402
403
404def round_up(number: float, decimals=0) -> float:
405    """ Rounds a number up to a specified number of decimal places.
406
407    Args:
408        number (float): The number to be rounded.
409        decimals (int, optional): The number of decimal places to round to. Defaults to 0.
410
411    Returns:
412        float: The rounded number.
413
414    Examples:
415        >>> round_up(105, 0)
416        105.0
417        >>> round_up(0.015, 2)
418        0.02
419        >>> round_up(0.025, 2)
420        0.03
421        >>> round_up(0.00001, 2)
422        0.01
423    """
424    multiplier = 10 ** decimals
425    return math.ceil(number * multiplier) / multiplier
426
427
428class TempDirectoryPath:
429    """ Gives a path to a uniquely named temporary directory that does not currently exist on __enter__, deletes the directory if it exists on __exit__.
430
431    Args:
432        delete (bool, optional): Default True. Delete the temporary directory on __exit__
433    """
434
435    def __init__(self, delete: bool = True):
436        # Validate args
437        if not isinstance(delete, bool):
438            raise TypeError(delete)
439
440        self.temp_directory = None
441        self.delete = delete
442
443        while self.temp_directory is None or os.path.isdir(self.temp_directory):
444            self.temp_directory = os.path.join(glasswall._TEMPDIR, next(tempfile._get_candidate_names()))
445
446        # Normalize
447        self.temp_directory = str(pathlib.Path(self.temp_directory).resolve())
448
449        # Create temp directory
450        os.makedirs(self.temp_directory, exist_ok=True)
451
452    def __enter__(self):
453        return self.temp_directory
454
455    def __exit__(self, type, value, traceback):
456        if self.delete:
457            # Delete temp directory and all of its contents
458            if os.path.isdir(self.temp_directory):
459                delete_directory(self.temp_directory)
460
461
462class TempFilePath:
463    """ Gives a path to a uniquely named temporary file that does not currently exist on __enter__, deletes the file if it exists on __exit__.
464
465    Args:
466        directory (Union[str, None], optional): The directory to create a temporary file in.
467        delete (bool, optional): Default True. Delete the temporary file on on __exit__
468    """
469
470    def __init__(self, directory: Union[str, None] = None, delete: bool = True):
471        # Validate args
472        if not isinstance(directory, (str, type(None))):
473            raise TypeError(directory)
474        if isinstance(directory, str) and not os.path.isdir(directory):
475            raise NotADirectoryError(directory)
476        if not isinstance(delete, bool):
477            raise TypeError(delete)
478
479        self.temp_file: str = None
480        self.directory = directory or tempfile.gettempdir()
481        self.delete = delete
482
483        while self.temp_file is None or os.path.isfile(self.temp_file):
484            self.temp_file = os.path.join(self.directory, next(tempfile._get_candidate_names()))
485
486        # Normalize
487        self.temp_file = str(pathlib.Path(self.temp_file).resolve())
488
489        # Create temp directory if it does not exist
490        os.makedirs(os.path.dirname(self.temp_file), exist_ok=True)
491
492    def __enter__(self):
493        return self.temp_file
494
495    def __exit__(self, type, value, traceback):
496        if self.delete:
497            if os.path.isfile(self.temp_file):
498                os.remove(self.temp_file)
499
500
501# NOTE typehint as string due to no "from __future__ import annotations" support on python 3.6 on ubuntu-16.04 / centos7
502def validate_xml(xml: Union[str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"]):
503    """ Attempts to parse the xml provided, returning the xml as string. Raises ValueError if the xml cannot be parsed.
504
505    Args:
506        xml (Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy]): The xml string, or file path, bytes, or ContentManagementPolicy instance to parse.
507
508    Returns:
509        xml_string (str): A string representation of the xml.
510
511    Raises:
512        ValueError: if the xml cannot be parsed.
513        TypeError: if the type of arg "xml" is invalid
514    """
515    try:
516        # Get tree from file/str
517        if isinstance(xml, str):
518            try:
519                is_file = os.path.isfile(os.path.abspath(xml))
520            except Exception:
521                is_file = False
522
523            if is_file:
524                tree = etree.parse(xml)
525            else:
526                xml = xml.encode("utf-8")
527                tree = etree.fromstring(xml)
528
529        # Get tree from bytes, bytearray, io.BytesIO
530        elif isinstance(xml, (bytes, bytearray, io.BytesIO)):
531            # Convert bytes, bytearray to io.BytesIO
532            if isinstance(xml, (bytes, bytearray)):
533                xml = as_io_BytesIO(xml)
534            tree = etree.parse(xml)
535
536        # Get tree from ContentManagementPolicy instance
537        elif isinstance(xml, glasswall.content_management.policies.policy.Policy):
538            xml = xml.text.encode("utf-8")
539            tree = etree.fromstring(xml)
540
541        else:
542            raise TypeError(xml)
543
544    except etree.XMLSyntaxError:
545        raise ValueError(xml)
546
547    # # convert tree to string and include xml declaration header utf8
548    etree.indent(tree, space=" " * 4)
549    xml_string = etree.tostring(tree, encoding="utf-8", xml_declaration=True, pretty_print=True).decode()
550
551    return xml_string
552
553
554def xml_as_dict(xml):
555    """ Converts a simple single-level xml into a dictionary.
556
557    Args:
558        xml (Union[str, bytes, bytearray, io.BytesIO]): The xml string, or file path, or bytes to parse.
559
560    Returns:
561        dict_ (dict): A dictionary of element tag : text
562    """
563    # Convert xml to string
564    xml_string = validate_xml(xml)
565
566    # Get root
567    root = etree.fromstring(xml_string.encode())
568
569    dict_ = {
570        element.tag: element.text
571        for element in root
572    }
573
574    # Sort for ease of viewing logs
575    dict_ = {k: v for k, v in sorted(dict_.items())}
576
577    return dict_
578
579
580def deprecated_alias(**aliases: str) -> Callable:
581    """ Decorator for deprecated function and method arguments.
582
583    Use as follows:
584
585    @deprecated_alias(old_arg='new_arg')
586    def myfunc(new_arg):
587        ...
588
589    https://stackoverflow.com/a/49802489
590    """
591
592    def deco(f: Callable):
593        @functools.wraps(f)
594        def wrapper(*args, **kwargs):
595            rename_kwargs(f.__name__, kwargs, aliases)
596            return f(*args, **kwargs)
597
598        return wrapper
599
600    return deco
601
602
603def rename_kwargs(func_name: str, kwargs: Dict[str, Any], aliases: Dict[str, str]):
604    """ Helper function for deprecating function arguments.
605
606    https://stackoverflow.com/a/49802489
607    """
608    for alias, new in aliases.items():
609        if alias in kwargs:
610            if new in kwargs:
611                raise TypeError(
612                    f"{func_name} received both {alias} and {new} as arguments!"
613                    f" {alias} is deprecated, use {new} instead."
614                )
615            warnings.warn(
616                message=(
617                    f"`{alias}` is deprecated as an argument to `{func_name}`; use"
618                    f" `{new}` instead."
619                ),
620                category=DeprecationWarning,
621                stacklevel=3,
622            )
623            kwargs[new] = kwargs.pop(alias)
624
625
626def deprecated_function(replacement_function):
627    def decorator(f: Callable):
628        @functools.wraps(f)
629        def wrapper(*args, **kwargs):
630            warnings.warn(
631                message=f"Call to deprecated method: '{f.__name__}'. Use '{replacement_function.__name__}' instead.",
632                category=DeprecationWarning,
633                stacklevel=3
634            )
635            return replacement_function(*args, **kwargs)
636
637        return wrapper
638
639    return decorator
def as_bytes(file_: Union[bytes, bytearray, _io.BytesIO]):
21def as_bytes(file_: Union[bytes, bytearray, io.BytesIO]):
22    """ Returns file_ as bytes.
23
24    Args:
25        file_ (Union[bytes, bytearray, io.BytesIO]): The file
26
27    Returns:
28        bytes
29
30    Raises:
31        TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO
32    """
33    if isinstance(file_, bytes):
34        return file_
35    elif isinstance(file_, bytearray):
36        return bytes(file_)
37    elif isinstance(file_, io.BytesIO):
38        file_.seek(0)
39        return file_.read()
40    else:
41        raise TypeError(file_)

Returns file_ as bytes.

Args: file_ (Union[bytes, bytearray, io.BytesIO]): The file

Returns: bytes

Raises: TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO

def as_io_BytesIO(file_: Union[bytes, bytearray]):
44def as_io_BytesIO(file_: Union[bytes, bytearray]):
45    """ Returns file_ as io.BytesIO object.
46
47    Args:
48        file_ (Union[bytes, bytearray]): The bytes or bytearray of the file
49
50    Returns:
51        io.BytesIO object
52
53    Raises:
54        TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO
55    """
56    if isinstance(file_, bytes):
57        return io.BytesIO(file_)
58    elif isinstance(file_, bytearray):
59        return io.BytesIO(bytes(file_))
60    elif isinstance(file_, io.BytesIO):
61        return file_
62    else:
63        raise TypeError(file_)

Returns file_ as io.BytesIO object.

Args: file_ (Union[bytes, bytearray]): The bytes or bytearray of the file

Returns: io.BytesIO object

Raises: TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO

def as_snake_case(string):
66def as_snake_case(string):
67    return ''.join(
68        [
69            '_' + char.lower()
70            if char.isupper() else char
71            for char in string
72        ]
73    ).lstrip('_')
def as_title(string):
76def as_title(string):
77    return ''.join(
78        word.title()
79        for word in string.replace(".", "_").split("_")
80    )
def buffer_to_bytes(buffer: ctypes.c_void_p, buffer_length: ctypes.c_ulong):
83def buffer_to_bytes(buffer: ct.c_void_p, buffer_length: ct.c_size_t):
84    """ Convert ctypes buffer and buffer_length to bytes.
85
86    Args:
87        buffer (ct.c_void_p()): The file buffer.
88        buffer_length (ct.c_size_t()): The file buffer length.
89
90    Returns:
91        bytes (bytes): The file as bytes.
92    """
93
94    file_buffer = (ct.c_byte * buffer_length.value)()
95    ct.memmove(file_buffer, buffer.value, buffer_length.value)
96
97    return bytes(file_buffer)

Convert ctypes buffer and buffer_length to bytes.

Args: buffer (ct.c_void_p()): The file buffer. buffer_length (ct.c_size_t()): The file buffer length.

Returns: bytes (bytes): The file as bytes.

class CwdHandler:
100class CwdHandler:
101    """ Changes the current working directory to new_cwd on __enter__, and back to previous cwd on __exit__.
102
103    Args:
104        new_cwd (str): The new current working directory to temporarily change to.
105    """
106
107    def __init__(self, new_cwd: str):
108        self.new_cwd = new_cwd if os.path.isdir(new_cwd) else os.path.dirname(new_cwd)
109        self.old_cwd = os.getcwd()
110
111    def __enter__(self):
112        os.chdir(self.new_cwd)
113
114    def __exit__(self, type, value, traceback):
115        os.chdir(self.old_cwd)

Changes the current working directory to new_cwd on __enter__, and back to previous cwd on __exit__.

Args: new_cwd (str): The new current working directory to temporarily change to.

CwdHandler(new_cwd: str)
107    def __init__(self, new_cwd: str):
108        self.new_cwd = new_cwd if os.path.isdir(new_cwd) else os.path.dirname(new_cwd)
109        self.old_cwd = os.getcwd()
new_cwd
old_cwd
def delete_directory(directory: str, keep_folder: bool = False):
118def delete_directory(directory: str, keep_folder: bool = False):
119    """ Delete a directory and its contents.
120
121    Args:
122        directory (str): The directory path.
123        keep_folder (bool, optional): Default False. If False, only delete contents.
124    """
125    if os.path.isdir(directory):
126        # Delete all files in directory
127        for file_ in list_file_paths(directory, followlinks=False):
128            os.remove(file_)
129
130        # Delete all empty subdirectories
131        delete_empty_subdirectories(directory)
132
133        # Delete the directory
134        if keep_folder is False:
135            os.rmdir(directory)

Delete a directory and its contents.

Args: directory (str): The directory path. keep_folder (bool, optional): Default False. If False, only delete contents.

def delete_empty_subdirectories(directory: str):
138def delete_empty_subdirectories(directory: str):
139    """ Deletes all empty subdirectories of a given directory.
140
141    Args:
142        directory (str): The directory to delete subdirectories from.
143
144    Returns:
145        None
146    """
147
148    for root, dirs, _ in os.walk(directory, topdown=False):
149        for dir_ in dirs:
150            absolute_path = os.path.join(root, dir_)
151            try:
152                os.rmdir(absolute_path)
153            except PermissionError:
154                # directory might be read-only
155                try:
156                    os.chmod(absolute_path, stat.S_IWRITE)
157                except Exception:
158                    log.warning(f"PermissionError while attempting to delete {absolute_path}. Attempted chmod but failed.")
159                try:
160                    os.rmdir(absolute_path)
161                except OSError:
162                    # cannot be deleted
163                    pass
164            except OSError:
165                # not empty, don't delete
166                pass

Deletes all empty subdirectories of a given directory.

Args: directory (str): The directory to delete subdirectories from.

Returns: None

def flatten_list(list_: Iterable):
169def flatten_list(list_: Iterable):
170    """ Returns a flattened list. [[1, 2], ["3"], (4, 5,), [6]] --> [1, 2, "3", 4, 5, 6] """
171    return [
172        item
173        for sublist in list_
174        for item in sublist
175    ]

Returns a flattened list. [[1, 2], ["3"], (4, 5,), [6]] --> [1, 2, "3", 4, 5, 6]

def get_file_type(file_path: str):
178def get_file_type(file_path: str):
179    """ Returns the filetype of a file. "data/files/splat.zip" -> "zip" """
180    return os.path.splitext(file_path)[-1].replace(".", "")

Returns the filetype of a file. "data/files/splat.zip" -> "zip"

def get_libraries( directory: str, library_names: Optional[List[str]] = None, ignore_errors: bool = False):
183def get_libraries(directory: str, library_names: Optional[List[str]] = None, ignore_errors: bool = False):
184    """ Recursively calls get_library on each library from glasswall.libraries.os_info on the given directory.
185
186    Args:
187        directory (str): The directory to search from.
188        library_names (List[str], optional): List of libraries to return, if None iterates all libraries found in glasswall.libraries.os_info
189        ignore_errors (bool, optional): Default False, prevents get_library raising FileNotFoundError when True.
190
191    Returns:
192        libraries (dict[str, str]): A dictionary of library names and their absolute file paths.
193    """
194    libraries = {}
195
196    if not library_names:
197        library_names = glasswall.libraries.os_info[glasswall._OPERATING_SYSTEM].keys()
198
199    for library_name in library_names:
200        try:
201            libraries[library_name] = get_library(library_name, directory)
202        except FileNotFoundError:
203            if ignore_errors is True:
204                continue
205            raise
206
207    return libraries

Recursively calls get_library on each library from glasswall.libraries.os_info on the given directory.

Args: directory (str): The directory to search from. library_names (List[str], optional): List of libraries to return, if None iterates all libraries found in glasswall.libraries.os_info ignore_errors (bool, optional): Default False, prevents get_library raising FileNotFoundError when True.

Returns: libraries (dict[str, str]): A dictionary of library names and their absolute file paths.

def get_library(library: str, directory: str):
210def get_library(library: str, directory: str):
211    """ Returns a path to the specified library found from the current directory or any subdirectory. If multiple libraries exist, returns the file with the latest modified time.
212
213    Args:
214        library (str): The library to search for, ie: "rebuild", "word_search"
215        directory (str): The directory to search from.
216
217    Returns:
218        library_file_path (str): The absolute file path to the library.
219
220    Raises:
221        KeyError: Unsupported OS or library name was not found in glasswall.libraries.os_info.
222        FileNotFoundError: Library was not found.
223    """
224    if not os.path.isdir(directory):
225        raise NotADirectoryError(directory)
226
227    library = as_snake_case(library)
228    library_file_names = glasswall.libraries.os_info[glasswall._OPERATING_SYSTEM][library]["file_name"]
229
230    if isinstance(library_file_names, str):
231        library_file_names = [library_file_names]
232
233    matches = []
234    for alias in library_file_names:
235        p = pathlib.Path(directory)
236        alias_matches = list(p.rglob(alias))
237        matches.extend(alias_matches)
238
239    if matches:
240        latest_library = str(max(matches, key=os.path.getctime).resolve())
241        if len(matches) > 1:
242            # warn that multiple libraries found, list library paths if there are <= 5
243            if len(matches) <= 5:
244                log.warning(f"Found {len(matches)} {library} libraries, but expected only one:\n{chr(10).join(str(item) for item in matches)}\nLatest library: {latest_library}")
245            else:
246                log.warning(f"Found {len(matches)} {library} libraries, but expected only one.\nLatest library: {latest_library}")
247
248        # Return library with latest change time
249        return latest_library
250
251    # exhausted, not found
252    raise FileNotFoundError(f'Could not find any files: "{library_file_names}" under directory: "{directory}"')

Returns a path to the specified library found from the current directory or any subdirectory. If multiple libraries exist, returns the file with the latest modified time.

Args: library (str): The library to search for, ie: "rebuild", "word_search" directory (str): The directory to search from.

Returns: library_file_path (str): The absolute file path to the library.

Raises: KeyError: Unsupported OS or library name was not found in glasswall.libraries.os_info. FileNotFoundError: Library was not found.

def iterate_directory_entries( directory: str, file_type: str = 'all', absolute: bool = True, recursive: bool = True, followlinks: bool = True, start_directory: str = None):
255def iterate_directory_entries(directory: str, file_type: str = 'all', absolute: bool = True, recursive: bool = True, followlinks: bool = True, start_directory: str = None):
256    """ Generate entries (files, directories, or both) in a given directory using os.scandir().
257
258    Args:
259        directory (str): The path to the directory whose entries are to be listed.
260        file_type (str, optional): Type of entries to return.
261            - 'all': Return both files and directories (default).
262            - 'files': Return only files.
263            - 'directories': Return only directories.
264        absolute (bool, optional): Whether to return absolute paths (default) or relative paths.
265        recursive (bool, optional): Whether to recurse into subdirectories (default is True).
266        followlinks (bool, optional): Whether to follow symbolic links and yield entries from the target directory (default is True).
267        start_directory (str, optional): The starting directory used to calculate relative paths (default is None).
268
269    Yields:
270        str: The full path of each file or directory found in the specified directory.
271
272    Raises:
273        ValueError: If an invalid 'file_type' value is provided.
274        NotADirectoryError: If the directory does not exist.
275
276    Example:
277        directory = '/path/to/your/directory'
278
279        # Iterate through all entries (files and directories) in the directory
280        for entry in iterate_directory_entries(directory):
281            print(entry)
282
283        # Iterate through only file entries in the directory
284        for file in iterate_directory_entries(directory, file_type='files'):
285            print("File:", file)
286
287        # Iterate through only directory entries in the directory
288        for directory in iterate_directory_entries(directory, file_type='directories'):
289            print("Directory:", directory)
290    """
291    if not os.path.isdir(directory):
292        raise NotADirectoryError(directory)
293
294    allowed_types = ['all', 'files', 'directories']
295
296    # Check if the provided file_type is valid
297    if file_type not in allowed_types:
298        raise ValueError(f"Invalid file_type '{file_type}'. Allowed values are {', '.join(allowed_types)}.")
299
300    # Convert the directory to an absolute path
301    directory = os.path.abspath(directory)
302
303    # Set the start_directory to the provided directory if not specified
304    start_directory = start_directory or directory
305
306    # Get the directory entries using os.scandir()
307    for entry in os.scandir(directory):
308        # If the entry is a directory
309        if entry.is_dir(follow_symlinks=followlinks):
310            # If recursive is True, traverse the subdirectory
311            if recursive:
312                yield from iterate_directory_entries(entry.path, file_type, absolute, recursive, followlinks, start_directory)
313
314            # If the file_type is not "files", yield the directory entry
315            if file_type != "files":
316                if absolute:
317                    yield entry.path
318                else:
319                    yield os.path.relpath(entry.path, start=start_directory)
320
321        # If the entry is a file
322        elif entry.is_file(follow_symlinks=followlinks):
323            # If the file_type is not "directories", yield the file entry
324            if file_type != "directories":
325                if absolute:
326                    yield entry.path
327                else:
328                    yield os.path.relpath(entry.path, start=start_directory)

Generate entries (files, directories, or both) in a given directory using os.scandir().

Args: directory (str): The path to the directory whose entries are to be listed. file_type (str, optional): Type of entries to return. - 'all': Return both files and directories (default). - 'files': Return only files. - 'directories': Return only directories. absolute (bool, optional): Whether to return absolute paths (default) or relative paths. recursive (bool, optional): Whether to recurse into subdirectories (default is True). followlinks (bool, optional): Whether to follow symbolic links and yield entries from the target directory (default is True). start_directory (str, optional): The starting directory used to calculate relative paths (default is None).

Yields: str: The full path of each file or directory found in the specified directory.

Raises: ValueError: If an invalid 'file_type' value is provided. NotADirectoryError: If the directory does not exist.

Example: directory = '/path/to/your/directory'

# Iterate through all entries (files and directories) in the directory
for entry in iterate_directory_entries(directory):
    print(entry)

# Iterate through only file entries in the directory
for file in iterate_directory_entries(directory, file_type='files'):
    print("File:", file)

# Iterate through only directory entries in the directory
for directory in iterate_directory_entries(directory, file_type='directories'):
    print("Directory:", directory)
def list_file_paths( directory: str, file_type: str = 'files', absolute: bool = True, recursive: bool = True, followlinks: bool = True) -> list:
331def list_file_paths(directory: str, file_type: str = 'files', absolute: bool = True, recursive: bool = True, followlinks: bool = True) -> list:
332    """ List all file paths in a given directory and its subdirectories.
333
334    Args:
335        directory (str): The path to the directory whose file paths are to be listed.
336        file_type (str, optional): Type of entries to return.
337            - 'all': Return both files and directories.
338            - 'files': Return only files (default).
339            - 'directories': Return only directories.
340        absolute (bool, optional): Whether to return absolute paths (default is True).
341        recursive (bool, optional): Whether to recurse into subdirectories (default is True).
342        followlinks (bool, optional): Whether to follow symbolic links and list file paths from the target directory (default is True).
343
344    Returns:
345        list: A list of file paths found in the specified directory and its subdirectories.
346
347    Example:
348        directory = '/path/to/your/directory'
349        file_paths = list_file_paths(directory)
350        print(file_paths)
351    """
352    # Remove duplicate file paths (symlinks of same files or other symlinks), and sort
353    return sorted(set(iterate_directory_entries(directory, file_type, absolute, recursive, followlinks)))

List all file paths in a given directory and its subdirectories.

Args: directory (str): The path to the directory whose file paths are to be listed. file_type (str, optional): Type of entries to return. - 'all': Return both files and directories. - 'files': Return only files (default). - 'directories': Return only directories. absolute (bool, optional): Whether to return absolute paths (default is True). recursive (bool, optional): Whether to recurse into subdirectories (default is True). followlinks (bool, optional): Whether to follow symbolic links and list file paths from the target directory (default is True).

Returns: list: A list of file paths found in the specified directory and its subdirectories.

Example: directory = '/path/to/your/directory' file_paths = list_file_paths(directory) print(file_paths)

def list_subdirectory_paths(directory: str, recursive: bool = False, absolute: bool = True):
356def list_subdirectory_paths(directory: str, recursive: bool = False, absolute: bool = True):
357    """ Returns a list of paths to subdirectories in a directory.
358
359    Args:
360        directory (str): The directory to list subdirectories from.
361        recursive (bool, optional): Default False. Include subdirectories of subdirectories.
362        absolute (bool, optional): Default True. Return paths as absolute paths. If False, returns relative paths.
363
364    Returns:
365        subdirectories (list): A list of subdirectory paths.
366    """
367    subdirectories = [f.path for f in os.scandir(directory) if f.is_dir()]
368
369    if recursive:
370        for subdirectory in subdirectories:
371            subdirectories.extend(list_subdirectory_paths(subdirectory, recursive=True))
372
373    if absolute:
374        subdirectories = [os.path.abspath(path) for path in subdirectories]
375    else:
376        subdirectories = [os.path.relpath(path, directory) for path in subdirectories]
377
378    return subdirectories

Returns a list of paths to subdirectories in a directory.

Args: directory (str): The directory to list subdirectories from. recursive (bool, optional): Default False. Include subdirectories of subdirectories. absolute (bool, optional): Default True. Return paths as absolute paths. If False, returns relative paths.

Returns: subdirectories (list): A list of subdirectory paths.

def load_dependencies(dependencies: list, ignore_errors: bool = False):
381def load_dependencies(dependencies: list, ignore_errors: bool = False):
382    """ Calls ctypes.cdll.LoadLibrary on each file path in `dependencies`.
383
384    Args:
385        dependencies (list): A list of absolute file paths of library dependencies.
386        ignore_errors (bool, optional): Default False, avoid raising exceptions from ct.cdll.LoadLibrary if ignore_errors is True.
387
388    Returns:
389        missing_dependencies (list): A list of missing dependencies, or an empty list.
390    """
391    missing_dependencies = [dependency for dependency in dependencies if not os.path.isfile(dependency)]
392
393    for dependency in dependencies:
394        # Try to load dependencies that exist
395        if dependency not in missing_dependencies:
396            try:
397                ct.cdll.LoadLibrary(dependency)
398            except Exception:
399                if ignore_errors:
400                    pass
401                else:
402                    raise
403
404    return missing_dependencies

Calls ctypes.cdll.LoadLibrary on each file path in dependencies.

Args: dependencies (list): A list of absolute file paths of library dependencies. ignore_errors (bool, optional): Default False, avoid raising exceptions from ct.cdll.LoadLibrary if ignore_errors is True.

Returns: missing_dependencies (list): A list of missing dependencies, or an empty list.

def round_up(number: float, decimals=0) -> float:
407def round_up(number: float, decimals=0) -> float:
408    """ Rounds a number up to a specified number of decimal places.
409
410    Args:
411        number (float): The number to be rounded.
412        decimals (int, optional): The number of decimal places to round to. Defaults to 0.
413
414    Returns:
415        float: The rounded number.
416
417    Examples:
418        >>> round_up(105, 0)
419        105.0
420        >>> round_up(0.015, 2)
421        0.02
422        >>> round_up(0.025, 2)
423        0.03
424        >>> round_up(0.00001, 2)
425        0.01
426    """
427    multiplier = 10 ** decimals
428    return math.ceil(number * multiplier) / multiplier

Rounds a number up to a specified number of decimal places.

Args: number (float): The number to be rounded. decimals (int, optional): The number of decimal places to round to. Defaults to 0.

Returns: float: The rounded number.

Examples:

round_up(105, 0) 105.0 round_up(0.015, 2) 0.02 round_up(0.025, 2) 0.03 round_up(0.00001, 2) 0.01

class TempDirectoryPath:
431class TempDirectoryPath:
432    """ Gives a path to a uniquely named temporary directory that does not currently exist on __enter__, deletes the directory if it exists on __exit__.
433
434    Args:
435        delete (bool, optional): Default True. Delete the temporary directory on __exit__
436    """
437
438    def __init__(self, delete: bool = True):
439        # Validate args
440        if not isinstance(delete, bool):
441            raise TypeError(delete)
442
443        self.temp_directory = None
444        self.delete = delete
445
446        while self.temp_directory is None or os.path.isdir(self.temp_directory):
447            self.temp_directory = os.path.join(glasswall._TEMPDIR, next(tempfile._get_candidate_names()))
448
449        # Normalize
450        self.temp_directory = str(pathlib.Path(self.temp_directory).resolve())
451
452        # Create temp directory
453        os.makedirs(self.temp_directory, exist_ok=True)
454
455    def __enter__(self):
456        return self.temp_directory
457
458    def __exit__(self, type, value, traceback):
459        if self.delete:
460            # Delete temp directory and all of its contents
461            if os.path.isdir(self.temp_directory):
462                delete_directory(self.temp_directory)

Gives a path to a uniquely named temporary directory that does not currently exist on __enter__, deletes the directory if it exists on __exit__.

Args: delete (bool, optional): Default True. Delete the temporary directory on __exit__

TempDirectoryPath(delete: bool = True)
438    def __init__(self, delete: bool = True):
439        # Validate args
440        if not isinstance(delete, bool):
441            raise TypeError(delete)
442
443        self.temp_directory = None
444        self.delete = delete
445
446        while self.temp_directory is None or os.path.isdir(self.temp_directory):
447            self.temp_directory = os.path.join(glasswall._TEMPDIR, next(tempfile._get_candidate_names()))
448
449        # Normalize
450        self.temp_directory = str(pathlib.Path(self.temp_directory).resolve())
451
452        # Create temp directory
453        os.makedirs(self.temp_directory, exist_ok=True)
temp_directory
delete
class TempFilePath:
465class TempFilePath:
466    """ Gives a path to a uniquely named temporary file that does not currently exist on __enter__, deletes the file if it exists on __exit__.
467
468    Args:
469        directory (Union[str, None], optional): The directory to create a temporary file in.
470        delete (bool, optional): Default True. Delete the temporary file on on __exit__
471    """
472
473    def __init__(self, directory: Union[str, None] = None, delete: bool = True):
474        # Validate args
475        if not isinstance(directory, (str, type(None))):
476            raise TypeError(directory)
477        if isinstance(directory, str) and not os.path.isdir(directory):
478            raise NotADirectoryError(directory)
479        if not isinstance(delete, bool):
480            raise TypeError(delete)
481
482        self.temp_file: str = None
483        self.directory = directory or tempfile.gettempdir()
484        self.delete = delete
485
486        while self.temp_file is None or os.path.isfile(self.temp_file):
487            self.temp_file = os.path.join(self.directory, next(tempfile._get_candidate_names()))
488
489        # Normalize
490        self.temp_file = str(pathlib.Path(self.temp_file).resolve())
491
492        # Create temp directory if it does not exist
493        os.makedirs(os.path.dirname(self.temp_file), exist_ok=True)
494
495    def __enter__(self):
496        return self.temp_file
497
498    def __exit__(self, type, value, traceback):
499        if self.delete:
500            if os.path.isfile(self.temp_file):
501                os.remove(self.temp_file)

Gives a path to a uniquely named temporary file that does not currently exist on __enter__, deletes the file if it exists on __exit__.

Args: directory (Union[str, None], optional): The directory to create a temporary file in. delete (bool, optional): Default True. Delete the temporary file on on __exit__

TempFilePath(directory: Optional[str] = None, delete: bool = True)
473    def __init__(self, directory: Union[str, None] = None, delete: bool = True):
474        # Validate args
475        if not isinstance(directory, (str, type(None))):
476            raise TypeError(directory)
477        if isinstance(directory, str) and not os.path.isdir(directory):
478            raise NotADirectoryError(directory)
479        if not isinstance(delete, bool):
480            raise TypeError(delete)
481
482        self.temp_file: str = None
483        self.directory = directory or tempfile.gettempdir()
484        self.delete = delete
485
486        while self.temp_file is None or os.path.isfile(self.temp_file):
487            self.temp_file = os.path.join(self.directory, next(tempfile._get_candidate_names()))
488
489        # Normalize
490        self.temp_file = str(pathlib.Path(self.temp_file).resolve())
491
492        # Create temp directory if it does not exist
493        os.makedirs(os.path.dirname(self.temp_file), exist_ok=True)
temp_file: str
directory
delete
def validate_xml( xml: Union[str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy]):
505def validate_xml(xml: Union[str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"]):
506    """ Attempts to parse the xml provided, returning the xml as string. Raises ValueError if the xml cannot be parsed.
507
508    Args:
509        xml (Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy]): The xml string, or file path, bytes, or ContentManagementPolicy instance to parse.
510
511    Returns:
512        xml_string (str): A string representation of the xml.
513
514    Raises:
515        ValueError: if the xml cannot be parsed.
516        TypeError: if the type of arg "xml" is invalid
517    """
518    try:
519        # Get tree from file/str
520        if isinstance(xml, str):
521            try:
522                is_file = os.path.isfile(os.path.abspath(xml))
523            except Exception:
524                is_file = False
525
526            if is_file:
527                tree = etree.parse(xml)
528            else:
529                xml = xml.encode("utf-8")
530                tree = etree.fromstring(xml)
531
532        # Get tree from bytes, bytearray, io.BytesIO
533        elif isinstance(xml, (bytes, bytearray, io.BytesIO)):
534            # Convert bytes, bytearray to io.BytesIO
535            if isinstance(xml, (bytes, bytearray)):
536                xml = as_io_BytesIO(xml)
537            tree = etree.parse(xml)
538
539        # Get tree from ContentManagementPolicy instance
540        elif isinstance(xml, glasswall.content_management.policies.policy.Policy):
541            xml = xml.text.encode("utf-8")
542            tree = etree.fromstring(xml)
543
544        else:
545            raise TypeError(xml)
546
547    except etree.XMLSyntaxError:
548        raise ValueError(xml)
549
550    # # convert tree to string and include xml declaration header utf8
551    etree.indent(tree, space=" " * 4)
552    xml_string = etree.tostring(tree, encoding="utf-8", xml_declaration=True, pretty_print=True).decode()
553
554    return xml_string

Attempts to parse the xml provided, returning the xml as string. Raises ValueError if the xml cannot be parsed.

Args: xml (Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy]): The xml string, or file path, bytes, or ContentManagementPolicy instance to parse.

Returns: xml_string (str): A string representation of the xml.

Raises: ValueError: if the xml cannot be parsed. TypeError: if the type of arg "xml" is invalid

def xml_as_dict(xml):
557def xml_as_dict(xml):
558    """ Converts a simple single-level xml into a dictionary.
559
560    Args:
561        xml (Union[str, bytes, bytearray, io.BytesIO]): The xml string, or file path, or bytes to parse.
562
563    Returns:
564        dict_ (dict): A dictionary of element tag : text
565    """
566    # Convert xml to string
567    xml_string = validate_xml(xml)
568
569    # Get root
570    root = etree.fromstring(xml_string.encode())
571
572    dict_ = {
573        element.tag: element.text
574        for element in root
575    }
576
577    # Sort for ease of viewing logs
578    dict_ = {k: v for k, v in sorted(dict_.items())}
579
580    return dict_

Converts a simple single-level xml into a dictionary.

Args: xml (Union[str, bytes, bytearray, io.BytesIO]): The xml string, or file path, or bytes to parse.

Returns: dict_ (dict): A dictionary of element tag : text

def deprecated_alias(**aliases: str) -> Callable:
583def deprecated_alias(**aliases: str) -> Callable:
584    """ Decorator for deprecated function and method arguments.
585
586    Use as follows:
587
588    @deprecated_alias(old_arg='new_arg')
589    def myfunc(new_arg):
590        ...
591
592    https://stackoverflow.com/a/49802489
593    """
594
595    def deco(f: Callable):
596        @functools.wraps(f)
597        def wrapper(*args, **kwargs):
598            rename_kwargs(f.__name__, kwargs, aliases)
599            return f(*args, **kwargs)
600
601        return wrapper
602
603    return deco

Decorator for deprecated function and method arguments.

Use as follows:

@deprecated_alias(old_arg='new_arg') def myfunc(new_arg): ...

https://stackoverflow.com/a/49802489

def rename_kwargs(func_name: str, kwargs: Dict[str, Any], aliases: Dict[str, str]):
606def rename_kwargs(func_name: str, kwargs: Dict[str, Any], aliases: Dict[str, str]):
607    """ Helper function for deprecating function arguments.
608
609    https://stackoverflow.com/a/49802489
610    """
611    for alias, new in aliases.items():
612        if alias in kwargs:
613            if new in kwargs:
614                raise TypeError(
615                    f"{func_name} received both {alias} and {new} as arguments!"
616                    f" {alias} is deprecated, use {new} instead."
617                )
618            warnings.warn(
619                message=(
620                    f"`{alias}` is deprecated as an argument to `{func_name}`; use"
621                    f" `{new}` instead."
622                ),
623                category=DeprecationWarning,
624                stacklevel=3,
625            )
626            kwargs[new] = kwargs.pop(alias)

Helper function for deprecating function arguments.

https://stackoverflow.com/a/49802489

def deprecated_function(replacement_function):
629def deprecated_function(replacement_function):
630    def decorator(f: Callable):
631        @functools.wraps(f)
632        def wrapper(*args, **kwargs):
633            warnings.warn(
634                message=f"Call to deprecated method: '{f.__name__}'. Use '{replacement_function.__name__}' instead.",
635                category=DeprecationWarning,
636                stacklevel=3
637            )
638            return replacement_function(*args, **kwargs)
639
640        return wrapper
641
642    return decorator