glasswall.libraries.archive_manager.archive_manager
1import ctypes as ct 2import functools 3import io 4import os 5from typing import Optional, Union 6 7import glasswall 8from glasswall import determine_file_type as dft 9from glasswall import utils 10from glasswall.config.logging import log 11from glasswall.libraries.archive_manager import errors, successes 12from glasswall.libraries.library import Library 13 14 15class ArchiveManager(Library): 16 """ A high level Python wrapper for Glasswall Archive Manager. """ 17 18 def __init__(self, library_path): 19 super().__init__(library_path) 20 self.library = self.load_library(os.path.abspath(library_path)) 21 22 log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") 23 24 def version(self): 25 """ Returns the Glasswall library version. 26 27 Returns: 28 version (str): The Glasswall library version. 29 """ 30 # API function declaration 31 self.library.GwArchiveVersion.restype = ct.c_char_p 32 33 # API call 34 version = self.library.GwArchiveVersion() 35 36 # Convert to Python string 37 version = ct.string_at(version).decode() 38 39 return version 40 41 def release(self): 42 """ Releases any resources held by the Glasswall Archive Manager library. """ 43 self.library.GwArchiveDone() 44 45 @property 46 @functools.lru_cache() 47 def supported_archives(self): 48 """ Returns a list of supported archive file formats. """ 49 50 # API function declaration 51 self.library.GwSupportedFiletypes.restype = ct.c_char_p 52 53 # API call 54 result = self.library.GwSupportedFiletypes() # b'7z,bz2,gz,rar,tar,xz,zip,' 55 56 # Convert to Python string 57 result = ct.string_at(result).decode() # 7z,bz2,gz,rar,tar,xz,zip, 58 59 # Convert comma separated str to list, remove empty trailing element, sort 60 result = sorted(filter(None, result.split(","))) 61 62 return result 63 64 @functools.lru_cache() 65 def is_supported_archive(self, archive_type: str): 66 """ Returns True if the archive type (e.g. `7z`) is supported. """ 67 68 # API function declaration 69 self.library.GwIsSupportedArchiveType.argtypes = [ 70 ct.c_char_p 71 ] 72 self.library.GwIsSupportedArchiveType.restype = ct.c_bool 73 74 ct_archive_type = ct.c_char_p(archive_type.encode()) # const char* type 75 76 result = self.library.GwIsSupportedArchiveType(ct_archive_type) 77 78 return result 79 80 def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True): 81 """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """ 82 return [ 83 file_path 84 for file_path in glasswall.utils.list_file_paths( 85 directory=directory, 86 recursive=recursive, 87 absolute=absolute, 88 followlinks=followlinks, 89 ) 90 if self.is_supported_archive(self.determine_file_type(file_path, as_string=True, raise_unsupported=False)) 91 ] 92 93 def determine_file_type(self, input_file: str, as_string: bool = False, raise_unsupported: bool = True): 94 """ Returns an int representing the file type of an archive. 95 96 Args: 97 input_file (str) The input file path. 98 as_string (bool, optional): Return file type as string, eg: "xz" instead of: 262. Defaults to False. 99 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 100 101 Returns: 102 file_type (Union[int, str]): The file format. 103 """ 104 if not os.path.isfile(input_file): 105 raise FileNotFoundError(input_file) 106 107 # API function declaration 108 self.library.GwDetermineArchiveTypeFromFile.argtypes = [ 109 ct.c_char_p 110 ] 111 112 # Variable initialisation 113 ct_input_file = ct.c_char_p(input_file.encode()) # const char * inputFilePath) 114 115 with utils.CwdHandler(new_cwd=self.library_path): 116 # API call 117 file_type = self.library.GwDetermineArchiveTypeFromFile( 118 ct_input_file 119 ) 120 121 file_type_as_string = dft.file_type_int_to_str(file_type) 122 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 123 124 if not dft.is_success(file_type): 125 if raise_unsupported: 126 log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 127 raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) 128 else: 129 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 130 else: 131 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 132 133 if as_string: 134 return file_type_as_string 135 136 return file_type 137 138 def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 139 """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report. 140 141 Args: 142 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 143 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 144 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 145 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 146 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 147 148 Returns: 149 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 150 """ 151 # Validate arg types 152 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 153 raise TypeError(input_file) 154 if not isinstance(output_file, (type(None), str)): 155 raise TypeError(output_file) 156 if not isinstance(output_report, (type(None), str)): 157 raise TypeError(output_report) 158 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 159 raise TypeError(content_management_policy) 160 161 # Convert string path arguments to absolute paths 162 if isinstance(input_file, str): 163 input_file = os.path.abspath(input_file) 164 if isinstance(output_file, str): 165 output_file = os.path.abspath(output_file) 166 if isinstance(output_report, str): 167 output_report = os.path.abspath(output_report) 168 169 # Convert inputs to bytes 170 if isinstance(input_file, str): 171 if not os.path.isfile(input_file): 172 raise FileNotFoundError(input_file) 173 with open(input_file, "rb") as f: 174 input_file_bytes = f.read() 175 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 176 input_file_bytes = utils.as_bytes(input_file) 177 178 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 179 with open(content_management_policy, "rb") as f: 180 content_management_policy = f.read() 181 elif isinstance(content_management_policy, type(None)): 182 # Load default 183 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 184 content_management_policy = utils.validate_xml(content_management_policy) 185 186 # API function declaration 187 self.library.GwFileAnalysisArchive.argtypes = [ 188 ct.c_void_p, # void *inputBuffer 189 ct.c_size_t, # size_t inputBufferLength 190 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 191 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 192 ct.POINTER(ct.c_void_p), # void **outputAnalysisReportBuffer 193 ct.POINTER(ct.c_size_t), # size_t *outputAnalysisReportBufferLength 194 ct.c_char_p # const char *xmlConfigString 195 ] 196 197 # Variable initialisation 198 gw_return_object = glasswall.GwReturnObj() 199 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 200 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 201 gw_return_object.output_buffer = ct.c_void_p() 202 gw_return_object.output_buffer_length = ct.c_size_t() 203 gw_return_object.output_report_buffer = ct.c_void_p() 204 gw_return_object.output_report_buffer_length = ct.c_size_t() 205 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 206 207 with utils.CwdHandler(new_cwd=self.library_path): 208 # API call 209 gw_return_object.status = self.library.GwFileAnalysisArchive( 210 gw_return_object.input_buffer, 211 gw_return_object.input_buffer_length, 212 ct.byref(gw_return_object.output_buffer), 213 ct.byref(gw_return_object.output_buffer_length), 214 ct.byref(gw_return_object.output_report_buffer), 215 ct.byref(gw_return_object.output_report_buffer_length), 216 gw_return_object.content_management_policy 217 ) 218 219 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 220 gw_return_object.output_file = utils.buffer_to_bytes( 221 gw_return_object.output_buffer, 222 gw_return_object.output_buffer_length 223 ) 224 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 225 gw_return_object.output_report = utils.buffer_to_bytes( 226 gw_return_object.output_report_buffer, 227 gw_return_object.output_report_buffer_length 228 ) 229 230 # Write output file 231 if hasattr(gw_return_object, "output_file"): 232 if isinstance(output_file, str): 233 os.makedirs(os.path.dirname(output_file), exist_ok=True) 234 with open(output_file, "wb") as f: 235 f.write(gw_return_object.output_file) 236 237 # Write output report 238 if hasattr(gw_return_object, "output_report"): 239 if isinstance(output_report, str): 240 os.makedirs(os.path.dirname(output_report), exist_ok=True) 241 with open(output_report, "wb") as f: 242 f.write(gw_return_object.output_report) 243 244 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 245 if gw_return_object.status not in successes.success_codes: 246 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 247 if raise_unsupported: 248 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 249 else: 250 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 251 252 self.release() 253 254 return gw_return_object 255 256 def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 257 """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory. 258 259 Args: 260 input_directory (str): The input directory containing archives to analyse. 261 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written. 262 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 263 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 264 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 265 266 Returns: 267 analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 268 """ 269 analysed_archives_dict = {} 270 # Call analyse_archive on each file in input_directory 271 for input_file in utils.list_file_paths(input_directory): 272 relative_path = os.path.relpath(input_file, input_directory) 273 # Construct paths for output file and output report 274 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 275 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 276 277 result = self.analyse_archive( 278 input_file=input_file, 279 output_file=output_file, 280 output_report=output_report, 281 content_management_policy=content_management_policy, 282 raise_unsupported=raise_unsupported, 283 ) 284 285 analysed_archives_dict[relative_path] = result 286 287 return analysed_archives_dict 288 289 def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 290 """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report. 291 292 Args: 293 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 294 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 295 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 296 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 297 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 298 299 Returns: 300 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 301 """ 302 # Validate arg types 303 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 304 raise TypeError(input_file) 305 if not isinstance(output_file, (type(None), str)): 306 raise TypeError(output_file) 307 if not isinstance(output_report, (type(None), str)): 308 raise TypeError(output_report) 309 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 310 raise TypeError(content_management_policy) 311 312 # Convert string path arguments to absolute paths 313 if isinstance(input_file, str): 314 input_file = os.path.abspath(input_file) 315 if isinstance(output_file, str): 316 output_file = os.path.abspath(output_file) 317 if isinstance(output_report, str): 318 output_report = os.path.abspath(output_report) 319 320 # Convert inputs to bytes 321 if isinstance(input_file, str): 322 if not os.path.isfile(input_file): 323 raise FileNotFoundError(input_file) 324 with open(input_file, "rb") as f: 325 input_file_bytes = f.read() 326 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 327 input_file_bytes = utils.as_bytes(input_file) 328 329 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 330 with open(content_management_policy, "rb") as f: 331 content_management_policy = f.read() 332 elif isinstance(content_management_policy, type(None)): 333 # Load default 334 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 335 content_management_policy = utils.validate_xml(content_management_policy) 336 337 # API function declaration 338 self.library.GwFileProtectAndReportArchive.argtypes = [ 339 ct.c_void_p, # void *inputBuffer 340 ct.c_size_t, # size_t inputBufferLength 341 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 342 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 343 ct.POINTER(ct.c_void_p), # void **outputReportBuffer 344 ct.POINTER(ct.c_size_t), # size_t *outputReportBufferLength 345 ct.c_char_p # const char *xmlConfigString 346 ] 347 # Variable initialisation 348 gw_return_object = glasswall.GwReturnObj() 349 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 350 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 351 gw_return_object.output_buffer = ct.c_void_p() 352 gw_return_object.output_buffer_length = ct.c_size_t() 353 gw_return_object.output_report_buffer = ct.c_void_p() 354 gw_return_object.output_report_buffer_length = ct.c_size_t() 355 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 356 357 with utils.CwdHandler(new_cwd=self.library_path): 358 # API call 359 gw_return_object.status = self.library.GwFileProtectAndReportArchive( 360 ct.byref(gw_return_object.input_buffer), 361 gw_return_object.input_buffer_length, 362 ct.byref(gw_return_object.output_buffer), 363 ct.byref(gw_return_object.output_buffer_length), 364 ct.byref(gw_return_object.output_report_buffer), 365 ct.byref(gw_return_object.output_report_buffer_length), 366 gw_return_object.content_management_policy 367 ) 368 369 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 370 gw_return_object.output_file = utils.buffer_to_bytes( 371 gw_return_object.output_buffer, 372 gw_return_object.output_buffer_length 373 ) 374 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 375 gw_return_object.output_report = utils.buffer_to_bytes( 376 gw_return_object.output_report_buffer, 377 gw_return_object.output_report_buffer_length 378 ) 379 380 # Write output file 381 if hasattr(gw_return_object, "output_file"): 382 if isinstance(output_file, str): 383 os.makedirs(os.path.dirname(output_file), exist_ok=True) 384 with open(output_file, "wb") as f: 385 f.write(gw_return_object.output_file) 386 387 # Write output report 388 if hasattr(gw_return_object, "output_report"): 389 if isinstance(output_report, str): 390 os.makedirs(os.path.dirname(output_report), exist_ok=True) 391 with open(output_report, "wb") as f: 392 f.write(gw_return_object.output_report) 393 394 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 395 if gw_return_object.status not in successes.success_codes: 396 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 397 if raise_unsupported: 398 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 399 else: 400 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 401 402 self.release() 403 404 return gw_return_object 405 406 def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 407 """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory. 408 409 Args: 410 input_directory (str): The input directory containing archives to protect. 411 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. 412 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 413 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 414 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 415 416 Returns: 417 protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 418 """ 419 protected_archives_dict = {} 420 # Call protect_archive on each file in input_directory to output_directory 421 for input_file in utils.list_file_paths(input_directory): 422 relative_path = os.path.relpath(input_file, input_directory) 423 # Construct paths for output file and output report 424 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 425 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 426 427 result = self.protect_archive( 428 input_file=input_file, 429 output_file=output_file, 430 output_report=output_report, 431 content_management_policy=content_management_policy, 432 raise_unsupported=raise_unsupported, 433 ) 434 435 protected_archives_dict[relative_path] = result 436 437 return protected_archives_dict 438 439 def file_to_file_unpack(self, input_file: str, output_directory: str, raise_unsupported: bool = True): 440 # Validate arg types 441 if not isinstance(input_file, str): 442 raise TypeError(input_file) 443 elif not os.path.isfile(input_file): 444 raise FileNotFoundError(input_file) 445 if not isinstance(output_directory, str): 446 raise TypeError(output_directory) 447 448 # API function declaration 449 self.library.GwFileToFileUnpack.argtypes = [ 450 ct.c_char_p, 451 ct.c_char_p, 452 ] 453 454 # Variable initialisation 455 gw_return_object = glasswall.GwReturnObj() 456 gw_return_object.ct_input_file = ct.c_char_p(input_file.encode()) # const char* inputFilePath 457 gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath 458 459 with utils.CwdHandler(new_cwd=self.library_path): 460 # API call 461 gw_return_object.status = self.library.GwFileToFileUnpack( 462 gw_return_object.ct_input_file, 463 gw_return_object.ct_output_directory, 464 ) 465 466 if gw_return_object.status not in successes.success_codes: 467 log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}") 468 if raise_unsupported: 469 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 470 else: 471 log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}") 472 473 self.release() 474 475 return gw_return_object 476 477 def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True): 478 # Validate arg types 479 if not isinstance(input_directory, str): 480 raise TypeError(input_directory) 481 elif not os.path.isdir(input_directory): 482 raise NotADirectoryError(input_directory) 483 if not isinstance(output_directory, str): 484 raise TypeError(output_directory) 485 if not file_type: 486 file_type = utils.get_file_type(input_directory) 487 488 # Ensure output_directory exists 489 os.makedirs(output_directory, exist_ok=True) 490 491 # API function declaration 492 self.library.GwFileToFilePack.argtypes = [ 493 ct.c_char_p, 494 ct.c_char_p, 495 ct.c_char_p, 496 ct.c_int, 497 ] 498 499 # Variable initialisation 500 gw_return_object = glasswall.GwReturnObj() 501 gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode()) # const char* inputDirPath 502 gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath 503 gw_return_object.ct_file_type = ct.c_char_p(file_type.encode()) # const char *fileType 504 gw_return_object.ct_add_extension = ct.c_int(int(add_extension)) # int addExtension 505 506 with utils.CwdHandler(new_cwd=self.library_path): 507 # API call 508 gw_return_object.status = self.library.GwFileToFilePack( 509 gw_return_object.ct_input_directory, 510 gw_return_object.ct_output_directory, 511 gw_return_object.ct_file_type, 512 gw_return_object.ct_add_extension, 513 ) 514 515 if gw_return_object.status not in successes.success_codes: 516 log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}") 517 if raise_unsupported: 518 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 519 else: 520 log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}") 521 522 self.release() 523 524 return gw_return_object 525 526 def unpack(self, input_file: str, output_directory: str, recursive: bool = True, include_file_type: bool = False, raise_unsupported: bool = True, delete_origin: bool = False): 527 """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip". 528 529 Args: 530 input_file (str): The archive file path 531 output_directory (str): The output directory where the archive will be unpacked to a new directory. 532 recursive (bool, optional): Default True. Recursively unpack all nested archives. 533 include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. 534 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 535 delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. 536 """ 537 # Convert to absolute paths 538 input_file = os.path.abspath(input_file) 539 output_directory = os.path.abspath(output_directory) 540 541 if include_file_type: 542 archive_name = os.path.basename(input_file) 543 else: 544 archive_name = os.path.splitext(os.path.basename(input_file))[0] 545 archive_output_directory = os.path.join(output_directory, archive_name) 546 547 # Unpack 548 log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}") 549 result = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, raise_unsupported=raise_unsupported) 550 if result: 551 status = result.status 552 else: 553 status = None 554 555 if status not in successes.success_codes: 556 log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}") 557 if raise_unsupported: 558 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 559 else: 560 log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}") 561 562 if delete_origin: 563 os.remove(input_file) 564 565 if recursive: 566 # Unpack sub archives 567 for subarchive in self.list_archive_paths(archive_output_directory): 568 self.unpack( 569 input_file=subarchive, 570 output_directory=archive_output_directory, 571 recursive=recursive, 572 raise_unsupported=raise_unsupported, 573 delete_origin=True 574 ) 575 576 return status 577 578 def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False): 579 """ Unpack a directory of archives, maintaining directory structure. 580 581 Args: 582 input_directory (str): The input directory containing archives to unpack. 583 output_directory (str): The output directory where archives will be unpacked to a new directory. 584 recursive (bool, optional): Default True. Recursively unpack all nested archives. 585 include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. 586 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 587 delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. 588 """ 589 # Convert to absolute paths 590 input_directory = os.path.abspath(input_directory) 591 output_directory = os.path.abspath(output_directory) 592 593 for archive_input_file in self.list_archive_paths(input_directory): 594 relative_path = os.path.relpath(archive_input_file, input_directory) 595 archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path)) 596 self.unpack( 597 input_file=archive_input_file, 598 output_directory=archive_output_file, 599 recursive=recursive, 600 include_file_type=include_file_type, 601 raise_unsupported=raise_unsupported, 602 delete_origin=delete_origin 603 ) 604 605 def pack_directory(self, input_directory: str, output_directory: str, file_type: str, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True, delete_origin: Optional[bool] = False): 606 """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip". 607 608 Args: 609 input_directory (str): The input directory containing files to archive. 610 output_directory (str): The output directory to store the created archive. 611 file_type (str): The archive file type. 612 add_extension (bool, optional): Default: True. Archive file type extension to result file. 613 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 614 delete_origin (bool, optional): Default False. Delete input_directory after packing to output_directory. 615 """ 616 # Convert to absolute paths 617 input_directory = os.path.abspath(input_directory) 618 output_directory = os.path.abspath(output_directory) 619 620 # Pack 621 log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}") 622 status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, add_extension=add_extension, raise_unsupported=raise_unsupported).status 623 624 if status not in successes.success_codes: 625 log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}") 626 if raise_unsupported: 627 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 628 else: 629 log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}") 630 631 if delete_origin: 632 utils.delete_directory(input_directory) 633 634 return status 635 636 def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 637 """ Exports an archive using the Glasswall engine. 638 639 Args: 640 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 641 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 642 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 643 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 644 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 645 646 Returns: 647 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 648 """ 649 # Validate arg types 650 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 651 raise TypeError(input_file) 652 if not isinstance(output_file, (type(None), str)): 653 raise TypeError(output_file) 654 if not isinstance(output_report, (type(None), str)): 655 raise TypeError(output_report) 656 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 657 raise TypeError(content_management_policy) 658 659 # Convert string path arguments to absolute paths 660 if isinstance(input_file, str): 661 input_file = os.path.abspath(input_file) 662 if isinstance(output_file, str): 663 output_file = os.path.abspath(output_file) 664 if isinstance(output_report, str): 665 output_report = os.path.abspath(output_report) 666 667 # Convert inputs to bytes 668 if isinstance(input_file, str): 669 if not os.path.isfile(input_file): 670 raise FileNotFoundError(input_file) 671 with open(input_file, "rb") as f: 672 input_file_bytes = f.read() 673 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 674 input_file_bytes = utils.as_bytes(input_file) 675 676 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 677 with open(content_management_policy, "rb") as f: 678 content_management_policy = f.read() 679 elif isinstance(content_management_policy, type(None)): 680 # Load default 681 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 682 content_management_policy = utils.validate_xml(content_management_policy) 683 684 # API function declaration 685 self.library.GwFileExportArchive.argtypes = [ 686 ct.c_void_p, # void *inputBuffer 687 ct.c_size_t, # size_t inputBufferLength 688 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 689 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 690 ct.POINTER(ct.c_void_p), # void **outputReportBuffer 691 ct.POINTER(ct.c_size_t), # size_t *outputReportBufferLength 692 ct.c_char_p # const char *xmlConfigString 693 ] 694 695 # Variable initialisation 696 gw_return_object = glasswall.GwReturnObj() 697 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 698 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 699 gw_return_object.output_buffer = ct.c_void_p() 700 gw_return_object.output_buffer_length = ct.c_size_t() 701 gw_return_object.output_report_buffer = ct.c_void_p() 702 gw_return_object.output_report_buffer_length = ct.c_size_t() 703 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 704 705 with utils.CwdHandler(new_cwd=self.library_path): 706 # API call 707 gw_return_object.status = self.library.GwFileExportArchive( 708 gw_return_object.input_buffer, 709 gw_return_object.input_buffer_length, 710 ct.byref(gw_return_object.output_buffer), 711 ct.byref(gw_return_object.output_buffer_length), 712 ct.byref(gw_return_object.output_report_buffer), 713 ct.byref(gw_return_object.output_report_buffer_length), 714 gw_return_object.content_management_policy 715 ) 716 717 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 718 gw_return_object.output_file = utils.buffer_to_bytes( 719 gw_return_object.output_buffer, 720 gw_return_object.output_buffer_length 721 ) 722 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 723 gw_return_object.output_report = utils.buffer_to_bytes( 724 gw_return_object.output_report_buffer, 725 gw_return_object.output_report_buffer_length 726 ) 727 728 # Write output file 729 if hasattr(gw_return_object, "output_file"): 730 if isinstance(output_file, str): 731 os.makedirs(os.path.dirname(output_file), exist_ok=True) 732 with open(output_file, "wb") as f: 733 f.write(gw_return_object.output_file) 734 735 # Write output report 736 if hasattr(gw_return_object, "output_report"): 737 if isinstance(output_report, str): 738 os.makedirs(os.path.dirname(output_report), exist_ok=True) 739 with open(output_report, "wb") as f: 740 f.write(gw_return_object.output_report) 741 742 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 743 if gw_return_object.status not in successes.success_codes: 744 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 745 if raise_unsupported: 746 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 747 else: 748 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 749 750 self.release() 751 752 return gw_return_object 753 754 def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 755 """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory. 756 757 Args: 758 input_directory (str): The input directory containing archives to export. 759 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. 760 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 761 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 762 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 763 764 Returns: 765 exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 766 """ 767 exported_archives_dict = {} 768 # Call export_archive on each file in input_directory to output_directory 769 for input_file in utils.list_file_paths(input_directory): 770 relative_path = os.path.relpath(input_file, input_directory) 771 # Construct paths for output file and output report 772 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 773 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 774 775 result = self.export_archive( 776 input_file=input_file, 777 output_file=output_file, 778 output_report=output_report, 779 content_management_policy=content_management_policy, 780 raise_unsupported=raise_unsupported, 781 ) 782 783 exported_archives_dict[relative_path] = result 784 785 return exported_archives_dict 786 787 def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: Optional[bool] = True): 788 """ Imports an archive using the Glasswall engine. 789 790 Args: 791 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 792 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 793 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 794 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 795 include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive. 796 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 797 798 Returns: 799 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 800 """ 801 # Validate arg types 802 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 803 raise TypeError(input_file) 804 if not isinstance(output_file, (type(None), str)): 805 raise TypeError(output_file) 806 if not isinstance(output_report, (type(None), str)): 807 raise TypeError(output_report) 808 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 809 raise TypeError(content_management_policy) 810 811 # Convert string path arguments to absolute paths 812 if isinstance(input_file, str): 813 input_file = os.path.abspath(input_file) 814 # Convert string path arguments to absolute paths 815 if isinstance(output_file, str): 816 output_file = os.path.abspath(output_file) 817 if isinstance(output_report, str): 818 output_report = os.path.abspath(output_report) 819 820 # Convert inputs to bytes 821 if isinstance(input_file, str): 822 if not os.path.isfile(input_file): 823 raise FileNotFoundError(input_file) 824 with open(input_file, "rb") as f: 825 input_file_bytes = f.read() 826 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 827 input_file_bytes = utils.as_bytes(input_file) 828 829 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 830 with open(content_management_policy, "rb") as f: 831 content_management_policy = f.read() 832 elif isinstance(content_management_policy, type(None)): 833 # Load default 834 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 835 content_management_policy = utils.validate_xml(content_management_policy) 836 837 # API function declaration 838 self.library.GwFileImportArchive.argtypes = [ 839 ct.c_void_p, # void *inputBuffer 840 ct.c_size_t, # size_t inputBufferLength 841 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 842 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 843 ct.POINTER(ct.c_void_p), # void **outputReportBuffer 844 ct.POINTER(ct.c_size_t), # size_t *outputReportBufferLength 845 ct.c_char_p, # const char *xmlConfigString 846 ct.c_int # int includeAnalysisReports 847 ] 848 849 # Variable initialisation 850 gw_return_object = glasswall.GwReturnObj() 851 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 852 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 853 gw_return_object.output_buffer = ct.c_void_p() 854 gw_return_object.output_buffer_length = ct.c_size_t() 855 gw_return_object.output_report_buffer = ct.c_void_p() 856 gw_return_object.output_report_buffer_length = ct.c_size_t() 857 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 858 gw_return_object.include_analysis_report = ct.c_int(int(include_analysis_report)) 859 860 with utils.CwdHandler(new_cwd=self.library_path): 861 # API call 862 gw_return_object.status = self.library.GwFileImportArchive( 863 gw_return_object.input_buffer, 864 gw_return_object.input_buffer_length, 865 ct.byref(gw_return_object.output_buffer), 866 ct.byref(gw_return_object.output_buffer_length), 867 ct.byref(gw_return_object.output_report_buffer), 868 ct.byref(gw_return_object.output_report_buffer_length), 869 gw_return_object.content_management_policy, 870 gw_return_object.include_analysis_report 871 ) 872 873 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 874 gw_return_object.output_file = utils.buffer_to_bytes( 875 gw_return_object.output_buffer, 876 gw_return_object.output_buffer_length 877 ) 878 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 879 gw_return_object.output_report = utils.buffer_to_bytes( 880 gw_return_object.output_report_buffer, 881 gw_return_object.output_report_buffer_length 882 ) 883 884 # Write output file 885 if hasattr(gw_return_object, "output_file"): 886 if isinstance(output_file, str): 887 os.makedirs(os.path.dirname(output_file), exist_ok=True) 888 with open(output_file, "wb") as f: 889 f.write(gw_return_object.output_file) 890 891 # Write output report 892 if hasattr(gw_return_object, "output_report"): 893 if isinstance(output_report, str): 894 os.makedirs(os.path.dirname(output_report), exist_ok=True) 895 with open(output_report, "wb") as f: 896 f.write(gw_return_object.output_report) 897 898 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 899 if gw_return_object.status not in successes.success_codes: 900 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 901 if raise_unsupported: 902 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 903 else: 904 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 905 906 self.release() 907 908 return gw_return_object 909 910 def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: bool = True): 911 """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory. 912 913 Args: 914 input_directory (str): The input directory containing archives to import. 915 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. 916 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 917 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 918 include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive. 919 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 920 921 Returns: 922 imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 923 """ 924 imported_archives_dict = {} 925 # Call import_archive on each file in input_directory to output_directory 926 for input_file in utils.list_file_paths(input_directory): 927 relative_path = os.path.relpath(input_file, input_directory) 928 # Construct paths for output file and output report 929 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 930 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 931 932 result = self.import_archive( 933 input_file=input_file, 934 output_file=output_file, 935 output_report=output_report, 936 content_management_policy=content_management_policy, 937 include_analysis_report=include_analysis_report, 938 raise_unsupported=raise_unsupported, 939 ) 940 941 imported_archives_dict[relative_path] = result 942 943 return imported_archives_dict
18class ArchiveManager(Library): 19 """ A high level Python wrapper for Glasswall Archive Manager. """ 20 21 def __init__(self, library_path): 22 super().__init__(library_path) 23 self.library = self.load_library(os.path.abspath(library_path)) 24 25 log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") 26 27 def version(self): 28 """ Returns the Glasswall library version. 29 30 Returns: 31 version (str): The Glasswall library version. 32 """ 33 # API function declaration 34 self.library.GwArchiveVersion.restype = ct.c_char_p 35 36 # API call 37 version = self.library.GwArchiveVersion() 38 39 # Convert to Python string 40 version = ct.string_at(version).decode() 41 42 return version 43 44 def release(self): 45 """ Releases any resources held by the Glasswall Archive Manager library. """ 46 self.library.GwArchiveDone() 47 48 @property 49 @functools.lru_cache() 50 def supported_archives(self): 51 """ Returns a list of supported archive file formats. """ 52 53 # API function declaration 54 self.library.GwSupportedFiletypes.restype = ct.c_char_p 55 56 # API call 57 result = self.library.GwSupportedFiletypes() # b'7z,bz2,gz,rar,tar,xz,zip,' 58 59 # Convert to Python string 60 result = ct.string_at(result).decode() # 7z,bz2,gz,rar,tar,xz,zip, 61 62 # Convert comma separated str to list, remove empty trailing element, sort 63 result = sorted(filter(None, result.split(","))) 64 65 return result 66 67 @functools.lru_cache() 68 def is_supported_archive(self, archive_type: str): 69 """ Returns True if the archive type (e.g. `7z`) is supported. """ 70 71 # API function declaration 72 self.library.GwIsSupportedArchiveType.argtypes = [ 73 ct.c_char_p 74 ] 75 self.library.GwIsSupportedArchiveType.restype = ct.c_bool 76 77 ct_archive_type = ct.c_char_p(archive_type.encode()) # const char* type 78 79 result = self.library.GwIsSupportedArchiveType(ct_archive_type) 80 81 return result 82 83 def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True): 84 """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """ 85 return [ 86 file_path 87 for file_path in glasswall.utils.list_file_paths( 88 directory=directory, 89 recursive=recursive, 90 absolute=absolute, 91 followlinks=followlinks, 92 ) 93 if self.is_supported_archive(self.determine_file_type(file_path, as_string=True, raise_unsupported=False)) 94 ] 95 96 def determine_file_type(self, input_file: str, as_string: bool = False, raise_unsupported: bool = True): 97 """ Returns an int representing the file type of an archive. 98 99 Args: 100 input_file (str) The input file path. 101 as_string (bool, optional): Return file type as string, eg: "xz" instead of: 262. Defaults to False. 102 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 103 104 Returns: 105 file_type (Union[int, str]): The file format. 106 """ 107 if not os.path.isfile(input_file): 108 raise FileNotFoundError(input_file) 109 110 # API function declaration 111 self.library.GwDetermineArchiveTypeFromFile.argtypes = [ 112 ct.c_char_p 113 ] 114 115 # Variable initialisation 116 ct_input_file = ct.c_char_p(input_file.encode()) # const char * inputFilePath) 117 118 with utils.CwdHandler(new_cwd=self.library_path): 119 # API call 120 file_type = self.library.GwDetermineArchiveTypeFromFile( 121 ct_input_file 122 ) 123 124 file_type_as_string = dft.file_type_int_to_str(file_type) 125 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 126 127 if not dft.is_success(file_type): 128 if raise_unsupported: 129 log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 130 raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) 131 else: 132 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 133 else: 134 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 135 136 if as_string: 137 return file_type_as_string 138 139 return file_type 140 141 def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 142 """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report. 143 144 Args: 145 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 146 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 147 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 148 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 149 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 150 151 Returns: 152 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 153 """ 154 # Validate arg types 155 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 156 raise TypeError(input_file) 157 if not isinstance(output_file, (type(None), str)): 158 raise TypeError(output_file) 159 if not isinstance(output_report, (type(None), str)): 160 raise TypeError(output_report) 161 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 162 raise TypeError(content_management_policy) 163 164 # Convert string path arguments to absolute paths 165 if isinstance(input_file, str): 166 input_file = os.path.abspath(input_file) 167 if isinstance(output_file, str): 168 output_file = os.path.abspath(output_file) 169 if isinstance(output_report, str): 170 output_report = os.path.abspath(output_report) 171 172 # Convert inputs to bytes 173 if isinstance(input_file, str): 174 if not os.path.isfile(input_file): 175 raise FileNotFoundError(input_file) 176 with open(input_file, "rb") as f: 177 input_file_bytes = f.read() 178 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 179 input_file_bytes = utils.as_bytes(input_file) 180 181 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 182 with open(content_management_policy, "rb") as f: 183 content_management_policy = f.read() 184 elif isinstance(content_management_policy, type(None)): 185 # Load default 186 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 187 content_management_policy = utils.validate_xml(content_management_policy) 188 189 # API function declaration 190 self.library.GwFileAnalysisArchive.argtypes = [ 191 ct.c_void_p, # void *inputBuffer 192 ct.c_size_t, # size_t inputBufferLength 193 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 194 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 195 ct.POINTER(ct.c_void_p), # void **outputAnalysisReportBuffer 196 ct.POINTER(ct.c_size_t), # size_t *outputAnalysisReportBufferLength 197 ct.c_char_p # const char *xmlConfigString 198 ] 199 200 # Variable initialisation 201 gw_return_object = glasswall.GwReturnObj() 202 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 203 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 204 gw_return_object.output_buffer = ct.c_void_p() 205 gw_return_object.output_buffer_length = ct.c_size_t() 206 gw_return_object.output_report_buffer = ct.c_void_p() 207 gw_return_object.output_report_buffer_length = ct.c_size_t() 208 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 209 210 with utils.CwdHandler(new_cwd=self.library_path): 211 # API call 212 gw_return_object.status = self.library.GwFileAnalysisArchive( 213 gw_return_object.input_buffer, 214 gw_return_object.input_buffer_length, 215 ct.byref(gw_return_object.output_buffer), 216 ct.byref(gw_return_object.output_buffer_length), 217 ct.byref(gw_return_object.output_report_buffer), 218 ct.byref(gw_return_object.output_report_buffer_length), 219 gw_return_object.content_management_policy 220 ) 221 222 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 223 gw_return_object.output_file = utils.buffer_to_bytes( 224 gw_return_object.output_buffer, 225 gw_return_object.output_buffer_length 226 ) 227 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 228 gw_return_object.output_report = utils.buffer_to_bytes( 229 gw_return_object.output_report_buffer, 230 gw_return_object.output_report_buffer_length 231 ) 232 233 # Write output file 234 if hasattr(gw_return_object, "output_file"): 235 if isinstance(output_file, str): 236 os.makedirs(os.path.dirname(output_file), exist_ok=True) 237 with open(output_file, "wb") as f: 238 f.write(gw_return_object.output_file) 239 240 # Write output report 241 if hasattr(gw_return_object, "output_report"): 242 if isinstance(output_report, str): 243 os.makedirs(os.path.dirname(output_report), exist_ok=True) 244 with open(output_report, "wb") as f: 245 f.write(gw_return_object.output_report) 246 247 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 248 if gw_return_object.status not in successes.success_codes: 249 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 250 if raise_unsupported: 251 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 252 else: 253 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 254 255 self.release() 256 257 return gw_return_object 258 259 def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 260 """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory. 261 262 Args: 263 input_directory (str): The input directory containing archives to analyse. 264 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written. 265 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 266 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 267 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 268 269 Returns: 270 analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 271 """ 272 analysed_archives_dict = {} 273 # Call analyse_archive on each file in input_directory 274 for input_file in utils.list_file_paths(input_directory): 275 relative_path = os.path.relpath(input_file, input_directory) 276 # Construct paths for output file and output report 277 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 278 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 279 280 result = self.analyse_archive( 281 input_file=input_file, 282 output_file=output_file, 283 output_report=output_report, 284 content_management_policy=content_management_policy, 285 raise_unsupported=raise_unsupported, 286 ) 287 288 analysed_archives_dict[relative_path] = result 289 290 return analysed_archives_dict 291 292 def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 293 """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report. 294 295 Args: 296 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 297 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 298 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 299 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 300 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 301 302 Returns: 303 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 304 """ 305 # Validate arg types 306 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 307 raise TypeError(input_file) 308 if not isinstance(output_file, (type(None), str)): 309 raise TypeError(output_file) 310 if not isinstance(output_report, (type(None), str)): 311 raise TypeError(output_report) 312 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 313 raise TypeError(content_management_policy) 314 315 # Convert string path arguments to absolute paths 316 if isinstance(input_file, str): 317 input_file = os.path.abspath(input_file) 318 if isinstance(output_file, str): 319 output_file = os.path.abspath(output_file) 320 if isinstance(output_report, str): 321 output_report = os.path.abspath(output_report) 322 323 # Convert inputs to bytes 324 if isinstance(input_file, str): 325 if not os.path.isfile(input_file): 326 raise FileNotFoundError(input_file) 327 with open(input_file, "rb") as f: 328 input_file_bytes = f.read() 329 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 330 input_file_bytes = utils.as_bytes(input_file) 331 332 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 333 with open(content_management_policy, "rb") as f: 334 content_management_policy = f.read() 335 elif isinstance(content_management_policy, type(None)): 336 # Load default 337 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 338 content_management_policy = utils.validate_xml(content_management_policy) 339 340 # API function declaration 341 self.library.GwFileProtectAndReportArchive.argtypes = [ 342 ct.c_void_p, # void *inputBuffer 343 ct.c_size_t, # size_t inputBufferLength 344 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 345 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 346 ct.POINTER(ct.c_void_p), # void **outputReportBuffer 347 ct.POINTER(ct.c_size_t), # size_t *outputReportBufferLength 348 ct.c_char_p # const char *xmlConfigString 349 ] 350 # Variable initialisation 351 gw_return_object = glasswall.GwReturnObj() 352 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 353 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 354 gw_return_object.output_buffer = ct.c_void_p() 355 gw_return_object.output_buffer_length = ct.c_size_t() 356 gw_return_object.output_report_buffer = ct.c_void_p() 357 gw_return_object.output_report_buffer_length = ct.c_size_t() 358 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 359 360 with utils.CwdHandler(new_cwd=self.library_path): 361 # API call 362 gw_return_object.status = self.library.GwFileProtectAndReportArchive( 363 ct.byref(gw_return_object.input_buffer), 364 gw_return_object.input_buffer_length, 365 ct.byref(gw_return_object.output_buffer), 366 ct.byref(gw_return_object.output_buffer_length), 367 ct.byref(gw_return_object.output_report_buffer), 368 ct.byref(gw_return_object.output_report_buffer_length), 369 gw_return_object.content_management_policy 370 ) 371 372 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 373 gw_return_object.output_file = utils.buffer_to_bytes( 374 gw_return_object.output_buffer, 375 gw_return_object.output_buffer_length 376 ) 377 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 378 gw_return_object.output_report = utils.buffer_to_bytes( 379 gw_return_object.output_report_buffer, 380 gw_return_object.output_report_buffer_length 381 ) 382 383 # Write output file 384 if hasattr(gw_return_object, "output_file"): 385 if isinstance(output_file, str): 386 os.makedirs(os.path.dirname(output_file), exist_ok=True) 387 with open(output_file, "wb") as f: 388 f.write(gw_return_object.output_file) 389 390 # Write output report 391 if hasattr(gw_return_object, "output_report"): 392 if isinstance(output_report, str): 393 os.makedirs(os.path.dirname(output_report), exist_ok=True) 394 with open(output_report, "wb") as f: 395 f.write(gw_return_object.output_report) 396 397 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 398 if gw_return_object.status not in successes.success_codes: 399 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 400 if raise_unsupported: 401 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 402 else: 403 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 404 405 self.release() 406 407 return gw_return_object 408 409 def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 410 """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory. 411 412 Args: 413 input_directory (str): The input directory containing archives to protect. 414 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. 415 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 416 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 417 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 418 419 Returns: 420 protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 421 """ 422 protected_archives_dict = {} 423 # Call protect_archive on each file in input_directory to output_directory 424 for input_file in utils.list_file_paths(input_directory): 425 relative_path = os.path.relpath(input_file, input_directory) 426 # Construct paths for output file and output report 427 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 428 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 429 430 result = self.protect_archive( 431 input_file=input_file, 432 output_file=output_file, 433 output_report=output_report, 434 content_management_policy=content_management_policy, 435 raise_unsupported=raise_unsupported, 436 ) 437 438 protected_archives_dict[relative_path] = result 439 440 return protected_archives_dict 441 442 def file_to_file_unpack(self, input_file: str, output_directory: str, raise_unsupported: bool = True): 443 # Validate arg types 444 if not isinstance(input_file, str): 445 raise TypeError(input_file) 446 elif not os.path.isfile(input_file): 447 raise FileNotFoundError(input_file) 448 if not isinstance(output_directory, str): 449 raise TypeError(output_directory) 450 451 # API function declaration 452 self.library.GwFileToFileUnpack.argtypes = [ 453 ct.c_char_p, 454 ct.c_char_p, 455 ] 456 457 # Variable initialisation 458 gw_return_object = glasswall.GwReturnObj() 459 gw_return_object.ct_input_file = ct.c_char_p(input_file.encode()) # const char* inputFilePath 460 gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath 461 462 with utils.CwdHandler(new_cwd=self.library_path): 463 # API call 464 gw_return_object.status = self.library.GwFileToFileUnpack( 465 gw_return_object.ct_input_file, 466 gw_return_object.ct_output_directory, 467 ) 468 469 if gw_return_object.status not in successes.success_codes: 470 log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}") 471 if raise_unsupported: 472 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 473 else: 474 log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}") 475 476 self.release() 477 478 return gw_return_object 479 480 def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True): 481 # Validate arg types 482 if not isinstance(input_directory, str): 483 raise TypeError(input_directory) 484 elif not os.path.isdir(input_directory): 485 raise NotADirectoryError(input_directory) 486 if not isinstance(output_directory, str): 487 raise TypeError(output_directory) 488 if not file_type: 489 file_type = utils.get_file_type(input_directory) 490 491 # Ensure output_directory exists 492 os.makedirs(output_directory, exist_ok=True) 493 494 # API function declaration 495 self.library.GwFileToFilePack.argtypes = [ 496 ct.c_char_p, 497 ct.c_char_p, 498 ct.c_char_p, 499 ct.c_int, 500 ] 501 502 # Variable initialisation 503 gw_return_object = glasswall.GwReturnObj() 504 gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode()) # const char* inputDirPath 505 gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath 506 gw_return_object.ct_file_type = ct.c_char_p(file_type.encode()) # const char *fileType 507 gw_return_object.ct_add_extension = ct.c_int(int(add_extension)) # int addExtension 508 509 with utils.CwdHandler(new_cwd=self.library_path): 510 # API call 511 gw_return_object.status = self.library.GwFileToFilePack( 512 gw_return_object.ct_input_directory, 513 gw_return_object.ct_output_directory, 514 gw_return_object.ct_file_type, 515 gw_return_object.ct_add_extension, 516 ) 517 518 if gw_return_object.status not in successes.success_codes: 519 log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}") 520 if raise_unsupported: 521 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 522 else: 523 log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}") 524 525 self.release() 526 527 return gw_return_object 528 529 def unpack(self, input_file: str, output_directory: str, recursive: bool = True, include_file_type: bool = False, raise_unsupported: bool = True, delete_origin: bool = False): 530 """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip". 531 532 Args: 533 input_file (str): The archive file path 534 output_directory (str): The output directory where the archive will be unpacked to a new directory. 535 recursive (bool, optional): Default True. Recursively unpack all nested archives. 536 include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. 537 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 538 delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. 539 """ 540 # Convert to absolute paths 541 input_file = os.path.abspath(input_file) 542 output_directory = os.path.abspath(output_directory) 543 544 if include_file_type: 545 archive_name = os.path.basename(input_file) 546 else: 547 archive_name = os.path.splitext(os.path.basename(input_file))[0] 548 archive_output_directory = os.path.join(output_directory, archive_name) 549 550 # Unpack 551 log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}") 552 result = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, raise_unsupported=raise_unsupported) 553 if result: 554 status = result.status 555 else: 556 status = None 557 558 if status not in successes.success_codes: 559 log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}") 560 if raise_unsupported: 561 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 562 else: 563 log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}") 564 565 if delete_origin: 566 os.remove(input_file) 567 568 if recursive: 569 # Unpack sub archives 570 for subarchive in self.list_archive_paths(archive_output_directory): 571 self.unpack( 572 input_file=subarchive, 573 output_directory=archive_output_directory, 574 recursive=recursive, 575 raise_unsupported=raise_unsupported, 576 delete_origin=True 577 ) 578 579 return status 580 581 def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False): 582 """ Unpack a directory of archives, maintaining directory structure. 583 584 Args: 585 input_directory (str): The input directory containing archives to unpack. 586 output_directory (str): The output directory where archives will be unpacked to a new directory. 587 recursive (bool, optional): Default True. Recursively unpack all nested archives. 588 include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. 589 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 590 delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. 591 """ 592 # Convert to absolute paths 593 input_directory = os.path.abspath(input_directory) 594 output_directory = os.path.abspath(output_directory) 595 596 for archive_input_file in self.list_archive_paths(input_directory): 597 relative_path = os.path.relpath(archive_input_file, input_directory) 598 archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path)) 599 self.unpack( 600 input_file=archive_input_file, 601 output_directory=archive_output_file, 602 recursive=recursive, 603 include_file_type=include_file_type, 604 raise_unsupported=raise_unsupported, 605 delete_origin=delete_origin 606 ) 607 608 def pack_directory(self, input_directory: str, output_directory: str, file_type: str, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True, delete_origin: Optional[bool] = False): 609 """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip". 610 611 Args: 612 input_directory (str): The input directory containing files to archive. 613 output_directory (str): The output directory to store the created archive. 614 file_type (str): The archive file type. 615 add_extension (bool, optional): Default: True. Archive file type extension to result file. 616 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 617 delete_origin (bool, optional): Default False. Delete input_directory after packing to output_directory. 618 """ 619 # Convert to absolute paths 620 input_directory = os.path.abspath(input_directory) 621 output_directory = os.path.abspath(output_directory) 622 623 # Pack 624 log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}") 625 status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, add_extension=add_extension, raise_unsupported=raise_unsupported).status 626 627 if status not in successes.success_codes: 628 log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}") 629 if raise_unsupported: 630 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 631 else: 632 log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}") 633 634 if delete_origin: 635 utils.delete_directory(input_directory) 636 637 return status 638 639 def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 640 """ Exports an archive using the Glasswall engine. 641 642 Args: 643 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 644 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 645 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 646 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 647 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 648 649 Returns: 650 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 651 """ 652 # Validate arg types 653 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 654 raise TypeError(input_file) 655 if not isinstance(output_file, (type(None), str)): 656 raise TypeError(output_file) 657 if not isinstance(output_report, (type(None), str)): 658 raise TypeError(output_report) 659 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 660 raise TypeError(content_management_policy) 661 662 # Convert string path arguments to absolute paths 663 if isinstance(input_file, str): 664 input_file = os.path.abspath(input_file) 665 if isinstance(output_file, str): 666 output_file = os.path.abspath(output_file) 667 if isinstance(output_report, str): 668 output_report = os.path.abspath(output_report) 669 670 # Convert inputs to bytes 671 if isinstance(input_file, str): 672 if not os.path.isfile(input_file): 673 raise FileNotFoundError(input_file) 674 with open(input_file, "rb") as f: 675 input_file_bytes = f.read() 676 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 677 input_file_bytes = utils.as_bytes(input_file) 678 679 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 680 with open(content_management_policy, "rb") as f: 681 content_management_policy = f.read() 682 elif isinstance(content_management_policy, type(None)): 683 # Load default 684 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 685 content_management_policy = utils.validate_xml(content_management_policy) 686 687 # API function declaration 688 self.library.GwFileExportArchive.argtypes = [ 689 ct.c_void_p, # void *inputBuffer 690 ct.c_size_t, # size_t inputBufferLength 691 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 692 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 693 ct.POINTER(ct.c_void_p), # void **outputReportBuffer 694 ct.POINTER(ct.c_size_t), # size_t *outputReportBufferLength 695 ct.c_char_p # const char *xmlConfigString 696 ] 697 698 # Variable initialisation 699 gw_return_object = glasswall.GwReturnObj() 700 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 701 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 702 gw_return_object.output_buffer = ct.c_void_p() 703 gw_return_object.output_buffer_length = ct.c_size_t() 704 gw_return_object.output_report_buffer = ct.c_void_p() 705 gw_return_object.output_report_buffer_length = ct.c_size_t() 706 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 707 708 with utils.CwdHandler(new_cwd=self.library_path): 709 # API call 710 gw_return_object.status = self.library.GwFileExportArchive( 711 gw_return_object.input_buffer, 712 gw_return_object.input_buffer_length, 713 ct.byref(gw_return_object.output_buffer), 714 ct.byref(gw_return_object.output_buffer_length), 715 ct.byref(gw_return_object.output_report_buffer), 716 ct.byref(gw_return_object.output_report_buffer_length), 717 gw_return_object.content_management_policy 718 ) 719 720 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 721 gw_return_object.output_file = utils.buffer_to_bytes( 722 gw_return_object.output_buffer, 723 gw_return_object.output_buffer_length 724 ) 725 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 726 gw_return_object.output_report = utils.buffer_to_bytes( 727 gw_return_object.output_report_buffer, 728 gw_return_object.output_report_buffer_length 729 ) 730 731 # Write output file 732 if hasattr(gw_return_object, "output_file"): 733 if isinstance(output_file, str): 734 os.makedirs(os.path.dirname(output_file), exist_ok=True) 735 with open(output_file, "wb") as f: 736 f.write(gw_return_object.output_file) 737 738 # Write output report 739 if hasattr(gw_return_object, "output_report"): 740 if isinstance(output_report, str): 741 os.makedirs(os.path.dirname(output_report), exist_ok=True) 742 with open(output_report, "wb") as f: 743 f.write(gw_return_object.output_report) 744 745 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 746 if gw_return_object.status not in successes.success_codes: 747 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 748 if raise_unsupported: 749 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 750 else: 751 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 752 753 self.release() 754 755 return gw_return_object 756 757 def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 758 """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory. 759 760 Args: 761 input_directory (str): The input directory containing archives to export. 762 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. 763 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 764 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 765 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 766 767 Returns: 768 exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 769 """ 770 exported_archives_dict = {} 771 # Call export_archive on each file in input_directory to output_directory 772 for input_file in utils.list_file_paths(input_directory): 773 relative_path = os.path.relpath(input_file, input_directory) 774 # Construct paths for output file and output report 775 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 776 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 777 778 result = self.export_archive( 779 input_file=input_file, 780 output_file=output_file, 781 output_report=output_report, 782 content_management_policy=content_management_policy, 783 raise_unsupported=raise_unsupported, 784 ) 785 786 exported_archives_dict[relative_path] = result 787 788 return exported_archives_dict 789 790 def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: Optional[bool] = True): 791 """ Imports an archive using the Glasswall engine. 792 793 Args: 794 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 795 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 796 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 797 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 798 include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive. 799 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 800 801 Returns: 802 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 803 """ 804 # Validate arg types 805 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 806 raise TypeError(input_file) 807 if not isinstance(output_file, (type(None), str)): 808 raise TypeError(output_file) 809 if not isinstance(output_report, (type(None), str)): 810 raise TypeError(output_report) 811 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 812 raise TypeError(content_management_policy) 813 814 # Convert string path arguments to absolute paths 815 if isinstance(input_file, str): 816 input_file = os.path.abspath(input_file) 817 # Convert string path arguments to absolute paths 818 if isinstance(output_file, str): 819 output_file = os.path.abspath(output_file) 820 if isinstance(output_report, str): 821 output_report = os.path.abspath(output_report) 822 823 # Convert inputs to bytes 824 if isinstance(input_file, str): 825 if not os.path.isfile(input_file): 826 raise FileNotFoundError(input_file) 827 with open(input_file, "rb") as f: 828 input_file_bytes = f.read() 829 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 830 input_file_bytes = utils.as_bytes(input_file) 831 832 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 833 with open(content_management_policy, "rb") as f: 834 content_management_policy = f.read() 835 elif isinstance(content_management_policy, type(None)): 836 # Load default 837 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 838 content_management_policy = utils.validate_xml(content_management_policy) 839 840 # API function declaration 841 self.library.GwFileImportArchive.argtypes = [ 842 ct.c_void_p, # void *inputBuffer 843 ct.c_size_t, # size_t inputBufferLength 844 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 845 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 846 ct.POINTER(ct.c_void_p), # void **outputReportBuffer 847 ct.POINTER(ct.c_size_t), # size_t *outputReportBufferLength 848 ct.c_char_p, # const char *xmlConfigString 849 ct.c_int # int includeAnalysisReports 850 ] 851 852 # Variable initialisation 853 gw_return_object = glasswall.GwReturnObj() 854 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 855 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 856 gw_return_object.output_buffer = ct.c_void_p() 857 gw_return_object.output_buffer_length = ct.c_size_t() 858 gw_return_object.output_report_buffer = ct.c_void_p() 859 gw_return_object.output_report_buffer_length = ct.c_size_t() 860 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 861 gw_return_object.include_analysis_report = ct.c_int(int(include_analysis_report)) 862 863 with utils.CwdHandler(new_cwd=self.library_path): 864 # API call 865 gw_return_object.status = self.library.GwFileImportArchive( 866 gw_return_object.input_buffer, 867 gw_return_object.input_buffer_length, 868 ct.byref(gw_return_object.output_buffer), 869 ct.byref(gw_return_object.output_buffer_length), 870 ct.byref(gw_return_object.output_report_buffer), 871 ct.byref(gw_return_object.output_report_buffer_length), 872 gw_return_object.content_management_policy, 873 gw_return_object.include_analysis_report 874 ) 875 876 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 877 gw_return_object.output_file = utils.buffer_to_bytes( 878 gw_return_object.output_buffer, 879 gw_return_object.output_buffer_length 880 ) 881 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 882 gw_return_object.output_report = utils.buffer_to_bytes( 883 gw_return_object.output_report_buffer, 884 gw_return_object.output_report_buffer_length 885 ) 886 887 # Write output file 888 if hasattr(gw_return_object, "output_file"): 889 if isinstance(output_file, str): 890 os.makedirs(os.path.dirname(output_file), exist_ok=True) 891 with open(output_file, "wb") as f: 892 f.write(gw_return_object.output_file) 893 894 # Write output report 895 if hasattr(gw_return_object, "output_report"): 896 if isinstance(output_report, str): 897 os.makedirs(os.path.dirname(output_report), exist_ok=True) 898 with open(output_report, "wb") as f: 899 f.write(gw_return_object.output_report) 900 901 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 902 if gw_return_object.status not in successes.success_codes: 903 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 904 if raise_unsupported: 905 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 906 else: 907 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 908 909 self.release() 910 911 return gw_return_object 912 913 def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: bool = True): 914 """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory. 915 916 Args: 917 input_directory (str): The input directory containing archives to import. 918 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. 919 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 920 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 921 include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive. 922 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 923 924 Returns: 925 imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 926 """ 927 imported_archives_dict = {} 928 # Call import_archive on each file in input_directory to output_directory 929 for input_file in utils.list_file_paths(input_directory): 930 relative_path = os.path.relpath(input_file, input_directory) 931 # Construct paths for output file and output report 932 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 933 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 934 935 result = self.import_archive( 936 input_file=input_file, 937 output_file=output_file, 938 output_report=output_report, 939 content_management_policy=content_management_policy, 940 include_analysis_report=include_analysis_report, 941 raise_unsupported=raise_unsupported, 942 ) 943 944 imported_archives_dict[relative_path] = result 945 946 return imported_archives_dict
A high level Python wrapper for Glasswall Archive Manager.
27 def version(self): 28 """ Returns the Glasswall library version. 29 30 Returns: 31 version (str): The Glasswall library version. 32 """ 33 # API function declaration 34 self.library.GwArchiveVersion.restype = ct.c_char_p 35 36 # API call 37 version = self.library.GwArchiveVersion() 38 39 # Convert to Python string 40 version = ct.string_at(version).decode() 41 42 return version
Returns the Glasswall library version.
Returns: version (str): The Glasswall library version.
44 def release(self): 45 """ Releases any resources held by the Glasswall Archive Manager library. """ 46 self.library.GwArchiveDone()
Releases any resources held by the Glasswall Archive Manager library.
48 @property 49 @functools.lru_cache() 50 def supported_archives(self): 51 """ Returns a list of supported archive file formats. """ 52 53 # API function declaration 54 self.library.GwSupportedFiletypes.restype = ct.c_char_p 55 56 # API call 57 result = self.library.GwSupportedFiletypes() # b'7z,bz2,gz,rar,tar,xz,zip,' 58 59 # Convert to Python string 60 result = ct.string_at(result).decode() # 7z,bz2,gz,rar,tar,xz,zip, 61 62 # Convert comma separated str to list, remove empty trailing element, sort 63 result = sorted(filter(None, result.split(","))) 64 65 return result
Returns a list of supported archive file formats.
67 @functools.lru_cache() 68 def is_supported_archive(self, archive_type: str): 69 """ Returns True if the archive type (e.g. `7z`) is supported. """ 70 71 # API function declaration 72 self.library.GwIsSupportedArchiveType.argtypes = [ 73 ct.c_char_p 74 ] 75 self.library.GwIsSupportedArchiveType.restype = ct.c_bool 76 77 ct_archive_type = ct.c_char_p(archive_type.encode()) # const char* type 78 79 result = self.library.GwIsSupportedArchiveType(ct_archive_type) 80 81 return result
Returns True if the archive type (e.g. 7z
) is supported.
83 def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True): 84 """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """ 85 return [ 86 file_path 87 for file_path in glasswall.utils.list_file_paths( 88 directory=directory, 89 recursive=recursive, 90 absolute=absolute, 91 followlinks=followlinks, 92 ) 93 if self.is_supported_archive(self.determine_file_type(file_path, as_string=True, raise_unsupported=False)) 94 ]
Returns a list of file paths of supported archives in a directory and all of its subdirectories.
96 def determine_file_type(self, input_file: str, as_string: bool = False, raise_unsupported: bool = True): 97 """ Returns an int representing the file type of an archive. 98 99 Args: 100 input_file (str) The input file path. 101 as_string (bool, optional): Return file type as string, eg: "xz" instead of: 262. Defaults to False. 102 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 103 104 Returns: 105 file_type (Union[int, str]): The file format. 106 """ 107 if not os.path.isfile(input_file): 108 raise FileNotFoundError(input_file) 109 110 # API function declaration 111 self.library.GwDetermineArchiveTypeFromFile.argtypes = [ 112 ct.c_char_p 113 ] 114 115 # Variable initialisation 116 ct_input_file = ct.c_char_p(input_file.encode()) # const char * inputFilePath) 117 118 with utils.CwdHandler(new_cwd=self.library_path): 119 # API call 120 file_type = self.library.GwDetermineArchiveTypeFromFile( 121 ct_input_file 122 ) 123 124 file_type_as_string = dft.file_type_int_to_str(file_type) 125 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 126 127 if not dft.is_success(file_type): 128 if raise_unsupported: 129 log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 130 raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) 131 else: 132 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 133 else: 134 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 135 136 if as_string: 137 return file_type_as_string 138 139 return file_type
Returns an int representing the file type of an archive.
Args: input_file (str) The input file path. as_string (bool, optional): Return file type as string, eg: "xz" instead of: 262. Defaults to False. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_type (Union[int, str]): The file format.
141 def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 142 """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report. 143 144 Args: 145 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 146 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 147 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 148 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 149 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 150 151 Returns: 152 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 153 """ 154 # Validate arg types 155 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 156 raise TypeError(input_file) 157 if not isinstance(output_file, (type(None), str)): 158 raise TypeError(output_file) 159 if not isinstance(output_report, (type(None), str)): 160 raise TypeError(output_report) 161 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 162 raise TypeError(content_management_policy) 163 164 # Convert string path arguments to absolute paths 165 if isinstance(input_file, str): 166 input_file = os.path.abspath(input_file) 167 if isinstance(output_file, str): 168 output_file = os.path.abspath(output_file) 169 if isinstance(output_report, str): 170 output_report = os.path.abspath(output_report) 171 172 # Convert inputs to bytes 173 if isinstance(input_file, str): 174 if not os.path.isfile(input_file): 175 raise FileNotFoundError(input_file) 176 with open(input_file, "rb") as f: 177 input_file_bytes = f.read() 178 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 179 input_file_bytes = utils.as_bytes(input_file) 180 181 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 182 with open(content_management_policy, "rb") as f: 183 content_management_policy = f.read() 184 elif isinstance(content_management_policy, type(None)): 185 # Load default 186 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 187 content_management_policy = utils.validate_xml(content_management_policy) 188 189 # API function declaration 190 self.library.GwFileAnalysisArchive.argtypes = [ 191 ct.c_void_p, # void *inputBuffer 192 ct.c_size_t, # size_t inputBufferLength 193 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 194 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 195 ct.POINTER(ct.c_void_p), # void **outputAnalysisReportBuffer 196 ct.POINTER(ct.c_size_t), # size_t *outputAnalysisReportBufferLength 197 ct.c_char_p # const char *xmlConfigString 198 ] 199 200 # Variable initialisation 201 gw_return_object = glasswall.GwReturnObj() 202 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 203 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 204 gw_return_object.output_buffer = ct.c_void_p() 205 gw_return_object.output_buffer_length = ct.c_size_t() 206 gw_return_object.output_report_buffer = ct.c_void_p() 207 gw_return_object.output_report_buffer_length = ct.c_size_t() 208 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 209 210 with utils.CwdHandler(new_cwd=self.library_path): 211 # API call 212 gw_return_object.status = self.library.GwFileAnalysisArchive( 213 gw_return_object.input_buffer, 214 gw_return_object.input_buffer_length, 215 ct.byref(gw_return_object.output_buffer), 216 ct.byref(gw_return_object.output_buffer_length), 217 ct.byref(gw_return_object.output_report_buffer), 218 ct.byref(gw_return_object.output_report_buffer_length), 219 gw_return_object.content_management_policy 220 ) 221 222 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 223 gw_return_object.output_file = utils.buffer_to_bytes( 224 gw_return_object.output_buffer, 225 gw_return_object.output_buffer_length 226 ) 227 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 228 gw_return_object.output_report = utils.buffer_to_bytes( 229 gw_return_object.output_report_buffer, 230 gw_return_object.output_report_buffer_length 231 ) 232 233 # Write output file 234 if hasattr(gw_return_object, "output_file"): 235 if isinstance(output_file, str): 236 os.makedirs(os.path.dirname(output_file), exist_ok=True) 237 with open(output_file, "wb") as f: 238 f.write(gw_return_object.output_file) 239 240 # Write output report 241 if hasattr(gw_return_object, "output_report"): 242 if isinstance(output_report, str): 243 os.makedirs(os.path.dirname(output_report), exist_ok=True) 244 with open(output_report, "wb") as f: 245 f.write(gw_return_object.output_report) 246 247 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 248 if gw_return_object.status not in successes.success_codes: 249 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 250 if raise_unsupported: 251 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 252 else: 253 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 254 255 self.release() 256 257 return gw_return_object
Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
259 def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 260 """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory. 261 262 Args: 263 input_directory (str): The input directory containing archives to analyse. 264 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written. 265 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 266 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 267 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 268 269 Returns: 270 analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 271 """ 272 analysed_archives_dict = {} 273 # Call analyse_archive on each file in input_directory 274 for input_file in utils.list_file_paths(input_directory): 275 relative_path = os.path.relpath(input_file, input_directory) 276 # Construct paths for output file and output report 277 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 278 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 279 280 result = self.analyse_archive( 281 input_file=input_file, 282 output_file=output_file, 283 output_report=output_report, 284 content_management_policy=content_management_policy, 285 raise_unsupported=raise_unsupported, 286 ) 287 288 analysed_archives_dict[relative_path] = result 289 290 return analysed_archives_dict
Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing archives to analyse. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
292 def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 293 """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report. 294 295 Args: 296 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 297 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 298 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 299 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 300 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 301 302 Returns: 303 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 304 """ 305 # Validate arg types 306 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 307 raise TypeError(input_file) 308 if not isinstance(output_file, (type(None), str)): 309 raise TypeError(output_file) 310 if not isinstance(output_report, (type(None), str)): 311 raise TypeError(output_report) 312 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 313 raise TypeError(content_management_policy) 314 315 # Convert string path arguments to absolute paths 316 if isinstance(input_file, str): 317 input_file = os.path.abspath(input_file) 318 if isinstance(output_file, str): 319 output_file = os.path.abspath(output_file) 320 if isinstance(output_report, str): 321 output_report = os.path.abspath(output_report) 322 323 # Convert inputs to bytes 324 if isinstance(input_file, str): 325 if not os.path.isfile(input_file): 326 raise FileNotFoundError(input_file) 327 with open(input_file, "rb") as f: 328 input_file_bytes = f.read() 329 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 330 input_file_bytes = utils.as_bytes(input_file) 331 332 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 333 with open(content_management_policy, "rb") as f: 334 content_management_policy = f.read() 335 elif isinstance(content_management_policy, type(None)): 336 # Load default 337 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 338 content_management_policy = utils.validate_xml(content_management_policy) 339 340 # API function declaration 341 self.library.GwFileProtectAndReportArchive.argtypes = [ 342 ct.c_void_p, # void *inputBuffer 343 ct.c_size_t, # size_t inputBufferLength 344 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 345 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 346 ct.POINTER(ct.c_void_p), # void **outputReportBuffer 347 ct.POINTER(ct.c_size_t), # size_t *outputReportBufferLength 348 ct.c_char_p # const char *xmlConfigString 349 ] 350 # Variable initialisation 351 gw_return_object = glasswall.GwReturnObj() 352 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 353 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 354 gw_return_object.output_buffer = ct.c_void_p() 355 gw_return_object.output_buffer_length = ct.c_size_t() 356 gw_return_object.output_report_buffer = ct.c_void_p() 357 gw_return_object.output_report_buffer_length = ct.c_size_t() 358 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 359 360 with utils.CwdHandler(new_cwd=self.library_path): 361 # API call 362 gw_return_object.status = self.library.GwFileProtectAndReportArchive( 363 ct.byref(gw_return_object.input_buffer), 364 gw_return_object.input_buffer_length, 365 ct.byref(gw_return_object.output_buffer), 366 ct.byref(gw_return_object.output_buffer_length), 367 ct.byref(gw_return_object.output_report_buffer), 368 ct.byref(gw_return_object.output_report_buffer_length), 369 gw_return_object.content_management_policy 370 ) 371 372 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 373 gw_return_object.output_file = utils.buffer_to_bytes( 374 gw_return_object.output_buffer, 375 gw_return_object.output_buffer_length 376 ) 377 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 378 gw_return_object.output_report = utils.buffer_to_bytes( 379 gw_return_object.output_report_buffer, 380 gw_return_object.output_report_buffer_length 381 ) 382 383 # Write output file 384 if hasattr(gw_return_object, "output_file"): 385 if isinstance(output_file, str): 386 os.makedirs(os.path.dirname(output_file), exist_ok=True) 387 with open(output_file, "wb") as f: 388 f.write(gw_return_object.output_file) 389 390 # Write output report 391 if hasattr(gw_return_object, "output_report"): 392 if isinstance(output_report, str): 393 os.makedirs(os.path.dirname(output_report), exist_ok=True) 394 with open(output_report, "wb") as f: 395 f.write(gw_return_object.output_report) 396 397 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 398 if gw_return_object.status not in successes.success_codes: 399 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 400 if raise_unsupported: 401 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 402 else: 403 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 404 405 self.release() 406 407 return gw_return_object
Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
409 def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 410 """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory. 411 412 Args: 413 input_directory (str): The input directory containing archives to protect. 414 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. 415 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 416 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 417 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 418 419 Returns: 420 protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 421 """ 422 protected_archives_dict = {} 423 # Call protect_archive on each file in input_directory to output_directory 424 for input_file in utils.list_file_paths(input_directory): 425 relative_path = os.path.relpath(input_file, input_directory) 426 # Construct paths for output file and output report 427 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 428 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 429 430 result = self.protect_archive( 431 input_file=input_file, 432 output_file=output_file, 433 output_report=output_report, 434 content_management_policy=content_management_policy, 435 raise_unsupported=raise_unsupported, 436 ) 437 438 protected_archives_dict[relative_path] = result 439 440 return protected_archives_dict
Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing archives to protect. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
442 def file_to_file_unpack(self, input_file: str, output_directory: str, raise_unsupported: bool = True): 443 # Validate arg types 444 if not isinstance(input_file, str): 445 raise TypeError(input_file) 446 elif not os.path.isfile(input_file): 447 raise FileNotFoundError(input_file) 448 if not isinstance(output_directory, str): 449 raise TypeError(output_directory) 450 451 # API function declaration 452 self.library.GwFileToFileUnpack.argtypes = [ 453 ct.c_char_p, 454 ct.c_char_p, 455 ] 456 457 # Variable initialisation 458 gw_return_object = glasswall.GwReturnObj() 459 gw_return_object.ct_input_file = ct.c_char_p(input_file.encode()) # const char* inputFilePath 460 gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath 461 462 with utils.CwdHandler(new_cwd=self.library_path): 463 # API call 464 gw_return_object.status = self.library.GwFileToFileUnpack( 465 gw_return_object.ct_input_file, 466 gw_return_object.ct_output_directory, 467 ) 468 469 if gw_return_object.status not in successes.success_codes: 470 log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}") 471 if raise_unsupported: 472 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 473 else: 474 log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}") 475 476 self.release() 477 478 return gw_return_object
480 def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True): 481 # Validate arg types 482 if not isinstance(input_directory, str): 483 raise TypeError(input_directory) 484 elif not os.path.isdir(input_directory): 485 raise NotADirectoryError(input_directory) 486 if not isinstance(output_directory, str): 487 raise TypeError(output_directory) 488 if not file_type: 489 file_type = utils.get_file_type(input_directory) 490 491 # Ensure output_directory exists 492 os.makedirs(output_directory, exist_ok=True) 493 494 # API function declaration 495 self.library.GwFileToFilePack.argtypes = [ 496 ct.c_char_p, 497 ct.c_char_p, 498 ct.c_char_p, 499 ct.c_int, 500 ] 501 502 # Variable initialisation 503 gw_return_object = glasswall.GwReturnObj() 504 gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode()) # const char* inputDirPath 505 gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath 506 gw_return_object.ct_file_type = ct.c_char_p(file_type.encode()) # const char *fileType 507 gw_return_object.ct_add_extension = ct.c_int(int(add_extension)) # int addExtension 508 509 with utils.CwdHandler(new_cwd=self.library_path): 510 # API call 511 gw_return_object.status = self.library.GwFileToFilePack( 512 gw_return_object.ct_input_directory, 513 gw_return_object.ct_output_directory, 514 gw_return_object.ct_file_type, 515 gw_return_object.ct_add_extension, 516 ) 517 518 if gw_return_object.status not in successes.success_codes: 519 log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}") 520 if raise_unsupported: 521 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 522 else: 523 log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}") 524 525 self.release() 526 527 return gw_return_object
529 def unpack(self, input_file: str, output_directory: str, recursive: bool = True, include_file_type: bool = False, raise_unsupported: bool = True, delete_origin: bool = False): 530 """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip". 531 532 Args: 533 input_file (str): The archive file path 534 output_directory (str): The output directory where the archive will be unpacked to a new directory. 535 recursive (bool, optional): Default True. Recursively unpack all nested archives. 536 include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. 537 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 538 delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. 539 """ 540 # Convert to absolute paths 541 input_file = os.path.abspath(input_file) 542 output_directory = os.path.abspath(output_directory) 543 544 if include_file_type: 545 archive_name = os.path.basename(input_file) 546 else: 547 archive_name = os.path.splitext(os.path.basename(input_file))[0] 548 archive_output_directory = os.path.join(output_directory, archive_name) 549 550 # Unpack 551 log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}") 552 result = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, raise_unsupported=raise_unsupported) 553 if result: 554 status = result.status 555 else: 556 status = None 557 558 if status not in successes.success_codes: 559 log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}") 560 if raise_unsupported: 561 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 562 else: 563 log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}") 564 565 if delete_origin: 566 os.remove(input_file) 567 568 if recursive: 569 # Unpack sub archives 570 for subarchive in self.list_archive_paths(archive_output_directory): 571 self.unpack( 572 input_file=subarchive, 573 output_directory=archive_output_directory, 574 recursive=recursive, 575 raise_unsupported=raise_unsupported, 576 delete_origin=True 577 ) 578 579 return status
Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip".
Args: input_file (str): The archive file path output_directory (str): The output directory where the archive will be unpacked to a new directory. recursive (bool, optional): Default True. Recursively unpack all nested archives. include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
581 def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False): 582 """ Unpack a directory of archives, maintaining directory structure. 583 584 Args: 585 input_directory (str): The input directory containing archives to unpack. 586 output_directory (str): The output directory where archives will be unpacked to a new directory. 587 recursive (bool, optional): Default True. Recursively unpack all nested archives. 588 include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. 589 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 590 delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. 591 """ 592 # Convert to absolute paths 593 input_directory = os.path.abspath(input_directory) 594 output_directory = os.path.abspath(output_directory) 595 596 for archive_input_file in self.list_archive_paths(input_directory): 597 relative_path = os.path.relpath(archive_input_file, input_directory) 598 archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path)) 599 self.unpack( 600 input_file=archive_input_file, 601 output_directory=archive_output_file, 602 recursive=recursive, 603 include_file_type=include_file_type, 604 raise_unsupported=raise_unsupported, 605 delete_origin=delete_origin 606 )
Unpack a directory of archives, maintaining directory structure.
Args: input_directory (str): The input directory containing archives to unpack. output_directory (str): The output directory where archives will be unpacked to a new directory. recursive (bool, optional): Default True. Recursively unpack all nested archives. include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
608 def pack_directory(self, input_directory: str, output_directory: str, file_type: str, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True, delete_origin: Optional[bool] = False): 609 """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip". 610 611 Args: 612 input_directory (str): The input directory containing files to archive. 613 output_directory (str): The output directory to store the created archive. 614 file_type (str): The archive file type. 615 add_extension (bool, optional): Default: True. Archive file type extension to result file. 616 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 617 delete_origin (bool, optional): Default False. Delete input_directory after packing to output_directory. 618 """ 619 # Convert to absolute paths 620 input_directory = os.path.abspath(input_directory) 621 output_directory = os.path.abspath(output_directory) 622 623 # Pack 624 log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}") 625 status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, add_extension=add_extension, raise_unsupported=raise_unsupported).status 626 627 if status not in successes.success_codes: 628 log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}") 629 if raise_unsupported: 630 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 631 else: 632 log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}") 633 634 if delete_origin: 635 utils.delete_directory(input_directory) 636 637 return status
Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip".
Args: input_directory (str): The input directory containing files to archive. output_directory (str): The output directory to store the created archive. file_type (str): The archive file type. add_extension (bool, optional): Default: True. Archive file type extension to result file. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_directory after packing to output_directory.
639 def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 640 """ Exports an archive using the Glasswall engine. 641 642 Args: 643 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 644 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 645 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 646 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 647 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 648 649 Returns: 650 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 651 """ 652 # Validate arg types 653 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 654 raise TypeError(input_file) 655 if not isinstance(output_file, (type(None), str)): 656 raise TypeError(output_file) 657 if not isinstance(output_report, (type(None), str)): 658 raise TypeError(output_report) 659 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 660 raise TypeError(content_management_policy) 661 662 # Convert string path arguments to absolute paths 663 if isinstance(input_file, str): 664 input_file = os.path.abspath(input_file) 665 if isinstance(output_file, str): 666 output_file = os.path.abspath(output_file) 667 if isinstance(output_report, str): 668 output_report = os.path.abspath(output_report) 669 670 # Convert inputs to bytes 671 if isinstance(input_file, str): 672 if not os.path.isfile(input_file): 673 raise FileNotFoundError(input_file) 674 with open(input_file, "rb") as f: 675 input_file_bytes = f.read() 676 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 677 input_file_bytes = utils.as_bytes(input_file) 678 679 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 680 with open(content_management_policy, "rb") as f: 681 content_management_policy = f.read() 682 elif isinstance(content_management_policy, type(None)): 683 # Load default 684 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 685 content_management_policy = utils.validate_xml(content_management_policy) 686 687 # API function declaration 688 self.library.GwFileExportArchive.argtypes = [ 689 ct.c_void_p, # void *inputBuffer 690 ct.c_size_t, # size_t inputBufferLength 691 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 692 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 693 ct.POINTER(ct.c_void_p), # void **outputReportBuffer 694 ct.POINTER(ct.c_size_t), # size_t *outputReportBufferLength 695 ct.c_char_p # const char *xmlConfigString 696 ] 697 698 # Variable initialisation 699 gw_return_object = glasswall.GwReturnObj() 700 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 701 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 702 gw_return_object.output_buffer = ct.c_void_p() 703 gw_return_object.output_buffer_length = ct.c_size_t() 704 gw_return_object.output_report_buffer = ct.c_void_p() 705 gw_return_object.output_report_buffer_length = ct.c_size_t() 706 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 707 708 with utils.CwdHandler(new_cwd=self.library_path): 709 # API call 710 gw_return_object.status = self.library.GwFileExportArchive( 711 gw_return_object.input_buffer, 712 gw_return_object.input_buffer_length, 713 ct.byref(gw_return_object.output_buffer), 714 ct.byref(gw_return_object.output_buffer_length), 715 ct.byref(gw_return_object.output_report_buffer), 716 ct.byref(gw_return_object.output_report_buffer_length), 717 gw_return_object.content_management_policy 718 ) 719 720 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 721 gw_return_object.output_file = utils.buffer_to_bytes( 722 gw_return_object.output_buffer, 723 gw_return_object.output_buffer_length 724 ) 725 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 726 gw_return_object.output_report = utils.buffer_to_bytes( 727 gw_return_object.output_report_buffer, 728 gw_return_object.output_report_buffer_length 729 ) 730 731 # Write output file 732 if hasattr(gw_return_object, "output_file"): 733 if isinstance(output_file, str): 734 os.makedirs(os.path.dirname(output_file), exist_ok=True) 735 with open(output_file, "wb") as f: 736 f.write(gw_return_object.output_file) 737 738 # Write output report 739 if hasattr(gw_return_object, "output_report"): 740 if isinstance(output_report, str): 741 os.makedirs(os.path.dirname(output_report), exist_ok=True) 742 with open(output_report, "wb") as f: 743 f.write(gw_return_object.output_report) 744 745 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 746 if gw_return_object.status not in successes.success_codes: 747 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 748 if raise_unsupported: 749 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 750 else: 751 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 752 753 self.release() 754 755 return gw_return_object
Exports an archive using the Glasswall engine.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
757 def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): 758 """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory. 759 760 Args: 761 input_directory (str): The input directory containing archives to export. 762 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. 763 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 764 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 765 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 766 767 Returns: 768 exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 769 """ 770 exported_archives_dict = {} 771 # Call export_archive on each file in input_directory to output_directory 772 for input_file in utils.list_file_paths(input_directory): 773 relative_path = os.path.relpath(input_file, input_directory) 774 # Construct paths for output file and output report 775 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 776 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 777 778 result = self.export_archive( 779 input_file=input_file, 780 output_file=output_file, 781 output_report=output_report, 782 content_management_policy=content_management_policy, 783 raise_unsupported=raise_unsupported, 784 ) 785 786 exported_archives_dict[relative_path] = result 787 788 return exported_archives_dict
Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing archives to export. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
790 def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: Optional[bool] = True): 791 """ Imports an archive using the Glasswall engine. 792 793 Args: 794 input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. 795 output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. 796 output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. 797 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 798 include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive. 799 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 800 801 Returns: 802 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes) 803 """ 804 # Validate arg types 805 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 806 raise TypeError(input_file) 807 if not isinstance(output_file, (type(None), str)): 808 raise TypeError(output_file) 809 if not isinstance(output_report, (type(None), str)): 810 raise TypeError(output_report) 811 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 812 raise TypeError(content_management_policy) 813 814 # Convert string path arguments to absolute paths 815 if isinstance(input_file, str): 816 input_file = os.path.abspath(input_file) 817 # Convert string path arguments to absolute paths 818 if isinstance(output_file, str): 819 output_file = os.path.abspath(output_file) 820 if isinstance(output_report, str): 821 output_report = os.path.abspath(output_report) 822 823 # Convert inputs to bytes 824 if isinstance(input_file, str): 825 if not os.path.isfile(input_file): 826 raise FileNotFoundError(input_file) 827 with open(input_file, "rb") as f: 828 input_file_bytes = f.read() 829 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 830 input_file_bytes = utils.as_bytes(input_file) 831 832 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 833 with open(content_management_policy, "rb") as f: 834 content_management_policy = f.read() 835 elif isinstance(content_management_policy, type(None)): 836 # Load default 837 content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") 838 content_management_policy = utils.validate_xml(content_management_policy) 839 840 # API function declaration 841 self.library.GwFileImportArchive.argtypes = [ 842 ct.c_void_p, # void *inputBuffer 843 ct.c_size_t, # size_t inputBufferLength 844 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 845 ct.POINTER(ct.c_size_t), # size_t *outputFileBufferLength 846 ct.POINTER(ct.c_void_p), # void **outputReportBuffer 847 ct.POINTER(ct.c_size_t), # size_t *outputReportBufferLength 848 ct.c_char_p, # const char *xmlConfigString 849 ct.c_int # int includeAnalysisReports 850 ] 851 852 # Variable initialisation 853 gw_return_object = glasswall.GwReturnObj() 854 gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes) 855 gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes)) 856 gw_return_object.output_buffer = ct.c_void_p() 857 gw_return_object.output_buffer_length = ct.c_size_t() 858 gw_return_object.output_report_buffer = ct.c_void_p() 859 gw_return_object.output_report_buffer_length = ct.c_size_t() 860 gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode()) 861 gw_return_object.include_analysis_report = ct.c_int(int(include_analysis_report)) 862 863 with utils.CwdHandler(new_cwd=self.library_path): 864 # API call 865 gw_return_object.status = self.library.GwFileImportArchive( 866 gw_return_object.input_buffer, 867 gw_return_object.input_buffer_length, 868 ct.byref(gw_return_object.output_buffer), 869 ct.byref(gw_return_object.output_buffer_length), 870 ct.byref(gw_return_object.output_report_buffer), 871 ct.byref(gw_return_object.output_report_buffer_length), 872 gw_return_object.content_management_policy, 873 gw_return_object.include_analysis_report 874 ) 875 876 if gw_return_object.output_buffer and gw_return_object.output_buffer_length: 877 gw_return_object.output_file = utils.buffer_to_bytes( 878 gw_return_object.output_buffer, 879 gw_return_object.output_buffer_length 880 ) 881 if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length: 882 gw_return_object.output_report = utils.buffer_to_bytes( 883 gw_return_object.output_report_buffer, 884 gw_return_object.output_report_buffer_length 885 ) 886 887 # Write output file 888 if hasattr(gw_return_object, "output_file"): 889 if isinstance(output_file, str): 890 os.makedirs(os.path.dirname(output_file), exist_ok=True) 891 with open(output_file, "wb") as f: 892 f.write(gw_return_object.output_file) 893 894 # Write output report 895 if hasattr(gw_return_object, "output_report"): 896 if isinstance(output_report, str): 897 os.makedirs(os.path.dirname(output_report), exist_ok=True) 898 with open(output_report, "wb") as f: 899 f.write(gw_return_object.output_report) 900 901 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 902 if gw_return_object.status not in successes.success_codes: 903 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 904 if raise_unsupported: 905 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 906 else: 907 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") 908 909 self.release() 910 911 return gw_return_object
Imports an archive using the Glasswall engine.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
913 def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: bool = True): 914 """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory. 915 916 Args: 917 input_directory (str): The input directory containing archives to import. 918 output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. 919 output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. 920 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. 921 include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive. 922 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 923 924 Returns: 925 imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 926 """ 927 imported_archives_dict = {} 928 # Call import_archive on each file in input_directory to output_directory 929 for input_file in utils.list_file_paths(input_directory): 930 relative_path = os.path.relpath(input_file, input_directory) 931 # Construct paths for output file and output report 932 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 933 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 934 935 result = self.import_archive( 936 input_file=input_file, 937 output_file=output_file, 938 output_report=output_report, 939 content_management_policy=content_management_policy, 940 include_analysis_report=include_analysis_report, 941 raise_unsupported=raise_unsupported, 942 ) 943 944 imported_archives_dict[relative_path] = result 945 946 return imported_archives_dict
Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing archives to import. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)