glasswall.libraries.rebuild.rebuild
1import ctypes as ct 2import io 3import os 4from typing import Union 5 6import glasswall 7from glasswall import determine_file_type as dft 8from glasswall import utils 9from glasswall.config.logging import log 10from glasswall.libraries.library import Library 11from glasswall.libraries.rebuild import errors, successes 12 13 14class Rebuild(Library): 15 """ A high level Python wrapper for Glasswall Rebuild / Classic. """ 16 17 def __init__(self, library_path: str): 18 super().__init__(library_path=library_path) 19 self.library = self.load_library(os.path.abspath(library_path)) 20 21 # Set content management configuration to default 22 self.set_content_management_policy(input_file=None) 23 24 # Validate killswitch has not activated 25 self.validate_licence() 26 27 log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") 28 29 def validate_licence(self): 30 """ Validates the licence of the library by attempting to call protect_file on a known supported file. 31 32 Raises: 33 RebuildError: If the licence could not be validated. 34 """ 35 # Call protect file on a known good bitmap to see if licence has expired 36 try: 37 self.protect_file( 38 input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00", 39 raise_unsupported=True 40 ) 41 log.debug(f"{self.__class__.__name__} licence validated successfully.") 42 except errors.RebuildError: 43 log.error(f"{self.__class__.__name__} licence validation failed.") 44 raise 45 46 def version(self): 47 """ Returns the Glasswall library version. 48 49 Returns: 50 version (str): The Glasswall library version. 51 """ 52 53 # Declare the return type 54 self.library.GWFileVersion.restype = ct.c_wchar_p 55 56 # API call 57 version = self.library.GWFileVersion() 58 59 return version 60 61 def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True): 62 """ Returns an int representing the file type / file format of a file. 63 64 Args: 65 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path. 66 as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. 67 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 68 69 Returns: 70 file_type (Union[int, str]): The file format. 71 """ 72 73 if isinstance(input_file, str): 74 if not os.path.isfile(input_file): 75 raise FileNotFoundError(input_file) 76 77 self.library.GWDetermineFileTypeFromFile.argtypes = [ct.c_wchar_p] 78 self.library.GWDetermineFileTypeFromFile.restype = ct.c_int 79 80 # convert to ct.c_wchar_p 81 ct_input_file = ct.c_wchar_p(input_file) 82 83 # API call 84 file_type = self.library.GWDetermineFileTypeFromFile(ct_input_file) 85 86 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 87 self.library.GWDetermineFileTypeFromFileInMem.argtypes = [ct.c_char_p, ct.c_size_t] 88 self.library.GWDetermineFileTypeFromFileInMem.restype = ct.c_int 89 90 # convert to bytes 91 bytes_input_file = utils.as_bytes(input_file) 92 93 # ctypes conversion 94 ct_input_buffer = ct.c_char_p(bytes_input_file) 95 ct_butter_length = ct.c_size_t(len(bytes_input_file)) 96 97 # API call 98 file_type = self.library.GWDetermineFileTypeFromFileInMem( 99 ct_input_buffer, 100 ct_butter_length 101 ) 102 103 file_type_as_string = dft.file_type_int_to_str(file_type) 104 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 105 106 if not dft.is_success(file_type): 107 if raise_unsupported: 108 log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 109 raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) 110 else: 111 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 112 else: 113 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 114 115 if as_string: 116 return file_type_as_string 117 118 return file_type 119 120 def get_content_management_policy(self): 121 """ Gets the current content management configuration. 122 123 Returns: 124 xml_string (str): The XML string of the current content management configuration. 125 """ 126 127 # Declare argument types 128 self.library.GWFileConfigGet.argtypes = [ 129 ct.POINTER(ct.POINTER(ct.c_wchar)), 130 ct.POINTER(ct.c_size_t) 131 ] 132 133 # Variable initialisation 134 ct_input_buffer = ct.POINTER(ct.c_wchar)() 135 ct_input_size = ct.c_size_t(0) 136 137 # API call 138 status = self.library.GWFileConfigGet( 139 ct.byref(ct_input_buffer), 140 ct.byref(ct_input_size) 141 ) 142 143 if status not in successes.success_codes: 144 log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 145 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 146 else: 147 log.debug(f"\n\tstatus: {status}") 148 149 # As string 150 xml_string = utils.validate_xml(ct.wstring_at(ct_input_buffer)) 151 152 return xml_string 153 154 def set_content_management_policy(self, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None): 155 """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied. 156 157 Args: 158 input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 159 160 Returns: 161 status (int): The result of the Glasswall API call. 162 """ 163 # Validate type 164 if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 165 raise TypeError(input_file) 166 167 # self.library.GWFileConfigRevertToDefaults doesn't work, load default instead 168 # Set input_file to default if input_file is None 169 if input_file is None: 170 input_file = glasswall.content_management.policies.Rebuild(default="sanitise") 171 172 # Validate xml content is parsable 173 xml_string = utils.validate_xml(input_file) 174 175 # Declare argument types 176 self.library.GWFileConfigXML.argtypes = [ct.c_wchar_p] 177 178 # API call 179 status = self.library.GWFileConfigXML( 180 ct.c_wchar_p(xml_string) 181 ) 182 183 if status not in successes.success_codes: 184 log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 185 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 186 else: 187 log.debug(f"\n\tstatus: {status}") 188 189 return status 190 191 def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 192 """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided. 193 194 Args: 195 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 196 output_file (Union[None, str], optional): The output file path where the protected file will be written. 197 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 198 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 199 200 Returns: 201 file_bytes (bytes): The protected file bytes. 202 """ 203 # Validate arg types 204 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 205 raise TypeError(input_file) 206 if not isinstance(output_file, (type(None), str)): 207 raise TypeError(output_file) 208 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 209 raise TypeError(content_management_policy) 210 if not isinstance(raise_unsupported, bool): 211 raise TypeError(raise_unsupported) 212 213 # Convert string path arguments to absolute paths 214 if isinstance(input_file, str): 215 if not os.path.isfile(input_file): 216 raise FileNotFoundError(input_file) 217 input_file = os.path.abspath(input_file) 218 if isinstance(output_file, str): 219 output_file = os.path.abspath(output_file) 220 # make directories that do not exist 221 os.makedirs(os.path.dirname(output_file), exist_ok=True) 222 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 223 content_management_policy = os.path.abspath(content_management_policy) 224 225 # Convert memory inputs to bytes 226 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 227 input_file = utils.as_bytes(input_file) 228 229 # Check that file type is supported 230 try: 231 file_type = self.determine_file_type(input_file=input_file) 232 except dft.errors.FileTypeEnumError: 233 if raise_unsupported: 234 raise 235 else: 236 return None 237 238 with utils.CwdHandler(self.library_path): 239 # Set content management policy 240 self.set_content_management_policy(content_management_policy) 241 242 # file to file 243 if isinstance(input_file, str) and isinstance(output_file, str): 244 # API function declaration 245 self.library.GWFileToFileProtect.argtypes = [ 246 ct.c_wchar_p, 247 ct.c_wchar_p, 248 ct.c_wchar_p 249 ] 250 251 # Variable initialisation 252 ct_input_file = ct.c_wchar_p(input_file) 253 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 254 ct_output_file = ct.c_wchar_p(output_file) 255 256 # API call 257 status = self.library.GWFileToFileProtect( 258 ct_input_file, 259 ct_file_type, 260 ct_output_file 261 ) 262 263 # file to memory 264 elif isinstance(input_file, str) and output_file is None: 265 # API function declaration 266 self.library.GWFileProtect.argtypes = [ 267 ct.c_wchar_p, 268 ct.c_wchar_p, 269 ct.POINTER(ct.c_void_p), 270 ct.POINTER(ct.c_size_t) 271 ] 272 273 # Variable initialisation 274 ct_input_file = ct.c_wchar_p(input_file) 275 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 276 ct_output_buffer = ct.c_void_p(0) 277 ct_output_size = ct.c_size_t(0) 278 279 # API call 280 status = self.library.GWFileProtect( 281 ct_input_file, 282 ct_file_type, 283 ct.byref(ct_output_buffer), 284 ct.byref(ct_output_size) 285 ) 286 287 # memory to memory and memory to file 288 elif isinstance(input_file, bytes): 289 # API function declaration 290 self.library.GWMemoryToMemoryProtect.argtypes = [ 291 ct.c_void_p, 292 ct.c_size_t, 293 ct.c_wchar_p, 294 ct.POINTER(ct.c_void_p), 295 ct.POINTER(ct.c_size_t) 296 ] 297 298 # Variable initialization 299 bytearray_buffer = bytearray(input_file) 300 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 301 ct_input_size = ct.c_size_t(len(input_file)) 302 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 303 ct_output_buffer = ct.c_void_p(0) 304 ct_output_size = ct.c_size_t(0) 305 306 status = self.library.GWMemoryToMemoryProtect( 307 ct_input_buffer, 308 ct_input_size, 309 ct_file_type, 310 ct.byref(ct_output_buffer), 311 ct.byref(ct_output_size) 312 ) 313 314 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 315 if status not in successes.success_codes: 316 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 317 if raise_unsupported: 318 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 319 else: 320 file_bytes = None 321 else: 322 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 323 if isinstance(input_file, str) and isinstance(output_file, str): 324 # file to file, read the bytes of the file that Rebuild has already written 325 if not os.path.isfile(output_file): 326 log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") 327 file_bytes = None 328 else: 329 with open(output_file, "rb") as f: 330 file_bytes = f.read() 331 else: 332 # file to memory, memory to memory 333 file_bytes = utils.buffer_to_bytes( 334 ct_output_buffer, 335 ct_output_size 336 ) 337 if isinstance(output_file, str): 338 # memory to file 339 # no Rebuild function exists for memory to file, write the memory to file ourselves 340 with open(output_file, "wb") as f: 341 f.write(file_bytes) 342 343 return file_bytes 344 345 def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 346 """ Recursively processes all files in a directory in protect mode using the given content management policy. 347 The protected files are written to output_directory maintaining the same directory structure as input_directory. 348 349 Args: 350 input_directory (str): The input directory containing files to protect. 351 output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files. 352 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 353 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 354 355 Returns: 356 protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 357 """ 358 protected_files_dict = {} 359 # Call protect_file on each file in input_directory to output_directory 360 for input_file in utils.list_file_paths(input_directory): 361 relative_path = os.path.relpath(input_file, input_directory) 362 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 363 364 protected_bytes = self.protect_file( 365 input_file=input_file, 366 output_file=output_file, 367 raise_unsupported=raise_unsupported, 368 content_management_policy=content_management_policy, 369 ) 370 371 protected_files_dict[relative_path] = protected_bytes 372 373 return protected_files_dict 374 375 def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 376 """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided. 377 378 Args: 379 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 380 output_file (Union[None, str], optional): The output file path where the analysis file will be written. 381 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 382 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 383 384 Returns: 385 file_bytes (bytes): The analysis file bytes. 386 """ 387 # Validate arg types 388 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 389 raise TypeError(input_file) 390 if not isinstance(output_file, (type(None), str)): 391 raise TypeError(output_file) 392 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 393 raise TypeError(content_management_policy) 394 if not isinstance(raise_unsupported, bool): 395 raise TypeError(raise_unsupported) 396 397 # Convert string path arguments to absolute paths 398 if isinstance(input_file, str): 399 if not os.path.isfile(input_file): 400 raise FileNotFoundError(input_file) 401 input_file = os.path.abspath(input_file) 402 if isinstance(output_file, str): 403 output_file = os.path.abspath(output_file) 404 # make directories that do not exist 405 os.makedirs(os.path.dirname(output_file), exist_ok=True) 406 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 407 content_management_policy = os.path.abspath(content_management_policy) 408 409 # Convert memory inputs to bytes 410 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 411 input_file = utils.as_bytes(input_file) 412 413 # Check that file type is supported 414 try: 415 file_type = self.determine_file_type(input_file=input_file) 416 except dft.errors.FileTypeEnumError: 417 if raise_unsupported: 418 raise 419 else: 420 return None 421 422 with utils.CwdHandler(self.library_path): 423 # Set content management policy 424 self.set_content_management_policy(content_management_policy) 425 426 # file to file 427 if isinstance(input_file, str) and isinstance(output_file, str): 428 # API function declaration 429 self.library.GWFileToFileAnalysisAudit.argtypes = [ 430 ct.c_wchar_p, 431 ct.c_wchar_p, 432 ct.c_wchar_p 433 ] 434 435 # Variable initialisation 436 ct_input_file = ct.c_wchar_p(input_file) 437 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 438 ct_output_file = ct.c_wchar_p(output_file) 439 440 # API call 441 status = self.library.GWFileToFileAnalysisAudit( 442 ct_input_file, 443 ct_file_type, 444 ct_output_file 445 ) 446 447 # file to memory 448 elif isinstance(input_file, str) and output_file is None: 449 # API function declaration 450 self.library.GWFileAnalysisAudit.argtypes = [ 451 ct.c_wchar_p, 452 ct.c_wchar_p, 453 ct.POINTER(ct.c_void_p), 454 ct.POINTER(ct.c_size_t) 455 ] 456 457 # Variable initialisation 458 ct_input_file = ct.c_wchar_p(input_file) 459 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 460 ct_output_buffer = ct.c_void_p(0) 461 ct_output_size = ct.c_size_t(0) 462 463 # API call 464 status = self.library.GWFileAnalysisAudit( 465 ct_input_file, 466 ct_file_type, 467 ct.byref(ct_output_buffer), 468 ct.byref(ct_output_size) 469 ) 470 471 # memory to memory and memory to file 472 elif isinstance(input_file, bytes): 473 # API function declaration 474 self.library.GWMemoryToMemoryAnalysisAudit.argtypes = [ 475 ct.c_void_p, 476 ct.c_size_t, 477 ct.c_wchar_p, 478 ct.POINTER(ct.c_void_p), 479 ct.POINTER(ct.c_size_t) 480 ] 481 482 # Variable initialization 483 bytearray_buffer = bytearray(input_file) 484 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 485 ct_input_size = ct.c_size_t(len(input_file)) 486 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 487 ct_output_buffer = ct.c_void_p(0) 488 ct_output_size = ct.c_size_t(0) 489 490 status = self.library.GWMemoryToMemoryAnalysisAudit( 491 ct.byref(ct_input_buffer), 492 ct_input_size, 493 ct_file_type, 494 ct.byref(ct_output_buffer), 495 ct.byref(ct_output_size) 496 ) 497 498 file_bytes = None 499 if isinstance(input_file, str) and isinstance(output_file, str): 500 # file to file, read the bytes of the file that Rebuild has already written 501 if os.path.isfile(output_file): 502 with open(output_file, "rb") as f: 503 file_bytes = f.read() 504 else: 505 # file to memory, memory to memory 506 if ct_output_buffer and ct_output_size: 507 file_bytes = utils.buffer_to_bytes( 508 ct_output_buffer, 509 ct_output_size 510 ) 511 if isinstance(output_file, str): 512 # memory to file 513 # no Rebuild function exists for memory to file, write the memory to file ourselves 514 with open(output_file, "wb") as f: 515 f.write(file_bytes) 516 517 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 518 if status not in successes.success_codes: 519 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 520 if raise_unsupported: 521 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 522 else: 523 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 524 525 return file_bytes 526 527 def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 528 """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory. 529 530 Args: 531 input_directory (str): The input directory containing files to analyse. 532 output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files. 533 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 534 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 535 536 Returns: 537 analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 538 """ 539 analysis_files_dict = {} 540 # Call analyse_file on each file in input_directory to output_directory 541 for input_file in utils.list_file_paths(input_directory): 542 relative_path = os.path.relpath(input_file, input_directory) + ".xml" 543 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 544 545 analysis_bytes = self.analyse_file( 546 input_file=input_file, 547 output_file=output_file, 548 raise_unsupported=raise_unsupported, 549 content_management_policy=content_management_policy, 550 ) 551 552 analysis_files_dict[relative_path] = analysis_bytes 553 554 return analysis_files_dict 555 556 def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 557 """ Export a file, returning the .zip file bytes. The .zip file is written to output_file. 558 559 Args: 560 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 561 output_file (Union[None, str], optional): The output file path where the .zip file will be written. 562 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 563 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 564 565 Returns: 566 file_bytes (bytes): The exported .zip file. 567 """ 568 # Validate arg types 569 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 570 raise TypeError(input_file) 571 if not isinstance(output_file, (type(None), str)): 572 raise TypeError(output_file) 573 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 574 raise TypeError(content_management_policy) 575 if not isinstance(raise_unsupported, bool): 576 raise TypeError(raise_unsupported) 577 578 # Convert string path arguments to absolute paths 579 if isinstance(input_file, str): 580 if not os.path.isfile(input_file): 581 raise FileNotFoundError(input_file) 582 input_file = os.path.abspath(input_file) 583 if isinstance(output_file, str): 584 output_file = os.path.abspath(output_file) 585 # make directories that do not exist 586 os.makedirs(os.path.dirname(output_file), exist_ok=True) 587 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 588 content_management_policy = os.path.abspath(content_management_policy) 589 590 # Convert memory inputs to bytes 591 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 592 input_file = utils.as_bytes(input_file) 593 594 # Check that file type is supported 595 try: 596 self.determine_file_type(input_file=input_file) 597 except dft.errors.FileTypeEnumError: 598 if raise_unsupported: 599 raise 600 else: 601 return None 602 603 with utils.CwdHandler(self.library_path): 604 # Set content management policy 605 self.set_content_management_policy(content_management_policy) 606 607 # file to file 608 if isinstance(input_file, str) and isinstance(output_file, str): 609 # API function declaration 610 self.library.GWFileToFileAnalysisProtectAndExport.argtypes = [ 611 ct.c_wchar_p, 612 ct.c_wchar_p 613 ] 614 615 # Variable initialisation 616 ct_input_file = ct.c_wchar_p(input_file) 617 ct_output_file = ct.c_wchar_p(output_file) 618 619 # API call 620 status = self.library.GWFileToFileAnalysisProtectAndExport( 621 ct_input_file, 622 ct_output_file 623 ) 624 625 # file to memory 626 elif isinstance(input_file, str) and output_file is None: 627 # API function declaration 628 self.library.GWFileToMemoryAnalysisProtectAndExport.argtypes = [ 629 ct.c_wchar_p, 630 ct.POINTER(ct.c_void_p), 631 ct.POINTER(ct.c_size_t) 632 ] 633 634 # Variable initialisation 635 ct_input_file = ct.c_wchar_p(input_file) 636 ct_output_buffer = ct.c_void_p(0) 637 ct_output_size = ct.c_size_t(0) 638 639 # API call 640 status = self.library.GWFileToMemoryAnalysisProtectAndExport( 641 ct_input_file, 642 ct.byref(ct_output_buffer), 643 ct.byref(ct_output_size) 644 ) 645 646 # memory to memory and memory to file 647 elif isinstance(input_file, bytes): 648 # API function declaration 649 self.library.GWMemoryToMemoryAnalysisProtectAndExport.argtypes = [ 650 ct.c_void_p, 651 ct.c_size_t, 652 ct.POINTER(ct.c_void_p), 653 ct.POINTER(ct.c_size_t) 654 ] 655 656 # Variable initialization 657 bytearray_buffer = bytearray(input_file) 658 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 659 ct_input_size = ct.c_size_t(len(input_file)) 660 ct_output_buffer = ct.c_void_p(0) 661 ct_output_size = ct.c_size_t(0) 662 663 status = self.library.GWMemoryToMemoryAnalysisProtectAndExport( 664 ct_input_buffer, 665 ct_input_size, 666 ct.byref(ct_output_buffer), 667 ct.byref(ct_output_size) 668 ) 669 670 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 671 if status not in successes.success_codes: 672 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 673 if raise_unsupported: 674 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 675 else: 676 file_bytes = None 677 else: 678 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 679 if isinstance(input_file, str) and isinstance(output_file, str): 680 # file to file, read the bytes of the file that Rebuild has already written 681 if not os.path.isfile(output_file): 682 log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") 683 file_bytes = None 684 else: 685 with open(output_file, "rb") as f: 686 file_bytes = f.read() 687 else: 688 # file to memory, memory to memory 689 file_bytes = utils.buffer_to_bytes( 690 ct_output_buffer, 691 ct_output_size 692 ) 693 if isinstance(output_file, str): 694 # memory to file 695 # no Rebuild function exists for memory to file, write the memory to file ourselves 696 with open(output_file, "wb") as f: 697 f.write(file_bytes) 698 699 return file_bytes 700 701 def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 702 """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory. 703 704 Args: 705 input_directory (str): The input directory containing files to export. 706 output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files. 707 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 708 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 709 710 Returns: 711 export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 712 """ 713 export_files_dict = {} 714 # Call export_file on each file in input_directory to output_directory 715 for input_file in utils.list_file_paths(input_directory): 716 relative_path = os.path.relpath(input_file, input_directory) + ".zip" 717 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 718 719 export_bytes = self.export_file( 720 input_file=input_file, 721 output_file=output_file, 722 raise_unsupported=raise_unsupported, 723 content_management_policy=content_management_policy, 724 ) 725 726 export_files_dict[relative_path] = export_bytes 727 728 return export_files_dict 729 730 def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 731 """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided. 732 733 Args: 734 input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. 735 output_file (Union[None, str], optional): The output file path where the constructed file will be written. 736 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 737 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 738 739 Returns: 740 file_bytes (bytes): The imported file bytes. 741 """ 742 # Validate arg types 743 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 744 raise TypeError(input_file) 745 if not isinstance(output_file, (type(None), str)): 746 raise TypeError(output_file) 747 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 748 raise TypeError(content_management_policy) 749 if not isinstance(raise_unsupported, bool): 750 raise TypeError(raise_unsupported) 751 752 # Convert string path arguments to absolute paths 753 if isinstance(input_file, str): 754 if not os.path.isfile(input_file): 755 raise FileNotFoundError(input_file) 756 input_file = os.path.abspath(input_file) 757 if isinstance(output_file, str): 758 output_file = os.path.abspath(output_file) 759 # make directories that do not exist 760 os.makedirs(os.path.dirname(output_file), exist_ok=True) 761 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 762 content_management_policy = os.path.abspath(content_management_policy) 763 764 # Convert memory inputs to bytes 765 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 766 input_file = utils.as_bytes(input_file) 767 768 # Check that file type is supported 769 try: 770 self.determine_file_type(input_file=input_file) 771 except dft.errors.FileTypeEnumError: 772 if raise_unsupported: 773 raise 774 else: 775 return None 776 777 with utils.CwdHandler(self.library_path): 778 # Set content management policy 779 self.set_content_management_policy(content_management_policy) 780 781 # file to file 782 if isinstance(input_file, str) and isinstance(output_file, str): 783 # API function declaration 784 self.library.GWFileToFileProtectAndImport.argtypes = [ 785 ct.c_wchar_p, 786 ct.c_wchar_p 787 ] 788 789 # Variable initialisation 790 ct_input_file = ct.c_wchar_p(input_file) 791 ct_output_file = ct.c_wchar_p(output_file) 792 793 # API call 794 status = self.library.GWFileToFileProtectAndImport( 795 ct_input_file, 796 ct_output_file 797 ) 798 799 # file to memory 800 elif isinstance(input_file, str) and output_file is None: 801 # API function declaration 802 self.library.GWFileToMemoryProtectAndImport.argtypes = [ 803 ct.c_wchar_p, 804 ct.POINTER(ct.c_void_p), 805 ct.POINTER(ct.c_size_t) 806 ] 807 808 # Variable initialisation 809 ct_input_file = ct.c_wchar_p(input_file) 810 ct_output_buffer = ct.c_void_p(0) 811 ct_output_size = ct.c_size_t(0) 812 813 # API call 814 status = self.library.GWFileToMemoryProtectAndImport( 815 ct_input_file, 816 ct.byref(ct_output_buffer), 817 ct.byref(ct_output_size) 818 ) 819 820 # memory to memory and memory to file 821 elif isinstance(input_file, bytes): 822 # API function declaration 823 self.library.GWMemoryToMemoryProtectAndImport.argtypes = [ 824 ct.c_void_p, 825 ct.c_size_t, 826 ct.POINTER(ct.c_void_p), 827 ct.POINTER(ct.c_size_t) 828 ] 829 830 # Variable initialization 831 bytearray_buffer = bytearray(input_file) 832 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 833 ct_input_size = ct.c_size_t(len(input_file)) 834 ct_output_buffer = ct.c_void_p(0) 835 ct_output_size = ct.c_size_t(0) 836 837 status = self.library.GWMemoryToMemoryProtectAndImport( 838 ct_input_buffer, 839 ct_input_size, 840 ct.byref(ct_output_buffer), 841 ct.byref(ct_output_size) 842 ) 843 844 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 845 if status not in successes.success_codes: 846 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 847 if raise_unsupported: 848 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 849 else: 850 file_bytes = None 851 else: 852 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 853 if isinstance(input_file, str) and isinstance(output_file, str): 854 # file to file, read the bytes of the file that Rebuild has already written 855 if not os.path.isfile(output_file): 856 log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") 857 file_bytes = None 858 else: 859 with open(output_file, "rb") as f: 860 file_bytes = f.read() 861 else: 862 # file to memory, memory to memory 863 file_bytes = utils.buffer_to_bytes( 864 ct_output_buffer, 865 ct_output_size 866 ) 867 if isinstance(output_file, str): 868 # memory to file 869 # no Rebuild function exists for memory to file, write the memory to file ourselves 870 with open(output_file, "wb") as f: 871 f.write(file_bytes) 872 873 return file_bytes 874 875 def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 876 """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. 877 The constructed files are written to output_directory maintaining the same directory structure as input_directory. 878 879 Args: 880 input_directory (str): The input directory containing files to import. 881 output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files. 882 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 883 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 884 885 Returns: 886 import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 887 """ 888 import_files_dict = {} 889 # Call import_file on each file in input_directory to output_directory 890 for input_file in utils.list_file_paths(input_directory): 891 relative_path = os.path.relpath(input_file, input_directory) 892 # Remove .zip extension from relative_path 893 relative_path = os.path.splitext(relative_path)[0] 894 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 895 896 import_bytes = self.import_file( 897 input_file=input_file, 898 output_file=output_file, 899 raise_unsupported=raise_unsupported, 900 content_management_policy=content_management_policy, 901 ) 902 903 import_files_dict[relative_path] = import_bytes 904 905 return import_files_dict 906 907 def GWFileErrorMsg(self): 908 """ Retrieve the Glasswall Process error message. 909 910 Returns: 911 error_message (str): The Glasswall Process error message. 912 """ 913 # Declare the return type 914 self.library.GWFileErrorMsg.restype = ct.c_wchar_p 915 916 # API call 917 error_message = self.library.GWFileErrorMsg() 918 919 return error_message 920 921 def _GWFileToFileAnalysisAndProtect(self, input_file: str, file_type: str, output_file: str, output_analysis_report: str): 922 """ This function Manages the specified file and carries out an Analysis Audit, saving the results to the specified file locations. 923 924 Args: 925 input_file (str): The input file path or bytes. 926 output_file (str): The output file path where the protected file will be written. 927 output_analysis_report (str): The output file path where the analysis report will be written. 928 929 Returns: 930 status (int): The result of the Glasswall API call. 931 """ 932 # API function declaration 933 self.library.GWFileToFileAnalysisAndProtect.argtypes = [ 934 ct.c_wchar_p, # wchar_t * inputFilePathName 935 ct.c_wchar_p, # wchar_t* wcType 936 ct.c_wchar_p, # wchar_t * outputFilePathName 937 ct.c_wchar_p # wchar_t * analysisFilePathName 938 ] 939 940 # Variable initialisation 941 ct_input_file = ct.c_wchar_p(input_file) 942 ct_file_type = ct.c_wchar_p(file_type) 943 ct_output_file = ct.c_wchar_p(output_file) 944 ct_output_analysis_report = ct.c_wchar_p(output_analysis_report) 945 946 # API call 947 status = self.library.GWFileToFileAnalysisAndProtect( 948 ct_input_file, 949 ct_file_type, 950 ct_output_file, 951 ct_output_analysis_report 952 ) 953 954 return status 955 956 def _GWFileAnalysisAndProtect(self, input_file: str, file_type: str): 957 """ This function Manages the specified file and carries out an Analysis Audit, returning both outputs to the specified memory locations. 958 959 Args: 960 input_file (str): The input file path or bytes. 961 962 Returns: 963 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'input_file', 'file_type', 'output_file_buffer', 'output_file_buffer_length', 'output_report_buffer', 'output_report_buffer_length', 'output_file', 'analysis_file'. 964 """ 965 # API function declaration 966 self.library.GWFileAnalysisAndProtect.argtypes = [ 967 ct.c_wchar_p, # wchar_t * inputFilePathName 968 ct.c_wchar_p, # wchar_t* wcType 969 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 970 ct.POINTER(ct.c_size_t), # size_t *outputLength 971 ct.POINTER(ct.c_void_p), # void **analysisFileBuffer 972 ct.POINTER(ct.c_size_t) # size_t *analysisFileBufferLength 973 ] 974 975 # Variable initialisation 976 gw_return_object = glasswall.GwReturnObj() 977 gw_return_object.input_file = ct.c_wchar_p(input_file) 978 gw_return_object.file_type = ct.c_wchar_p(file_type) 979 gw_return_object.output_file_buffer = ct.c_void_p(0) 980 gw_return_object.output_file_buffer_length = ct.c_size_t(0) 981 gw_return_object.output_report_buffer = ct.c_void_p(0) 982 gw_return_object.output_report_buffer_length = ct.c_size_t(0) 983 984 # API call 985 gw_return_object.status = self.library.GWFileAnalysisAndProtect( 986 gw_return_object.input_file, 987 gw_return_object.file_type, 988 ct.byref(gw_return_object.output_file_buffer), 989 ct.byref(gw_return_object.output_file_buffer_length), 990 ct.byref(gw_return_object.output_report_buffer), 991 ct.byref(gw_return_object.output_report_buffer_length) 992 ) 993 994 gw_return_object.output_file = ct.string_at(gw_return_object.output_file_buffer, gw_return_object.output_file_buffer_length.value) 995 gw_return_object.analysis_file = ct.string_at(gw_return_object.output_report_buffer, gw_return_object.output_report_buffer_length.value) 996 997 return gw_return_object
17class Rebuild(Library): 18 """ A high level Python wrapper for Glasswall Rebuild / Classic. """ 19 20 def __init__(self, library_path: str): 21 super().__init__(library_path=library_path) 22 self.library = self.load_library(os.path.abspath(library_path)) 23 24 # Set content management configuration to default 25 self.set_content_management_policy(input_file=None) 26 27 # Validate killswitch has not activated 28 self.validate_licence() 29 30 log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") 31 32 def validate_licence(self): 33 """ Validates the licence of the library by attempting to call protect_file on a known supported file. 34 35 Raises: 36 RebuildError: If the licence could not be validated. 37 """ 38 # Call protect file on a known good bitmap to see if licence has expired 39 try: 40 self.protect_file( 41 input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00", 42 raise_unsupported=True 43 ) 44 log.debug(f"{self.__class__.__name__} licence validated successfully.") 45 except errors.RebuildError: 46 log.error(f"{self.__class__.__name__} licence validation failed.") 47 raise 48 49 def version(self): 50 """ Returns the Glasswall library version. 51 52 Returns: 53 version (str): The Glasswall library version. 54 """ 55 56 # Declare the return type 57 self.library.GWFileVersion.restype = ct.c_wchar_p 58 59 # API call 60 version = self.library.GWFileVersion() 61 62 return version 63 64 def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True): 65 """ Returns an int representing the file type / file format of a file. 66 67 Args: 68 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path. 69 as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. 70 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 71 72 Returns: 73 file_type (Union[int, str]): The file format. 74 """ 75 76 if isinstance(input_file, str): 77 if not os.path.isfile(input_file): 78 raise FileNotFoundError(input_file) 79 80 self.library.GWDetermineFileTypeFromFile.argtypes = [ct.c_wchar_p] 81 self.library.GWDetermineFileTypeFromFile.restype = ct.c_int 82 83 # convert to ct.c_wchar_p 84 ct_input_file = ct.c_wchar_p(input_file) 85 86 # API call 87 file_type = self.library.GWDetermineFileTypeFromFile(ct_input_file) 88 89 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 90 self.library.GWDetermineFileTypeFromFileInMem.argtypes = [ct.c_char_p, ct.c_size_t] 91 self.library.GWDetermineFileTypeFromFileInMem.restype = ct.c_int 92 93 # convert to bytes 94 bytes_input_file = utils.as_bytes(input_file) 95 96 # ctypes conversion 97 ct_input_buffer = ct.c_char_p(bytes_input_file) 98 ct_butter_length = ct.c_size_t(len(bytes_input_file)) 99 100 # API call 101 file_type = self.library.GWDetermineFileTypeFromFileInMem( 102 ct_input_buffer, 103 ct_butter_length 104 ) 105 106 file_type_as_string = dft.file_type_int_to_str(file_type) 107 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 108 109 if not dft.is_success(file_type): 110 if raise_unsupported: 111 log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 112 raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) 113 else: 114 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 115 else: 116 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 117 118 if as_string: 119 return file_type_as_string 120 121 return file_type 122 123 def get_content_management_policy(self): 124 """ Gets the current content management configuration. 125 126 Returns: 127 xml_string (str): The XML string of the current content management configuration. 128 """ 129 130 # Declare argument types 131 self.library.GWFileConfigGet.argtypes = [ 132 ct.POINTER(ct.POINTER(ct.c_wchar)), 133 ct.POINTER(ct.c_size_t) 134 ] 135 136 # Variable initialisation 137 ct_input_buffer = ct.POINTER(ct.c_wchar)() 138 ct_input_size = ct.c_size_t(0) 139 140 # API call 141 status = self.library.GWFileConfigGet( 142 ct.byref(ct_input_buffer), 143 ct.byref(ct_input_size) 144 ) 145 146 if status not in successes.success_codes: 147 log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 148 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 149 else: 150 log.debug(f"\n\tstatus: {status}") 151 152 # As string 153 xml_string = utils.validate_xml(ct.wstring_at(ct_input_buffer)) 154 155 return xml_string 156 157 def set_content_management_policy(self, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None): 158 """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied. 159 160 Args: 161 input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 162 163 Returns: 164 status (int): The result of the Glasswall API call. 165 """ 166 # Validate type 167 if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 168 raise TypeError(input_file) 169 170 # self.library.GWFileConfigRevertToDefaults doesn't work, load default instead 171 # Set input_file to default if input_file is None 172 if input_file is None: 173 input_file = glasswall.content_management.policies.Rebuild(default="sanitise") 174 175 # Validate xml content is parsable 176 xml_string = utils.validate_xml(input_file) 177 178 # Declare argument types 179 self.library.GWFileConfigXML.argtypes = [ct.c_wchar_p] 180 181 # API call 182 status = self.library.GWFileConfigXML( 183 ct.c_wchar_p(xml_string) 184 ) 185 186 if status not in successes.success_codes: 187 log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 188 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 189 else: 190 log.debug(f"\n\tstatus: {status}") 191 192 return status 193 194 def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 195 """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided. 196 197 Args: 198 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 199 output_file (Union[None, str], optional): The output file path where the protected file will be written. 200 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 201 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 202 203 Returns: 204 file_bytes (bytes): The protected file bytes. 205 """ 206 # Validate arg types 207 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 208 raise TypeError(input_file) 209 if not isinstance(output_file, (type(None), str)): 210 raise TypeError(output_file) 211 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 212 raise TypeError(content_management_policy) 213 if not isinstance(raise_unsupported, bool): 214 raise TypeError(raise_unsupported) 215 216 # Convert string path arguments to absolute paths 217 if isinstance(input_file, str): 218 if not os.path.isfile(input_file): 219 raise FileNotFoundError(input_file) 220 input_file = os.path.abspath(input_file) 221 if isinstance(output_file, str): 222 output_file = os.path.abspath(output_file) 223 # make directories that do not exist 224 os.makedirs(os.path.dirname(output_file), exist_ok=True) 225 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 226 content_management_policy = os.path.abspath(content_management_policy) 227 228 # Convert memory inputs to bytes 229 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 230 input_file = utils.as_bytes(input_file) 231 232 # Check that file type is supported 233 try: 234 file_type = self.determine_file_type(input_file=input_file) 235 except dft.errors.FileTypeEnumError: 236 if raise_unsupported: 237 raise 238 else: 239 return None 240 241 with utils.CwdHandler(self.library_path): 242 # Set content management policy 243 self.set_content_management_policy(content_management_policy) 244 245 # file to file 246 if isinstance(input_file, str) and isinstance(output_file, str): 247 # API function declaration 248 self.library.GWFileToFileProtect.argtypes = [ 249 ct.c_wchar_p, 250 ct.c_wchar_p, 251 ct.c_wchar_p 252 ] 253 254 # Variable initialisation 255 ct_input_file = ct.c_wchar_p(input_file) 256 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 257 ct_output_file = ct.c_wchar_p(output_file) 258 259 # API call 260 status = self.library.GWFileToFileProtect( 261 ct_input_file, 262 ct_file_type, 263 ct_output_file 264 ) 265 266 # file to memory 267 elif isinstance(input_file, str) and output_file is None: 268 # API function declaration 269 self.library.GWFileProtect.argtypes = [ 270 ct.c_wchar_p, 271 ct.c_wchar_p, 272 ct.POINTER(ct.c_void_p), 273 ct.POINTER(ct.c_size_t) 274 ] 275 276 # Variable initialisation 277 ct_input_file = ct.c_wchar_p(input_file) 278 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 279 ct_output_buffer = ct.c_void_p(0) 280 ct_output_size = ct.c_size_t(0) 281 282 # API call 283 status = self.library.GWFileProtect( 284 ct_input_file, 285 ct_file_type, 286 ct.byref(ct_output_buffer), 287 ct.byref(ct_output_size) 288 ) 289 290 # memory to memory and memory to file 291 elif isinstance(input_file, bytes): 292 # API function declaration 293 self.library.GWMemoryToMemoryProtect.argtypes = [ 294 ct.c_void_p, 295 ct.c_size_t, 296 ct.c_wchar_p, 297 ct.POINTER(ct.c_void_p), 298 ct.POINTER(ct.c_size_t) 299 ] 300 301 # Variable initialization 302 bytearray_buffer = bytearray(input_file) 303 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 304 ct_input_size = ct.c_size_t(len(input_file)) 305 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 306 ct_output_buffer = ct.c_void_p(0) 307 ct_output_size = ct.c_size_t(0) 308 309 status = self.library.GWMemoryToMemoryProtect( 310 ct_input_buffer, 311 ct_input_size, 312 ct_file_type, 313 ct.byref(ct_output_buffer), 314 ct.byref(ct_output_size) 315 ) 316 317 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 318 if status not in successes.success_codes: 319 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 320 if raise_unsupported: 321 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 322 else: 323 file_bytes = None 324 else: 325 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 326 if isinstance(input_file, str) and isinstance(output_file, str): 327 # file to file, read the bytes of the file that Rebuild has already written 328 if not os.path.isfile(output_file): 329 log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") 330 file_bytes = None 331 else: 332 with open(output_file, "rb") as f: 333 file_bytes = f.read() 334 else: 335 # file to memory, memory to memory 336 file_bytes = utils.buffer_to_bytes( 337 ct_output_buffer, 338 ct_output_size 339 ) 340 if isinstance(output_file, str): 341 # memory to file 342 # no Rebuild function exists for memory to file, write the memory to file ourselves 343 with open(output_file, "wb") as f: 344 f.write(file_bytes) 345 346 return file_bytes 347 348 def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 349 """ Recursively processes all files in a directory in protect mode using the given content management policy. 350 The protected files are written to output_directory maintaining the same directory structure as input_directory. 351 352 Args: 353 input_directory (str): The input directory containing files to protect. 354 output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files. 355 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 356 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 357 358 Returns: 359 protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 360 """ 361 protected_files_dict = {} 362 # Call protect_file on each file in input_directory to output_directory 363 for input_file in utils.list_file_paths(input_directory): 364 relative_path = os.path.relpath(input_file, input_directory) 365 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 366 367 protected_bytes = self.protect_file( 368 input_file=input_file, 369 output_file=output_file, 370 raise_unsupported=raise_unsupported, 371 content_management_policy=content_management_policy, 372 ) 373 374 protected_files_dict[relative_path] = protected_bytes 375 376 return protected_files_dict 377 378 def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 379 """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided. 380 381 Args: 382 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 383 output_file (Union[None, str], optional): The output file path where the analysis file will be written. 384 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 385 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 386 387 Returns: 388 file_bytes (bytes): The analysis file bytes. 389 """ 390 # Validate arg types 391 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 392 raise TypeError(input_file) 393 if not isinstance(output_file, (type(None), str)): 394 raise TypeError(output_file) 395 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 396 raise TypeError(content_management_policy) 397 if not isinstance(raise_unsupported, bool): 398 raise TypeError(raise_unsupported) 399 400 # Convert string path arguments to absolute paths 401 if isinstance(input_file, str): 402 if not os.path.isfile(input_file): 403 raise FileNotFoundError(input_file) 404 input_file = os.path.abspath(input_file) 405 if isinstance(output_file, str): 406 output_file = os.path.abspath(output_file) 407 # make directories that do not exist 408 os.makedirs(os.path.dirname(output_file), exist_ok=True) 409 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 410 content_management_policy = os.path.abspath(content_management_policy) 411 412 # Convert memory inputs to bytes 413 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 414 input_file = utils.as_bytes(input_file) 415 416 # Check that file type is supported 417 try: 418 file_type = self.determine_file_type(input_file=input_file) 419 except dft.errors.FileTypeEnumError: 420 if raise_unsupported: 421 raise 422 else: 423 return None 424 425 with utils.CwdHandler(self.library_path): 426 # Set content management policy 427 self.set_content_management_policy(content_management_policy) 428 429 # file to file 430 if isinstance(input_file, str) and isinstance(output_file, str): 431 # API function declaration 432 self.library.GWFileToFileAnalysisAudit.argtypes = [ 433 ct.c_wchar_p, 434 ct.c_wchar_p, 435 ct.c_wchar_p 436 ] 437 438 # Variable initialisation 439 ct_input_file = ct.c_wchar_p(input_file) 440 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 441 ct_output_file = ct.c_wchar_p(output_file) 442 443 # API call 444 status = self.library.GWFileToFileAnalysisAudit( 445 ct_input_file, 446 ct_file_type, 447 ct_output_file 448 ) 449 450 # file to memory 451 elif isinstance(input_file, str) and output_file is None: 452 # API function declaration 453 self.library.GWFileAnalysisAudit.argtypes = [ 454 ct.c_wchar_p, 455 ct.c_wchar_p, 456 ct.POINTER(ct.c_void_p), 457 ct.POINTER(ct.c_size_t) 458 ] 459 460 # Variable initialisation 461 ct_input_file = ct.c_wchar_p(input_file) 462 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 463 ct_output_buffer = ct.c_void_p(0) 464 ct_output_size = ct.c_size_t(0) 465 466 # API call 467 status = self.library.GWFileAnalysisAudit( 468 ct_input_file, 469 ct_file_type, 470 ct.byref(ct_output_buffer), 471 ct.byref(ct_output_size) 472 ) 473 474 # memory to memory and memory to file 475 elif isinstance(input_file, bytes): 476 # API function declaration 477 self.library.GWMemoryToMemoryAnalysisAudit.argtypes = [ 478 ct.c_void_p, 479 ct.c_size_t, 480 ct.c_wchar_p, 481 ct.POINTER(ct.c_void_p), 482 ct.POINTER(ct.c_size_t) 483 ] 484 485 # Variable initialization 486 bytearray_buffer = bytearray(input_file) 487 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 488 ct_input_size = ct.c_size_t(len(input_file)) 489 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 490 ct_output_buffer = ct.c_void_p(0) 491 ct_output_size = ct.c_size_t(0) 492 493 status = self.library.GWMemoryToMemoryAnalysisAudit( 494 ct.byref(ct_input_buffer), 495 ct_input_size, 496 ct_file_type, 497 ct.byref(ct_output_buffer), 498 ct.byref(ct_output_size) 499 ) 500 501 file_bytes = None 502 if isinstance(input_file, str) and isinstance(output_file, str): 503 # file to file, read the bytes of the file that Rebuild has already written 504 if os.path.isfile(output_file): 505 with open(output_file, "rb") as f: 506 file_bytes = f.read() 507 else: 508 # file to memory, memory to memory 509 if ct_output_buffer and ct_output_size: 510 file_bytes = utils.buffer_to_bytes( 511 ct_output_buffer, 512 ct_output_size 513 ) 514 if isinstance(output_file, str): 515 # memory to file 516 # no Rebuild function exists for memory to file, write the memory to file ourselves 517 with open(output_file, "wb") as f: 518 f.write(file_bytes) 519 520 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 521 if status not in successes.success_codes: 522 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 523 if raise_unsupported: 524 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 525 else: 526 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 527 528 return file_bytes 529 530 def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 531 """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory. 532 533 Args: 534 input_directory (str): The input directory containing files to analyse. 535 output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files. 536 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 537 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 538 539 Returns: 540 analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 541 """ 542 analysis_files_dict = {} 543 # Call analyse_file on each file in input_directory to output_directory 544 for input_file in utils.list_file_paths(input_directory): 545 relative_path = os.path.relpath(input_file, input_directory) + ".xml" 546 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 547 548 analysis_bytes = self.analyse_file( 549 input_file=input_file, 550 output_file=output_file, 551 raise_unsupported=raise_unsupported, 552 content_management_policy=content_management_policy, 553 ) 554 555 analysis_files_dict[relative_path] = analysis_bytes 556 557 return analysis_files_dict 558 559 def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 560 """ Export a file, returning the .zip file bytes. The .zip file is written to output_file. 561 562 Args: 563 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 564 output_file (Union[None, str], optional): The output file path where the .zip file will be written. 565 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 566 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 567 568 Returns: 569 file_bytes (bytes): The exported .zip file. 570 """ 571 # Validate arg types 572 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 573 raise TypeError(input_file) 574 if not isinstance(output_file, (type(None), str)): 575 raise TypeError(output_file) 576 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 577 raise TypeError(content_management_policy) 578 if not isinstance(raise_unsupported, bool): 579 raise TypeError(raise_unsupported) 580 581 # Convert string path arguments to absolute paths 582 if isinstance(input_file, str): 583 if not os.path.isfile(input_file): 584 raise FileNotFoundError(input_file) 585 input_file = os.path.abspath(input_file) 586 if isinstance(output_file, str): 587 output_file = os.path.abspath(output_file) 588 # make directories that do not exist 589 os.makedirs(os.path.dirname(output_file), exist_ok=True) 590 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 591 content_management_policy = os.path.abspath(content_management_policy) 592 593 # Convert memory inputs to bytes 594 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 595 input_file = utils.as_bytes(input_file) 596 597 # Check that file type is supported 598 try: 599 self.determine_file_type(input_file=input_file) 600 except dft.errors.FileTypeEnumError: 601 if raise_unsupported: 602 raise 603 else: 604 return None 605 606 with utils.CwdHandler(self.library_path): 607 # Set content management policy 608 self.set_content_management_policy(content_management_policy) 609 610 # file to file 611 if isinstance(input_file, str) and isinstance(output_file, str): 612 # API function declaration 613 self.library.GWFileToFileAnalysisProtectAndExport.argtypes = [ 614 ct.c_wchar_p, 615 ct.c_wchar_p 616 ] 617 618 # Variable initialisation 619 ct_input_file = ct.c_wchar_p(input_file) 620 ct_output_file = ct.c_wchar_p(output_file) 621 622 # API call 623 status = self.library.GWFileToFileAnalysisProtectAndExport( 624 ct_input_file, 625 ct_output_file 626 ) 627 628 # file to memory 629 elif isinstance(input_file, str) and output_file is None: 630 # API function declaration 631 self.library.GWFileToMemoryAnalysisProtectAndExport.argtypes = [ 632 ct.c_wchar_p, 633 ct.POINTER(ct.c_void_p), 634 ct.POINTER(ct.c_size_t) 635 ] 636 637 # Variable initialisation 638 ct_input_file = ct.c_wchar_p(input_file) 639 ct_output_buffer = ct.c_void_p(0) 640 ct_output_size = ct.c_size_t(0) 641 642 # API call 643 status = self.library.GWFileToMemoryAnalysisProtectAndExport( 644 ct_input_file, 645 ct.byref(ct_output_buffer), 646 ct.byref(ct_output_size) 647 ) 648 649 # memory to memory and memory to file 650 elif isinstance(input_file, bytes): 651 # API function declaration 652 self.library.GWMemoryToMemoryAnalysisProtectAndExport.argtypes = [ 653 ct.c_void_p, 654 ct.c_size_t, 655 ct.POINTER(ct.c_void_p), 656 ct.POINTER(ct.c_size_t) 657 ] 658 659 # Variable initialization 660 bytearray_buffer = bytearray(input_file) 661 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 662 ct_input_size = ct.c_size_t(len(input_file)) 663 ct_output_buffer = ct.c_void_p(0) 664 ct_output_size = ct.c_size_t(0) 665 666 status = self.library.GWMemoryToMemoryAnalysisProtectAndExport( 667 ct_input_buffer, 668 ct_input_size, 669 ct.byref(ct_output_buffer), 670 ct.byref(ct_output_size) 671 ) 672 673 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 674 if status not in successes.success_codes: 675 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 676 if raise_unsupported: 677 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 678 else: 679 file_bytes = None 680 else: 681 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 682 if isinstance(input_file, str) and isinstance(output_file, str): 683 # file to file, read the bytes of the file that Rebuild has already written 684 if not os.path.isfile(output_file): 685 log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") 686 file_bytes = None 687 else: 688 with open(output_file, "rb") as f: 689 file_bytes = f.read() 690 else: 691 # file to memory, memory to memory 692 file_bytes = utils.buffer_to_bytes( 693 ct_output_buffer, 694 ct_output_size 695 ) 696 if isinstance(output_file, str): 697 # memory to file 698 # no Rebuild function exists for memory to file, write the memory to file ourselves 699 with open(output_file, "wb") as f: 700 f.write(file_bytes) 701 702 return file_bytes 703 704 def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 705 """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory. 706 707 Args: 708 input_directory (str): The input directory containing files to export. 709 output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files. 710 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 711 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 712 713 Returns: 714 export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 715 """ 716 export_files_dict = {} 717 # Call export_file on each file in input_directory to output_directory 718 for input_file in utils.list_file_paths(input_directory): 719 relative_path = os.path.relpath(input_file, input_directory) + ".zip" 720 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 721 722 export_bytes = self.export_file( 723 input_file=input_file, 724 output_file=output_file, 725 raise_unsupported=raise_unsupported, 726 content_management_policy=content_management_policy, 727 ) 728 729 export_files_dict[relative_path] = export_bytes 730 731 return export_files_dict 732 733 def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 734 """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided. 735 736 Args: 737 input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. 738 output_file (Union[None, str], optional): The output file path where the constructed file will be written. 739 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 740 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 741 742 Returns: 743 file_bytes (bytes): The imported file bytes. 744 """ 745 # Validate arg types 746 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 747 raise TypeError(input_file) 748 if not isinstance(output_file, (type(None), str)): 749 raise TypeError(output_file) 750 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 751 raise TypeError(content_management_policy) 752 if not isinstance(raise_unsupported, bool): 753 raise TypeError(raise_unsupported) 754 755 # Convert string path arguments to absolute paths 756 if isinstance(input_file, str): 757 if not os.path.isfile(input_file): 758 raise FileNotFoundError(input_file) 759 input_file = os.path.abspath(input_file) 760 if isinstance(output_file, str): 761 output_file = os.path.abspath(output_file) 762 # make directories that do not exist 763 os.makedirs(os.path.dirname(output_file), exist_ok=True) 764 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 765 content_management_policy = os.path.abspath(content_management_policy) 766 767 # Convert memory inputs to bytes 768 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 769 input_file = utils.as_bytes(input_file) 770 771 # Check that file type is supported 772 try: 773 self.determine_file_type(input_file=input_file) 774 except dft.errors.FileTypeEnumError: 775 if raise_unsupported: 776 raise 777 else: 778 return None 779 780 with utils.CwdHandler(self.library_path): 781 # Set content management policy 782 self.set_content_management_policy(content_management_policy) 783 784 # file to file 785 if isinstance(input_file, str) and isinstance(output_file, str): 786 # API function declaration 787 self.library.GWFileToFileProtectAndImport.argtypes = [ 788 ct.c_wchar_p, 789 ct.c_wchar_p 790 ] 791 792 # Variable initialisation 793 ct_input_file = ct.c_wchar_p(input_file) 794 ct_output_file = ct.c_wchar_p(output_file) 795 796 # API call 797 status = self.library.GWFileToFileProtectAndImport( 798 ct_input_file, 799 ct_output_file 800 ) 801 802 # file to memory 803 elif isinstance(input_file, str) and output_file is None: 804 # API function declaration 805 self.library.GWFileToMemoryProtectAndImport.argtypes = [ 806 ct.c_wchar_p, 807 ct.POINTER(ct.c_void_p), 808 ct.POINTER(ct.c_size_t) 809 ] 810 811 # Variable initialisation 812 ct_input_file = ct.c_wchar_p(input_file) 813 ct_output_buffer = ct.c_void_p(0) 814 ct_output_size = ct.c_size_t(0) 815 816 # API call 817 status = self.library.GWFileToMemoryProtectAndImport( 818 ct_input_file, 819 ct.byref(ct_output_buffer), 820 ct.byref(ct_output_size) 821 ) 822 823 # memory to memory and memory to file 824 elif isinstance(input_file, bytes): 825 # API function declaration 826 self.library.GWMemoryToMemoryProtectAndImport.argtypes = [ 827 ct.c_void_p, 828 ct.c_size_t, 829 ct.POINTER(ct.c_void_p), 830 ct.POINTER(ct.c_size_t) 831 ] 832 833 # Variable initialization 834 bytearray_buffer = bytearray(input_file) 835 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 836 ct_input_size = ct.c_size_t(len(input_file)) 837 ct_output_buffer = ct.c_void_p(0) 838 ct_output_size = ct.c_size_t(0) 839 840 status = self.library.GWMemoryToMemoryProtectAndImport( 841 ct_input_buffer, 842 ct_input_size, 843 ct.byref(ct_output_buffer), 844 ct.byref(ct_output_size) 845 ) 846 847 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 848 if status not in successes.success_codes: 849 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 850 if raise_unsupported: 851 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 852 else: 853 file_bytes = None 854 else: 855 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 856 if isinstance(input_file, str) and isinstance(output_file, str): 857 # file to file, read the bytes of the file that Rebuild has already written 858 if not os.path.isfile(output_file): 859 log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") 860 file_bytes = None 861 else: 862 with open(output_file, "rb") as f: 863 file_bytes = f.read() 864 else: 865 # file to memory, memory to memory 866 file_bytes = utils.buffer_to_bytes( 867 ct_output_buffer, 868 ct_output_size 869 ) 870 if isinstance(output_file, str): 871 # memory to file 872 # no Rebuild function exists for memory to file, write the memory to file ourselves 873 with open(output_file, "wb") as f: 874 f.write(file_bytes) 875 876 return file_bytes 877 878 def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 879 """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. 880 The constructed files are written to output_directory maintaining the same directory structure as input_directory. 881 882 Args: 883 input_directory (str): The input directory containing files to import. 884 output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files. 885 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 886 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 887 888 Returns: 889 import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 890 """ 891 import_files_dict = {} 892 # Call import_file on each file in input_directory to output_directory 893 for input_file in utils.list_file_paths(input_directory): 894 relative_path = os.path.relpath(input_file, input_directory) 895 # Remove .zip extension from relative_path 896 relative_path = os.path.splitext(relative_path)[0] 897 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 898 899 import_bytes = self.import_file( 900 input_file=input_file, 901 output_file=output_file, 902 raise_unsupported=raise_unsupported, 903 content_management_policy=content_management_policy, 904 ) 905 906 import_files_dict[relative_path] = import_bytes 907 908 return import_files_dict 909 910 def GWFileErrorMsg(self): 911 """ Retrieve the Glasswall Process error message. 912 913 Returns: 914 error_message (str): The Glasswall Process error message. 915 """ 916 # Declare the return type 917 self.library.GWFileErrorMsg.restype = ct.c_wchar_p 918 919 # API call 920 error_message = self.library.GWFileErrorMsg() 921 922 return error_message 923 924 def _GWFileToFileAnalysisAndProtect(self, input_file: str, file_type: str, output_file: str, output_analysis_report: str): 925 """ This function Manages the specified file and carries out an Analysis Audit, saving the results to the specified file locations. 926 927 Args: 928 input_file (str): The input file path or bytes. 929 output_file (str): The output file path where the protected file will be written. 930 output_analysis_report (str): The output file path where the analysis report will be written. 931 932 Returns: 933 status (int): The result of the Glasswall API call. 934 """ 935 # API function declaration 936 self.library.GWFileToFileAnalysisAndProtect.argtypes = [ 937 ct.c_wchar_p, # wchar_t * inputFilePathName 938 ct.c_wchar_p, # wchar_t* wcType 939 ct.c_wchar_p, # wchar_t * outputFilePathName 940 ct.c_wchar_p # wchar_t * analysisFilePathName 941 ] 942 943 # Variable initialisation 944 ct_input_file = ct.c_wchar_p(input_file) 945 ct_file_type = ct.c_wchar_p(file_type) 946 ct_output_file = ct.c_wchar_p(output_file) 947 ct_output_analysis_report = ct.c_wchar_p(output_analysis_report) 948 949 # API call 950 status = self.library.GWFileToFileAnalysisAndProtect( 951 ct_input_file, 952 ct_file_type, 953 ct_output_file, 954 ct_output_analysis_report 955 ) 956 957 return status 958 959 def _GWFileAnalysisAndProtect(self, input_file: str, file_type: str): 960 """ This function Manages the specified file and carries out an Analysis Audit, returning both outputs to the specified memory locations. 961 962 Args: 963 input_file (str): The input file path or bytes. 964 965 Returns: 966 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'input_file', 'file_type', 'output_file_buffer', 'output_file_buffer_length', 'output_report_buffer', 'output_report_buffer_length', 'output_file', 'analysis_file'. 967 """ 968 # API function declaration 969 self.library.GWFileAnalysisAndProtect.argtypes = [ 970 ct.c_wchar_p, # wchar_t * inputFilePathName 971 ct.c_wchar_p, # wchar_t* wcType 972 ct.POINTER(ct.c_void_p), # void **outputFileBuffer 973 ct.POINTER(ct.c_size_t), # size_t *outputLength 974 ct.POINTER(ct.c_void_p), # void **analysisFileBuffer 975 ct.POINTER(ct.c_size_t) # size_t *analysisFileBufferLength 976 ] 977 978 # Variable initialisation 979 gw_return_object = glasswall.GwReturnObj() 980 gw_return_object.input_file = ct.c_wchar_p(input_file) 981 gw_return_object.file_type = ct.c_wchar_p(file_type) 982 gw_return_object.output_file_buffer = ct.c_void_p(0) 983 gw_return_object.output_file_buffer_length = ct.c_size_t(0) 984 gw_return_object.output_report_buffer = ct.c_void_p(0) 985 gw_return_object.output_report_buffer_length = ct.c_size_t(0) 986 987 # API call 988 gw_return_object.status = self.library.GWFileAnalysisAndProtect( 989 gw_return_object.input_file, 990 gw_return_object.file_type, 991 ct.byref(gw_return_object.output_file_buffer), 992 ct.byref(gw_return_object.output_file_buffer_length), 993 ct.byref(gw_return_object.output_report_buffer), 994 ct.byref(gw_return_object.output_report_buffer_length) 995 ) 996 997 gw_return_object.output_file = ct.string_at(gw_return_object.output_file_buffer, gw_return_object.output_file_buffer_length.value) 998 gw_return_object.analysis_file = ct.string_at(gw_return_object.output_report_buffer, gw_return_object.output_report_buffer_length.value) 999 1000 return gw_return_object
A high level Python wrapper for Glasswall Rebuild / Classic.
20 def __init__(self, library_path: str): 21 super().__init__(library_path=library_path) 22 self.library = self.load_library(os.path.abspath(library_path)) 23 24 # Set content management configuration to default 25 self.set_content_management_policy(input_file=None) 26 27 # Validate killswitch has not activated 28 self.validate_licence() 29 30 log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
32 def validate_licence(self): 33 """ Validates the licence of the library by attempting to call protect_file on a known supported file. 34 35 Raises: 36 RebuildError: If the licence could not be validated. 37 """ 38 # Call protect file on a known good bitmap to see if licence has expired 39 try: 40 self.protect_file( 41 input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00", 42 raise_unsupported=True 43 ) 44 log.debug(f"{self.__class__.__name__} licence validated successfully.") 45 except errors.RebuildError: 46 log.error(f"{self.__class__.__name__} licence validation failed.") 47 raise
Validates the licence of the library by attempting to call protect_file on a known supported file.
Raises: RebuildError: If the licence could not be validated.
49 def version(self): 50 """ Returns the Glasswall library version. 51 52 Returns: 53 version (str): The Glasswall library version. 54 """ 55 56 # Declare the return type 57 self.library.GWFileVersion.restype = ct.c_wchar_p 58 59 # API call 60 version = self.library.GWFileVersion() 61 62 return version
Returns the Glasswall library version.
Returns: version (str): The Glasswall library version.
64 def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True): 65 """ Returns an int representing the file type / file format of a file. 66 67 Args: 68 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path. 69 as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. 70 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 71 72 Returns: 73 file_type (Union[int, str]): The file format. 74 """ 75 76 if isinstance(input_file, str): 77 if not os.path.isfile(input_file): 78 raise FileNotFoundError(input_file) 79 80 self.library.GWDetermineFileTypeFromFile.argtypes = [ct.c_wchar_p] 81 self.library.GWDetermineFileTypeFromFile.restype = ct.c_int 82 83 # convert to ct.c_wchar_p 84 ct_input_file = ct.c_wchar_p(input_file) 85 86 # API call 87 file_type = self.library.GWDetermineFileTypeFromFile(ct_input_file) 88 89 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 90 self.library.GWDetermineFileTypeFromFileInMem.argtypes = [ct.c_char_p, ct.c_size_t] 91 self.library.GWDetermineFileTypeFromFileInMem.restype = ct.c_int 92 93 # convert to bytes 94 bytes_input_file = utils.as_bytes(input_file) 95 96 # ctypes conversion 97 ct_input_buffer = ct.c_char_p(bytes_input_file) 98 ct_butter_length = ct.c_size_t(len(bytes_input_file)) 99 100 # API call 101 file_type = self.library.GWDetermineFileTypeFromFileInMem( 102 ct_input_buffer, 103 ct_butter_length 104 ) 105 106 file_type_as_string = dft.file_type_int_to_str(file_type) 107 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 108 109 if not dft.is_success(file_type): 110 if raise_unsupported: 111 log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 112 raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) 113 else: 114 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 115 else: 116 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 117 118 if as_string: 119 return file_type_as_string 120 121 return file_type
Returns an int representing the file type / file format of a file.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path. as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_type (Union[int, str]): The file format.
123 def get_content_management_policy(self): 124 """ Gets the current content management configuration. 125 126 Returns: 127 xml_string (str): The XML string of the current content management configuration. 128 """ 129 130 # Declare argument types 131 self.library.GWFileConfigGet.argtypes = [ 132 ct.POINTER(ct.POINTER(ct.c_wchar)), 133 ct.POINTER(ct.c_size_t) 134 ] 135 136 # Variable initialisation 137 ct_input_buffer = ct.POINTER(ct.c_wchar)() 138 ct_input_size = ct.c_size_t(0) 139 140 # API call 141 status = self.library.GWFileConfigGet( 142 ct.byref(ct_input_buffer), 143 ct.byref(ct_input_size) 144 ) 145 146 if status not in successes.success_codes: 147 log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 148 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 149 else: 150 log.debug(f"\n\tstatus: {status}") 151 152 # As string 153 xml_string = utils.validate_xml(ct.wstring_at(ct_input_buffer)) 154 155 return xml_string
Gets the current content management configuration.
Returns: xml_string (str): The XML string of the current content management configuration.
157 def set_content_management_policy(self, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None): 158 """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied. 159 160 Args: 161 input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 162 163 Returns: 164 status (int): The result of the Glasswall API call. 165 """ 166 # Validate type 167 if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 168 raise TypeError(input_file) 169 170 # self.library.GWFileConfigRevertToDefaults doesn't work, load default instead 171 # Set input_file to default if input_file is None 172 if input_file is None: 173 input_file = glasswall.content_management.policies.Rebuild(default="sanitise") 174 175 # Validate xml content is parsable 176 xml_string = utils.validate_xml(input_file) 177 178 # Declare argument types 179 self.library.GWFileConfigXML.argtypes = [ct.c_wchar_p] 180 181 # API call 182 status = self.library.GWFileConfigXML( 183 ct.c_wchar_p(xml_string) 184 ) 185 186 if status not in successes.success_codes: 187 log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 188 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 189 else: 190 log.debug(f"\n\tstatus: {status}") 191 192 return status
Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.
Args: input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
Returns: status (int): The result of the Glasswall API call.
194 def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 195 """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided. 196 197 Args: 198 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 199 output_file (Union[None, str], optional): The output file path where the protected file will be written. 200 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 201 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 202 203 Returns: 204 file_bytes (bytes): The protected file bytes. 205 """ 206 # Validate arg types 207 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 208 raise TypeError(input_file) 209 if not isinstance(output_file, (type(None), str)): 210 raise TypeError(output_file) 211 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 212 raise TypeError(content_management_policy) 213 if not isinstance(raise_unsupported, bool): 214 raise TypeError(raise_unsupported) 215 216 # Convert string path arguments to absolute paths 217 if isinstance(input_file, str): 218 if not os.path.isfile(input_file): 219 raise FileNotFoundError(input_file) 220 input_file = os.path.abspath(input_file) 221 if isinstance(output_file, str): 222 output_file = os.path.abspath(output_file) 223 # make directories that do not exist 224 os.makedirs(os.path.dirname(output_file), exist_ok=True) 225 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 226 content_management_policy = os.path.abspath(content_management_policy) 227 228 # Convert memory inputs to bytes 229 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 230 input_file = utils.as_bytes(input_file) 231 232 # Check that file type is supported 233 try: 234 file_type = self.determine_file_type(input_file=input_file) 235 except dft.errors.FileTypeEnumError: 236 if raise_unsupported: 237 raise 238 else: 239 return None 240 241 with utils.CwdHandler(self.library_path): 242 # Set content management policy 243 self.set_content_management_policy(content_management_policy) 244 245 # file to file 246 if isinstance(input_file, str) and isinstance(output_file, str): 247 # API function declaration 248 self.library.GWFileToFileProtect.argtypes = [ 249 ct.c_wchar_p, 250 ct.c_wchar_p, 251 ct.c_wchar_p 252 ] 253 254 # Variable initialisation 255 ct_input_file = ct.c_wchar_p(input_file) 256 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 257 ct_output_file = ct.c_wchar_p(output_file) 258 259 # API call 260 status = self.library.GWFileToFileProtect( 261 ct_input_file, 262 ct_file_type, 263 ct_output_file 264 ) 265 266 # file to memory 267 elif isinstance(input_file, str) and output_file is None: 268 # API function declaration 269 self.library.GWFileProtect.argtypes = [ 270 ct.c_wchar_p, 271 ct.c_wchar_p, 272 ct.POINTER(ct.c_void_p), 273 ct.POINTER(ct.c_size_t) 274 ] 275 276 # Variable initialisation 277 ct_input_file = ct.c_wchar_p(input_file) 278 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 279 ct_output_buffer = ct.c_void_p(0) 280 ct_output_size = ct.c_size_t(0) 281 282 # API call 283 status = self.library.GWFileProtect( 284 ct_input_file, 285 ct_file_type, 286 ct.byref(ct_output_buffer), 287 ct.byref(ct_output_size) 288 ) 289 290 # memory to memory and memory to file 291 elif isinstance(input_file, bytes): 292 # API function declaration 293 self.library.GWMemoryToMemoryProtect.argtypes = [ 294 ct.c_void_p, 295 ct.c_size_t, 296 ct.c_wchar_p, 297 ct.POINTER(ct.c_void_p), 298 ct.POINTER(ct.c_size_t) 299 ] 300 301 # Variable initialization 302 bytearray_buffer = bytearray(input_file) 303 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 304 ct_input_size = ct.c_size_t(len(input_file)) 305 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 306 ct_output_buffer = ct.c_void_p(0) 307 ct_output_size = ct.c_size_t(0) 308 309 status = self.library.GWMemoryToMemoryProtect( 310 ct_input_buffer, 311 ct_input_size, 312 ct_file_type, 313 ct.byref(ct_output_buffer), 314 ct.byref(ct_output_size) 315 ) 316 317 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 318 if status not in successes.success_codes: 319 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 320 if raise_unsupported: 321 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 322 else: 323 file_bytes = None 324 else: 325 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 326 if isinstance(input_file, str) and isinstance(output_file, str): 327 # file to file, read the bytes of the file that Rebuild has already written 328 if not os.path.isfile(output_file): 329 log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") 330 file_bytes = None 331 else: 332 with open(output_file, "rb") as f: 333 file_bytes = f.read() 334 else: 335 # file to memory, memory to memory 336 file_bytes = utils.buffer_to_bytes( 337 ct_output_buffer, 338 ct_output_size 339 ) 340 if isinstance(output_file, str): 341 # memory to file 342 # no Rebuild function exists for memory to file, write the memory to file ourselves 343 with open(output_file, "wb") as f: 344 f.write(file_bytes) 345 346 return file_bytes
Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the protected file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_bytes (bytes): The protected file bytes.
348 def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 349 """ Recursively processes all files in a directory in protect mode using the given content management policy. 350 The protected files are written to output_directory maintaining the same directory structure as input_directory. 351 352 Args: 353 input_directory (str): The input directory containing files to protect. 354 output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files. 355 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 356 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 357 358 Returns: 359 protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 360 """ 361 protected_files_dict = {} 362 # Call protect_file on each file in input_directory to output_directory 363 for input_file in utils.list_file_paths(input_directory): 364 relative_path = os.path.relpath(input_file, input_directory) 365 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 366 367 protected_bytes = self.protect_file( 368 input_file=input_file, 369 output_file=output_file, 370 raise_unsupported=raise_unsupported, 371 content_management_policy=content_management_policy, 372 ) 373 374 protected_files_dict[relative_path] = protected_bytes 375 376 return protected_files_dict
Recursively processes all files in a directory in protect mode using the given content management policy. The protected files are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing files to protect. output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
378 def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 379 """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided. 380 381 Args: 382 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 383 output_file (Union[None, str], optional): The output file path where the analysis file will be written. 384 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 385 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 386 387 Returns: 388 file_bytes (bytes): The analysis file bytes. 389 """ 390 # Validate arg types 391 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 392 raise TypeError(input_file) 393 if not isinstance(output_file, (type(None), str)): 394 raise TypeError(output_file) 395 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 396 raise TypeError(content_management_policy) 397 if not isinstance(raise_unsupported, bool): 398 raise TypeError(raise_unsupported) 399 400 # Convert string path arguments to absolute paths 401 if isinstance(input_file, str): 402 if not os.path.isfile(input_file): 403 raise FileNotFoundError(input_file) 404 input_file = os.path.abspath(input_file) 405 if isinstance(output_file, str): 406 output_file = os.path.abspath(output_file) 407 # make directories that do not exist 408 os.makedirs(os.path.dirname(output_file), exist_ok=True) 409 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 410 content_management_policy = os.path.abspath(content_management_policy) 411 412 # Convert memory inputs to bytes 413 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 414 input_file = utils.as_bytes(input_file) 415 416 # Check that file type is supported 417 try: 418 file_type = self.determine_file_type(input_file=input_file) 419 except dft.errors.FileTypeEnumError: 420 if raise_unsupported: 421 raise 422 else: 423 return None 424 425 with utils.CwdHandler(self.library_path): 426 # Set content management policy 427 self.set_content_management_policy(content_management_policy) 428 429 # file to file 430 if isinstance(input_file, str) and isinstance(output_file, str): 431 # API function declaration 432 self.library.GWFileToFileAnalysisAudit.argtypes = [ 433 ct.c_wchar_p, 434 ct.c_wchar_p, 435 ct.c_wchar_p 436 ] 437 438 # Variable initialisation 439 ct_input_file = ct.c_wchar_p(input_file) 440 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 441 ct_output_file = ct.c_wchar_p(output_file) 442 443 # API call 444 status = self.library.GWFileToFileAnalysisAudit( 445 ct_input_file, 446 ct_file_type, 447 ct_output_file 448 ) 449 450 # file to memory 451 elif isinstance(input_file, str) and output_file is None: 452 # API function declaration 453 self.library.GWFileAnalysisAudit.argtypes = [ 454 ct.c_wchar_p, 455 ct.c_wchar_p, 456 ct.POINTER(ct.c_void_p), 457 ct.POINTER(ct.c_size_t) 458 ] 459 460 # Variable initialisation 461 ct_input_file = ct.c_wchar_p(input_file) 462 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 463 ct_output_buffer = ct.c_void_p(0) 464 ct_output_size = ct.c_size_t(0) 465 466 # API call 467 status = self.library.GWFileAnalysisAudit( 468 ct_input_file, 469 ct_file_type, 470 ct.byref(ct_output_buffer), 471 ct.byref(ct_output_size) 472 ) 473 474 # memory to memory and memory to file 475 elif isinstance(input_file, bytes): 476 # API function declaration 477 self.library.GWMemoryToMemoryAnalysisAudit.argtypes = [ 478 ct.c_void_p, 479 ct.c_size_t, 480 ct.c_wchar_p, 481 ct.POINTER(ct.c_void_p), 482 ct.POINTER(ct.c_size_t) 483 ] 484 485 # Variable initialization 486 bytearray_buffer = bytearray(input_file) 487 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 488 ct_input_size = ct.c_size_t(len(input_file)) 489 ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) 490 ct_output_buffer = ct.c_void_p(0) 491 ct_output_size = ct.c_size_t(0) 492 493 status = self.library.GWMemoryToMemoryAnalysisAudit( 494 ct.byref(ct_input_buffer), 495 ct_input_size, 496 ct_file_type, 497 ct.byref(ct_output_buffer), 498 ct.byref(ct_output_size) 499 ) 500 501 file_bytes = None 502 if isinstance(input_file, str) and isinstance(output_file, str): 503 # file to file, read the bytes of the file that Rebuild has already written 504 if os.path.isfile(output_file): 505 with open(output_file, "rb") as f: 506 file_bytes = f.read() 507 else: 508 # file to memory, memory to memory 509 if ct_output_buffer and ct_output_size: 510 file_bytes = utils.buffer_to_bytes( 511 ct_output_buffer, 512 ct_output_size 513 ) 514 if isinstance(output_file, str): 515 # memory to file 516 # no Rebuild function exists for memory to file, write the memory to file ourselves 517 with open(output_file, "wb") as f: 518 f.write(file_bytes) 519 520 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 521 if status not in successes.success_codes: 522 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 523 if raise_unsupported: 524 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 525 else: 526 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 527 528 return file_bytes
Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the analysis file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_bytes (bytes): The analysis file bytes.
530 def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 531 """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory. 532 533 Args: 534 input_directory (str): The input directory containing files to analyse. 535 output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files. 536 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 537 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 538 539 Returns: 540 analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 541 """ 542 analysis_files_dict = {} 543 # Call analyse_file on each file in input_directory to output_directory 544 for input_file in utils.list_file_paths(input_directory): 545 relative_path = os.path.relpath(input_file, input_directory) + ".xml" 546 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 547 548 analysis_bytes = self.analyse_file( 549 input_file=input_file, 550 output_file=output_file, 551 raise_unsupported=raise_unsupported, 552 content_management_policy=content_management_policy, 553 ) 554 555 analysis_files_dict[relative_path] = analysis_bytes 556 557 return analysis_files_dict
Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing files to analyse. output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
559 def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 560 """ Export a file, returning the .zip file bytes. The .zip file is written to output_file. 561 562 Args: 563 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 564 output_file (Union[None, str], optional): The output file path where the .zip file will be written. 565 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 566 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 567 568 Returns: 569 file_bytes (bytes): The exported .zip file. 570 """ 571 # Validate arg types 572 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 573 raise TypeError(input_file) 574 if not isinstance(output_file, (type(None), str)): 575 raise TypeError(output_file) 576 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 577 raise TypeError(content_management_policy) 578 if not isinstance(raise_unsupported, bool): 579 raise TypeError(raise_unsupported) 580 581 # Convert string path arguments to absolute paths 582 if isinstance(input_file, str): 583 if not os.path.isfile(input_file): 584 raise FileNotFoundError(input_file) 585 input_file = os.path.abspath(input_file) 586 if isinstance(output_file, str): 587 output_file = os.path.abspath(output_file) 588 # make directories that do not exist 589 os.makedirs(os.path.dirname(output_file), exist_ok=True) 590 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 591 content_management_policy = os.path.abspath(content_management_policy) 592 593 # Convert memory inputs to bytes 594 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 595 input_file = utils.as_bytes(input_file) 596 597 # Check that file type is supported 598 try: 599 self.determine_file_type(input_file=input_file) 600 except dft.errors.FileTypeEnumError: 601 if raise_unsupported: 602 raise 603 else: 604 return None 605 606 with utils.CwdHandler(self.library_path): 607 # Set content management policy 608 self.set_content_management_policy(content_management_policy) 609 610 # file to file 611 if isinstance(input_file, str) and isinstance(output_file, str): 612 # API function declaration 613 self.library.GWFileToFileAnalysisProtectAndExport.argtypes = [ 614 ct.c_wchar_p, 615 ct.c_wchar_p 616 ] 617 618 # Variable initialisation 619 ct_input_file = ct.c_wchar_p(input_file) 620 ct_output_file = ct.c_wchar_p(output_file) 621 622 # API call 623 status = self.library.GWFileToFileAnalysisProtectAndExport( 624 ct_input_file, 625 ct_output_file 626 ) 627 628 # file to memory 629 elif isinstance(input_file, str) and output_file is None: 630 # API function declaration 631 self.library.GWFileToMemoryAnalysisProtectAndExport.argtypes = [ 632 ct.c_wchar_p, 633 ct.POINTER(ct.c_void_p), 634 ct.POINTER(ct.c_size_t) 635 ] 636 637 # Variable initialisation 638 ct_input_file = ct.c_wchar_p(input_file) 639 ct_output_buffer = ct.c_void_p(0) 640 ct_output_size = ct.c_size_t(0) 641 642 # API call 643 status = self.library.GWFileToMemoryAnalysisProtectAndExport( 644 ct_input_file, 645 ct.byref(ct_output_buffer), 646 ct.byref(ct_output_size) 647 ) 648 649 # memory to memory and memory to file 650 elif isinstance(input_file, bytes): 651 # API function declaration 652 self.library.GWMemoryToMemoryAnalysisProtectAndExport.argtypes = [ 653 ct.c_void_p, 654 ct.c_size_t, 655 ct.POINTER(ct.c_void_p), 656 ct.POINTER(ct.c_size_t) 657 ] 658 659 # Variable initialization 660 bytearray_buffer = bytearray(input_file) 661 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 662 ct_input_size = ct.c_size_t(len(input_file)) 663 ct_output_buffer = ct.c_void_p(0) 664 ct_output_size = ct.c_size_t(0) 665 666 status = self.library.GWMemoryToMemoryAnalysisProtectAndExport( 667 ct_input_buffer, 668 ct_input_size, 669 ct.byref(ct_output_buffer), 670 ct.byref(ct_output_size) 671 ) 672 673 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 674 if status not in successes.success_codes: 675 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 676 if raise_unsupported: 677 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 678 else: 679 file_bytes = None 680 else: 681 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 682 if isinstance(input_file, str) and isinstance(output_file, str): 683 # file to file, read the bytes of the file that Rebuild has already written 684 if not os.path.isfile(output_file): 685 log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") 686 file_bytes = None 687 else: 688 with open(output_file, "rb") as f: 689 file_bytes = f.read() 690 else: 691 # file to memory, memory to memory 692 file_bytes = utils.buffer_to_bytes( 693 ct_output_buffer, 694 ct_output_size 695 ) 696 if isinstance(output_file, str): 697 # memory to file 698 # no Rebuild function exists for memory to file, write the memory to file ourselves 699 with open(output_file, "wb") as f: 700 f.write(file_bytes) 701 702 return file_bytes
Export a file, returning the .zip file bytes. The .zip file is written to output_file.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the .zip file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_bytes (bytes): The exported .zip file.
704 def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 705 """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory. 706 707 Args: 708 input_directory (str): The input directory containing files to export. 709 output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files. 710 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 711 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 712 713 Returns: 714 export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 715 """ 716 export_files_dict = {} 717 # Call export_file on each file in input_directory to output_directory 718 for input_file in utils.list_file_paths(input_directory): 719 relative_path = os.path.relpath(input_file, input_directory) + ".zip" 720 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 721 722 export_bytes = self.export_file( 723 input_file=input_file, 724 output_file=output_file, 725 raise_unsupported=raise_unsupported, 726 content_management_policy=content_management_policy, 727 ) 728 729 export_files_dict[relative_path] = export_bytes 730 731 return export_files_dict
Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing files to export. output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
733 def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 734 """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided. 735 736 Args: 737 input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. 738 output_file (Union[None, str], optional): The output file path where the constructed file will be written. 739 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 740 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 741 742 Returns: 743 file_bytes (bytes): The imported file bytes. 744 """ 745 # Validate arg types 746 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 747 raise TypeError(input_file) 748 if not isinstance(output_file, (type(None), str)): 749 raise TypeError(output_file) 750 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 751 raise TypeError(content_management_policy) 752 if not isinstance(raise_unsupported, bool): 753 raise TypeError(raise_unsupported) 754 755 # Convert string path arguments to absolute paths 756 if isinstance(input_file, str): 757 if not os.path.isfile(input_file): 758 raise FileNotFoundError(input_file) 759 input_file = os.path.abspath(input_file) 760 if isinstance(output_file, str): 761 output_file = os.path.abspath(output_file) 762 # make directories that do not exist 763 os.makedirs(os.path.dirname(output_file), exist_ok=True) 764 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 765 content_management_policy = os.path.abspath(content_management_policy) 766 767 # Convert memory inputs to bytes 768 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 769 input_file = utils.as_bytes(input_file) 770 771 # Check that file type is supported 772 try: 773 self.determine_file_type(input_file=input_file) 774 except dft.errors.FileTypeEnumError: 775 if raise_unsupported: 776 raise 777 else: 778 return None 779 780 with utils.CwdHandler(self.library_path): 781 # Set content management policy 782 self.set_content_management_policy(content_management_policy) 783 784 # file to file 785 if isinstance(input_file, str) and isinstance(output_file, str): 786 # API function declaration 787 self.library.GWFileToFileProtectAndImport.argtypes = [ 788 ct.c_wchar_p, 789 ct.c_wchar_p 790 ] 791 792 # Variable initialisation 793 ct_input_file = ct.c_wchar_p(input_file) 794 ct_output_file = ct.c_wchar_p(output_file) 795 796 # API call 797 status = self.library.GWFileToFileProtectAndImport( 798 ct_input_file, 799 ct_output_file 800 ) 801 802 # file to memory 803 elif isinstance(input_file, str) and output_file is None: 804 # API function declaration 805 self.library.GWFileToMemoryProtectAndImport.argtypes = [ 806 ct.c_wchar_p, 807 ct.POINTER(ct.c_void_p), 808 ct.POINTER(ct.c_size_t) 809 ] 810 811 # Variable initialisation 812 ct_input_file = ct.c_wchar_p(input_file) 813 ct_output_buffer = ct.c_void_p(0) 814 ct_output_size = ct.c_size_t(0) 815 816 # API call 817 status = self.library.GWFileToMemoryProtectAndImport( 818 ct_input_file, 819 ct.byref(ct_output_buffer), 820 ct.byref(ct_output_size) 821 ) 822 823 # memory to memory and memory to file 824 elif isinstance(input_file, bytes): 825 # API function declaration 826 self.library.GWMemoryToMemoryProtectAndImport.argtypes = [ 827 ct.c_void_p, 828 ct.c_size_t, 829 ct.POINTER(ct.c_void_p), 830 ct.POINTER(ct.c_size_t) 831 ] 832 833 # Variable initialization 834 bytearray_buffer = bytearray(input_file) 835 ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) 836 ct_input_size = ct.c_size_t(len(input_file)) 837 ct_output_buffer = ct.c_void_p(0) 838 ct_output_size = ct.c_size_t(0) 839 840 status = self.library.GWMemoryToMemoryProtectAndImport( 841 ct_input_buffer, 842 ct_input_size, 843 ct.byref(ct_output_buffer), 844 ct.byref(ct_output_size) 845 ) 846 847 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 848 if status not in successes.success_codes: 849 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") 850 if raise_unsupported: 851 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 852 else: 853 file_bytes = None 854 else: 855 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") 856 if isinstance(input_file, str) and isinstance(output_file, str): 857 # file to file, read the bytes of the file that Rebuild has already written 858 if not os.path.isfile(output_file): 859 log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") 860 file_bytes = None 861 else: 862 with open(output_file, "rb") as f: 863 file_bytes = f.read() 864 else: 865 # file to memory, memory to memory 866 file_bytes = utils.buffer_to_bytes( 867 ct_output_buffer, 868 ct_output_size 869 ) 870 if isinstance(output_file, str): 871 # memory to file 872 # no Rebuild function exists for memory to file, write the memory to file ourselves 873 with open(output_file, "wb") as f: 874 f.write(file_bytes) 875 876 return file_bytes
Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. output_file (Union[None, str], optional): The output file path where the constructed file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_bytes (bytes): The imported file bytes.
878 def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 879 """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. 880 The constructed files are written to output_directory maintaining the same directory structure as input_directory. 881 882 Args: 883 input_directory (str): The input directory containing files to import. 884 output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files. 885 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 886 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 887 888 Returns: 889 import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 890 """ 891 import_files_dict = {} 892 # Call import_file on each file in input_directory to output_directory 893 for input_file in utils.list_file_paths(input_directory): 894 relative_path = os.path.relpath(input_file, input_directory) 895 # Remove .zip extension from relative_path 896 relative_path = os.path.splitext(relative_path)[0] 897 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 898 899 import_bytes = self.import_file( 900 input_file=input_file, 901 output_file=output_file, 902 raise_unsupported=raise_unsupported, 903 content_management_policy=content_management_policy, 904 ) 905 906 import_files_dict[relative_path] = import_bytes 907 908 return import_files_dict
Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. The constructed files are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing files to import. output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
910 def GWFileErrorMsg(self): 911 """ Retrieve the Glasswall Process error message. 912 913 Returns: 914 error_message (str): The Glasswall Process error message. 915 """ 916 # Declare the return type 917 self.library.GWFileErrorMsg.restype = ct.c_wchar_p 918 919 # API call 920 error_message = self.library.GWFileErrorMsg() 921 922 return error_message
Retrieve the Glasswall Process error message.
Returns: error_message (str): The Glasswall Process error message.