glasswall.utils
1import ctypes as ct 2import functools 3import io 4import math 5import os 6import pathlib 7import stat 8import tempfile 9import warnings 10from typing import Any, Callable, Dict, Iterable, List, Optional, Union 11 12from lxml import etree 13 14import glasswall 15from glasswall.config.logging import log 16 17 18def as_bytes(file_: Union[bytes, bytearray, io.BytesIO]): 19 """ Returns file_ as bytes. 20 21 Args: 22 file_ (Union[bytes, bytearray, io.BytesIO]): The file 23 24 Returns: 25 bytes 26 27 Raises: 28 TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO 29 """ 30 if isinstance(file_, bytes): 31 return file_ 32 elif isinstance(file_, bytearray): 33 return bytes(file_) 34 elif isinstance(file_, io.BytesIO): 35 file_.seek(0) 36 return file_.read() 37 else: 38 raise TypeError(file_) 39 40 41def as_io_BytesIO(file_: Union[bytes, bytearray]): 42 """ Returns file_ as io.BytesIO object. 43 44 Args: 45 file_ (Union[bytes, bytearray]): The bytes or bytearray of the file 46 47 Returns: 48 io.BytesIO object 49 50 Raises: 51 TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO 52 """ 53 if isinstance(file_, bytes): 54 return io.BytesIO(file_) 55 elif isinstance(file_, bytearray): 56 return io.BytesIO(bytes(file_)) 57 elif isinstance(file_, io.BytesIO): 58 return file_ 59 else: 60 raise TypeError(file_) 61 62 63def as_snake_case(string): 64 return ''.join( 65 [ 66 '_' + char.lower() 67 if char.isupper() else char 68 for char in string 69 ] 70 ).lstrip('_') 71 72 73def as_title(string): 74 return ''.join( 75 word.title() 76 for word in string.replace(".", "_").split("_") 77 ) 78 79 80def buffer_to_bytes(buffer: ct.c_void_p, buffer_length: ct.c_size_t): 81 """ Convert ctypes buffer and buffer_length to bytes. 82 83 Args: 84 buffer (ct.c_void_p()): The file buffer. 85 buffer_length (ct.c_size_t()): The file buffer length. 86 87 Returns: 88 bytes (bytes): The file as bytes. 89 """ 90 91 file_buffer = (ct.c_byte * buffer_length.value)() 92 ct.memmove(file_buffer, buffer.value, buffer_length.value) 93 94 return bytes(file_buffer) 95 96 97class CwdHandler: 98 """ Changes the current working directory to new_cwd on __enter__, and back to previous cwd on __exit__. 99 100 Args: 101 new_cwd (str): The new current working directory to temporarily change to. 102 """ 103 104 def __init__(self, new_cwd: str): 105 self.new_cwd = new_cwd if os.path.isdir(new_cwd) else os.path.dirname(new_cwd) 106 self.old_cwd = os.getcwd() 107 108 def __enter__(self): 109 os.chdir(self.new_cwd) 110 111 def __exit__(self, type, value, traceback): 112 os.chdir(self.old_cwd) 113 114 115def delete_directory(directory: str, keep_folder: bool = False): 116 """ Delete a directory and its contents. 117 118 Args: 119 directory (str): The directory path. 120 keep_folder (bool, optional): Default False. If False, only delete contents. 121 """ 122 if os.path.isdir(directory): 123 # Delete all files in directory 124 for file_ in list_file_paths(directory, followlinks=False): 125 os.remove(file_) 126 127 # Delete all empty subdirectories 128 delete_empty_subdirectories(directory) 129 130 # Delete the directory 131 if keep_folder is False: 132 os.rmdir(directory) 133 134 135def delete_empty_subdirectories(directory: str): 136 """ Deletes all empty subdirectories of a given directory. 137 138 Args: 139 directory (str): The directory to delete subdirectories from. 140 141 Returns: 142 None 143 """ 144 145 for root, dirs, _ in os.walk(directory, topdown=False): 146 for dir_ in dirs: 147 absolute_path = os.path.join(root, dir_) 148 try: 149 os.rmdir(absolute_path) 150 except PermissionError: 151 # directory might be read-only 152 try: 153 os.chmod(absolute_path, stat.S_IWRITE) 154 except Exception: 155 log.warning(f"PermissionError while attempting to delete {absolute_path}. Attempted chmod but failed.") 156 try: 157 os.rmdir(absolute_path) 158 except OSError: 159 # cannot be deleted 160 pass 161 except OSError: 162 # not empty, don't delete 163 pass 164 165 166def flatten_list(list_: Iterable): 167 """ Returns a flattened list. [[1, 2], ["3"], (4, 5,), [6]] --> [1, 2, "3", 4, 5, 6] """ 168 return [ 169 item 170 for sublist in list_ 171 for item in sublist 172 ] 173 174 175def get_file_type(file_path: str): 176 """ Returns the filetype of a file. "data/files/splat.zip" -> "zip" """ 177 return os.path.splitext(file_path)[-1].replace(".", "") 178 179 180def get_libraries(directory: str, library_names: Optional[List[str]] = None, ignore_errors: bool = False): 181 """ Recursively calls get_library on each library from glasswall.libraries.os_info on the given directory. 182 183 Args: 184 directory (str): The directory to search from. 185 library_names (List[str], optional): List of libraries to return, if None iterates all libraries found in glasswall.libraries.os_info 186 ignore_errors (bool, optional): Default False, prevents get_library raising FileNotFoundError when True. 187 188 Returns: 189 libraries (dict[str, str]): A dictionary of library names and their absolute file paths. 190 """ 191 libraries = {} 192 193 if not library_names: 194 library_names = glasswall.libraries.os_info[glasswall._OPERATING_SYSTEM].keys() 195 196 for library_name in library_names: 197 try: 198 libraries[library_name] = get_library(library_name, directory) 199 except FileNotFoundError: 200 if ignore_errors is True: 201 continue 202 raise 203 204 return libraries 205 206 207def get_library(library: str, directory: str): 208 """ Returns a path to the specified library found from the current directory or any subdirectory. If multiple libraries exist, returns the file with the latest modified time. 209 210 Args: 211 library (str): The library to search for, ie: "rebuild", "word_search" 212 directory (str): The directory to search from. 213 214 Returns: 215 library_file_path (str): The absolute file path to the library. 216 217 Raises: 218 KeyError: Unsupported OS or library name was not found in glasswall.libraries.os_info. 219 FileNotFoundError: Library was not found. 220 """ 221 if not os.path.isdir(directory): 222 raise NotADirectoryError(directory) 223 224 library = as_snake_case(library) 225 library_file_names = glasswall.libraries.os_info[glasswall._OPERATING_SYSTEM][library]["file_name"] 226 227 if isinstance(library_file_names, str): 228 library_file_names = [library_file_names] 229 230 matches = [] 231 for alias in library_file_names: 232 p = pathlib.Path(directory) 233 alias_matches = list(p.rglob(alias)) 234 matches.extend(alias_matches) 235 236 if matches: 237 latest_library = str(max(matches, key=os.path.getctime).resolve()) 238 if len(matches) > 1: 239 # warn that multiple libraries found, list library paths if there are <= 5 240 if len(matches) <= 5: 241 log.warning(f"Found {len(matches)} {library} libraries, but expected only one:\n{chr(10).join(str(item) for item in matches)}\nLatest library: {latest_library}") 242 else: 243 log.warning(f"Found {len(matches)} {library} libraries, but expected only one.\nLatest library: {latest_library}") 244 245 # Return library with latest change time 246 return latest_library 247 248 # exhausted, not found 249 raise FileNotFoundError(f'Could not find any files: "{library_file_names}" under directory: "{directory}"') 250 251 252def iterate_directory_entries(directory: str, file_type: str = 'all', absolute: bool = True, recursive: bool = True, followlinks: bool = True, start_directory: str = None): 253 """ Generate entries (files, directories, or both) in a given directory using os.scandir(). 254 255 Args: 256 directory (str): The path to the directory whose entries are to be listed. 257 file_type (str, optional): Type of entries to return. 258 - 'all': Return both files and directories (default). 259 - 'files': Return only files. 260 - 'directories': Return only directories. 261 absolute (bool, optional): Whether to return absolute paths (default) or relative paths. 262 recursive (bool, optional): Whether to recurse into subdirectories (default is True). 263 followlinks (bool, optional): Whether to follow symbolic links and yield entries from the target directory (default is True). 264 start_directory (str, optional): The starting directory used to calculate relative paths (default is None). 265 266 Yields: 267 str: The full path of each file or directory found in the specified directory. 268 269 Raises: 270 ValueError: If an invalid 'file_type' value is provided. 271 NotADirectoryError: If the directory does not exist. 272 273 Example: 274 directory = '/path/to/your/directory' 275 276 # Iterate through all entries (files and directories) in the directory 277 for entry in iterate_directory_entries(directory): 278 print(entry) 279 280 # Iterate through only file entries in the directory 281 for file in iterate_directory_entries(directory, file_type='files'): 282 print("File:", file) 283 284 # Iterate through only directory entries in the directory 285 for directory in iterate_directory_entries(directory, file_type='directories'): 286 print("Directory:", directory) 287 """ 288 if not os.path.isdir(directory): 289 raise NotADirectoryError(directory) 290 291 allowed_types = ['all', 'files', 'directories'] 292 293 # Check if the provided file_type is valid 294 if file_type not in allowed_types: 295 raise ValueError(f"Invalid file_type '{file_type}'. Allowed values are {', '.join(allowed_types)}.") 296 297 # Convert the directory to an absolute path 298 directory = os.path.abspath(directory) 299 300 # Set the start_directory to the provided directory if not specified 301 start_directory = start_directory or directory 302 303 # Get the directory entries using os.scandir() 304 for entry in os.scandir(directory): 305 # If the entry is a directory 306 if entry.is_dir(follow_symlinks=followlinks): 307 # If recursive is True, traverse the subdirectory 308 if recursive: 309 yield from iterate_directory_entries(entry.path, file_type, absolute, recursive, followlinks, start_directory) 310 311 # If the file_type is not "files", yield the directory entry 312 if file_type != "files": 313 if absolute: 314 yield entry.path 315 else: 316 yield os.path.relpath(entry.path, start=start_directory) 317 318 # If the entry is a file 319 elif entry.is_file(follow_symlinks=followlinks): 320 # If the file_type is not "directories", yield the file entry 321 if file_type != "directories": 322 if absolute: 323 yield entry.path 324 else: 325 yield os.path.relpath(entry.path, start=start_directory) 326 327 328def list_file_paths(directory: str, file_type: str = 'files', absolute: bool = True, recursive: bool = True, followlinks: bool = True) -> list: 329 """ List all file paths in a given directory and its subdirectories. 330 331 Args: 332 directory (str): The path to the directory whose file paths are to be listed. 333 file_type (str, optional): Type of entries to return. 334 - 'all': Return both files and directories. 335 - 'files': Return only files (default). 336 - 'directories': Return only directories. 337 absolute (bool, optional): Whether to return absolute paths (default is True). 338 recursive (bool, optional): Whether to recurse into subdirectories (default is True). 339 followlinks (bool, optional): Whether to follow symbolic links and list file paths from the target directory (default is True). 340 341 Returns: 342 list: A list of file paths found in the specified directory and its subdirectories. 343 344 Example: 345 directory = '/path/to/your/directory' 346 file_paths = list_file_paths(directory) 347 print(file_paths) 348 """ 349 # Remove duplicate file paths (symlinks of same files or other symlinks), and sort 350 return sorted(set(iterate_directory_entries(directory, file_type, absolute, recursive, followlinks))) 351 352 353def list_subdirectory_paths(directory: str, recursive: bool = False, absolute: bool = True): 354 """ Returns a list of paths to subdirectories in a directory. 355 356 Args: 357 directory (str): The directory to list subdirectories from. 358 recursive (bool, optional): Default False. Include subdirectories of subdirectories. 359 absolute (bool, optional): Default True. Return paths as absolute paths. If False, returns relative paths. 360 361 Returns: 362 subdirectories (list): A list of subdirectory paths. 363 """ 364 subdirectories = [f.path for f in os.scandir(directory) if f.is_dir()] 365 366 if recursive: 367 for subdirectory in subdirectories: 368 subdirectories.extend(list_subdirectory_paths(subdirectory, recursive=True)) 369 370 if absolute: 371 subdirectories = [os.path.abspath(path) for path in subdirectories] 372 else: 373 subdirectories = [os.path.relpath(path, directory) for path in subdirectories] 374 375 return subdirectories 376 377 378def load_dependencies(dependencies: list, ignore_errors: bool = False): 379 """ Calls ctypes.cdll.LoadLibrary on each file path in `dependencies`. 380 381 Args: 382 dependencies (list): A list of absolute file paths of library dependencies. 383 ignore_errors (bool, optional): Default False, avoid raising exceptions from ct.cdll.LoadLibrary if ignore_errors is True. 384 385 Returns: 386 missing_dependencies (list): A list of missing dependencies, or an empty list. 387 """ 388 missing_dependencies = [dependency for dependency in dependencies if not os.path.isfile(dependency)] 389 390 for dependency in dependencies: 391 # Try to load dependencies that exist 392 if dependency not in missing_dependencies: 393 try: 394 ct.cdll.LoadLibrary(dependency) 395 except Exception: 396 if ignore_errors: 397 pass 398 else: 399 raise 400 401 return missing_dependencies 402 403 404def round_up(number: float, decimals=0) -> float: 405 """ Rounds a number up to a specified number of decimal places. 406 407 Args: 408 number (float): The number to be rounded. 409 decimals (int, optional): The number of decimal places to round to. Defaults to 0. 410 411 Returns: 412 float: The rounded number. 413 414 Examples: 415 >>> round_up(105, 0) 416 105.0 417 >>> round_up(0.015, 2) 418 0.02 419 >>> round_up(0.025, 2) 420 0.03 421 >>> round_up(0.00001, 2) 422 0.01 423 """ 424 multiplier = 10 ** decimals 425 return math.ceil(number * multiplier) / multiplier 426 427 428class TempDirectoryPath: 429 """ Gives a path to a uniquely named temporary directory that does not currently exist on __enter__, deletes the directory if it exists on __exit__. 430 431 Args: 432 delete (bool, optional): Default True. Delete the temporary directory on __exit__ 433 """ 434 435 def __init__(self, delete: bool = True): 436 # Validate args 437 if not isinstance(delete, bool): 438 raise TypeError(delete) 439 440 self.temp_directory = None 441 self.delete = delete 442 443 while self.temp_directory is None or os.path.isdir(self.temp_directory): 444 self.temp_directory = os.path.join(glasswall._TEMPDIR, next(tempfile._get_candidate_names())) 445 446 # Normalize 447 self.temp_directory = str(pathlib.Path(self.temp_directory).resolve()) 448 449 # Create temp directory 450 os.makedirs(self.temp_directory, exist_ok=True) 451 452 def __enter__(self): 453 return self.temp_directory 454 455 def __exit__(self, type, value, traceback): 456 if self.delete: 457 # Delete temp directory and all of its contents 458 if os.path.isdir(self.temp_directory): 459 delete_directory(self.temp_directory) 460 461 462class TempFilePath: 463 """ Gives a path to a uniquely named temporary file that does not currently exist on __enter__, deletes the file if it exists on __exit__. 464 465 Args: 466 directory (Union[str, None], optional): The directory to create a temporary file in. 467 delete (bool, optional): Default True. Delete the temporary file on on __exit__ 468 """ 469 470 def __init__(self, directory: Union[str, None] = None, delete: bool = True): 471 # Validate args 472 if not isinstance(directory, (str, type(None))): 473 raise TypeError(directory) 474 if isinstance(directory, str) and not os.path.isdir(directory): 475 raise NotADirectoryError(directory) 476 if not isinstance(delete, bool): 477 raise TypeError(delete) 478 479 self.temp_file: str = None 480 self.directory = directory or tempfile.gettempdir() 481 self.delete = delete 482 483 while self.temp_file is None or os.path.isfile(self.temp_file): 484 self.temp_file = os.path.join(self.directory, next(tempfile._get_candidate_names())) 485 486 # Normalize 487 self.temp_file = str(pathlib.Path(self.temp_file).resolve()) 488 489 # Create temp directory if it does not exist 490 os.makedirs(os.path.dirname(self.temp_file), exist_ok=True) 491 492 def __enter__(self): 493 return self.temp_file 494 495 def __exit__(self, type, value, traceback): 496 if self.delete: 497 if os.path.isfile(self.temp_file): 498 os.remove(self.temp_file) 499 500 501# NOTE typehint as string due to no "from __future__ import annotations" support on python 3.6 on ubuntu-16.04 / centos7 502def validate_xml(xml: Union[str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"]): 503 """ Attempts to parse the xml provided, returning the xml as string. Raises ValueError if the xml cannot be parsed. 504 505 Args: 506 xml (Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy]): The xml string, or file path, bytes, or ContentManagementPolicy instance to parse. 507 508 Returns: 509 xml_string (str): A string representation of the xml. 510 511 Raises: 512 ValueError: if the xml cannot be parsed. 513 TypeError: if the type of arg "xml" is invalid 514 """ 515 try: 516 # Get tree from file/str 517 if isinstance(xml, str): 518 try: 519 is_file = os.path.isfile(os.path.abspath(xml)) 520 except Exception: 521 is_file = False 522 523 if is_file: 524 tree = etree.parse(xml) 525 else: 526 xml = xml.encode("utf-8") 527 tree = etree.fromstring(xml) 528 529 # Get tree from bytes, bytearray, io.BytesIO 530 elif isinstance(xml, (bytes, bytearray, io.BytesIO)): 531 # Convert bytes, bytearray to io.BytesIO 532 if isinstance(xml, (bytes, bytearray)): 533 xml = as_io_BytesIO(xml) 534 tree = etree.parse(xml) 535 536 # Get tree from ContentManagementPolicy instance 537 elif isinstance(xml, glasswall.content_management.policies.policy.Policy): 538 xml = xml.text.encode("utf-8") 539 tree = etree.fromstring(xml) 540 541 else: 542 raise TypeError(xml) 543 544 except etree.XMLSyntaxError: 545 raise ValueError(xml) 546 547 # # convert tree to string and include xml declaration header utf8 548 etree.indent(tree, space=" " * 4) 549 xml_string = etree.tostring(tree, encoding="utf-8", xml_declaration=True, pretty_print=True).decode() 550 551 return xml_string 552 553 554def xml_as_dict(xml): 555 """ Converts a simple single-level xml into a dictionary. 556 557 Args: 558 xml (Union[str, bytes, bytearray, io.BytesIO]): The xml string, or file path, or bytes to parse. 559 560 Returns: 561 dict_ (dict): A dictionary of element tag : text 562 """ 563 # Convert xml to string 564 xml_string = validate_xml(xml) 565 566 # Get root 567 root = etree.fromstring(xml_string.encode()) 568 569 dict_ = { 570 element.tag: element.text 571 for element in root 572 } 573 574 # Sort for ease of viewing logs 575 dict_ = {k: v for k, v in sorted(dict_.items())} 576 577 return dict_ 578 579 580def deprecated_alias(**aliases: str) -> Callable: 581 """ Decorator for deprecated function and method arguments. 582 583 Use as follows: 584 585 @deprecated_alias(old_arg='new_arg') 586 def myfunc(new_arg): 587 ... 588 589 https://stackoverflow.com/a/49802489 590 """ 591 592 def deco(f: Callable): 593 @functools.wraps(f) 594 def wrapper(*args, **kwargs): 595 rename_kwargs(f.__name__, kwargs, aliases) 596 return f(*args, **kwargs) 597 598 return wrapper 599 600 return deco 601 602 603def rename_kwargs(func_name: str, kwargs: Dict[str, Any], aliases: Dict[str, str]): 604 """ Helper function for deprecating function arguments. 605 606 https://stackoverflow.com/a/49802489 607 """ 608 for alias, new in aliases.items(): 609 if alias in kwargs: 610 if new in kwargs: 611 raise TypeError( 612 f"{func_name} received both {alias} and {new} as arguments!" 613 f" {alias} is deprecated, use {new} instead." 614 ) 615 warnings.warn( 616 message=( 617 f"`{alias}` is deprecated as an argument to `{func_name}`; use" 618 f" `{new}` instead." 619 ), 620 category=DeprecationWarning, 621 stacklevel=3, 622 ) 623 kwargs[new] = kwargs.pop(alias) 624 625 626def deprecated_function(replacement_function): 627 def decorator(f: Callable): 628 @functools.wraps(f) 629 def wrapper(*args, **kwargs): 630 warnings.warn( 631 message=f"Call to deprecated method: '{f.__name__}'. Use '{replacement_function.__name__}' instead.", 632 category=DeprecationWarning, 633 stacklevel=3 634 ) 635 return replacement_function(*args, **kwargs) 636 637 return wrapper 638 639 return decorator
21def as_bytes(file_: Union[bytes, bytearray, io.BytesIO]): 22 """ Returns file_ as bytes. 23 24 Args: 25 file_ (Union[bytes, bytearray, io.BytesIO]): The file 26 27 Returns: 28 bytes 29 30 Raises: 31 TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO 32 """ 33 if isinstance(file_, bytes): 34 return file_ 35 elif isinstance(file_, bytearray): 36 return bytes(file_) 37 elif isinstance(file_, io.BytesIO): 38 file_.seek(0) 39 return file_.read() 40 else: 41 raise TypeError(file_)
Returns file_ as bytes.
Args: file_ (Union[bytes, bytearray, io.BytesIO]): The file
Returns: bytes
Raises: TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO
44def as_io_BytesIO(file_: Union[bytes, bytearray]): 45 """ Returns file_ as io.BytesIO object. 46 47 Args: 48 file_ (Union[bytes, bytearray]): The bytes or bytearray of the file 49 50 Returns: 51 io.BytesIO object 52 53 Raises: 54 TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO 55 """ 56 if isinstance(file_, bytes): 57 return io.BytesIO(file_) 58 elif isinstance(file_, bytearray): 59 return io.BytesIO(bytes(file_)) 60 elif isinstance(file_, io.BytesIO): 61 return file_ 62 else: 63 raise TypeError(file_)
Returns file_ as io.BytesIO object.
Args: file_ (Union[bytes, bytearray]): The bytes or bytearray of the file
Returns: io.BytesIO object
Raises: TypeError: If file_ is not an instance of: bytes, bytearray, io.BytesIO
83def buffer_to_bytes(buffer: ct.c_void_p, buffer_length: ct.c_size_t): 84 """ Convert ctypes buffer and buffer_length to bytes. 85 86 Args: 87 buffer (ct.c_void_p()): The file buffer. 88 buffer_length (ct.c_size_t()): The file buffer length. 89 90 Returns: 91 bytes (bytes): The file as bytes. 92 """ 93 94 file_buffer = (ct.c_byte * buffer_length.value)() 95 ct.memmove(file_buffer, buffer.value, buffer_length.value) 96 97 return bytes(file_buffer)
Convert ctypes buffer and buffer_length to bytes.
Args: buffer (ct.c_void_p()): The file buffer. buffer_length (ct.c_size_t()): The file buffer length.
Returns: bytes (bytes): The file as bytes.
100class CwdHandler: 101 """ Changes the current working directory to new_cwd on __enter__, and back to previous cwd on __exit__. 102 103 Args: 104 new_cwd (str): The new current working directory to temporarily change to. 105 """ 106 107 def __init__(self, new_cwd: str): 108 self.new_cwd = new_cwd if os.path.isdir(new_cwd) else os.path.dirname(new_cwd) 109 self.old_cwd = os.getcwd() 110 111 def __enter__(self): 112 os.chdir(self.new_cwd) 113 114 def __exit__(self, type, value, traceback): 115 os.chdir(self.old_cwd)
Changes the current working directory to new_cwd on __enter__, and back to previous cwd on __exit__.
Args: new_cwd (str): The new current working directory to temporarily change to.
118def delete_directory(directory: str, keep_folder: bool = False): 119 """ Delete a directory and its contents. 120 121 Args: 122 directory (str): The directory path. 123 keep_folder (bool, optional): Default False. If False, only delete contents. 124 """ 125 if os.path.isdir(directory): 126 # Delete all files in directory 127 for file_ in list_file_paths(directory, followlinks=False): 128 os.remove(file_) 129 130 # Delete all empty subdirectories 131 delete_empty_subdirectories(directory) 132 133 # Delete the directory 134 if keep_folder is False: 135 os.rmdir(directory)
Delete a directory and its contents.
Args: directory (str): The directory path. keep_folder (bool, optional): Default False. If False, only delete contents.
138def delete_empty_subdirectories(directory: str): 139 """ Deletes all empty subdirectories of a given directory. 140 141 Args: 142 directory (str): The directory to delete subdirectories from. 143 144 Returns: 145 None 146 """ 147 148 for root, dirs, _ in os.walk(directory, topdown=False): 149 for dir_ in dirs: 150 absolute_path = os.path.join(root, dir_) 151 try: 152 os.rmdir(absolute_path) 153 except PermissionError: 154 # directory might be read-only 155 try: 156 os.chmod(absolute_path, stat.S_IWRITE) 157 except Exception: 158 log.warning(f"PermissionError while attempting to delete {absolute_path}. Attempted chmod but failed.") 159 try: 160 os.rmdir(absolute_path) 161 except OSError: 162 # cannot be deleted 163 pass 164 except OSError: 165 # not empty, don't delete 166 pass
Deletes all empty subdirectories of a given directory.
Args: directory (str): The directory to delete subdirectories from.
Returns: None
169def flatten_list(list_: Iterable): 170 """ Returns a flattened list. [[1, 2], ["3"], (4, 5,), [6]] --> [1, 2, "3", 4, 5, 6] """ 171 return [ 172 item 173 for sublist in list_ 174 for item in sublist 175 ]
Returns a flattened list. [[1, 2], ["3"], (4, 5,), [6]] --> [1, 2, "3", 4, 5, 6]
178def get_file_type(file_path: str): 179 """ Returns the filetype of a file. "data/files/splat.zip" -> "zip" """ 180 return os.path.splitext(file_path)[-1].replace(".", "")
Returns the filetype of a file. "data/files/splat.zip" -> "zip"
183def get_libraries(directory: str, library_names: Optional[List[str]] = None, ignore_errors: bool = False): 184 """ Recursively calls get_library on each library from glasswall.libraries.os_info on the given directory. 185 186 Args: 187 directory (str): The directory to search from. 188 library_names (List[str], optional): List of libraries to return, if None iterates all libraries found in glasswall.libraries.os_info 189 ignore_errors (bool, optional): Default False, prevents get_library raising FileNotFoundError when True. 190 191 Returns: 192 libraries (dict[str, str]): A dictionary of library names and their absolute file paths. 193 """ 194 libraries = {} 195 196 if not library_names: 197 library_names = glasswall.libraries.os_info[glasswall._OPERATING_SYSTEM].keys() 198 199 for library_name in library_names: 200 try: 201 libraries[library_name] = get_library(library_name, directory) 202 except FileNotFoundError: 203 if ignore_errors is True: 204 continue 205 raise 206 207 return libraries
Recursively calls get_library on each library from glasswall.libraries.os_info on the given directory.
Args: directory (str): The directory to search from. library_names (List[str], optional): List of libraries to return, if None iterates all libraries found in glasswall.libraries.os_info ignore_errors (bool, optional): Default False, prevents get_library raising FileNotFoundError when True.
Returns: libraries (dict[str, str]): A dictionary of library names and their absolute file paths.
210def get_library(library: str, directory: str): 211 """ Returns a path to the specified library found from the current directory or any subdirectory. If multiple libraries exist, returns the file with the latest modified time. 212 213 Args: 214 library (str): The library to search for, ie: "rebuild", "word_search" 215 directory (str): The directory to search from. 216 217 Returns: 218 library_file_path (str): The absolute file path to the library. 219 220 Raises: 221 KeyError: Unsupported OS or library name was not found in glasswall.libraries.os_info. 222 FileNotFoundError: Library was not found. 223 """ 224 if not os.path.isdir(directory): 225 raise NotADirectoryError(directory) 226 227 library = as_snake_case(library) 228 library_file_names = glasswall.libraries.os_info[glasswall._OPERATING_SYSTEM][library]["file_name"] 229 230 if isinstance(library_file_names, str): 231 library_file_names = [library_file_names] 232 233 matches = [] 234 for alias in library_file_names: 235 p = pathlib.Path(directory) 236 alias_matches = list(p.rglob(alias)) 237 matches.extend(alias_matches) 238 239 if matches: 240 latest_library = str(max(matches, key=os.path.getctime).resolve()) 241 if len(matches) > 1: 242 # warn that multiple libraries found, list library paths if there are <= 5 243 if len(matches) <= 5: 244 log.warning(f"Found {len(matches)} {library} libraries, but expected only one:\n{chr(10).join(str(item) for item in matches)}\nLatest library: {latest_library}") 245 else: 246 log.warning(f"Found {len(matches)} {library} libraries, but expected only one.\nLatest library: {latest_library}") 247 248 # Return library with latest change time 249 return latest_library 250 251 # exhausted, not found 252 raise FileNotFoundError(f'Could not find any files: "{library_file_names}" under directory: "{directory}"')
Returns a path to the specified library found from the current directory or any subdirectory. If multiple libraries exist, returns the file with the latest modified time.
Args: library (str): The library to search for, ie: "rebuild", "word_search" directory (str): The directory to search from.
Returns: library_file_path (str): The absolute file path to the library.
Raises: KeyError: Unsupported OS or library name was not found in glasswall.libraries.os_info. FileNotFoundError: Library was not found.
255def iterate_directory_entries(directory: str, file_type: str = 'all', absolute: bool = True, recursive: bool = True, followlinks: bool = True, start_directory: str = None): 256 """ Generate entries (files, directories, or both) in a given directory using os.scandir(). 257 258 Args: 259 directory (str): The path to the directory whose entries are to be listed. 260 file_type (str, optional): Type of entries to return. 261 - 'all': Return both files and directories (default). 262 - 'files': Return only files. 263 - 'directories': Return only directories. 264 absolute (bool, optional): Whether to return absolute paths (default) or relative paths. 265 recursive (bool, optional): Whether to recurse into subdirectories (default is True). 266 followlinks (bool, optional): Whether to follow symbolic links and yield entries from the target directory (default is True). 267 start_directory (str, optional): The starting directory used to calculate relative paths (default is None). 268 269 Yields: 270 str: The full path of each file or directory found in the specified directory. 271 272 Raises: 273 ValueError: If an invalid 'file_type' value is provided. 274 NotADirectoryError: If the directory does not exist. 275 276 Example: 277 directory = '/path/to/your/directory' 278 279 # Iterate through all entries (files and directories) in the directory 280 for entry in iterate_directory_entries(directory): 281 print(entry) 282 283 # Iterate through only file entries in the directory 284 for file in iterate_directory_entries(directory, file_type='files'): 285 print("File:", file) 286 287 # Iterate through only directory entries in the directory 288 for directory in iterate_directory_entries(directory, file_type='directories'): 289 print("Directory:", directory) 290 """ 291 if not os.path.isdir(directory): 292 raise NotADirectoryError(directory) 293 294 allowed_types = ['all', 'files', 'directories'] 295 296 # Check if the provided file_type is valid 297 if file_type not in allowed_types: 298 raise ValueError(f"Invalid file_type '{file_type}'. Allowed values are {', '.join(allowed_types)}.") 299 300 # Convert the directory to an absolute path 301 directory = os.path.abspath(directory) 302 303 # Set the start_directory to the provided directory if not specified 304 start_directory = start_directory or directory 305 306 # Get the directory entries using os.scandir() 307 for entry in os.scandir(directory): 308 # If the entry is a directory 309 if entry.is_dir(follow_symlinks=followlinks): 310 # If recursive is True, traverse the subdirectory 311 if recursive: 312 yield from iterate_directory_entries(entry.path, file_type, absolute, recursive, followlinks, start_directory) 313 314 # If the file_type is not "files", yield the directory entry 315 if file_type != "files": 316 if absolute: 317 yield entry.path 318 else: 319 yield os.path.relpath(entry.path, start=start_directory) 320 321 # If the entry is a file 322 elif entry.is_file(follow_symlinks=followlinks): 323 # If the file_type is not "directories", yield the file entry 324 if file_type != "directories": 325 if absolute: 326 yield entry.path 327 else: 328 yield os.path.relpath(entry.path, start=start_directory)
Generate entries (files, directories, or both) in a given directory using os.scandir().
Args: directory (str): The path to the directory whose entries are to be listed. file_type (str, optional): Type of entries to return. - 'all': Return both files and directories (default). - 'files': Return only files. - 'directories': Return only directories. absolute (bool, optional): Whether to return absolute paths (default) or relative paths. recursive (bool, optional): Whether to recurse into subdirectories (default is True). followlinks (bool, optional): Whether to follow symbolic links and yield entries from the target directory (default is True). start_directory (str, optional): The starting directory used to calculate relative paths (default is None).
Yields: str: The full path of each file or directory found in the specified directory.
Raises: ValueError: If an invalid 'file_type' value is provided. NotADirectoryError: If the directory does not exist.
Example: directory = '/path/to/your/directory'
# Iterate through all entries (files and directories) in the directory
for entry in iterate_directory_entries(directory):
print(entry)
# Iterate through only file entries in the directory
for file in iterate_directory_entries(directory, file_type='files'):
print("File:", file)
# Iterate through only directory entries in the directory
for directory in iterate_directory_entries(directory, file_type='directories'):
print("Directory:", directory)
331def list_file_paths(directory: str, file_type: str = 'files', absolute: bool = True, recursive: bool = True, followlinks: bool = True) -> list: 332 """ List all file paths in a given directory and its subdirectories. 333 334 Args: 335 directory (str): The path to the directory whose file paths are to be listed. 336 file_type (str, optional): Type of entries to return. 337 - 'all': Return both files and directories. 338 - 'files': Return only files (default). 339 - 'directories': Return only directories. 340 absolute (bool, optional): Whether to return absolute paths (default is True). 341 recursive (bool, optional): Whether to recurse into subdirectories (default is True). 342 followlinks (bool, optional): Whether to follow symbolic links and list file paths from the target directory (default is True). 343 344 Returns: 345 list: A list of file paths found in the specified directory and its subdirectories. 346 347 Example: 348 directory = '/path/to/your/directory' 349 file_paths = list_file_paths(directory) 350 print(file_paths) 351 """ 352 # Remove duplicate file paths (symlinks of same files or other symlinks), and sort 353 return sorted(set(iterate_directory_entries(directory, file_type, absolute, recursive, followlinks)))
List all file paths in a given directory and its subdirectories.
Args: directory (str): The path to the directory whose file paths are to be listed. file_type (str, optional): Type of entries to return. - 'all': Return both files and directories. - 'files': Return only files (default). - 'directories': Return only directories. absolute (bool, optional): Whether to return absolute paths (default is True). recursive (bool, optional): Whether to recurse into subdirectories (default is True). followlinks (bool, optional): Whether to follow symbolic links and list file paths from the target directory (default is True).
Returns: list: A list of file paths found in the specified directory and its subdirectories.
Example: directory = '/path/to/your/directory' file_paths = list_file_paths(directory) print(file_paths)
356def list_subdirectory_paths(directory: str, recursive: bool = False, absolute: bool = True): 357 """ Returns a list of paths to subdirectories in a directory. 358 359 Args: 360 directory (str): The directory to list subdirectories from. 361 recursive (bool, optional): Default False. Include subdirectories of subdirectories. 362 absolute (bool, optional): Default True. Return paths as absolute paths. If False, returns relative paths. 363 364 Returns: 365 subdirectories (list): A list of subdirectory paths. 366 """ 367 subdirectories = [f.path for f in os.scandir(directory) if f.is_dir()] 368 369 if recursive: 370 for subdirectory in subdirectories: 371 subdirectories.extend(list_subdirectory_paths(subdirectory, recursive=True)) 372 373 if absolute: 374 subdirectories = [os.path.abspath(path) for path in subdirectories] 375 else: 376 subdirectories = [os.path.relpath(path, directory) for path in subdirectories] 377 378 return subdirectories
Returns a list of paths to subdirectories in a directory.
Args: directory (str): The directory to list subdirectories from. recursive (bool, optional): Default False. Include subdirectories of subdirectories. absolute (bool, optional): Default True. Return paths as absolute paths. If False, returns relative paths.
Returns: subdirectories (list): A list of subdirectory paths.
381def load_dependencies(dependencies: list, ignore_errors: bool = False): 382 """ Calls ctypes.cdll.LoadLibrary on each file path in `dependencies`. 383 384 Args: 385 dependencies (list): A list of absolute file paths of library dependencies. 386 ignore_errors (bool, optional): Default False, avoid raising exceptions from ct.cdll.LoadLibrary if ignore_errors is True. 387 388 Returns: 389 missing_dependencies (list): A list of missing dependencies, or an empty list. 390 """ 391 missing_dependencies = [dependency for dependency in dependencies if not os.path.isfile(dependency)] 392 393 for dependency in dependencies: 394 # Try to load dependencies that exist 395 if dependency not in missing_dependencies: 396 try: 397 ct.cdll.LoadLibrary(dependency) 398 except Exception: 399 if ignore_errors: 400 pass 401 else: 402 raise 403 404 return missing_dependencies
Calls ctypes.cdll.LoadLibrary on each file path in dependencies
.
Args: dependencies (list): A list of absolute file paths of library dependencies. ignore_errors (bool, optional): Default False, avoid raising exceptions from ct.cdll.LoadLibrary if ignore_errors is True.
Returns: missing_dependencies (list): A list of missing dependencies, or an empty list.
407def round_up(number: float, decimals=0) -> float: 408 """ Rounds a number up to a specified number of decimal places. 409 410 Args: 411 number (float): The number to be rounded. 412 decimals (int, optional): The number of decimal places to round to. Defaults to 0. 413 414 Returns: 415 float: The rounded number. 416 417 Examples: 418 >>> round_up(105, 0) 419 105.0 420 >>> round_up(0.015, 2) 421 0.02 422 >>> round_up(0.025, 2) 423 0.03 424 >>> round_up(0.00001, 2) 425 0.01 426 """ 427 multiplier = 10 ** decimals 428 return math.ceil(number * multiplier) / multiplier
Rounds a number up to a specified number of decimal places.
Args: number (float): The number to be rounded. decimals (int, optional): The number of decimal places to round to. Defaults to 0.
Returns: float: The rounded number.
Examples:
round_up(105, 0) 105.0 round_up(0.015, 2) 0.02 round_up(0.025, 2) 0.03 round_up(0.00001, 2) 0.01
431class TempDirectoryPath: 432 """ Gives a path to a uniquely named temporary directory that does not currently exist on __enter__, deletes the directory if it exists on __exit__. 433 434 Args: 435 delete (bool, optional): Default True. Delete the temporary directory on __exit__ 436 """ 437 438 def __init__(self, delete: bool = True): 439 # Validate args 440 if not isinstance(delete, bool): 441 raise TypeError(delete) 442 443 self.temp_directory = None 444 self.delete = delete 445 446 while self.temp_directory is None or os.path.isdir(self.temp_directory): 447 self.temp_directory = os.path.join(glasswall._TEMPDIR, next(tempfile._get_candidate_names())) 448 449 # Normalize 450 self.temp_directory = str(pathlib.Path(self.temp_directory).resolve()) 451 452 # Create temp directory 453 os.makedirs(self.temp_directory, exist_ok=True) 454 455 def __enter__(self): 456 return self.temp_directory 457 458 def __exit__(self, type, value, traceback): 459 if self.delete: 460 # Delete temp directory and all of its contents 461 if os.path.isdir(self.temp_directory): 462 delete_directory(self.temp_directory)
Gives a path to a uniquely named temporary directory that does not currently exist on __enter__, deletes the directory if it exists on __exit__.
Args: delete (bool, optional): Default True. Delete the temporary directory on __exit__
438 def __init__(self, delete: bool = True): 439 # Validate args 440 if not isinstance(delete, bool): 441 raise TypeError(delete) 442 443 self.temp_directory = None 444 self.delete = delete 445 446 while self.temp_directory is None or os.path.isdir(self.temp_directory): 447 self.temp_directory = os.path.join(glasswall._TEMPDIR, next(tempfile._get_candidate_names())) 448 449 # Normalize 450 self.temp_directory = str(pathlib.Path(self.temp_directory).resolve()) 451 452 # Create temp directory 453 os.makedirs(self.temp_directory, exist_ok=True)
465class TempFilePath: 466 """ Gives a path to a uniquely named temporary file that does not currently exist on __enter__, deletes the file if it exists on __exit__. 467 468 Args: 469 directory (Union[str, None], optional): The directory to create a temporary file in. 470 delete (bool, optional): Default True. Delete the temporary file on on __exit__ 471 """ 472 473 def __init__(self, directory: Union[str, None] = None, delete: bool = True): 474 # Validate args 475 if not isinstance(directory, (str, type(None))): 476 raise TypeError(directory) 477 if isinstance(directory, str) and not os.path.isdir(directory): 478 raise NotADirectoryError(directory) 479 if not isinstance(delete, bool): 480 raise TypeError(delete) 481 482 self.temp_file: str = None 483 self.directory = directory or tempfile.gettempdir() 484 self.delete = delete 485 486 while self.temp_file is None or os.path.isfile(self.temp_file): 487 self.temp_file = os.path.join(self.directory, next(tempfile._get_candidate_names())) 488 489 # Normalize 490 self.temp_file = str(pathlib.Path(self.temp_file).resolve()) 491 492 # Create temp directory if it does not exist 493 os.makedirs(os.path.dirname(self.temp_file), exist_ok=True) 494 495 def __enter__(self): 496 return self.temp_file 497 498 def __exit__(self, type, value, traceback): 499 if self.delete: 500 if os.path.isfile(self.temp_file): 501 os.remove(self.temp_file)
Gives a path to a uniquely named temporary file that does not currently exist on __enter__, deletes the file if it exists on __exit__.
Args: directory (Union[str, None], optional): The directory to create a temporary file in. delete (bool, optional): Default True. Delete the temporary file on on __exit__
473 def __init__(self, directory: Union[str, None] = None, delete: bool = True): 474 # Validate args 475 if not isinstance(directory, (str, type(None))): 476 raise TypeError(directory) 477 if isinstance(directory, str) and not os.path.isdir(directory): 478 raise NotADirectoryError(directory) 479 if not isinstance(delete, bool): 480 raise TypeError(delete) 481 482 self.temp_file: str = None 483 self.directory = directory or tempfile.gettempdir() 484 self.delete = delete 485 486 while self.temp_file is None or os.path.isfile(self.temp_file): 487 self.temp_file = os.path.join(self.directory, next(tempfile._get_candidate_names())) 488 489 # Normalize 490 self.temp_file = str(pathlib.Path(self.temp_file).resolve()) 491 492 # Create temp directory if it does not exist 493 os.makedirs(os.path.dirname(self.temp_file), exist_ok=True)
505def validate_xml(xml: Union[str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"]): 506 """ Attempts to parse the xml provided, returning the xml as string. Raises ValueError if the xml cannot be parsed. 507 508 Args: 509 xml (Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy]): The xml string, or file path, bytes, or ContentManagementPolicy instance to parse. 510 511 Returns: 512 xml_string (str): A string representation of the xml. 513 514 Raises: 515 ValueError: if the xml cannot be parsed. 516 TypeError: if the type of arg "xml" is invalid 517 """ 518 try: 519 # Get tree from file/str 520 if isinstance(xml, str): 521 try: 522 is_file = os.path.isfile(os.path.abspath(xml)) 523 except Exception: 524 is_file = False 525 526 if is_file: 527 tree = etree.parse(xml) 528 else: 529 xml = xml.encode("utf-8") 530 tree = etree.fromstring(xml) 531 532 # Get tree from bytes, bytearray, io.BytesIO 533 elif isinstance(xml, (bytes, bytearray, io.BytesIO)): 534 # Convert bytes, bytearray to io.BytesIO 535 if isinstance(xml, (bytes, bytearray)): 536 xml = as_io_BytesIO(xml) 537 tree = etree.parse(xml) 538 539 # Get tree from ContentManagementPolicy instance 540 elif isinstance(xml, glasswall.content_management.policies.policy.Policy): 541 xml = xml.text.encode("utf-8") 542 tree = etree.fromstring(xml) 543 544 else: 545 raise TypeError(xml) 546 547 except etree.XMLSyntaxError: 548 raise ValueError(xml) 549 550 # # convert tree to string and include xml declaration header utf8 551 etree.indent(tree, space=" " * 4) 552 xml_string = etree.tostring(tree, encoding="utf-8", xml_declaration=True, pretty_print=True).decode() 553 554 return xml_string
Attempts to parse the xml provided, returning the xml as string. Raises ValueError if the xml cannot be parsed.
Args: xml (Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy]): The xml string, or file path, bytes, or ContentManagementPolicy instance to parse.
Returns: xml_string (str): A string representation of the xml.
Raises: ValueError: if the xml cannot be parsed. TypeError: if the type of arg "xml" is invalid
557def xml_as_dict(xml): 558 """ Converts a simple single-level xml into a dictionary. 559 560 Args: 561 xml (Union[str, bytes, bytearray, io.BytesIO]): The xml string, or file path, or bytes to parse. 562 563 Returns: 564 dict_ (dict): A dictionary of element tag : text 565 """ 566 # Convert xml to string 567 xml_string = validate_xml(xml) 568 569 # Get root 570 root = etree.fromstring(xml_string.encode()) 571 572 dict_ = { 573 element.tag: element.text 574 for element in root 575 } 576 577 # Sort for ease of viewing logs 578 dict_ = {k: v for k, v in sorted(dict_.items())} 579 580 return dict_
Converts a simple single-level xml into a dictionary.
Args: xml (Union[str, bytes, bytearray, io.BytesIO]): The xml string, or file path, or bytes to parse.
Returns: dict_ (dict): A dictionary of element tag : text
583def deprecated_alias(**aliases: str) -> Callable: 584 """ Decorator for deprecated function and method arguments. 585 586 Use as follows: 587 588 @deprecated_alias(old_arg='new_arg') 589 def myfunc(new_arg): 590 ... 591 592 https://stackoverflow.com/a/49802489 593 """ 594 595 def deco(f: Callable): 596 @functools.wraps(f) 597 def wrapper(*args, **kwargs): 598 rename_kwargs(f.__name__, kwargs, aliases) 599 return f(*args, **kwargs) 600 601 return wrapper 602 603 return deco
Decorator for deprecated function and method arguments.
Use as follows:
@deprecated_alias(old_arg='new_arg') def myfunc(new_arg): ...
606def rename_kwargs(func_name: str, kwargs: Dict[str, Any], aliases: Dict[str, str]): 607 """ Helper function for deprecating function arguments. 608 609 https://stackoverflow.com/a/49802489 610 """ 611 for alias, new in aliases.items(): 612 if alias in kwargs: 613 if new in kwargs: 614 raise TypeError( 615 f"{func_name} received both {alias} and {new} as arguments!" 616 f" {alias} is deprecated, use {new} instead." 617 ) 618 warnings.warn( 619 message=( 620 f"`{alias}` is deprecated as an argument to `{func_name}`; use" 621 f" `{new}` instead." 622 ), 623 category=DeprecationWarning, 624 stacklevel=3, 625 ) 626 kwargs[new] = kwargs.pop(alias)
Helper function for deprecating function arguments.
629def deprecated_function(replacement_function): 630 def decorator(f: Callable): 631 @functools.wraps(f) 632 def wrapper(*args, **kwargs): 633 warnings.warn( 634 message=f"Call to deprecated method: '{f.__name__}'. Use '{replacement_function.__name__}' instead.", 635 category=DeprecationWarning, 636 stacklevel=3 637 ) 638 return replacement_function(*args, **kwargs) 639 640 return wrapper 641 642 return decorator