glasswall.libraries.editor.editor
1import ctypes as ct 2import functools 3import io 4import os 5from contextlib import contextmanager 6from typing import Union, Optional 7 8import glasswall 9from glasswall import determine_file_type as dft 10from glasswall import utils 11from glasswall.config.logging import log, format_object 12from glasswall.libraries.editor import errors, successes 13from glasswall.libraries.library import Library 14 15 16class Editor(Library): 17 """ A high level Python wrapper for Glasswall Editor / Core2. """ 18 19 def __init__(self, library_path: str, licence: Union[str, bytes, bytearray, io.BytesIO] = None): 20 """ Initialise the Editor instance. 21 22 Args: 23 library_path (str): The file or directory path to the Editor library. 24 licence (str, bytes, bytearray, or io.BytesIO, optional): The licence file content or path. This can be: 25 - A string representing the file path to the licence. 26 - A `bytes` or `bytearray` object containing the licence data. 27 - An `io.BytesIO` object for in-memory licence data. 28 If not specified, it is assumed that the licence file is located in the same directory as the `library_path`. 29 """ 30 super().__init__(library_path) 31 self.library = self.load_library(os.path.abspath(library_path)) 32 self.licence = licence 33 34 # Validate killswitch has not activated 35 self.validate_licence() 36 37 log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") 38 39 def validate_licence(self): 40 """ Validates the licence of the library by checking the licence details. 41 42 Raises: 43 LicenceExpired: If the licence has expired or could not be validated. 44 """ 45 licence_details = self.licence_details() 46 47 bad_details = [ 48 "Unable to Read Licence Key File", 49 "Licence File Missing Required Contents", 50 "Licence Expired", 51 ] 52 53 if any(bad_detail.lower() in licence_details.lower() for bad_detail in bad_details): 54 # bad_details found in licence_details 55 log.error(f"{self.__class__.__name__} licence validation failed. Licence details:\n{licence_details}") 56 raise errors.LicenceExpired(licence_details) 57 else: 58 log.debug(f"{self.__class__.__name__} licence validated successfully. Licence details:\n{licence_details}") 59 60 def version(self): 61 """ Returns the Glasswall library version. 62 63 Returns: 64 version (str): The Glasswall library version. 65 """ 66 # API function declaration 67 self.library.GW2LibVersion.restype = ct.c_char_p 68 69 # API call 70 version = self.library.GW2LibVersion() 71 72 # Convert to Python string 73 version = ct.string_at(version).decode() 74 75 return version 76 77 def open_session(self): 78 """ Open a new Glasswall session. 79 80 Returns: 81 session (int): An incrementing integer repsenting the current session. 82 """ 83 # API call 84 session = self.library.GW2OpenSession() 85 86 log.debug(f"\n\tsession: {session}") 87 88 if self.licence: 89 # Must register licence on each GW2OpenSession if the licence is not in the same directory as the library 90 self.register_licence(session, self.licence) 91 92 return session 93 94 def close_session(self, session: int) -> int: 95 """ Close the Glasswall session. All resources allocated by the session will be destroyed. 96 97 Args: 98 session (int): The session to close. 99 100 Returns: 101 status (int): The status code of the function call. 102 """ 103 if not isinstance(session, int): 104 raise TypeError(session) 105 106 # API function declaration 107 self.library.GW2CloseSession.argtypes = [ct.c_size_t] 108 109 # Variable initialisation 110 ct_session = ct.c_size_t(session) 111 112 # API call 113 status = self.library.GW2CloseSession(ct_session) 114 115 if status not in successes.success_codes: 116 log.error(f"\n\tsession: {session}\n\tstatus: {status}") 117 else: 118 log.debug(f"\n\tsession: {session}\n\tstatus: {status}") 119 120 return status 121 122 @contextmanager 123 def new_session(self): 124 """ Context manager. Opens a new session on entry and closes the session on exit. """ 125 try: 126 session = self.open_session() 127 yield session 128 finally: 129 self.close_session(session) 130 131 def run_session(self, session): 132 """ Runs the Glasswall session and begins processing of a file. 133 134 Args: 135 session (int): The session to run. 136 137 Returns: 138 status (int): The status code of the function call. 139 """ 140 # API function declaration 141 self.library.GW2RunSession.argtypes = [ct.c_size_t] 142 143 # Variable initialisation 144 ct_session = ct.c_size_t(session) 145 146 # API call 147 status = self.library.GW2RunSession(ct_session) 148 149 if status not in successes.success_codes: 150 log.error(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}") 151 else: 152 log.debug(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}") 153 154 return status 155 156 def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True) -> Union[int, str]: 157 """ Determine the file type of a given input file, either as an integer identifier or a string. 158 159 Args: 160 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file to analyse. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object. 161 as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. 162 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 163 164 Returns: 165 file_type (Union[int, str]): The file type. 166 """ 167 if isinstance(input_file, str): 168 if not os.path.isfile(input_file): 169 raise FileNotFoundError(input_file) 170 171 # convert to ct.c_char_p of bytes 172 ct_input_file = ct.c_char_p(input_file.encode("utf-8")) 173 174 # API call 175 file_type = self.library.GW2DetermineFileTypeFromFile(ct_input_file) 176 177 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 178 # convert to bytes 179 bytes_input_file = utils.as_bytes(input_file) 180 181 # ctypes conversion 182 ct_buffer = ct.c_char_p(bytes_input_file) 183 ct_buffer_length = ct.c_size_t(len(bytes_input_file)) 184 185 # API call 186 file_type = self.library.GW2DetermineFileTypeFromMemory( 187 ct_buffer, 188 ct_buffer_length 189 ) 190 191 else: 192 raise TypeError(input_file) 193 194 file_type_as_string = dft.file_type_int_to_str(file_type) 195 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 196 197 if not dft.is_success(file_type): 198 log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 199 if raise_unsupported: 200 raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) 201 else: 202 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 203 204 if as_string: 205 return file_type_as_string 206 207 return file_type 208 209 def _GW2GetPolicySettings(self, session: int, policy_format: int = 0): 210 """ Get current policy settings for the given session. 211 212 Args: 213 session (int): The session integer. 214 policy_format (int): The format of the content management policy. 0=XML. 215 216 Returns: 217 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status', 'policy'. 218 """ 219 # API function declaration 220 self.library.GW2GetPolicySettings.argtypes = [ 221 ct.c_size_t, # Session_Handle session 222 ct.POINTER(ct.c_void_p), # char ** policiesBuffer 223 ct.POINTER(ct.c_size_t), # size_t * policiesLength 224 ct.c_int, # Policy_Format format 225 ] 226 227 # Variable initialisation 228 gw_return_object = glasswall.GwReturnObj() 229 gw_return_object.session = ct.c_size_t(session) 230 gw_return_object.buffer = ct.c_void_p() 231 gw_return_object.buffer_length = ct.c_size_t() 232 gw_return_object.policy_format = ct.c_int(policy_format) 233 234 # API Call 235 gw_return_object.status = self.library.GW2GetPolicySettings( 236 gw_return_object.session, 237 ct.byref(gw_return_object.buffer), 238 ct.byref(gw_return_object.buffer_length), 239 gw_return_object.policy_format 240 ) 241 242 # Editor wrote to a buffer, convert it to bytes 243 policy_bytes = utils.buffer_to_bytes( 244 gw_return_object.buffer, 245 gw_return_object.buffer_length 246 ) 247 248 gw_return_object.policy = policy_bytes.decode() 249 250 return gw_return_object 251 252 def get_content_management_policy(self, session: int): 253 """ Returns the content management configuration for a given session. 254 255 Args: 256 session (int): The session integer. 257 258 Returns: 259 xml_string (str): The XML string of the current content management configuration. 260 """ 261 # NOTE GW2GetPolicySettings is current not implemented in editor 262 263 # set xml_string as loaded default config 264 xml_string = glasswall.content_management.policies.Editor(default="sanitise").text, 265 266 # log.debug(f"xml_string:\n{xml_string}") 267 268 return xml_string 269 270 # # API function declaration 271 # self.library.GW2GetPolicySettings.argtypes = [ 272 # ct.c_size_t, 273 # ct.c_void_p, 274 # ] 275 276 # # Variable initialisation 277 # ct_session = ct.c_size_t(session) 278 # ct_buffer = ct.c_void_p() 279 # ct_buffer_length = ct.c_size_t() 280 # # ct_file_format = ct.c_int(file_format) 281 282 # # API Call 283 # status = self.library.GW2GetPolicySettings( 284 # ct_session, 285 # ct.byref(ct_buffer), 286 # ct.byref(ct_buffer_length) 287 # ) 288 289 # print("GW2GetPolicySettings status:", status) 290 291 # file_bytes = utils.buffer_to_bytes( 292 # ct_buffer, 293 # ct_buffer_length, 294 # ) 295 296 # return file_bytes 297 298 def _GW2RegisterPoliciesFile(self, session: int, input_file: str, policy_format: int = 0): 299 """ Registers the policies to be used by Glasswall when processing files. 300 301 Args: 302 session (int): The session integer. 303 input_file (str): The content management policy input file path. 304 policy_format (int): The format of the content management policy. 0=XML. 305 306 Returns: 307 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'. 308 """ 309 # API function declaration 310 self.library.GW2RegisterPoliciesFile.argtypes = [ 311 ct.c_size_t, # Session_Handle session 312 ct.c_char_p, # const char *filename 313 ct.c_int, # Policy_Format format 314 ] 315 316 # Variable initialisation 317 gw_return_object = glasswall.GwReturnObj() 318 gw_return_object.session = ct.c_size_t(session) 319 gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8")) 320 gw_return_object.policy_format = ct.c_int(policy_format) 321 322 gw_return_object.status = self.library.GW2RegisterPoliciesFile( 323 gw_return_object.session, 324 gw_return_object.input_file, 325 gw_return_object.policy_format 326 ) 327 328 return gw_return_object 329 330 def _GW2RegisterPoliciesMemory(self, session: int, input_file: bytes, policy_format: int = 0): 331 """ Registers the policies in memory to be used by Glasswall when processing files. 332 333 Args: 334 session (int): The session integer. 335 input_file (str): The content management policy input file bytes. 336 policy_format (int): The format of the content management policy. 0=XML. 337 338 Returns: 339 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'. 340 """ 341 # API function declaration 342 self.library.GW2RegisterPoliciesMemory.argtype = [ 343 ct.c_size_t, # Session_Handle session 344 ct.c_char_p, # const char *policies 345 ct.c_size_t, # size_t policiesLength 346 ct.c_int # Policy_Format format 347 ] 348 349 # Variable initialisation 350 gw_return_object = glasswall.GwReturnObj() 351 gw_return_object.session = ct.c_size_t(session) 352 gw_return_object.buffer = ct.c_char_p(input_file) 353 gw_return_object.buffer_length = ct.c_size_t(len(input_file)) 354 gw_return_object.policy_format = ct.c_int(policy_format) 355 356 # API Call 357 gw_return_object.status = self.library.GW2RegisterPoliciesMemory( 358 gw_return_object.session, 359 gw_return_object.buffer, 360 gw_return_object.buffer_length, 361 gw_return_object.policy_format 362 ) 363 364 return gw_return_object 365 366 def set_content_management_policy(self, session: int, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, policy_format=0): 367 """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied. 368 369 Args: 370 session (int): The session integer. 371 input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 372 policy_format (int): The format of the content management policy. 0=XML. 373 374 Returns: 375 - result (glasswall.GwReturnObj): Depending on the input 'input_file': 376 - If input_file is a str file path: 377 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'. 378 379 - If input_file is a file in memory: 380 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'. 381 """ 382 # Validate type 383 if not isinstance(session, int): 384 raise TypeError(session) 385 if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 386 raise TypeError(input_file) 387 if not isinstance(policy_format, int): 388 raise TypeError(policy_format) 389 390 # Set input_file to default if input_file is None 391 if input_file is None: 392 input_file = glasswall.content_management.policies.Editor(default="sanitise") 393 394 # Validate xml content is parsable 395 utils.validate_xml(input_file) 396 397 # From file 398 if isinstance(input_file, str) and os.path.isfile(input_file): 399 input_file = os.path.abspath(input_file) 400 401 result = self._GW2RegisterPoliciesFile(session, input_file) 402 403 # From memory 404 elif isinstance(input_file, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 405 # Convert bytearray, io.BytesIO to bytes 406 if isinstance(input_file, (bytearray, io.BytesIO)): 407 input_file = utils.as_bytes(input_file) 408 # Convert string xml or Policy to bytes 409 if isinstance(input_file, (str, glasswall.content_management.policies.policy.Policy)): 410 input_file = input_file.encode("utf-8") 411 412 result = self._GW2RegisterPoliciesMemory(session, input_file) 413 414 if result.status not in successes.success_codes: 415 log.error(format_object(result)) 416 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 417 else: 418 log.debug(format_object(result)) 419 420 return result 421 422 def _GW2RegisterInputFile(self, session: int, input_file: str): 423 """ Register an input file for the given session. 424 425 Args: 426 session (int): The session integer. 427 input_file (str): The input file path. 428 429 Returns: 430 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'. 431 """ 432 # API function declaration 433 self.library.GW2RegisterInputFile.argtypes = [ 434 ct.c_size_t, # Session_Handle session 435 ct.c_char_p # const char * inputFilePath 436 ] 437 438 # Variable initialisation 439 gw_return_object = glasswall.GwReturnObj() 440 gw_return_object.session = ct.c_size_t(session) 441 gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8")) 442 gw_return_object.input_file_path = input_file 443 444 # API call 445 gw_return_object.status = self.library.GW2RegisterInputFile( 446 gw_return_object.session, 447 gw_return_object.input_file 448 ) 449 450 return gw_return_object 451 452 def _GW2RegisterInputMemory(self, session: int, input_file: bytes): 453 """ Register an input file in memory for the given session. 454 455 Args: 456 session (int): The session integer. 457 input_file (bytes): The input file in memory. 458 459 Returns: 460 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 461 """ 462 # API function declaration 463 self.library.GW2RegisterInputMemory.argtypes = [ 464 ct.c_size_t, # Session_Handle session 465 ct.c_char_p, # const char * inputFileBuffer 466 ct.c_size_t, # size_t inputLength 467 ] 468 469 # Variable initialisation 470 gw_return_object = glasswall.GwReturnObj() 471 gw_return_object.session = ct.c_size_t(session) 472 gw_return_object.buffer = ct.c_char_p(input_file) 473 gw_return_object.buffer_length = ct.c_size_t(len(input_file)) 474 475 # API call 476 gw_return_object.status = self.library.GW2RegisterInputMemory( 477 gw_return_object.session, 478 gw_return_object.buffer, 479 gw_return_object.buffer_length 480 ) 481 482 return gw_return_object 483 484 def register_input(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]): 485 """ Register an input file or bytes for the given session. 486 487 Args: 488 session (int): The session integer. 489 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 490 491 Returns: 492 - result (glasswall.GwReturnObj): Depending on the input 'input_file': 493 - If input_file is a str file path: 494 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'. 495 496 - If input_file is a file in memory: 497 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 498 """ 499 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)): 500 raise TypeError(input_file) 501 502 if isinstance(input_file, str): 503 if not os.path.isfile(input_file): 504 raise FileNotFoundError(input_file) 505 506 input_file = os.path.abspath(input_file) 507 508 result = self._GW2RegisterInputFile(session, input_file) 509 510 elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)): 511 # Convert bytearray and io.BytesIO to bytes 512 input_file = utils.as_bytes(input_file) 513 514 result = self._GW2RegisterInputMemory(session, input_file) 515 516 if result.status not in successes.success_codes: 517 log.error(format_object(result)) 518 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 519 else: 520 log.debug(format_object(result)) 521 522 return result 523 524 def _GW2RegisterOutFile(self, session: int, output_file: str): 525 """ Register an output file for the given session. 526 527 Args: 528 session (int): The session integer. 529 output_file (str): The output file path. 530 531 Returns: 532 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 533 """ 534 # API function declaration 535 self.library.GW2RegisterOutFile.argtypes = [ 536 ct.c_size_t, # Session_Handle session 537 ct.c_char_p # const char * outputFilePath 538 ] 539 540 # Variable initialisation 541 gw_return_object = glasswall.GwReturnObj() 542 gw_return_object.session = ct.c_size_t(session) 543 gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8")) 544 545 # API call 546 gw_return_object.status = self.library.GW2RegisterOutFile( 547 gw_return_object.session, 548 gw_return_object.output_file 549 ) 550 551 return gw_return_object 552 553 def _GW2RegisterOutputMemory(self, session: int): 554 """ Register an output file in memory for the given session. 555 556 Args: 557 session (int): The session integer. 558 559 Returns: 560 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 561 """ 562 # API function declaration 563 self.library.GW2RegisterOutputMemory.argtypes = [ 564 ct.c_size_t, # Session_Handle session 565 ct.POINTER(ct.c_void_p), # char ** outputBuffer 566 ct.POINTER(ct.c_size_t) # size_t * outputLength 567 ] 568 569 # Variable initialisation 570 gw_return_object = glasswall.GwReturnObj() 571 gw_return_object.session = ct.c_size_t(session) 572 gw_return_object.buffer = ct.c_void_p() 573 gw_return_object.buffer_length = ct.c_size_t() 574 575 # API call 576 gw_return_object.status = self.library.GW2RegisterOutputMemory( 577 gw_return_object.session, 578 ct.byref(gw_return_object.buffer), 579 ct.byref(gw_return_object.buffer_length) 580 ) 581 582 return gw_return_object 583 584 def register_output(self, session, output_file: Optional[str] = None): 585 """ Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes. 586 587 Args: 588 session (int): The session integer. 589 output_file (Optional[str]): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes. 590 591 Returns: 592 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. 593 """ 594 if not isinstance(output_file, (type(None), str)): 595 raise TypeError(output_file) 596 597 if isinstance(output_file, str): 598 output_file = os.path.abspath(output_file) 599 600 result = self._GW2RegisterOutFile(session, output_file) 601 602 elif isinstance(output_file, type(None)): 603 result = self._GW2RegisterOutputMemory(session) 604 605 if result.status not in successes.success_codes: 606 log.error(format_object(result)) 607 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 608 else: 609 log.debug(format_object(result)) 610 611 return result 612 613 def _GW2RegisterAnalysisFile(self, session: int, output_file: str, analysis_format: int = 0): 614 """ Register an analysis output file for the given session. 615 616 Args: 617 session (int): The session integer. 618 output_file (str): The analysis output file path. 619 analysis_format (int): The format of the analysis report. 0=XML, 1=XMLExtended 620 621 Returns: 622 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'analysis_format', 'status'. 623 """ 624 # API function declaration 625 self.library.GW2RegisterAnalysisFile.argtypes = [ 626 ct.c_size_t, # Session_Handle session 627 ct.c_char_p, # const char * analysisFilePathName 628 ct.c_int, # Analysis_Format format 629 ] 630 631 # Variable initialisation 632 gw_return_object = glasswall.GwReturnObj() 633 gw_return_object.session = ct.c_size_t(session) 634 gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8")) 635 gw_return_object.analysis_format = ct.c_int() 636 637 # API call 638 gw_return_object.status = self.library.GW2RegisterAnalysisFile( 639 gw_return_object.session, 640 gw_return_object.output_file, 641 gw_return_object.analysis_format 642 ) 643 644 return gw_return_object 645 646 def _GW2RegisterAnalysisMemory(self, session: int, analysis_format: int = 0): 647 """ Register an analysis output file in memory for the given session. 648 649 Args: 650 session (int): The session integer. 651 analysis_format (int): The format of the analysis report. 0=XML, 1=XMLExtended 652 653 Returns: 654 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'analysis_format', 'status'. 655 """ 656 # API function declaration 657 self.library.GW2RegisterAnalysisMemory.argtypes = [ 658 ct.c_size_t, # Session_Handle session 659 ct.POINTER(ct.c_void_p), # char ** analysisFileBuffer 660 ct.POINTER(ct.c_size_t), # size_t * analysisoutputLength 661 ct.c_int # Analysis_Format format 662 ] 663 664 # Variable initialisation 665 gw_return_object = glasswall.GwReturnObj() 666 gw_return_object.session = ct.c_size_t(session) 667 gw_return_object.buffer = ct.c_void_p() 668 gw_return_object.buffer_length = ct.c_size_t() 669 gw_return_object.analysis_format = ct.c_int() 670 671 # API call 672 gw_return_object.status = self.library.GW2RegisterAnalysisMemory( 673 gw_return_object.session, 674 ct.byref(gw_return_object.buffer), 675 ct.byref(gw_return_object.buffer_length), 676 gw_return_object.analysis_format 677 ) 678 679 return gw_return_object 680 681 def register_analysis(self, session: int, output_file: Optional[str] = None): 682 """ Registers an analysis file for the given session. The analysis file will be created during the session's run_session call. 683 684 Args: 685 session (int): The session integer. 686 output_file (Optional[str]): Default None. The file path where the analysis will be written. None returns the analysis as bytes. 687 688 Returns: 689 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'status', 'session', 'analysis_format'. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. If output_file is not None (file mode) 'output_file' is included. 690 """ 691 if not isinstance(output_file, (type(None), str)): 692 raise TypeError(output_file) 693 694 if isinstance(output_file, str): 695 output_file = os.path.abspath(output_file) 696 697 result = self._GW2RegisterAnalysisFile(session, output_file) 698 699 elif isinstance(output_file, type(None)): 700 result = self._GW2RegisterAnalysisMemory(session) 701 702 if result.status not in successes.success_codes: 703 log.error(format_object(result)) 704 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 705 else: 706 log.debug(format_object(result)) 707 708 return result 709 710 def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 711 """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided. 712 713 Args: 714 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 715 output_file (Optional[str]): The output file path where the protected file will be written. 716 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 717 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 718 719 Returns: 720 file_bytes (bytes): The protected file bytes. 721 """ 722 # Validate arg types 723 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 724 raise TypeError(input_file) 725 if not isinstance(output_file, (type(None), str)): 726 raise TypeError(output_file) 727 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 728 raise TypeError(content_management_policy) 729 if not isinstance(raise_unsupported, bool): 730 raise TypeError(raise_unsupported) 731 732 # Convert string path arguments to absolute paths 733 if isinstance(input_file, str): 734 if not os.path.isfile(input_file): 735 raise FileNotFoundError(input_file) 736 input_file = os.path.abspath(input_file) 737 if isinstance(output_file, str): 738 output_file = os.path.abspath(output_file) 739 # make directories that do not exist 740 os.makedirs(os.path.dirname(output_file), exist_ok=True) 741 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 742 content_management_policy = os.path.abspath(content_management_policy) 743 744 # Convert memory inputs to bytes 745 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 746 input_file = utils.as_bytes(input_file) 747 748 with utils.CwdHandler(self.library_path): 749 with self.new_session() as session: 750 content_management_policy = self.set_content_management_policy(session, content_management_policy) 751 register_input = self.register_input(session, input_file) 752 register_output = self.register_output(session, output_file=output_file) 753 status = self.run_session(session) 754 755 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 756 if status not in successes.success_codes: 757 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 758 if raise_unsupported: 759 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 760 else: 761 file_bytes = None 762 else: 763 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 764 # Get file bytes 765 if isinstance(output_file, str): 766 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 767 if not os.path.isfile(output_file): 768 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 769 file_bytes = None 770 else: 771 with open(output_file, "rb") as f: 772 file_bytes = f.read() 773 else: 774 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 775 file_bytes = utils.buffer_to_bytes( 776 register_output.buffer, 777 register_output.buffer_length 778 ) 779 780 # Ensure memory allocated is not garbage collected 781 content_management_policy, register_input, register_output 782 783 return file_bytes 784 785 def protect_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 786 """ Recursively processes all files in a directory in protect mode using the given content management policy. 787 The protected files are written to output_directory maintaining the same directory structure as input_directory. 788 789 Args: 790 input_directory (str): The input directory containing files to protect. 791 output_directory (Optional[str]): The output directory where the protected file will be written, or None to not write files. 792 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 793 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 794 795 Returns: 796 protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 797 """ 798 protected_files_dict = {} 799 # Call protect_file on each file in input_directory to output_directory 800 for input_file in utils.list_file_paths(input_directory): 801 relative_path = os.path.relpath(input_file, input_directory) 802 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 803 804 protected_bytes = self.protect_file( 805 input_file=input_file, 806 output_file=output_file, 807 raise_unsupported=raise_unsupported, 808 content_management_policy=content_management_policy, 809 ) 810 811 protected_files_dict[relative_path] = protected_bytes 812 813 return protected_files_dict 814 815 def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 816 """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided. 817 818 Args: 819 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 820 output_file (Optional[str]): The output file path where the analysis file will be written. 821 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 822 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 823 824 Returns: 825 file_bytes (bytes): The analysis file bytes. 826 """ 827 # Validate arg types 828 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 829 raise TypeError(input_file) 830 if not isinstance(output_file, (type(None), str)): 831 raise TypeError(output_file) 832 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 833 raise TypeError(content_management_policy) 834 if not isinstance(raise_unsupported, bool): 835 raise TypeError(raise_unsupported) 836 837 # Convert string path arguments to absolute paths 838 if isinstance(input_file, str): 839 if not os.path.isfile(input_file): 840 raise FileNotFoundError(input_file) 841 input_file = os.path.abspath(input_file) 842 if isinstance(output_file, str): 843 output_file = os.path.abspath(output_file) 844 # make directories that do not exist 845 os.makedirs(os.path.dirname(output_file), exist_ok=True) 846 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 847 content_management_policy = os.path.abspath(content_management_policy) 848 849 # Convert memory inputs to bytes 850 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 851 input_file = utils.as_bytes(input_file) 852 853 with utils.CwdHandler(self.library_path): 854 with self.new_session() as session: 855 content_management_policy = self.set_content_management_policy(session, content_management_policy) 856 register_input = self.register_input(session, input_file) 857 register_analysis = self.register_analysis(session, output_file) 858 status = self.run_session(session) 859 860 file_bytes = None 861 if isinstance(output_file, str): 862 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 863 if os.path.isfile(output_file): 864 with open(output_file, "rb") as f: 865 file_bytes = f.read() 866 else: 867 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 868 if register_analysis.buffer and register_analysis.buffer_length: 869 file_bytes = utils.buffer_to_bytes( 870 register_analysis.buffer, 871 register_analysis.buffer_length 872 ) 873 874 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 875 if status not in successes.success_codes: 876 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 877 if raise_unsupported: 878 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 879 else: 880 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 881 882 # Ensure memory allocated is not garbage collected 883 content_management_policy, register_input, register_analysis 884 885 return file_bytes 886 887 def analyse_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 888 """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory. 889 890 Args: 891 input_directory (str): The input directory containing files to analyse. 892 output_directory (Optional[str]): The output directory where the analysis files will be written, or None to not write files. 893 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 894 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 895 896 Returns: 897 analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 898 """ 899 analysis_files_dict = {} 900 # Call analyse_file on each file in input_directory to output_directory 901 for input_file in utils.list_file_paths(input_directory): 902 relative_path = os.path.relpath(input_file, input_directory) + ".xml" 903 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 904 905 analysis_bytes = self.analyse_file( 906 input_file=input_file, 907 output_file=output_file, 908 raise_unsupported=raise_unsupported, 909 content_management_policy=content_management_policy, 910 ) 911 912 analysis_files_dict[relative_path] = analysis_bytes 913 914 return analysis_files_dict 915 916 def protect_and_analyse_file( 917 self, 918 input_file: Union[str, bytes, bytearray, io.BytesIO], 919 output_file: Optional[str] = None, 920 output_analysis_report: Optional[str] = None, 921 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 922 raise_unsupported: bool = True, 923 ): 924 """ Protects and analyses a file in a single session, returning both protected file bytes and analysis report bytes. 925 926 Args: 927 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 928 output_file (Optional[str]): The output file path where the protected file will be written. 929 output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written. 930 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 931 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 932 933 Returns: 934 Tuple[Optional[bytes], Optional[bytes]]: A tuple of (protected_file_bytes, analysis_report_bytes). 935 """ 936 # Validate arg types 937 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 938 raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}") 939 if not isinstance(output_file, (type(None), str)): 940 raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}") 941 if not isinstance(output_analysis_report, (type(None), str)): 942 raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}") 943 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 944 raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}") 945 if not isinstance(raise_unsupported, bool): 946 raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}") 947 948 # Convert string path arguments to absolute paths 949 if isinstance(input_file, str): 950 if not os.path.isfile(input_file): 951 raise FileNotFoundError(input_file) 952 input_file = os.path.abspath(input_file) 953 if isinstance(output_file, str): 954 output_file = os.path.abspath(output_file) 955 os.makedirs(os.path.dirname(output_file), exist_ok=True) 956 if isinstance(output_analysis_report, str): 957 output_analysis_report = os.path.abspath(output_analysis_report) 958 os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True) 959 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 960 content_management_policy = os.path.abspath(content_management_policy) 961 962 # Convert memory inputs to bytes 963 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 964 input_file = utils.as_bytes(input_file) 965 966 with utils.CwdHandler(self.library_path): 967 with self.new_session() as session: 968 content_management_policy = self.set_content_management_policy(session, content_management_policy) 969 register_input = self.register_input(session, input_file) 970 register_output = self.register_output(session, output_file) 971 register_analysis = self.register_analysis(session, output_analysis_report) 972 973 status = self.run_session(session) 974 975 protected_file_bytes = None 976 analysis_report_bytes = None 977 978 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 979 if status not in successes.success_codes: 980 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 981 if raise_unsupported: 982 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 983 984 # Get analysis report file bytes, even on processing failure 985 if isinstance(output_analysis_report, str): 986 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 987 if not os.path.isfile(output_analysis_report): 988 log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}") 989 else: 990 with open(output_analysis_report, "rb") as f: 991 analysis_report_bytes = f.read() 992 else: 993 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 994 if register_analysis.buffer and register_analysis.buffer_length: 995 analysis_report_bytes = utils.buffer_to_bytes( 996 register_analysis.buffer, 997 register_analysis.buffer_length 998 ) 999 1000 # On success, get protected file bytes 1001 if status in successes.success_codes: 1002 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 1003 # Get file bytes 1004 if isinstance(output_file, str): 1005 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1006 if not os.path.isfile(output_file): 1007 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1008 else: 1009 with open(output_file, "rb") as f: 1010 protected_file_bytes = f.read() 1011 else: 1012 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1013 protected_file_bytes = utils.buffer_to_bytes( 1014 register_output.buffer, 1015 register_output.buffer_length 1016 ) 1017 1018 # Ensure memory allocated is not garbage collected 1019 content_management_policy, register_input, register_output, register_analysis 1020 1021 return protected_file_bytes, analysis_report_bytes 1022 1023 def protect_and_analyse_directory( 1024 self, 1025 input_directory: str, 1026 output_directory: Optional[str], 1027 analysis_directory: Optional[str], 1028 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 1029 raise_unsupported: bool = True, 1030 ): 1031 """ Recursively processes all files in a directory using protect and analyse mode with the given content management policy. 1032 Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory. 1033 1034 Args: 1035 input_directory (str): The input directory containing files to process. 1036 output_directory (Optional[str]): The output directory for protected files. 1037 analysis_directory (Optional[str]): The output directory for XML analysis reports. 1038 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 1039 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1040 1041 Returns: 1042 result_dict (dict): A dictionary mapping relative file paths to tuples of (protected_file_bytes, analysis_report_bytes). 1043 """ 1044 result_dict = {} 1045 for input_file in utils.list_file_paths(input_directory): 1046 relative_path = os.path.relpath(input_file, input_directory) 1047 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 1048 output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml") 1049 1050 protected_file_bytes, analysis_report_bytes = self.protect_and_analyse_file( 1051 input_file=input_file, 1052 output_file=output_file, 1053 output_analysis_report=output_analysis_report, 1054 content_management_policy=content_management_policy, 1055 raise_unsupported=raise_unsupported, 1056 ) 1057 1058 result_dict[relative_path] = (protected_file_bytes, analysis_report_bytes) 1059 1060 return result_dict 1061 1062 def _GW2RegisterExportFile(self, session: int, output_file: str): 1063 """ Register an export output file for the given session. 1064 1065 Args: 1066 session (int): The session integer. 1067 output_file (str): The export output file path. 1068 1069 Returns: 1070 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 1071 """ 1072 # API function declaration 1073 self.library.GW2RegisterExportFile.argtypes = [ 1074 ct.c_size_t, # Session_Handle session 1075 ct.c_char_p # const char * exportFilePath 1076 ] 1077 1078 # Variable initialisation 1079 gw_return_object = glasswall.GwReturnObj() 1080 gw_return_object.session = ct.c_size_t(session) 1081 gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8")) 1082 1083 # API Call 1084 gw_return_object.status = self.library.GW2RegisterExportFile( 1085 gw_return_object.session, 1086 gw_return_object.output_file 1087 ) 1088 1089 return gw_return_object 1090 1091 def _GW2RegisterExportMemory(self, session: int): 1092 """ Register an export output file in memory for the given session. 1093 1094 Args: 1095 session (int): The session integer. 1096 1097 Returns: 1098 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 1099 """ 1100 # API function declaration 1101 self.library.GW2RegisterExportMemory.argtypes = [ 1102 ct.c_size_t, # Session_Handle session 1103 ct.POINTER(ct.c_void_p), # char ** exportFileBuffer 1104 ct.POINTER(ct.c_size_t) # size_t * exportLength 1105 ] 1106 1107 # Variable initialisation 1108 gw_return_object = glasswall.GwReturnObj() 1109 gw_return_object.session = ct.c_size_t(session) 1110 gw_return_object.buffer = ct.c_void_p() 1111 gw_return_object.buffer_length = ct.c_size_t() 1112 1113 # API call 1114 gw_return_object.status = self.library.GW2RegisterExportMemory( 1115 gw_return_object.session, 1116 ct.byref(gw_return_object.buffer), 1117 ct.byref(gw_return_object.buffer_length) 1118 ) 1119 1120 return gw_return_object 1121 1122 def register_export(self, session: int, output_file: Optional[str] = None): 1123 """ Registers a file to be exported for the given session. The export file will be created during the session's run_session call. 1124 1125 Args: 1126 session (int): The session integer. 1127 output_file (Optional[str]): Default None. The file path where the export will be written. None exports the file in memory. 1128 1129 Returns: 1130 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call and 'session', the session integer. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. 1131 """ 1132 if not isinstance(output_file, (type(None), str)): 1133 raise TypeError(output_file) 1134 1135 if isinstance(output_file, str): 1136 output_file = os.path.abspath(output_file) 1137 1138 result = self._GW2RegisterExportFile(session, output_file) 1139 1140 elif isinstance(output_file, type(None)): 1141 result = self._GW2RegisterExportMemory(session) 1142 1143 if result.status not in successes.success_codes: 1144 log.error(format_object(result)) 1145 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1146 else: 1147 log.debug(format_object(result)) 1148 1149 return result 1150 1151 def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1152 """ Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided. 1153 1154 Args: 1155 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 1156 output_file (Optional[str]): The output file path where the .zip file will be written. 1157 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 1158 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1159 1160 Returns: 1161 file_bytes (bytes): The exported .zip file. 1162 """ 1163 # Validate arg types 1164 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 1165 raise TypeError(input_file) 1166 if not isinstance(output_file, (type(None), str)): 1167 raise TypeError(output_file) 1168 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 1169 raise TypeError(content_management_policy) 1170 if not isinstance(raise_unsupported, bool): 1171 raise TypeError(raise_unsupported) 1172 1173 # Convert string path arguments to absolute paths 1174 if isinstance(input_file, str): 1175 if not os.path.isfile(input_file): 1176 raise FileNotFoundError(input_file) 1177 input_file = os.path.abspath(input_file) 1178 if isinstance(output_file, str): 1179 output_file = os.path.abspath(output_file) 1180 # make directories that do not exist 1181 os.makedirs(os.path.dirname(output_file), exist_ok=True) 1182 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 1183 content_management_policy = os.path.abspath(content_management_policy) 1184 1185 # Convert memory inputs to bytes 1186 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 1187 input_file = utils.as_bytes(input_file) 1188 1189 with utils.CwdHandler(self.library_path): 1190 with self.new_session() as session: 1191 content_management_policy = self.set_content_management_policy(session, content_management_policy) 1192 register_input = self.register_input(session, input_file) 1193 register_export = self.register_export(session, output_file) 1194 status = self.run_session(session) 1195 1196 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 1197 if status not in successes.success_codes: 1198 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1199 if raise_unsupported: 1200 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1201 else: 1202 file_bytes = None 1203 else: 1204 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1205 # Get file bytes 1206 if isinstance(output_file, str): 1207 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1208 if not os.path.isfile(output_file): 1209 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1210 file_bytes = None 1211 else: 1212 with open(output_file, "rb") as f: 1213 file_bytes = f.read() 1214 else: 1215 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1216 file_bytes = utils.buffer_to_bytes( 1217 register_export.buffer, 1218 register_export.buffer_length 1219 ) 1220 1221 # Ensure memory allocated is not garbage collected 1222 content_management_policy, register_input, register_export 1223 1224 return file_bytes 1225 1226 def export_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1227 """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory. 1228 1229 Args: 1230 input_directory (str): The input directory containing files to export. 1231 output_directory (Optional[str]): The output directory where the export files will be written, or None to not write files. 1232 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 1233 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1234 1235 Returns: 1236 export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 1237 """ 1238 export_files_dict = {} 1239 # Call export_file on each file in input_directory to output_directory 1240 for input_file in utils.list_file_paths(input_directory): 1241 relative_path = os.path.relpath(input_file, input_directory) + ".zip" 1242 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 1243 1244 export_bytes = self.export_file( 1245 input_file=input_file, 1246 output_file=output_file, 1247 raise_unsupported=raise_unsupported, 1248 content_management_policy=content_management_policy, 1249 ) 1250 1251 export_files_dict[relative_path] = export_bytes 1252 1253 return export_files_dict 1254 1255 def export_and_analyse_file( 1256 self, 1257 input_file: Union[str, bytes, bytearray, io.BytesIO], 1258 output_file: Optional[str] = None, 1259 output_analysis_report: Optional[str] = None, 1260 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 1261 raise_unsupported: bool = True, 1262 ): 1263 """ Exports and analyses a file in a single session, returning both exported .zip bytes and analysis report bytes. 1264 1265 Args: 1266 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 1267 output_file (Optional[str]): The output file path where the .zip export will be written. 1268 output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written. 1269 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 1270 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1271 1272 Returns: 1273 Tuple[Optional[bytes], Optional[bytes]]: A tuple of (exported_file_bytes, analysis_report_bytes). 1274 """ 1275 # Validate arg types 1276 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 1277 raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}") 1278 if not isinstance(output_file, (type(None), str)): 1279 raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}") 1280 if not isinstance(output_analysis_report, (type(None), str)): 1281 raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}") 1282 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 1283 raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}") 1284 if not isinstance(raise_unsupported, bool): 1285 raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}") 1286 1287 # Convert string path arguments to absolute paths 1288 if isinstance(input_file, str): 1289 if not os.path.isfile(input_file): 1290 raise FileNotFoundError(input_file) 1291 input_file = os.path.abspath(input_file) 1292 if isinstance(output_file, str): 1293 output_file = os.path.abspath(output_file) 1294 os.makedirs(os.path.dirname(output_file), exist_ok=True) 1295 if isinstance(output_analysis_report, str): 1296 output_analysis_report = os.path.abspath(output_analysis_report) 1297 os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True) 1298 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 1299 content_management_policy = os.path.abspath(content_management_policy) 1300 1301 # Convert memory inputs to bytes 1302 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 1303 input_file = utils.as_bytes(input_file) 1304 1305 with utils.CwdHandler(self.library_path): 1306 with self.new_session() as session: 1307 content_management_policy = self.set_content_management_policy(session, content_management_policy) 1308 register_input = self.register_input(session, input_file) 1309 register_export = self.register_export(session, output_file) 1310 register_analysis = self.register_analysis(session, output_analysis_report) 1311 1312 status = self.run_session(session) 1313 1314 export_file_bytes = None 1315 analysis_report_bytes = None 1316 1317 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 1318 if status not in successes.success_codes: 1319 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 1320 if raise_unsupported: 1321 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1322 1323 # Get analysis report file bytes, even on processing failure 1324 if isinstance(output_analysis_report, str): 1325 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1326 if not os.path.isfile(output_analysis_report): 1327 log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}") 1328 else: 1329 with open(output_analysis_report, "rb") as f: 1330 analysis_report_bytes = f.read() 1331 else: 1332 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1333 if register_analysis.buffer and register_analysis.buffer_length: 1334 analysis_report_bytes = utils.buffer_to_bytes( 1335 register_analysis.buffer, 1336 register_analysis.buffer_length 1337 ) 1338 1339 # On success, get export file bytes 1340 if status in successes.success_codes: 1341 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 1342 # Get export file bytes 1343 if isinstance(output_file, str): 1344 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1345 if not os.path.isfile(output_file): 1346 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1347 else: 1348 with open(output_file, "rb") as f: 1349 export_file_bytes = f.read() 1350 else: 1351 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1352 export_file_bytes = utils.buffer_to_bytes( 1353 register_export.buffer, 1354 register_export.buffer_length 1355 ) 1356 1357 # Ensure memory allocated is not garbage collected 1358 content_management_policy, register_input, register_export, register_analysis 1359 1360 return export_file_bytes, analysis_report_bytes 1361 1362 def export_and_analyse_directory( 1363 self, 1364 input_directory: str, 1365 output_directory: Optional[str], 1366 analysis_directory: Optional[str], 1367 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 1368 raise_unsupported: bool = True, 1369 ): 1370 """ Recursively processes all files in a directory using export and analyse mode with the given content management policy. 1371 Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory. 1372 1373 Args: 1374 input_directory (str): The input directory containing files to process. 1375 output_directory (Optional[str]): The output directory for exported .zip files. 1376 analysis_directory (Optional[str]): The output directory for XML analysis reports. 1377 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 1378 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1379 1380 Returns: 1381 result_dict (dict): A dictionary mapping relative file paths to tuples of (exported_file_bytes, analysis_report_bytes). 1382 """ 1383 result_dict = {} 1384 1385 for input_file in utils.list_file_paths(input_directory): 1386 relative_path = os.path.relpath(input_file, input_directory) 1387 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path + ".zip") 1388 output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml") 1389 1390 export_file_bytes, analysis_report_bytes = self.export_and_analyse_file( 1391 input_file=input_file, 1392 output_file=output_file, 1393 output_analysis_report=output_analysis_report, 1394 content_management_policy=content_management_policy, 1395 raise_unsupported=raise_unsupported, 1396 ) 1397 1398 result_dict[relative_path] = (export_file_bytes, analysis_report_bytes) 1399 1400 return result_dict 1401 1402 def _GW2RegisterImportFile(self, session: int, input_file: str): 1403 """ Register an import input file for the given session. 1404 1405 Args: 1406 session (int): The session integer. 1407 input_file (str): The input import file path. 1408 1409 Returns: 1410 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 1411 """ 1412 # API function declaration 1413 self.library.GW2RegisterImportFile.argtypes = [ 1414 ct.c_size_t, # Session_Handle session 1415 ct.c_char_p # const char * importFilePath 1416 ] 1417 1418 # Variable initialisation 1419 gw_return_object = glasswall.GwReturnObj() 1420 gw_return_object.session = ct.c_size_t(session) 1421 gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8")) 1422 1423 # API Call 1424 gw_return_object.status = self.library.GW2RegisterImportFile( 1425 gw_return_object.session, 1426 gw_return_object.input_file 1427 ) 1428 1429 return gw_return_object 1430 1431 def _GW2RegisterImportMemory(self, session: int, input_file: bytes): 1432 """ Register an import input file in memory for the given session. 1433 1434 Args: 1435 session (int): The session integer. 1436 input_file (str): The input import file in memory. 1437 1438 Returns: 1439 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 1440 """ 1441 # API function declaration 1442 self.library.GW2RegisterImportMemory.argtypes = [ 1443 ct.c_size_t, # Session_Handle session 1444 ct.c_void_p, # char * importFileBuffer 1445 ct.c_size_t # size_t importLength 1446 ] 1447 1448 # Variable initialisation 1449 gw_return_object = glasswall.GwReturnObj() 1450 gw_return_object.session = ct.c_size_t(session) 1451 gw_return_object.buffer = ct.c_char_p(input_file) 1452 gw_return_object.buffer_length = ct.c_size_t(len(input_file)) 1453 1454 # API call 1455 gw_return_object.status = self.library.GW2RegisterImportMemory( 1456 gw_return_object.session, 1457 gw_return_object.buffer, 1458 gw_return_object.buffer_length 1459 ) 1460 1461 return gw_return_object 1462 1463 def register_import(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]): 1464 """ Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call. 1465 1466 Args: 1467 session (int): The session integer. 1468 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input import file path or bytes. 1469 1470 Returns: 1471 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. 1472 """ 1473 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)): 1474 raise TypeError(input_file) 1475 1476 if isinstance(input_file, str): 1477 if not os.path.isfile(input_file): 1478 raise FileNotFoundError(input_file) 1479 1480 input_file = os.path.abspath(input_file) 1481 1482 result = self._GW2RegisterImportFile(session, input_file) 1483 1484 elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)): 1485 # Convert bytearray and io.BytesIO to bytes 1486 input_file = utils.as_bytes(input_file) 1487 1488 result = self._GW2RegisterImportMemory(session, input_file) 1489 1490 if result.status not in successes.success_codes: 1491 log.error(format_object(result)) 1492 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1493 else: 1494 log.debug(format_object(result)) 1495 1496 return result 1497 1498 def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1499 """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided. 1500 1501 Args: 1502 input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. 1503 output_file (Optional[str]): The output file path where the constructed file will be written. 1504 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 1505 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1506 1507 Returns: 1508 file_bytes (bytes): The imported file bytes. 1509 """ 1510 # Validate arg types 1511 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 1512 raise TypeError(input_file) 1513 if not isinstance(output_file, (type(None), str)): 1514 raise TypeError(output_file) 1515 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 1516 raise TypeError(content_management_policy) 1517 if not isinstance(raise_unsupported, bool): 1518 raise TypeError(raise_unsupported) 1519 1520 # Convert string path arguments to absolute paths 1521 if isinstance(input_file, str): 1522 if not os.path.isfile(input_file): 1523 raise FileNotFoundError(input_file) 1524 input_file = os.path.abspath(input_file) 1525 if isinstance(output_file, str): 1526 output_file = os.path.abspath(output_file) 1527 # make directories that do not exist 1528 os.makedirs(os.path.dirname(output_file), exist_ok=True) 1529 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 1530 content_management_policy = os.path.abspath(content_management_policy) 1531 1532 # Convert memory inputs to bytes 1533 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 1534 input_file = utils.as_bytes(input_file) 1535 1536 with utils.CwdHandler(self.library_path): 1537 with self.new_session() as session: 1538 content_management_policy = self.set_content_management_policy(session, content_management_policy) 1539 register_import = self.register_import(session, input_file) 1540 register_output = self.register_output(session, output_file) 1541 status = self.run_session(session) 1542 1543 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 1544 if status not in successes.success_codes: 1545 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1546 if raise_unsupported: 1547 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1548 else: 1549 file_bytes = None 1550 else: 1551 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1552 # Get file bytes 1553 if isinstance(output_file, str): 1554 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1555 if not os.path.isfile(output_file): 1556 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1557 file_bytes = None 1558 else: 1559 with open(output_file, "rb") as f: 1560 file_bytes = f.read() 1561 else: 1562 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1563 file_bytes = utils.buffer_to_bytes( 1564 register_output.buffer, 1565 register_output.buffer_length 1566 ) 1567 1568 # Ensure memory allocated is not garbage collected 1569 content_management_policy, register_import, register_output 1570 1571 return file_bytes 1572 1573 def import_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1574 """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. 1575 The constructed files are written to output_directory maintaining the same directory structure as input_directory. 1576 1577 Args: 1578 input_directory (str): The input directory containing files to import. 1579 output_directory (Optional[str]): The output directory where the constructed files will be written, or None to not write files. 1580 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 1581 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1582 1583 Returns: 1584 import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 1585 """ 1586 import_files_dict = {} 1587 # Call import_file on each file in input_directory to output_directory 1588 for input_file in utils.list_file_paths(input_directory): 1589 relative_path = os.path.relpath(input_file, input_directory) 1590 # Remove .zip extension from relative_path 1591 relative_path = os.path.splitext(relative_path)[0] 1592 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 1593 1594 import_bytes = self.import_file( 1595 input_file=input_file, 1596 output_file=output_file, 1597 raise_unsupported=raise_unsupported, 1598 content_management_policy=content_management_policy, 1599 ) 1600 1601 import_files_dict[relative_path] = import_bytes 1602 1603 return import_files_dict 1604 1605 def _GW2FileErrorMsg(self, session: int): 1606 """ Retrieve the Glasswall Session Process error message. 1607 1608 Args: 1609 session (int): The session integer. 1610 1611 Returns: 1612 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status', 'error_message'. 1613 """ 1614 # API function declaration 1615 self.library.GW2FileErrorMsg.argtypes = [ 1616 ct.c_size_t, # Session_Handle session 1617 ct.POINTER(ct.c_void_p), # char **errorMsgBuffer 1618 ct.POINTER(ct.c_size_t) # size_t *errorMsgBufferLength 1619 ] 1620 1621 # Variable initialisation 1622 gw_return_object = glasswall.GwReturnObj() 1623 gw_return_object.session = ct.c_size_t(session) 1624 gw_return_object.buffer = ct.c_void_p() 1625 gw_return_object.buffer_length = ct.c_size_t() 1626 1627 # API call 1628 gw_return_object.status = self.library.GW2FileErrorMsg( 1629 gw_return_object.session, 1630 ct.byref(gw_return_object.buffer), 1631 ct.byref(gw_return_object.buffer_length) 1632 ) 1633 1634 # Editor wrote to a buffer, convert it to bytes 1635 error_bytes = utils.buffer_to_bytes( 1636 gw_return_object.buffer, 1637 gw_return_object.buffer_length 1638 ) 1639 1640 gw_return_object.error_message = error_bytes.decode() 1641 1642 return gw_return_object 1643 1644 @functools.lru_cache() 1645 def file_error_message(self, session: int) -> str: 1646 """ Retrieve the Glasswall Session Process error message. 1647 1648 Args: 1649 session (int): The session integer. 1650 1651 Returns: 1652 error_message (str): The Glasswall Session Process error message. 1653 """ 1654 # Validate arg types 1655 if not isinstance(session, int): 1656 raise TypeError(session) 1657 1658 result = self._GW2FileErrorMsg(session) 1659 1660 if result.status not in successes.success_codes: 1661 log.error(format_object(result)) 1662 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1663 else: 1664 log.debug(format_object(result)) 1665 1666 return result.error_message 1667 1668 def GW2GetFileType(self, session: int, file_type_id): 1669 """ Retrieve the file type as a string. 1670 1671 Args: 1672 session (int): The session integer. 1673 file_type_id (int): The file type id. 1674 1675 Returns: 1676 file_type (str): The formal file name for the corresponding file id. 1677 """ 1678 # Validate arg types 1679 if not isinstance(session, int): 1680 raise TypeError(session) 1681 1682 # API function declaration 1683 self.library.GW2GetFileType.argtypes = [ 1684 ct.c_size_t, 1685 ct.c_size_t, 1686 ct.POINTER(ct.c_size_t), 1687 ct.POINTER(ct.c_void_p) 1688 ] 1689 1690 # Variable initialisation 1691 ct_session = ct.c_size_t(session) 1692 ct_file_type = ct.c_size_t(file_type_id) 1693 ct_buffer_length = ct.c_size_t() 1694 ct_buffer = ct.c_void_p() 1695 1696 # API call 1697 status = self.library.GW2GetFileType( 1698 ct_session, 1699 ct_file_type, 1700 ct.byref(ct_buffer_length), 1701 ct.byref(ct_buffer) 1702 ) 1703 1704 if status not in successes.success_codes: 1705 log.error(f"\n\tsession: {session}\n\tstatus: {status}") 1706 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1707 else: 1708 log.debug(f"\n\tsession: {session}\n\tstatus: {status}") 1709 1710 # Editor wrote to a buffer, convert it to bytes 1711 file_type_bytes = utils.buffer_to_bytes( 1712 ct_buffer, 1713 ct_buffer_length 1714 ) 1715 1716 file_type = file_type_bytes.decode() 1717 1718 return file_type 1719 1720 def GW2GetFileTypeID(self, session: int, file_type_str): 1721 """ Retrieve the Glasswall file type id given a file type string. 1722 1723 Args: 1724 session (int): The session integer. 1725 file_type_str (str): The file type as a string. 1726 1727 Returns: 1728 file_type_id (str): The Glasswall file type id for the specified file type. 1729 """ 1730 # Validate arg types 1731 if not isinstance(session, int): 1732 raise TypeError(session) 1733 1734 # API function declaration 1735 self.library.GW2GetFileTypeID.argtypes = [ 1736 ct.c_size_t, 1737 ct.c_char_p, 1738 ct.POINTER(ct.c_size_t), 1739 ct.POINTER(ct.c_void_p) 1740 ] 1741 1742 # Variable initialisation 1743 ct_session = ct.c_size_t(session) 1744 ct_file_type = ct.c_char_p(file_type_str.encode('utf-8')) 1745 ct_buffer_length = ct.c_size_t() 1746 ct_buffer = ct.c_void_p() 1747 1748 # API call 1749 status = self.library.GW2GetFileTypeID( 1750 ct_session, 1751 ct_file_type, 1752 ct.byref(ct_buffer_length), 1753 ct.byref(ct_buffer) 1754 ) 1755 1756 if status not in successes.success_codes: 1757 log.error(f"\n\tsession: {session}\n\tstatus: {status}") 1758 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1759 else: 1760 log.debug(f"\n\tsession: {session}\n\tstatus: {status}") 1761 1762 # Editor wrote to a buffer, convert it to bytes 1763 file_type_bytes = utils.buffer_to_bytes( 1764 ct_buffer, 1765 ct_buffer_length 1766 ) 1767 1768 file_type_id = file_type_bytes.decode() 1769 1770 return file_type_id 1771 1772 def get_file_type_info(self, file_type: Union[str, int]): 1773 """ Retrieve information about a file type based on its identifier. 1774 1775 Args: 1776 file_type (Union[str, int]): The file type identifier. This can be either a string representing a file 1777 extension (e.g. 'bmp') or an integer corresponding to a file type (e.g. 29). 1778 1779 Returns: 1780 - file_type_info (Union[int, str]): Depending on the input 'file_type': 1781 - If `file_type` is a string (e.g. 'bmp'): 1782 - If the file type is recognised, returns an integer corresponding to that file type. 1783 - If the file type is not recognised, returns 0. 1784 - If `file_type` is an integer (e.g. 29): 1785 - If the integer corresponds to a recognised file type, returns a more detailed string description 1786 of the file type (e.g. 'BMP Image'). 1787 - If the integer does not match any recognised file type, returns an empty string. 1788 """ 1789 # Validate arg types 1790 if not isinstance(file_type, (str, int)): 1791 raise TypeError(file_type) 1792 1793 with utils.CwdHandler(self.library_path): 1794 with self.new_session() as session: 1795 1796 if isinstance(file_type, int): 1797 file_type_info = self.GW2GetFileType(session, file_type) 1798 if isinstance(file_type, str): 1799 file_type_info = self.GW2GetFileTypeID(session, file_type) 1800 1801 return file_type_info 1802 1803 @utils.deprecated_function(replacement_function=get_file_type_info) 1804 def get_file_info(self, *args, **kwargs): 1805 """ Deprecated in 1.0.6. Use get_file_type_info. """ 1806 pass 1807 1808 def _GW2RegisterReportFile(self, session: int, output_file: str): 1809 """ Register an output report file path for the given session. 1810 1811 Args: 1812 session (int): The session integer. 1813 output_file (str): The file path of the output report file. 1814 1815 Returns: 1816 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 1817 """ 1818 # API function declaration 1819 self.library.GW2RegisterReportFile.argtypes = [ 1820 ct.c_size_t, # Session_Handle session 1821 ct.c_char_p, # const char * reportFilePathName 1822 ] 1823 1824 # Variable initialisation 1825 gw_return_object = glasswall.GwReturnObj() 1826 gw_return_object.session = ct.c_size_t(session) 1827 gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8")) 1828 1829 # API call 1830 gw_return_object.status = self.library.GW2RegisterReportFile( 1831 gw_return_object.session, 1832 gw_return_object.output_file 1833 ) 1834 1835 return gw_return_object 1836 1837 def register_report_file(self, session: int, output_file: str): 1838 """ Register the report file path for the given session. 1839 1840 Args: 1841 session (int): The session integer. 1842 output_file (str): The file path of the report file. 1843 1844 Returns: 1845 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 1846 """ 1847 # Validate arg types 1848 if not isinstance(session, int): 1849 raise TypeError(session) 1850 if not isinstance(output_file, (type(None), str)): 1851 raise TypeError(output_file) 1852 1853 result = self._GW2RegisterReportFile(session, output_file) 1854 1855 if result.status not in successes.success_codes: 1856 log.error(format_object(result)) 1857 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1858 else: 1859 log.debug(format_object(result)) 1860 1861 return result 1862 1863 def _GW2GetIdInfo(self, session: int, issue_id: int): 1864 """ Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances" 1865 1866 Args: 1867 session (int): The session integer. 1868 issue_id (int): The issue id. 1869 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1870 1871 Returns: 1872 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'issue_id', 'buffer_length', 'buffer', 'status', 'id_info'. 1873 """ 1874 # API function declaration 1875 self.library.GW2GetIdInfo.argtypes = [ 1876 ct.c_size_t, # Session_Handle session 1877 ct.c_size_t, # size_t issueId 1878 ct.POINTER(ct.c_size_t), # size_t * bufferLength 1879 ct.POINTER(ct.c_void_p) # char ** outputBuffer 1880 ] 1881 1882 # Variable initialisation 1883 gw_return_object = glasswall.GwReturnObj() 1884 gw_return_object.session = ct.c_size_t(session) 1885 gw_return_object.issue_id = ct.c_size_t(issue_id) 1886 gw_return_object.buffer_length = ct.c_size_t() 1887 gw_return_object.buffer = ct.c_void_p() 1888 1889 # API call 1890 gw_return_object.status = self.library.GW2GetIdInfo( 1891 gw_return_object.session, 1892 gw_return_object.issue_id, 1893 ct.byref(gw_return_object.buffer_length), 1894 ct.byref(gw_return_object.buffer) 1895 ) 1896 1897 # Editor wrote to a buffer, convert it to bytes 1898 id_info_bytes = utils.buffer_to_bytes( 1899 gw_return_object.buffer, 1900 gw_return_object.buffer_length 1901 ) 1902 1903 gw_return_object.id_info = id_info_bytes.decode() 1904 1905 return gw_return_object 1906 1907 def get_id_info(self, issue_id: int, raise_unsupported: bool = True): 1908 """ Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances" 1909 1910 Args: 1911 issue_id (int): The issue id. 1912 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1913 1914 Returns: 1915 id_info (str): The group description for the given Issue ID. 1916 """ 1917 # Validate arg types 1918 if not isinstance(issue_id, int): 1919 raise TypeError(issue_id) 1920 1921 with utils.CwdHandler(self.library_path): 1922 with self.new_session() as session: 1923 result = self._GW2GetIdInfo(session, issue_id) 1924 1925 if result.status not in successes.success_codes: 1926 log.error(format_object(result)) 1927 if raise_unsupported: 1928 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1929 else: 1930 log.debug(format_object(result)) 1931 1932 return result.id_info 1933 1934 def _GW2GetAllIdInfo(self, session: int): 1935 """ Retrieves the XML containing all the Issue ID ranges with their group descriptions 1936 1937 Args: 1938 session (int): The session integer. 1939 1940 Returns: 1941 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'analysis_format', 'status', 'all_id_info'. 1942 """ 1943 1944 # API function declaration 1945 self.library.GW2GetAllIdInfo.argtypes = [ 1946 ct.c_size_t, # Session_Handle session 1947 ct.POINTER(ct.c_size_t), # size_t * bufferLength 1948 ct.POINTER(ct.c_void_p) # char ** outputBuffer 1949 ] 1950 1951 # Variable initialisation 1952 # The extracted issue Id information is stored in the analysis report, register an analysis session. 1953 gw_return_object = self._GW2RegisterAnalysisMemory(session) 1954 1955 # API call 1956 gw_return_object.status = self.library.GW2GetAllIdInfo( 1957 gw_return_object.session, 1958 ct.byref(gw_return_object.buffer_length), 1959 ct.byref(gw_return_object.buffer) 1960 ) 1961 1962 # Editor wrote to a buffer, convert it to bytes 1963 all_id_info_bytes = utils.buffer_to_bytes( 1964 gw_return_object.buffer, 1965 gw_return_object.buffer_length 1966 ) 1967 1968 gw_return_object.all_id_info = all_id_info_bytes.decode() 1969 1970 return gw_return_object 1971 1972 def get_all_id_info(self, output_file: Optional[str] = None, raise_unsupported: bool = True) -> str: 1973 """ Retrieves the XML containing all the Issue ID ranges with their group descriptions 1974 1975 Args: 1976 output_file (Optional[str]): The output file path where the analysis file will be written. 1977 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1978 1979 Returns: 1980 all_id_info (str): A string XML analysis report containing all id info. 1981 """ 1982 # Validate arg types 1983 if not isinstance(output_file, (type(None), str)): 1984 raise TypeError(output_file) 1985 if isinstance(output_file, str): 1986 output_file = os.path.abspath(output_file) 1987 1988 with utils.CwdHandler(self.library_path): 1989 with self.new_session() as session: 1990 result = self._GW2GetAllIdInfo(session) 1991 1992 if result.status not in successes.success_codes: 1993 log.error(format_object(result)) 1994 if raise_unsupported: 1995 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1996 else: 1997 log.debug(format_object(result)) 1998 1999 if isinstance(output_file, str): 2000 # GW2GetAllIdInfo is memory only, write to file 2001 # make directories that do not exist 2002 os.makedirs(os.path.dirname(output_file), exist_ok=True) 2003 with open(output_file, "w") as f: 2004 f.write(result.all_id_info) 2005 2006 return result.all_id_info 2007 2008 def _GW2FileSessionStatus(self, session: int): 2009 """ Retrieves the Glasswall Session Status. Also gives a high level indication of the processing that was carried out on the last document processed by the library 2010 2011 Args: 2012 session (int): The session integer. 2013 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 2014 2015 Returns: 2016 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'session_status', 'buffer', 'buffer_length', 'status', 'message'. 2017 """ 2018 # API function declaration 2019 self.library.GW2FileSessionStatus.argtypes = [ 2020 ct.c_size_t, # Session_Handle session 2021 ct.POINTER(ct.c_int), # int *glasswallSessionStatus 2022 ct.POINTER(ct.c_void_p), # char **statusMsgBuffer 2023 ct.POINTER(ct.c_size_t) # size_t *statusbufferLength 2024 ] 2025 2026 # Variable initialisation 2027 gw_return_object = glasswall.GwReturnObj() 2028 gw_return_object.session = ct.c_size_t(session) 2029 gw_return_object.session_status = ct.c_int() 2030 gw_return_object.buffer = ct.c_void_p() 2031 gw_return_object.buffer_length = ct.c_size_t() 2032 2033 # API call 2034 gw_return_object.status = self.library.GW2FileSessionStatus( 2035 gw_return_object.session, 2036 ct.byref(gw_return_object.session_status), 2037 ct.byref(gw_return_object.buffer), 2038 ct.byref(gw_return_object.buffer_length) 2039 ) 2040 2041 # Convert session_status to int 2042 gw_return_object.session_status = gw_return_object.session_status.value 2043 2044 # Editor wrote to a buffer, convert it to bytes 2045 message_bytes = utils.buffer_to_bytes( 2046 gw_return_object.buffer, 2047 gw_return_object.buffer_length 2048 ) 2049 gw_return_object.message = message_bytes.decode() 2050 2051 return gw_return_object 2052 2053 def file_session_status_message(self, session: int, raise_unsupported: bool = True) -> str: 2054 """ Retrieves the Glasswall session status message. Gives a high level indication of the processing that was carried out. 2055 2056 Args: 2057 session (int): The session integer. 2058 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 2059 2060 Returns: 2061 result.message (str):The file session status message. 2062 """ 2063 # Validate arg types 2064 if not isinstance(session, int): 2065 raise TypeError(session) 2066 2067 result = self._GW2FileSessionStatus(session) 2068 2069 if result.status not in successes.success_codes: 2070 log.error(format_object(result)) 2071 if raise_unsupported: 2072 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 2073 else: 2074 log.debug(format_object(result)) 2075 2076 return result.message 2077 2078 def _GW2LicenceDetails(self, session: int): 2079 """ Returns a human readable string containing licence details. 2080 2081 Args: 2082 session (int): The session integer. 2083 2084 Returns: 2085 licence_details (str): A human readable string representing the relevant information contained in the licence. 2086 """ 2087 # API function declaration 2088 self.library.GW2LicenceDetails.argtypes = [ct.c_size_t] 2089 self.library.GW2LicenceDetails.restype = ct.c_char_p 2090 2091 # Variable initialisation 2092 ct_session = ct.c_size_t(session) 2093 2094 # API call 2095 licence_details = self.library.GW2LicenceDetails(ct_session) 2096 2097 # Convert to Python string 2098 licence_details = ct.string_at(licence_details).decode() 2099 2100 return licence_details 2101 2102 def licence_details(self): 2103 """ Returns a string containing details of the licence. 2104 2105 Returns: 2106 result (str): A string containing details of the licence. 2107 """ 2108 with self.new_session() as session: 2109 result = self._GW2LicenceDetails(session) 2110 2111 log.debug(f"\n\tsession: {session}\n\tGW2LicenceDetails: {result}") 2112 2113 return result 2114 2115 def _GW2RegisterExportTextDumpMemory(self, session: int): 2116 """ Registers an export text dump to be written in memory. 2117 2118 Args: 2119 session (int): The session integer. 2120 2121 Returns: 2122 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 2123 """ 2124 # API function declaration 2125 self.library.GW2RegisterExportTextDumpMemory.argtypes = [ 2126 ct.c_size_t, # Session_Handle session 2127 ct.POINTER(ct.c_void_p), # char ** exportTextDumpFileBuffer 2128 ct.POINTER(ct.c_size_t) # size_t * exportTextDumpLength 2129 ] 2130 2131 # Variable initialisation 2132 gw_return_object = glasswall.GwReturnObj() 2133 gw_return_object.session = ct.c_size_t(session) 2134 gw_return_object.buffer = ct.c_void_p() 2135 gw_return_object.buffer_length = ct.c_size_t() 2136 2137 # API call 2138 gw_return_object.status = self.library.GW2RegisterExportTextDumpMemory( 2139 gw_return_object.session, 2140 ct.byref(gw_return_object.buffer), 2141 ct.byref(gw_return_object.buffer_length) 2142 ) 2143 2144 return gw_return_object 2145 2146 def _GW2RegisterExportTextDumpFile(self, session: int, output_file: str): 2147 """ Registers an export text dump to be written to file. 2148 2149 Args: 2150 session (int): The session integer. 2151 output_file (str): The file path of the text dump file. 2152 2153 Returns: 2154 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 2155 """ 2156 # API function declaration 2157 self.library.GW2RegisterExportTextDumpFile.argtypes = [ 2158 ct.c_size_t, # Session_Handle session 2159 ct.c_char_p # const char * textDumpFilePathName 2160 ] 2161 2162 # Variable initialisation 2163 gw_return_object = glasswall.GwReturnObj() 2164 gw_return_object.session = ct.c_size_t(session) 2165 gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8")) 2166 2167 # API call 2168 gw_return_object.status = self.library.GW2RegisterExportTextDumpFile( 2169 gw_return_object.session, 2170 gw_return_object.output_file 2171 ) 2172 2173 return gw_return_object 2174 2175 def _GW2RegisterLicenceFile(self, session: int, input_file: str): 2176 """ Registers a "gwkey.lic" licence from file path. 2177 2178 Args: 2179 session (int): The session integer. 2180 input_file (str): The "gwkey.lic" licence input file path. 2181 2182 Returns: 2183 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'. 2184 """ 2185 # API function declaration 2186 self.library.GW2RegisterLicenceFile.argtypes = [ 2187 ct.c_size_t, # Session_Handle session 2188 ct.c_char_p, # const char *filename 2189 ] 2190 2191 # Variable initialisation 2192 gw_return_object = glasswall.GwReturnObj() 2193 gw_return_object.session = ct.c_size_t(session) 2194 gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8")) 2195 2196 # API call 2197 gw_return_object.status = self.library.GW2RegisterLicenceFile( 2198 gw_return_object.session, 2199 gw_return_object.input_file, 2200 ) 2201 2202 return gw_return_object 2203 2204 def _GW2RegisterLicenceMemory(self, session: int, input_file: bytes): 2205 """ Registers a "gwkey.lic" licence from memory. 2206 2207 Args: 2208 session (int): The session integer. 2209 input_file (bytes): The "gwkey.lic" licence input file. 2210 2211 Returns: 2212 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 2213 """ 2214 # API function declaration 2215 self.library.GW2RegisterLicenceMemory.argtypes = [ 2216 ct.c_size_t, # Session_Handle session 2217 ct.c_char_p, # const char *filename 2218 ct.c_size_t, # size_t licenceLength 2219 ] 2220 2221 # Variable initialisation 2222 gw_return_object = glasswall.GwReturnObj() 2223 gw_return_object.session = ct.c_size_t(session) 2224 gw_return_object.buffer = ct.c_char_p(input_file) 2225 gw_return_object.buffer_length = ct.c_size_t(len(input_file)) 2226 2227 # API call 2228 gw_return_object.status = self.library.GW2RegisterLicenceMemory( 2229 gw_return_object.session, 2230 gw_return_object.buffer, 2231 gw_return_object.buffer_length 2232 ) 2233 2234 return gw_return_object 2235 2236 def register_licence(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]): 2237 """ Registers a "gwkey.lic" licence from file path or memory. 2238 2239 Args: 2240 session (int): The session integer. 2241 input_file (Union[str, bytes, bytearray, io.BytesIO]): The "gwkey.lic" licence. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object. 2242 2243 Returns: 2244 - result (glasswall.GwReturnObj): Depending on the input 'input_file': 2245 - If input_file is a str file path: 2246 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'. 2247 2248 - If input_file is a file in memory: 2249 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 2250 """ 2251 if isinstance(input_file, str): 2252 if not os.path.isfile(input_file): 2253 raise FileNotFoundError(input_file) 2254 2255 input_file = os.path.abspath(input_file) 2256 2257 result = self._GW2RegisterLicenceFile(session, input_file) 2258 2259 elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)): 2260 # Convert bytearray and io.BytesIO to bytes 2261 input_file = utils.as_bytes(input_file) 2262 2263 result = self._GW2RegisterLicenceMemory(session, input_file) 2264 2265 else: 2266 raise TypeError(input_file) 2267 2268 if result.status not in successes.success_codes: 2269 log.error(format_object(result)) 2270 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 2271 else: 2272 log.debug(format_object(result)) 2273 2274 return result
19class Editor(Library): 20 """ A high level Python wrapper for Glasswall Editor / Core2. """ 21 22 def __init__(self, library_path: str, licence: Union[str, bytes, bytearray, io.BytesIO] = None): 23 """ Initialise the Editor instance. 24 25 Args: 26 library_path (str): The file or directory path to the Editor library. 27 licence (str, bytes, bytearray, or io.BytesIO, optional): The licence file content or path. This can be: 28 - A string representing the file path to the licence. 29 - A `bytes` or `bytearray` object containing the licence data. 30 - An `io.BytesIO` object for in-memory licence data. 31 If not specified, it is assumed that the licence file is located in the same directory as the `library_path`. 32 """ 33 super().__init__(library_path) 34 self.library = self.load_library(os.path.abspath(library_path)) 35 self.licence = licence 36 37 # Validate killswitch has not activated 38 self.validate_licence() 39 40 log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") 41 42 def validate_licence(self): 43 """ Validates the licence of the library by checking the licence details. 44 45 Raises: 46 LicenceExpired: If the licence has expired or could not be validated. 47 """ 48 licence_details = self.licence_details() 49 50 bad_details = [ 51 "Unable to Read Licence Key File", 52 "Licence File Missing Required Contents", 53 "Licence Expired", 54 ] 55 56 if any(bad_detail.lower() in licence_details.lower() for bad_detail in bad_details): 57 # bad_details found in licence_details 58 log.error(f"{self.__class__.__name__} licence validation failed. Licence details:\n{licence_details}") 59 raise errors.LicenceExpired(licence_details) 60 else: 61 log.debug(f"{self.__class__.__name__} licence validated successfully. Licence details:\n{licence_details}") 62 63 def version(self): 64 """ Returns the Glasswall library version. 65 66 Returns: 67 version (str): The Glasswall library version. 68 """ 69 # API function declaration 70 self.library.GW2LibVersion.restype = ct.c_char_p 71 72 # API call 73 version = self.library.GW2LibVersion() 74 75 # Convert to Python string 76 version = ct.string_at(version).decode() 77 78 return version 79 80 def open_session(self): 81 """ Open a new Glasswall session. 82 83 Returns: 84 session (int): An incrementing integer repsenting the current session. 85 """ 86 # API call 87 session = self.library.GW2OpenSession() 88 89 log.debug(f"\n\tsession: {session}") 90 91 if self.licence: 92 # Must register licence on each GW2OpenSession if the licence is not in the same directory as the library 93 self.register_licence(session, self.licence) 94 95 return session 96 97 def close_session(self, session: int) -> int: 98 """ Close the Glasswall session. All resources allocated by the session will be destroyed. 99 100 Args: 101 session (int): The session to close. 102 103 Returns: 104 status (int): The status code of the function call. 105 """ 106 if not isinstance(session, int): 107 raise TypeError(session) 108 109 # API function declaration 110 self.library.GW2CloseSession.argtypes = [ct.c_size_t] 111 112 # Variable initialisation 113 ct_session = ct.c_size_t(session) 114 115 # API call 116 status = self.library.GW2CloseSession(ct_session) 117 118 if status not in successes.success_codes: 119 log.error(f"\n\tsession: {session}\n\tstatus: {status}") 120 else: 121 log.debug(f"\n\tsession: {session}\n\tstatus: {status}") 122 123 return status 124 125 @contextmanager 126 def new_session(self): 127 """ Context manager. Opens a new session on entry and closes the session on exit. """ 128 try: 129 session = self.open_session() 130 yield session 131 finally: 132 self.close_session(session) 133 134 def run_session(self, session): 135 """ Runs the Glasswall session and begins processing of a file. 136 137 Args: 138 session (int): The session to run. 139 140 Returns: 141 status (int): The status code of the function call. 142 """ 143 # API function declaration 144 self.library.GW2RunSession.argtypes = [ct.c_size_t] 145 146 # Variable initialisation 147 ct_session = ct.c_size_t(session) 148 149 # API call 150 status = self.library.GW2RunSession(ct_session) 151 152 if status not in successes.success_codes: 153 log.error(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}") 154 else: 155 log.debug(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}") 156 157 return status 158 159 def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True) -> Union[int, str]: 160 """ Determine the file type of a given input file, either as an integer identifier or a string. 161 162 Args: 163 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file to analyse. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object. 164 as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. 165 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 166 167 Returns: 168 file_type (Union[int, str]): The file type. 169 """ 170 if isinstance(input_file, str): 171 if not os.path.isfile(input_file): 172 raise FileNotFoundError(input_file) 173 174 # convert to ct.c_char_p of bytes 175 ct_input_file = ct.c_char_p(input_file.encode("utf-8")) 176 177 # API call 178 file_type = self.library.GW2DetermineFileTypeFromFile(ct_input_file) 179 180 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 181 # convert to bytes 182 bytes_input_file = utils.as_bytes(input_file) 183 184 # ctypes conversion 185 ct_buffer = ct.c_char_p(bytes_input_file) 186 ct_buffer_length = ct.c_size_t(len(bytes_input_file)) 187 188 # API call 189 file_type = self.library.GW2DetermineFileTypeFromMemory( 190 ct_buffer, 191 ct_buffer_length 192 ) 193 194 else: 195 raise TypeError(input_file) 196 197 file_type_as_string = dft.file_type_int_to_str(file_type) 198 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 199 200 if not dft.is_success(file_type): 201 log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 202 if raise_unsupported: 203 raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) 204 else: 205 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 206 207 if as_string: 208 return file_type_as_string 209 210 return file_type 211 212 def _GW2GetPolicySettings(self, session: int, policy_format: int = 0): 213 """ Get current policy settings for the given session. 214 215 Args: 216 session (int): The session integer. 217 policy_format (int): The format of the content management policy. 0=XML. 218 219 Returns: 220 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status', 'policy'. 221 """ 222 # API function declaration 223 self.library.GW2GetPolicySettings.argtypes = [ 224 ct.c_size_t, # Session_Handle session 225 ct.POINTER(ct.c_void_p), # char ** policiesBuffer 226 ct.POINTER(ct.c_size_t), # size_t * policiesLength 227 ct.c_int, # Policy_Format format 228 ] 229 230 # Variable initialisation 231 gw_return_object = glasswall.GwReturnObj() 232 gw_return_object.session = ct.c_size_t(session) 233 gw_return_object.buffer = ct.c_void_p() 234 gw_return_object.buffer_length = ct.c_size_t() 235 gw_return_object.policy_format = ct.c_int(policy_format) 236 237 # API Call 238 gw_return_object.status = self.library.GW2GetPolicySettings( 239 gw_return_object.session, 240 ct.byref(gw_return_object.buffer), 241 ct.byref(gw_return_object.buffer_length), 242 gw_return_object.policy_format 243 ) 244 245 # Editor wrote to a buffer, convert it to bytes 246 policy_bytes = utils.buffer_to_bytes( 247 gw_return_object.buffer, 248 gw_return_object.buffer_length 249 ) 250 251 gw_return_object.policy = policy_bytes.decode() 252 253 return gw_return_object 254 255 def get_content_management_policy(self, session: int): 256 """ Returns the content management configuration for a given session. 257 258 Args: 259 session (int): The session integer. 260 261 Returns: 262 xml_string (str): The XML string of the current content management configuration. 263 """ 264 # NOTE GW2GetPolicySettings is current not implemented in editor 265 266 # set xml_string as loaded default config 267 xml_string = glasswall.content_management.policies.Editor(default="sanitise").text, 268 269 # log.debug(f"xml_string:\n{xml_string}") 270 271 return xml_string 272 273 # # API function declaration 274 # self.library.GW2GetPolicySettings.argtypes = [ 275 # ct.c_size_t, 276 # ct.c_void_p, 277 # ] 278 279 # # Variable initialisation 280 # ct_session = ct.c_size_t(session) 281 # ct_buffer = ct.c_void_p() 282 # ct_buffer_length = ct.c_size_t() 283 # # ct_file_format = ct.c_int(file_format) 284 285 # # API Call 286 # status = self.library.GW2GetPolicySettings( 287 # ct_session, 288 # ct.byref(ct_buffer), 289 # ct.byref(ct_buffer_length) 290 # ) 291 292 # print("GW2GetPolicySettings status:", status) 293 294 # file_bytes = utils.buffer_to_bytes( 295 # ct_buffer, 296 # ct_buffer_length, 297 # ) 298 299 # return file_bytes 300 301 def _GW2RegisterPoliciesFile(self, session: int, input_file: str, policy_format: int = 0): 302 """ Registers the policies to be used by Glasswall when processing files. 303 304 Args: 305 session (int): The session integer. 306 input_file (str): The content management policy input file path. 307 policy_format (int): The format of the content management policy. 0=XML. 308 309 Returns: 310 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'. 311 """ 312 # API function declaration 313 self.library.GW2RegisterPoliciesFile.argtypes = [ 314 ct.c_size_t, # Session_Handle session 315 ct.c_char_p, # const char *filename 316 ct.c_int, # Policy_Format format 317 ] 318 319 # Variable initialisation 320 gw_return_object = glasswall.GwReturnObj() 321 gw_return_object.session = ct.c_size_t(session) 322 gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8")) 323 gw_return_object.policy_format = ct.c_int(policy_format) 324 325 gw_return_object.status = self.library.GW2RegisterPoliciesFile( 326 gw_return_object.session, 327 gw_return_object.input_file, 328 gw_return_object.policy_format 329 ) 330 331 return gw_return_object 332 333 def _GW2RegisterPoliciesMemory(self, session: int, input_file: bytes, policy_format: int = 0): 334 """ Registers the policies in memory to be used by Glasswall when processing files. 335 336 Args: 337 session (int): The session integer. 338 input_file (str): The content management policy input file bytes. 339 policy_format (int): The format of the content management policy. 0=XML. 340 341 Returns: 342 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'. 343 """ 344 # API function declaration 345 self.library.GW2RegisterPoliciesMemory.argtype = [ 346 ct.c_size_t, # Session_Handle session 347 ct.c_char_p, # const char *policies 348 ct.c_size_t, # size_t policiesLength 349 ct.c_int # Policy_Format format 350 ] 351 352 # Variable initialisation 353 gw_return_object = glasswall.GwReturnObj() 354 gw_return_object.session = ct.c_size_t(session) 355 gw_return_object.buffer = ct.c_char_p(input_file) 356 gw_return_object.buffer_length = ct.c_size_t(len(input_file)) 357 gw_return_object.policy_format = ct.c_int(policy_format) 358 359 # API Call 360 gw_return_object.status = self.library.GW2RegisterPoliciesMemory( 361 gw_return_object.session, 362 gw_return_object.buffer, 363 gw_return_object.buffer_length, 364 gw_return_object.policy_format 365 ) 366 367 return gw_return_object 368 369 def set_content_management_policy(self, session: int, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, policy_format=0): 370 """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied. 371 372 Args: 373 session (int): The session integer. 374 input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 375 policy_format (int): The format of the content management policy. 0=XML. 376 377 Returns: 378 - result (glasswall.GwReturnObj): Depending on the input 'input_file': 379 - If input_file is a str file path: 380 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'. 381 382 - If input_file is a file in memory: 383 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'. 384 """ 385 # Validate type 386 if not isinstance(session, int): 387 raise TypeError(session) 388 if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 389 raise TypeError(input_file) 390 if not isinstance(policy_format, int): 391 raise TypeError(policy_format) 392 393 # Set input_file to default if input_file is None 394 if input_file is None: 395 input_file = glasswall.content_management.policies.Editor(default="sanitise") 396 397 # Validate xml content is parsable 398 utils.validate_xml(input_file) 399 400 # From file 401 if isinstance(input_file, str) and os.path.isfile(input_file): 402 input_file = os.path.abspath(input_file) 403 404 result = self._GW2RegisterPoliciesFile(session, input_file) 405 406 # From memory 407 elif isinstance(input_file, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 408 # Convert bytearray, io.BytesIO to bytes 409 if isinstance(input_file, (bytearray, io.BytesIO)): 410 input_file = utils.as_bytes(input_file) 411 # Convert string xml or Policy to bytes 412 if isinstance(input_file, (str, glasswall.content_management.policies.policy.Policy)): 413 input_file = input_file.encode("utf-8") 414 415 result = self._GW2RegisterPoliciesMemory(session, input_file) 416 417 if result.status not in successes.success_codes: 418 log.error(format_object(result)) 419 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 420 else: 421 log.debug(format_object(result)) 422 423 return result 424 425 def _GW2RegisterInputFile(self, session: int, input_file: str): 426 """ Register an input file for the given session. 427 428 Args: 429 session (int): The session integer. 430 input_file (str): The input file path. 431 432 Returns: 433 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'. 434 """ 435 # API function declaration 436 self.library.GW2RegisterInputFile.argtypes = [ 437 ct.c_size_t, # Session_Handle session 438 ct.c_char_p # const char * inputFilePath 439 ] 440 441 # Variable initialisation 442 gw_return_object = glasswall.GwReturnObj() 443 gw_return_object.session = ct.c_size_t(session) 444 gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8")) 445 gw_return_object.input_file_path = input_file 446 447 # API call 448 gw_return_object.status = self.library.GW2RegisterInputFile( 449 gw_return_object.session, 450 gw_return_object.input_file 451 ) 452 453 return gw_return_object 454 455 def _GW2RegisterInputMemory(self, session: int, input_file: bytes): 456 """ Register an input file in memory for the given session. 457 458 Args: 459 session (int): The session integer. 460 input_file (bytes): The input file in memory. 461 462 Returns: 463 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 464 """ 465 # API function declaration 466 self.library.GW2RegisterInputMemory.argtypes = [ 467 ct.c_size_t, # Session_Handle session 468 ct.c_char_p, # const char * inputFileBuffer 469 ct.c_size_t, # size_t inputLength 470 ] 471 472 # Variable initialisation 473 gw_return_object = glasswall.GwReturnObj() 474 gw_return_object.session = ct.c_size_t(session) 475 gw_return_object.buffer = ct.c_char_p(input_file) 476 gw_return_object.buffer_length = ct.c_size_t(len(input_file)) 477 478 # API call 479 gw_return_object.status = self.library.GW2RegisterInputMemory( 480 gw_return_object.session, 481 gw_return_object.buffer, 482 gw_return_object.buffer_length 483 ) 484 485 return gw_return_object 486 487 def register_input(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]): 488 """ Register an input file or bytes for the given session. 489 490 Args: 491 session (int): The session integer. 492 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 493 494 Returns: 495 - result (glasswall.GwReturnObj): Depending on the input 'input_file': 496 - If input_file is a str file path: 497 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'. 498 499 - If input_file is a file in memory: 500 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 501 """ 502 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)): 503 raise TypeError(input_file) 504 505 if isinstance(input_file, str): 506 if not os.path.isfile(input_file): 507 raise FileNotFoundError(input_file) 508 509 input_file = os.path.abspath(input_file) 510 511 result = self._GW2RegisterInputFile(session, input_file) 512 513 elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)): 514 # Convert bytearray and io.BytesIO to bytes 515 input_file = utils.as_bytes(input_file) 516 517 result = self._GW2RegisterInputMemory(session, input_file) 518 519 if result.status not in successes.success_codes: 520 log.error(format_object(result)) 521 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 522 else: 523 log.debug(format_object(result)) 524 525 return result 526 527 def _GW2RegisterOutFile(self, session: int, output_file: str): 528 """ Register an output file for the given session. 529 530 Args: 531 session (int): The session integer. 532 output_file (str): The output file path. 533 534 Returns: 535 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 536 """ 537 # API function declaration 538 self.library.GW2RegisterOutFile.argtypes = [ 539 ct.c_size_t, # Session_Handle session 540 ct.c_char_p # const char * outputFilePath 541 ] 542 543 # Variable initialisation 544 gw_return_object = glasswall.GwReturnObj() 545 gw_return_object.session = ct.c_size_t(session) 546 gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8")) 547 548 # API call 549 gw_return_object.status = self.library.GW2RegisterOutFile( 550 gw_return_object.session, 551 gw_return_object.output_file 552 ) 553 554 return gw_return_object 555 556 def _GW2RegisterOutputMemory(self, session: int): 557 """ Register an output file in memory for the given session. 558 559 Args: 560 session (int): The session integer. 561 562 Returns: 563 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 564 """ 565 # API function declaration 566 self.library.GW2RegisterOutputMemory.argtypes = [ 567 ct.c_size_t, # Session_Handle session 568 ct.POINTER(ct.c_void_p), # char ** outputBuffer 569 ct.POINTER(ct.c_size_t) # size_t * outputLength 570 ] 571 572 # Variable initialisation 573 gw_return_object = glasswall.GwReturnObj() 574 gw_return_object.session = ct.c_size_t(session) 575 gw_return_object.buffer = ct.c_void_p() 576 gw_return_object.buffer_length = ct.c_size_t() 577 578 # API call 579 gw_return_object.status = self.library.GW2RegisterOutputMemory( 580 gw_return_object.session, 581 ct.byref(gw_return_object.buffer), 582 ct.byref(gw_return_object.buffer_length) 583 ) 584 585 return gw_return_object 586 587 def register_output(self, session, output_file: Optional[str] = None): 588 """ Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes. 589 590 Args: 591 session (int): The session integer. 592 output_file (Optional[str]): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes. 593 594 Returns: 595 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. 596 """ 597 if not isinstance(output_file, (type(None), str)): 598 raise TypeError(output_file) 599 600 if isinstance(output_file, str): 601 output_file = os.path.abspath(output_file) 602 603 result = self._GW2RegisterOutFile(session, output_file) 604 605 elif isinstance(output_file, type(None)): 606 result = self._GW2RegisterOutputMemory(session) 607 608 if result.status not in successes.success_codes: 609 log.error(format_object(result)) 610 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 611 else: 612 log.debug(format_object(result)) 613 614 return result 615 616 def _GW2RegisterAnalysisFile(self, session: int, output_file: str, analysis_format: int = 0): 617 """ Register an analysis output file for the given session. 618 619 Args: 620 session (int): The session integer. 621 output_file (str): The analysis output file path. 622 analysis_format (int): The format of the analysis report. 0=XML, 1=XMLExtended 623 624 Returns: 625 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'analysis_format', 'status'. 626 """ 627 # API function declaration 628 self.library.GW2RegisterAnalysisFile.argtypes = [ 629 ct.c_size_t, # Session_Handle session 630 ct.c_char_p, # const char * analysisFilePathName 631 ct.c_int, # Analysis_Format format 632 ] 633 634 # Variable initialisation 635 gw_return_object = glasswall.GwReturnObj() 636 gw_return_object.session = ct.c_size_t(session) 637 gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8")) 638 gw_return_object.analysis_format = ct.c_int() 639 640 # API call 641 gw_return_object.status = self.library.GW2RegisterAnalysisFile( 642 gw_return_object.session, 643 gw_return_object.output_file, 644 gw_return_object.analysis_format 645 ) 646 647 return gw_return_object 648 649 def _GW2RegisterAnalysisMemory(self, session: int, analysis_format: int = 0): 650 """ Register an analysis output file in memory for the given session. 651 652 Args: 653 session (int): The session integer. 654 analysis_format (int): The format of the analysis report. 0=XML, 1=XMLExtended 655 656 Returns: 657 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'analysis_format', 'status'. 658 """ 659 # API function declaration 660 self.library.GW2RegisterAnalysisMemory.argtypes = [ 661 ct.c_size_t, # Session_Handle session 662 ct.POINTER(ct.c_void_p), # char ** analysisFileBuffer 663 ct.POINTER(ct.c_size_t), # size_t * analysisoutputLength 664 ct.c_int # Analysis_Format format 665 ] 666 667 # Variable initialisation 668 gw_return_object = glasswall.GwReturnObj() 669 gw_return_object.session = ct.c_size_t(session) 670 gw_return_object.buffer = ct.c_void_p() 671 gw_return_object.buffer_length = ct.c_size_t() 672 gw_return_object.analysis_format = ct.c_int() 673 674 # API call 675 gw_return_object.status = self.library.GW2RegisterAnalysisMemory( 676 gw_return_object.session, 677 ct.byref(gw_return_object.buffer), 678 ct.byref(gw_return_object.buffer_length), 679 gw_return_object.analysis_format 680 ) 681 682 return gw_return_object 683 684 def register_analysis(self, session: int, output_file: Optional[str] = None): 685 """ Registers an analysis file for the given session. The analysis file will be created during the session's run_session call. 686 687 Args: 688 session (int): The session integer. 689 output_file (Optional[str]): Default None. The file path where the analysis will be written. None returns the analysis as bytes. 690 691 Returns: 692 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'status', 'session', 'analysis_format'. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. If output_file is not None (file mode) 'output_file' is included. 693 """ 694 if not isinstance(output_file, (type(None), str)): 695 raise TypeError(output_file) 696 697 if isinstance(output_file, str): 698 output_file = os.path.abspath(output_file) 699 700 result = self._GW2RegisterAnalysisFile(session, output_file) 701 702 elif isinstance(output_file, type(None)): 703 result = self._GW2RegisterAnalysisMemory(session) 704 705 if result.status not in successes.success_codes: 706 log.error(format_object(result)) 707 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 708 else: 709 log.debug(format_object(result)) 710 711 return result 712 713 def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 714 """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided. 715 716 Args: 717 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 718 output_file (Optional[str]): The output file path where the protected file will be written. 719 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 720 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 721 722 Returns: 723 file_bytes (bytes): The protected file bytes. 724 """ 725 # Validate arg types 726 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 727 raise TypeError(input_file) 728 if not isinstance(output_file, (type(None), str)): 729 raise TypeError(output_file) 730 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 731 raise TypeError(content_management_policy) 732 if not isinstance(raise_unsupported, bool): 733 raise TypeError(raise_unsupported) 734 735 # Convert string path arguments to absolute paths 736 if isinstance(input_file, str): 737 if not os.path.isfile(input_file): 738 raise FileNotFoundError(input_file) 739 input_file = os.path.abspath(input_file) 740 if isinstance(output_file, str): 741 output_file = os.path.abspath(output_file) 742 # make directories that do not exist 743 os.makedirs(os.path.dirname(output_file), exist_ok=True) 744 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 745 content_management_policy = os.path.abspath(content_management_policy) 746 747 # Convert memory inputs to bytes 748 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 749 input_file = utils.as_bytes(input_file) 750 751 with utils.CwdHandler(self.library_path): 752 with self.new_session() as session: 753 content_management_policy = self.set_content_management_policy(session, content_management_policy) 754 register_input = self.register_input(session, input_file) 755 register_output = self.register_output(session, output_file=output_file) 756 status = self.run_session(session) 757 758 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 759 if status not in successes.success_codes: 760 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 761 if raise_unsupported: 762 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 763 else: 764 file_bytes = None 765 else: 766 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 767 # Get file bytes 768 if isinstance(output_file, str): 769 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 770 if not os.path.isfile(output_file): 771 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 772 file_bytes = None 773 else: 774 with open(output_file, "rb") as f: 775 file_bytes = f.read() 776 else: 777 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 778 file_bytes = utils.buffer_to_bytes( 779 register_output.buffer, 780 register_output.buffer_length 781 ) 782 783 # Ensure memory allocated is not garbage collected 784 content_management_policy, register_input, register_output 785 786 return file_bytes 787 788 def protect_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 789 """ Recursively processes all files in a directory in protect mode using the given content management policy. 790 The protected files are written to output_directory maintaining the same directory structure as input_directory. 791 792 Args: 793 input_directory (str): The input directory containing files to protect. 794 output_directory (Optional[str]): The output directory where the protected file will be written, or None to not write files. 795 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 796 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 797 798 Returns: 799 protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 800 """ 801 protected_files_dict = {} 802 # Call protect_file on each file in input_directory to output_directory 803 for input_file in utils.list_file_paths(input_directory): 804 relative_path = os.path.relpath(input_file, input_directory) 805 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 806 807 protected_bytes = self.protect_file( 808 input_file=input_file, 809 output_file=output_file, 810 raise_unsupported=raise_unsupported, 811 content_management_policy=content_management_policy, 812 ) 813 814 protected_files_dict[relative_path] = protected_bytes 815 816 return protected_files_dict 817 818 def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 819 """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided. 820 821 Args: 822 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 823 output_file (Optional[str]): The output file path where the analysis file will be written. 824 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 825 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 826 827 Returns: 828 file_bytes (bytes): The analysis file bytes. 829 """ 830 # Validate arg types 831 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 832 raise TypeError(input_file) 833 if not isinstance(output_file, (type(None), str)): 834 raise TypeError(output_file) 835 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 836 raise TypeError(content_management_policy) 837 if not isinstance(raise_unsupported, bool): 838 raise TypeError(raise_unsupported) 839 840 # Convert string path arguments to absolute paths 841 if isinstance(input_file, str): 842 if not os.path.isfile(input_file): 843 raise FileNotFoundError(input_file) 844 input_file = os.path.abspath(input_file) 845 if isinstance(output_file, str): 846 output_file = os.path.abspath(output_file) 847 # make directories that do not exist 848 os.makedirs(os.path.dirname(output_file), exist_ok=True) 849 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 850 content_management_policy = os.path.abspath(content_management_policy) 851 852 # Convert memory inputs to bytes 853 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 854 input_file = utils.as_bytes(input_file) 855 856 with utils.CwdHandler(self.library_path): 857 with self.new_session() as session: 858 content_management_policy = self.set_content_management_policy(session, content_management_policy) 859 register_input = self.register_input(session, input_file) 860 register_analysis = self.register_analysis(session, output_file) 861 status = self.run_session(session) 862 863 file_bytes = None 864 if isinstance(output_file, str): 865 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 866 if os.path.isfile(output_file): 867 with open(output_file, "rb") as f: 868 file_bytes = f.read() 869 else: 870 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 871 if register_analysis.buffer and register_analysis.buffer_length: 872 file_bytes = utils.buffer_to_bytes( 873 register_analysis.buffer, 874 register_analysis.buffer_length 875 ) 876 877 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 878 if status not in successes.success_codes: 879 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 880 if raise_unsupported: 881 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 882 else: 883 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 884 885 # Ensure memory allocated is not garbage collected 886 content_management_policy, register_input, register_analysis 887 888 return file_bytes 889 890 def analyse_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 891 """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory. 892 893 Args: 894 input_directory (str): The input directory containing files to analyse. 895 output_directory (Optional[str]): The output directory where the analysis files will be written, or None to not write files. 896 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 897 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 898 899 Returns: 900 analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 901 """ 902 analysis_files_dict = {} 903 # Call analyse_file on each file in input_directory to output_directory 904 for input_file in utils.list_file_paths(input_directory): 905 relative_path = os.path.relpath(input_file, input_directory) + ".xml" 906 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 907 908 analysis_bytes = self.analyse_file( 909 input_file=input_file, 910 output_file=output_file, 911 raise_unsupported=raise_unsupported, 912 content_management_policy=content_management_policy, 913 ) 914 915 analysis_files_dict[relative_path] = analysis_bytes 916 917 return analysis_files_dict 918 919 def protect_and_analyse_file( 920 self, 921 input_file: Union[str, bytes, bytearray, io.BytesIO], 922 output_file: Optional[str] = None, 923 output_analysis_report: Optional[str] = None, 924 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 925 raise_unsupported: bool = True, 926 ): 927 """ Protects and analyses a file in a single session, returning both protected file bytes and analysis report bytes. 928 929 Args: 930 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 931 output_file (Optional[str]): The output file path where the protected file will be written. 932 output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written. 933 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 934 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 935 936 Returns: 937 Tuple[Optional[bytes], Optional[bytes]]: A tuple of (protected_file_bytes, analysis_report_bytes). 938 """ 939 # Validate arg types 940 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 941 raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}") 942 if not isinstance(output_file, (type(None), str)): 943 raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}") 944 if not isinstance(output_analysis_report, (type(None), str)): 945 raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}") 946 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 947 raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}") 948 if not isinstance(raise_unsupported, bool): 949 raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}") 950 951 # Convert string path arguments to absolute paths 952 if isinstance(input_file, str): 953 if not os.path.isfile(input_file): 954 raise FileNotFoundError(input_file) 955 input_file = os.path.abspath(input_file) 956 if isinstance(output_file, str): 957 output_file = os.path.abspath(output_file) 958 os.makedirs(os.path.dirname(output_file), exist_ok=True) 959 if isinstance(output_analysis_report, str): 960 output_analysis_report = os.path.abspath(output_analysis_report) 961 os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True) 962 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 963 content_management_policy = os.path.abspath(content_management_policy) 964 965 # Convert memory inputs to bytes 966 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 967 input_file = utils.as_bytes(input_file) 968 969 with utils.CwdHandler(self.library_path): 970 with self.new_session() as session: 971 content_management_policy = self.set_content_management_policy(session, content_management_policy) 972 register_input = self.register_input(session, input_file) 973 register_output = self.register_output(session, output_file) 974 register_analysis = self.register_analysis(session, output_analysis_report) 975 976 status = self.run_session(session) 977 978 protected_file_bytes = None 979 analysis_report_bytes = None 980 981 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 982 if status not in successes.success_codes: 983 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 984 if raise_unsupported: 985 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 986 987 # Get analysis report file bytes, even on processing failure 988 if isinstance(output_analysis_report, str): 989 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 990 if not os.path.isfile(output_analysis_report): 991 log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}") 992 else: 993 with open(output_analysis_report, "rb") as f: 994 analysis_report_bytes = f.read() 995 else: 996 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 997 if register_analysis.buffer and register_analysis.buffer_length: 998 analysis_report_bytes = utils.buffer_to_bytes( 999 register_analysis.buffer, 1000 register_analysis.buffer_length 1001 ) 1002 1003 # On success, get protected file bytes 1004 if status in successes.success_codes: 1005 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 1006 # Get file bytes 1007 if isinstance(output_file, str): 1008 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1009 if not os.path.isfile(output_file): 1010 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1011 else: 1012 with open(output_file, "rb") as f: 1013 protected_file_bytes = f.read() 1014 else: 1015 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1016 protected_file_bytes = utils.buffer_to_bytes( 1017 register_output.buffer, 1018 register_output.buffer_length 1019 ) 1020 1021 # Ensure memory allocated is not garbage collected 1022 content_management_policy, register_input, register_output, register_analysis 1023 1024 return protected_file_bytes, analysis_report_bytes 1025 1026 def protect_and_analyse_directory( 1027 self, 1028 input_directory: str, 1029 output_directory: Optional[str], 1030 analysis_directory: Optional[str], 1031 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 1032 raise_unsupported: bool = True, 1033 ): 1034 """ Recursively processes all files in a directory using protect and analyse mode with the given content management policy. 1035 Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory. 1036 1037 Args: 1038 input_directory (str): The input directory containing files to process. 1039 output_directory (Optional[str]): The output directory for protected files. 1040 analysis_directory (Optional[str]): The output directory for XML analysis reports. 1041 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 1042 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1043 1044 Returns: 1045 result_dict (dict): A dictionary mapping relative file paths to tuples of (protected_file_bytes, analysis_report_bytes). 1046 """ 1047 result_dict = {} 1048 for input_file in utils.list_file_paths(input_directory): 1049 relative_path = os.path.relpath(input_file, input_directory) 1050 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 1051 output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml") 1052 1053 protected_file_bytes, analysis_report_bytes = self.protect_and_analyse_file( 1054 input_file=input_file, 1055 output_file=output_file, 1056 output_analysis_report=output_analysis_report, 1057 content_management_policy=content_management_policy, 1058 raise_unsupported=raise_unsupported, 1059 ) 1060 1061 result_dict[relative_path] = (protected_file_bytes, analysis_report_bytes) 1062 1063 return result_dict 1064 1065 def _GW2RegisterExportFile(self, session: int, output_file: str): 1066 """ Register an export output file for the given session. 1067 1068 Args: 1069 session (int): The session integer. 1070 output_file (str): The export output file path. 1071 1072 Returns: 1073 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 1074 """ 1075 # API function declaration 1076 self.library.GW2RegisterExportFile.argtypes = [ 1077 ct.c_size_t, # Session_Handle session 1078 ct.c_char_p # const char * exportFilePath 1079 ] 1080 1081 # Variable initialisation 1082 gw_return_object = glasswall.GwReturnObj() 1083 gw_return_object.session = ct.c_size_t(session) 1084 gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8")) 1085 1086 # API Call 1087 gw_return_object.status = self.library.GW2RegisterExportFile( 1088 gw_return_object.session, 1089 gw_return_object.output_file 1090 ) 1091 1092 return gw_return_object 1093 1094 def _GW2RegisterExportMemory(self, session: int): 1095 """ Register an export output file in memory for the given session. 1096 1097 Args: 1098 session (int): The session integer. 1099 1100 Returns: 1101 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 1102 """ 1103 # API function declaration 1104 self.library.GW2RegisterExportMemory.argtypes = [ 1105 ct.c_size_t, # Session_Handle session 1106 ct.POINTER(ct.c_void_p), # char ** exportFileBuffer 1107 ct.POINTER(ct.c_size_t) # size_t * exportLength 1108 ] 1109 1110 # Variable initialisation 1111 gw_return_object = glasswall.GwReturnObj() 1112 gw_return_object.session = ct.c_size_t(session) 1113 gw_return_object.buffer = ct.c_void_p() 1114 gw_return_object.buffer_length = ct.c_size_t() 1115 1116 # API call 1117 gw_return_object.status = self.library.GW2RegisterExportMemory( 1118 gw_return_object.session, 1119 ct.byref(gw_return_object.buffer), 1120 ct.byref(gw_return_object.buffer_length) 1121 ) 1122 1123 return gw_return_object 1124 1125 def register_export(self, session: int, output_file: Optional[str] = None): 1126 """ Registers a file to be exported for the given session. The export file will be created during the session's run_session call. 1127 1128 Args: 1129 session (int): The session integer. 1130 output_file (Optional[str]): Default None. The file path where the export will be written. None exports the file in memory. 1131 1132 Returns: 1133 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call and 'session', the session integer. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. 1134 """ 1135 if not isinstance(output_file, (type(None), str)): 1136 raise TypeError(output_file) 1137 1138 if isinstance(output_file, str): 1139 output_file = os.path.abspath(output_file) 1140 1141 result = self._GW2RegisterExportFile(session, output_file) 1142 1143 elif isinstance(output_file, type(None)): 1144 result = self._GW2RegisterExportMemory(session) 1145 1146 if result.status not in successes.success_codes: 1147 log.error(format_object(result)) 1148 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1149 else: 1150 log.debug(format_object(result)) 1151 1152 return result 1153 1154 def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1155 """ Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided. 1156 1157 Args: 1158 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 1159 output_file (Optional[str]): The output file path where the .zip file will be written. 1160 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 1161 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1162 1163 Returns: 1164 file_bytes (bytes): The exported .zip file. 1165 """ 1166 # Validate arg types 1167 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 1168 raise TypeError(input_file) 1169 if not isinstance(output_file, (type(None), str)): 1170 raise TypeError(output_file) 1171 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 1172 raise TypeError(content_management_policy) 1173 if not isinstance(raise_unsupported, bool): 1174 raise TypeError(raise_unsupported) 1175 1176 # Convert string path arguments to absolute paths 1177 if isinstance(input_file, str): 1178 if not os.path.isfile(input_file): 1179 raise FileNotFoundError(input_file) 1180 input_file = os.path.abspath(input_file) 1181 if isinstance(output_file, str): 1182 output_file = os.path.abspath(output_file) 1183 # make directories that do not exist 1184 os.makedirs(os.path.dirname(output_file), exist_ok=True) 1185 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 1186 content_management_policy = os.path.abspath(content_management_policy) 1187 1188 # Convert memory inputs to bytes 1189 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 1190 input_file = utils.as_bytes(input_file) 1191 1192 with utils.CwdHandler(self.library_path): 1193 with self.new_session() as session: 1194 content_management_policy = self.set_content_management_policy(session, content_management_policy) 1195 register_input = self.register_input(session, input_file) 1196 register_export = self.register_export(session, output_file) 1197 status = self.run_session(session) 1198 1199 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 1200 if status not in successes.success_codes: 1201 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1202 if raise_unsupported: 1203 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1204 else: 1205 file_bytes = None 1206 else: 1207 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1208 # Get file bytes 1209 if isinstance(output_file, str): 1210 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1211 if not os.path.isfile(output_file): 1212 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1213 file_bytes = None 1214 else: 1215 with open(output_file, "rb") as f: 1216 file_bytes = f.read() 1217 else: 1218 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1219 file_bytes = utils.buffer_to_bytes( 1220 register_export.buffer, 1221 register_export.buffer_length 1222 ) 1223 1224 # Ensure memory allocated is not garbage collected 1225 content_management_policy, register_input, register_export 1226 1227 return file_bytes 1228 1229 def export_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1230 """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory. 1231 1232 Args: 1233 input_directory (str): The input directory containing files to export. 1234 output_directory (Optional[str]): The output directory where the export files will be written, or None to not write files. 1235 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 1236 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1237 1238 Returns: 1239 export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 1240 """ 1241 export_files_dict = {} 1242 # Call export_file on each file in input_directory to output_directory 1243 for input_file in utils.list_file_paths(input_directory): 1244 relative_path = os.path.relpath(input_file, input_directory) + ".zip" 1245 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 1246 1247 export_bytes = self.export_file( 1248 input_file=input_file, 1249 output_file=output_file, 1250 raise_unsupported=raise_unsupported, 1251 content_management_policy=content_management_policy, 1252 ) 1253 1254 export_files_dict[relative_path] = export_bytes 1255 1256 return export_files_dict 1257 1258 def export_and_analyse_file( 1259 self, 1260 input_file: Union[str, bytes, bytearray, io.BytesIO], 1261 output_file: Optional[str] = None, 1262 output_analysis_report: Optional[str] = None, 1263 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 1264 raise_unsupported: bool = True, 1265 ): 1266 """ Exports and analyses a file in a single session, returning both exported .zip bytes and analysis report bytes. 1267 1268 Args: 1269 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 1270 output_file (Optional[str]): The output file path where the .zip export will be written. 1271 output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written. 1272 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 1273 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1274 1275 Returns: 1276 Tuple[Optional[bytes], Optional[bytes]]: A tuple of (exported_file_bytes, analysis_report_bytes). 1277 """ 1278 # Validate arg types 1279 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 1280 raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}") 1281 if not isinstance(output_file, (type(None), str)): 1282 raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}") 1283 if not isinstance(output_analysis_report, (type(None), str)): 1284 raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}") 1285 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 1286 raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}") 1287 if not isinstance(raise_unsupported, bool): 1288 raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}") 1289 1290 # Convert string path arguments to absolute paths 1291 if isinstance(input_file, str): 1292 if not os.path.isfile(input_file): 1293 raise FileNotFoundError(input_file) 1294 input_file = os.path.abspath(input_file) 1295 if isinstance(output_file, str): 1296 output_file = os.path.abspath(output_file) 1297 os.makedirs(os.path.dirname(output_file), exist_ok=True) 1298 if isinstance(output_analysis_report, str): 1299 output_analysis_report = os.path.abspath(output_analysis_report) 1300 os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True) 1301 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 1302 content_management_policy = os.path.abspath(content_management_policy) 1303 1304 # Convert memory inputs to bytes 1305 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 1306 input_file = utils.as_bytes(input_file) 1307 1308 with utils.CwdHandler(self.library_path): 1309 with self.new_session() as session: 1310 content_management_policy = self.set_content_management_policy(session, content_management_policy) 1311 register_input = self.register_input(session, input_file) 1312 register_export = self.register_export(session, output_file) 1313 register_analysis = self.register_analysis(session, output_analysis_report) 1314 1315 status = self.run_session(session) 1316 1317 export_file_bytes = None 1318 analysis_report_bytes = None 1319 1320 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 1321 if status not in successes.success_codes: 1322 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 1323 if raise_unsupported: 1324 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1325 1326 # Get analysis report file bytes, even on processing failure 1327 if isinstance(output_analysis_report, str): 1328 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1329 if not os.path.isfile(output_analysis_report): 1330 log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}") 1331 else: 1332 with open(output_analysis_report, "rb") as f: 1333 analysis_report_bytes = f.read() 1334 else: 1335 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1336 if register_analysis.buffer and register_analysis.buffer_length: 1337 analysis_report_bytes = utils.buffer_to_bytes( 1338 register_analysis.buffer, 1339 register_analysis.buffer_length 1340 ) 1341 1342 # On success, get export file bytes 1343 if status in successes.success_codes: 1344 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 1345 # Get export file bytes 1346 if isinstance(output_file, str): 1347 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1348 if not os.path.isfile(output_file): 1349 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1350 else: 1351 with open(output_file, "rb") as f: 1352 export_file_bytes = f.read() 1353 else: 1354 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1355 export_file_bytes = utils.buffer_to_bytes( 1356 register_export.buffer, 1357 register_export.buffer_length 1358 ) 1359 1360 # Ensure memory allocated is not garbage collected 1361 content_management_policy, register_input, register_export, register_analysis 1362 1363 return export_file_bytes, analysis_report_bytes 1364 1365 def export_and_analyse_directory( 1366 self, 1367 input_directory: str, 1368 output_directory: Optional[str], 1369 analysis_directory: Optional[str], 1370 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 1371 raise_unsupported: bool = True, 1372 ): 1373 """ Recursively processes all files in a directory using export and analyse mode with the given content management policy. 1374 Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory. 1375 1376 Args: 1377 input_directory (str): The input directory containing files to process. 1378 output_directory (Optional[str]): The output directory for exported .zip files. 1379 analysis_directory (Optional[str]): The output directory for XML analysis reports. 1380 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 1381 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1382 1383 Returns: 1384 result_dict (dict): A dictionary mapping relative file paths to tuples of (exported_file_bytes, analysis_report_bytes). 1385 """ 1386 result_dict = {} 1387 1388 for input_file in utils.list_file_paths(input_directory): 1389 relative_path = os.path.relpath(input_file, input_directory) 1390 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path + ".zip") 1391 output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml") 1392 1393 export_file_bytes, analysis_report_bytes = self.export_and_analyse_file( 1394 input_file=input_file, 1395 output_file=output_file, 1396 output_analysis_report=output_analysis_report, 1397 content_management_policy=content_management_policy, 1398 raise_unsupported=raise_unsupported, 1399 ) 1400 1401 result_dict[relative_path] = (export_file_bytes, analysis_report_bytes) 1402 1403 return result_dict 1404 1405 def _GW2RegisterImportFile(self, session: int, input_file: str): 1406 """ Register an import input file for the given session. 1407 1408 Args: 1409 session (int): The session integer. 1410 input_file (str): The input import file path. 1411 1412 Returns: 1413 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 1414 """ 1415 # API function declaration 1416 self.library.GW2RegisterImportFile.argtypes = [ 1417 ct.c_size_t, # Session_Handle session 1418 ct.c_char_p # const char * importFilePath 1419 ] 1420 1421 # Variable initialisation 1422 gw_return_object = glasswall.GwReturnObj() 1423 gw_return_object.session = ct.c_size_t(session) 1424 gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8")) 1425 1426 # API Call 1427 gw_return_object.status = self.library.GW2RegisterImportFile( 1428 gw_return_object.session, 1429 gw_return_object.input_file 1430 ) 1431 1432 return gw_return_object 1433 1434 def _GW2RegisterImportMemory(self, session: int, input_file: bytes): 1435 """ Register an import input file in memory for the given session. 1436 1437 Args: 1438 session (int): The session integer. 1439 input_file (str): The input import file in memory. 1440 1441 Returns: 1442 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 1443 """ 1444 # API function declaration 1445 self.library.GW2RegisterImportMemory.argtypes = [ 1446 ct.c_size_t, # Session_Handle session 1447 ct.c_void_p, # char * importFileBuffer 1448 ct.c_size_t # size_t importLength 1449 ] 1450 1451 # Variable initialisation 1452 gw_return_object = glasswall.GwReturnObj() 1453 gw_return_object.session = ct.c_size_t(session) 1454 gw_return_object.buffer = ct.c_char_p(input_file) 1455 gw_return_object.buffer_length = ct.c_size_t(len(input_file)) 1456 1457 # API call 1458 gw_return_object.status = self.library.GW2RegisterImportMemory( 1459 gw_return_object.session, 1460 gw_return_object.buffer, 1461 gw_return_object.buffer_length 1462 ) 1463 1464 return gw_return_object 1465 1466 def register_import(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]): 1467 """ Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call. 1468 1469 Args: 1470 session (int): The session integer. 1471 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input import file path or bytes. 1472 1473 Returns: 1474 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. 1475 """ 1476 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)): 1477 raise TypeError(input_file) 1478 1479 if isinstance(input_file, str): 1480 if not os.path.isfile(input_file): 1481 raise FileNotFoundError(input_file) 1482 1483 input_file = os.path.abspath(input_file) 1484 1485 result = self._GW2RegisterImportFile(session, input_file) 1486 1487 elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)): 1488 # Convert bytearray and io.BytesIO to bytes 1489 input_file = utils.as_bytes(input_file) 1490 1491 result = self._GW2RegisterImportMemory(session, input_file) 1492 1493 if result.status not in successes.success_codes: 1494 log.error(format_object(result)) 1495 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1496 else: 1497 log.debug(format_object(result)) 1498 1499 return result 1500 1501 def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1502 """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided. 1503 1504 Args: 1505 input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. 1506 output_file (Optional[str]): The output file path where the constructed file will be written. 1507 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 1508 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1509 1510 Returns: 1511 file_bytes (bytes): The imported file bytes. 1512 """ 1513 # Validate arg types 1514 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 1515 raise TypeError(input_file) 1516 if not isinstance(output_file, (type(None), str)): 1517 raise TypeError(output_file) 1518 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 1519 raise TypeError(content_management_policy) 1520 if not isinstance(raise_unsupported, bool): 1521 raise TypeError(raise_unsupported) 1522 1523 # Convert string path arguments to absolute paths 1524 if isinstance(input_file, str): 1525 if not os.path.isfile(input_file): 1526 raise FileNotFoundError(input_file) 1527 input_file = os.path.abspath(input_file) 1528 if isinstance(output_file, str): 1529 output_file = os.path.abspath(output_file) 1530 # make directories that do not exist 1531 os.makedirs(os.path.dirname(output_file), exist_ok=True) 1532 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 1533 content_management_policy = os.path.abspath(content_management_policy) 1534 1535 # Convert memory inputs to bytes 1536 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 1537 input_file = utils.as_bytes(input_file) 1538 1539 with utils.CwdHandler(self.library_path): 1540 with self.new_session() as session: 1541 content_management_policy = self.set_content_management_policy(session, content_management_policy) 1542 register_import = self.register_import(session, input_file) 1543 register_output = self.register_output(session, output_file) 1544 status = self.run_session(session) 1545 1546 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 1547 if status not in successes.success_codes: 1548 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1549 if raise_unsupported: 1550 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1551 else: 1552 file_bytes = None 1553 else: 1554 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1555 # Get file bytes 1556 if isinstance(output_file, str): 1557 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1558 if not os.path.isfile(output_file): 1559 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1560 file_bytes = None 1561 else: 1562 with open(output_file, "rb") as f: 1563 file_bytes = f.read() 1564 else: 1565 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1566 file_bytes = utils.buffer_to_bytes( 1567 register_output.buffer, 1568 register_output.buffer_length 1569 ) 1570 1571 # Ensure memory allocated is not garbage collected 1572 content_management_policy, register_import, register_output 1573 1574 return file_bytes 1575 1576 def import_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1577 """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. 1578 The constructed files are written to output_directory maintaining the same directory structure as input_directory. 1579 1580 Args: 1581 input_directory (str): The input directory containing files to import. 1582 output_directory (Optional[str]): The output directory where the constructed files will be written, or None to not write files. 1583 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 1584 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1585 1586 Returns: 1587 import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 1588 """ 1589 import_files_dict = {} 1590 # Call import_file on each file in input_directory to output_directory 1591 for input_file in utils.list_file_paths(input_directory): 1592 relative_path = os.path.relpath(input_file, input_directory) 1593 # Remove .zip extension from relative_path 1594 relative_path = os.path.splitext(relative_path)[0] 1595 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 1596 1597 import_bytes = self.import_file( 1598 input_file=input_file, 1599 output_file=output_file, 1600 raise_unsupported=raise_unsupported, 1601 content_management_policy=content_management_policy, 1602 ) 1603 1604 import_files_dict[relative_path] = import_bytes 1605 1606 return import_files_dict 1607 1608 def _GW2FileErrorMsg(self, session: int): 1609 """ Retrieve the Glasswall Session Process error message. 1610 1611 Args: 1612 session (int): The session integer. 1613 1614 Returns: 1615 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status', 'error_message'. 1616 """ 1617 # API function declaration 1618 self.library.GW2FileErrorMsg.argtypes = [ 1619 ct.c_size_t, # Session_Handle session 1620 ct.POINTER(ct.c_void_p), # char **errorMsgBuffer 1621 ct.POINTER(ct.c_size_t) # size_t *errorMsgBufferLength 1622 ] 1623 1624 # Variable initialisation 1625 gw_return_object = glasswall.GwReturnObj() 1626 gw_return_object.session = ct.c_size_t(session) 1627 gw_return_object.buffer = ct.c_void_p() 1628 gw_return_object.buffer_length = ct.c_size_t() 1629 1630 # API call 1631 gw_return_object.status = self.library.GW2FileErrorMsg( 1632 gw_return_object.session, 1633 ct.byref(gw_return_object.buffer), 1634 ct.byref(gw_return_object.buffer_length) 1635 ) 1636 1637 # Editor wrote to a buffer, convert it to bytes 1638 error_bytes = utils.buffer_to_bytes( 1639 gw_return_object.buffer, 1640 gw_return_object.buffer_length 1641 ) 1642 1643 gw_return_object.error_message = error_bytes.decode() 1644 1645 return gw_return_object 1646 1647 @functools.lru_cache() 1648 def file_error_message(self, session: int) -> str: 1649 """ Retrieve the Glasswall Session Process error message. 1650 1651 Args: 1652 session (int): The session integer. 1653 1654 Returns: 1655 error_message (str): The Glasswall Session Process error message. 1656 """ 1657 # Validate arg types 1658 if not isinstance(session, int): 1659 raise TypeError(session) 1660 1661 result = self._GW2FileErrorMsg(session) 1662 1663 if result.status not in successes.success_codes: 1664 log.error(format_object(result)) 1665 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1666 else: 1667 log.debug(format_object(result)) 1668 1669 return result.error_message 1670 1671 def GW2GetFileType(self, session: int, file_type_id): 1672 """ Retrieve the file type as a string. 1673 1674 Args: 1675 session (int): The session integer. 1676 file_type_id (int): The file type id. 1677 1678 Returns: 1679 file_type (str): The formal file name for the corresponding file id. 1680 """ 1681 # Validate arg types 1682 if not isinstance(session, int): 1683 raise TypeError(session) 1684 1685 # API function declaration 1686 self.library.GW2GetFileType.argtypes = [ 1687 ct.c_size_t, 1688 ct.c_size_t, 1689 ct.POINTER(ct.c_size_t), 1690 ct.POINTER(ct.c_void_p) 1691 ] 1692 1693 # Variable initialisation 1694 ct_session = ct.c_size_t(session) 1695 ct_file_type = ct.c_size_t(file_type_id) 1696 ct_buffer_length = ct.c_size_t() 1697 ct_buffer = ct.c_void_p() 1698 1699 # API call 1700 status = self.library.GW2GetFileType( 1701 ct_session, 1702 ct_file_type, 1703 ct.byref(ct_buffer_length), 1704 ct.byref(ct_buffer) 1705 ) 1706 1707 if status not in successes.success_codes: 1708 log.error(f"\n\tsession: {session}\n\tstatus: {status}") 1709 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1710 else: 1711 log.debug(f"\n\tsession: {session}\n\tstatus: {status}") 1712 1713 # Editor wrote to a buffer, convert it to bytes 1714 file_type_bytes = utils.buffer_to_bytes( 1715 ct_buffer, 1716 ct_buffer_length 1717 ) 1718 1719 file_type = file_type_bytes.decode() 1720 1721 return file_type 1722 1723 def GW2GetFileTypeID(self, session: int, file_type_str): 1724 """ Retrieve the Glasswall file type id given a file type string. 1725 1726 Args: 1727 session (int): The session integer. 1728 file_type_str (str): The file type as a string. 1729 1730 Returns: 1731 file_type_id (str): The Glasswall file type id for the specified file type. 1732 """ 1733 # Validate arg types 1734 if not isinstance(session, int): 1735 raise TypeError(session) 1736 1737 # API function declaration 1738 self.library.GW2GetFileTypeID.argtypes = [ 1739 ct.c_size_t, 1740 ct.c_char_p, 1741 ct.POINTER(ct.c_size_t), 1742 ct.POINTER(ct.c_void_p) 1743 ] 1744 1745 # Variable initialisation 1746 ct_session = ct.c_size_t(session) 1747 ct_file_type = ct.c_char_p(file_type_str.encode('utf-8')) 1748 ct_buffer_length = ct.c_size_t() 1749 ct_buffer = ct.c_void_p() 1750 1751 # API call 1752 status = self.library.GW2GetFileTypeID( 1753 ct_session, 1754 ct_file_type, 1755 ct.byref(ct_buffer_length), 1756 ct.byref(ct_buffer) 1757 ) 1758 1759 if status not in successes.success_codes: 1760 log.error(f"\n\tsession: {session}\n\tstatus: {status}") 1761 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1762 else: 1763 log.debug(f"\n\tsession: {session}\n\tstatus: {status}") 1764 1765 # Editor wrote to a buffer, convert it to bytes 1766 file_type_bytes = utils.buffer_to_bytes( 1767 ct_buffer, 1768 ct_buffer_length 1769 ) 1770 1771 file_type_id = file_type_bytes.decode() 1772 1773 return file_type_id 1774 1775 def get_file_type_info(self, file_type: Union[str, int]): 1776 """ Retrieve information about a file type based on its identifier. 1777 1778 Args: 1779 file_type (Union[str, int]): The file type identifier. This can be either a string representing a file 1780 extension (e.g. 'bmp') or an integer corresponding to a file type (e.g. 29). 1781 1782 Returns: 1783 - file_type_info (Union[int, str]): Depending on the input 'file_type': 1784 - If `file_type` is a string (e.g. 'bmp'): 1785 - If the file type is recognised, returns an integer corresponding to that file type. 1786 - If the file type is not recognised, returns 0. 1787 - If `file_type` is an integer (e.g. 29): 1788 - If the integer corresponds to a recognised file type, returns a more detailed string description 1789 of the file type (e.g. 'BMP Image'). 1790 - If the integer does not match any recognised file type, returns an empty string. 1791 """ 1792 # Validate arg types 1793 if not isinstance(file_type, (str, int)): 1794 raise TypeError(file_type) 1795 1796 with utils.CwdHandler(self.library_path): 1797 with self.new_session() as session: 1798 1799 if isinstance(file_type, int): 1800 file_type_info = self.GW2GetFileType(session, file_type) 1801 if isinstance(file_type, str): 1802 file_type_info = self.GW2GetFileTypeID(session, file_type) 1803 1804 return file_type_info 1805 1806 @utils.deprecated_function(replacement_function=get_file_type_info) 1807 def get_file_info(self, *args, **kwargs): 1808 """ Deprecated in 1.0.6. Use get_file_type_info. """ 1809 pass 1810 1811 def _GW2RegisterReportFile(self, session: int, output_file: str): 1812 """ Register an output report file path for the given session. 1813 1814 Args: 1815 session (int): The session integer. 1816 output_file (str): The file path of the output report file. 1817 1818 Returns: 1819 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 1820 """ 1821 # API function declaration 1822 self.library.GW2RegisterReportFile.argtypes = [ 1823 ct.c_size_t, # Session_Handle session 1824 ct.c_char_p, # const char * reportFilePathName 1825 ] 1826 1827 # Variable initialisation 1828 gw_return_object = glasswall.GwReturnObj() 1829 gw_return_object.session = ct.c_size_t(session) 1830 gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8")) 1831 1832 # API call 1833 gw_return_object.status = self.library.GW2RegisterReportFile( 1834 gw_return_object.session, 1835 gw_return_object.output_file 1836 ) 1837 1838 return gw_return_object 1839 1840 def register_report_file(self, session: int, output_file: str): 1841 """ Register the report file path for the given session. 1842 1843 Args: 1844 session (int): The session integer. 1845 output_file (str): The file path of the report file. 1846 1847 Returns: 1848 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 1849 """ 1850 # Validate arg types 1851 if not isinstance(session, int): 1852 raise TypeError(session) 1853 if not isinstance(output_file, (type(None), str)): 1854 raise TypeError(output_file) 1855 1856 result = self._GW2RegisterReportFile(session, output_file) 1857 1858 if result.status not in successes.success_codes: 1859 log.error(format_object(result)) 1860 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1861 else: 1862 log.debug(format_object(result)) 1863 1864 return result 1865 1866 def _GW2GetIdInfo(self, session: int, issue_id: int): 1867 """ Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances" 1868 1869 Args: 1870 session (int): The session integer. 1871 issue_id (int): The issue id. 1872 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1873 1874 Returns: 1875 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'issue_id', 'buffer_length', 'buffer', 'status', 'id_info'. 1876 """ 1877 # API function declaration 1878 self.library.GW2GetIdInfo.argtypes = [ 1879 ct.c_size_t, # Session_Handle session 1880 ct.c_size_t, # size_t issueId 1881 ct.POINTER(ct.c_size_t), # size_t * bufferLength 1882 ct.POINTER(ct.c_void_p) # char ** outputBuffer 1883 ] 1884 1885 # Variable initialisation 1886 gw_return_object = glasswall.GwReturnObj() 1887 gw_return_object.session = ct.c_size_t(session) 1888 gw_return_object.issue_id = ct.c_size_t(issue_id) 1889 gw_return_object.buffer_length = ct.c_size_t() 1890 gw_return_object.buffer = ct.c_void_p() 1891 1892 # API call 1893 gw_return_object.status = self.library.GW2GetIdInfo( 1894 gw_return_object.session, 1895 gw_return_object.issue_id, 1896 ct.byref(gw_return_object.buffer_length), 1897 ct.byref(gw_return_object.buffer) 1898 ) 1899 1900 # Editor wrote to a buffer, convert it to bytes 1901 id_info_bytes = utils.buffer_to_bytes( 1902 gw_return_object.buffer, 1903 gw_return_object.buffer_length 1904 ) 1905 1906 gw_return_object.id_info = id_info_bytes.decode() 1907 1908 return gw_return_object 1909 1910 def get_id_info(self, issue_id: int, raise_unsupported: bool = True): 1911 """ Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances" 1912 1913 Args: 1914 issue_id (int): The issue id. 1915 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1916 1917 Returns: 1918 id_info (str): The group description for the given Issue ID. 1919 """ 1920 # Validate arg types 1921 if not isinstance(issue_id, int): 1922 raise TypeError(issue_id) 1923 1924 with utils.CwdHandler(self.library_path): 1925 with self.new_session() as session: 1926 result = self._GW2GetIdInfo(session, issue_id) 1927 1928 if result.status not in successes.success_codes: 1929 log.error(format_object(result)) 1930 if raise_unsupported: 1931 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1932 else: 1933 log.debug(format_object(result)) 1934 1935 return result.id_info 1936 1937 def _GW2GetAllIdInfo(self, session: int): 1938 """ Retrieves the XML containing all the Issue ID ranges with their group descriptions 1939 1940 Args: 1941 session (int): The session integer. 1942 1943 Returns: 1944 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'analysis_format', 'status', 'all_id_info'. 1945 """ 1946 1947 # API function declaration 1948 self.library.GW2GetAllIdInfo.argtypes = [ 1949 ct.c_size_t, # Session_Handle session 1950 ct.POINTER(ct.c_size_t), # size_t * bufferLength 1951 ct.POINTER(ct.c_void_p) # char ** outputBuffer 1952 ] 1953 1954 # Variable initialisation 1955 # The extracted issue Id information is stored in the analysis report, register an analysis session. 1956 gw_return_object = self._GW2RegisterAnalysisMemory(session) 1957 1958 # API call 1959 gw_return_object.status = self.library.GW2GetAllIdInfo( 1960 gw_return_object.session, 1961 ct.byref(gw_return_object.buffer_length), 1962 ct.byref(gw_return_object.buffer) 1963 ) 1964 1965 # Editor wrote to a buffer, convert it to bytes 1966 all_id_info_bytes = utils.buffer_to_bytes( 1967 gw_return_object.buffer, 1968 gw_return_object.buffer_length 1969 ) 1970 1971 gw_return_object.all_id_info = all_id_info_bytes.decode() 1972 1973 return gw_return_object 1974 1975 def get_all_id_info(self, output_file: Optional[str] = None, raise_unsupported: bool = True) -> str: 1976 """ Retrieves the XML containing all the Issue ID ranges with their group descriptions 1977 1978 Args: 1979 output_file (Optional[str]): The output file path where the analysis file will be written. 1980 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1981 1982 Returns: 1983 all_id_info (str): A string XML analysis report containing all id info. 1984 """ 1985 # Validate arg types 1986 if not isinstance(output_file, (type(None), str)): 1987 raise TypeError(output_file) 1988 if isinstance(output_file, str): 1989 output_file = os.path.abspath(output_file) 1990 1991 with utils.CwdHandler(self.library_path): 1992 with self.new_session() as session: 1993 result = self._GW2GetAllIdInfo(session) 1994 1995 if result.status not in successes.success_codes: 1996 log.error(format_object(result)) 1997 if raise_unsupported: 1998 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1999 else: 2000 log.debug(format_object(result)) 2001 2002 if isinstance(output_file, str): 2003 # GW2GetAllIdInfo is memory only, write to file 2004 # make directories that do not exist 2005 os.makedirs(os.path.dirname(output_file), exist_ok=True) 2006 with open(output_file, "w") as f: 2007 f.write(result.all_id_info) 2008 2009 return result.all_id_info 2010 2011 def _GW2FileSessionStatus(self, session: int): 2012 """ Retrieves the Glasswall Session Status. Also gives a high level indication of the processing that was carried out on the last document processed by the library 2013 2014 Args: 2015 session (int): The session integer. 2016 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 2017 2018 Returns: 2019 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'session_status', 'buffer', 'buffer_length', 'status', 'message'. 2020 """ 2021 # API function declaration 2022 self.library.GW2FileSessionStatus.argtypes = [ 2023 ct.c_size_t, # Session_Handle session 2024 ct.POINTER(ct.c_int), # int *glasswallSessionStatus 2025 ct.POINTER(ct.c_void_p), # char **statusMsgBuffer 2026 ct.POINTER(ct.c_size_t) # size_t *statusbufferLength 2027 ] 2028 2029 # Variable initialisation 2030 gw_return_object = glasswall.GwReturnObj() 2031 gw_return_object.session = ct.c_size_t(session) 2032 gw_return_object.session_status = ct.c_int() 2033 gw_return_object.buffer = ct.c_void_p() 2034 gw_return_object.buffer_length = ct.c_size_t() 2035 2036 # API call 2037 gw_return_object.status = self.library.GW2FileSessionStatus( 2038 gw_return_object.session, 2039 ct.byref(gw_return_object.session_status), 2040 ct.byref(gw_return_object.buffer), 2041 ct.byref(gw_return_object.buffer_length) 2042 ) 2043 2044 # Convert session_status to int 2045 gw_return_object.session_status = gw_return_object.session_status.value 2046 2047 # Editor wrote to a buffer, convert it to bytes 2048 message_bytes = utils.buffer_to_bytes( 2049 gw_return_object.buffer, 2050 gw_return_object.buffer_length 2051 ) 2052 gw_return_object.message = message_bytes.decode() 2053 2054 return gw_return_object 2055 2056 def file_session_status_message(self, session: int, raise_unsupported: bool = True) -> str: 2057 """ Retrieves the Glasswall session status message. Gives a high level indication of the processing that was carried out. 2058 2059 Args: 2060 session (int): The session integer. 2061 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 2062 2063 Returns: 2064 result.message (str):The file session status message. 2065 """ 2066 # Validate arg types 2067 if not isinstance(session, int): 2068 raise TypeError(session) 2069 2070 result = self._GW2FileSessionStatus(session) 2071 2072 if result.status not in successes.success_codes: 2073 log.error(format_object(result)) 2074 if raise_unsupported: 2075 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 2076 else: 2077 log.debug(format_object(result)) 2078 2079 return result.message 2080 2081 def _GW2LicenceDetails(self, session: int): 2082 """ Returns a human readable string containing licence details. 2083 2084 Args: 2085 session (int): The session integer. 2086 2087 Returns: 2088 licence_details (str): A human readable string representing the relevant information contained in the licence. 2089 """ 2090 # API function declaration 2091 self.library.GW2LicenceDetails.argtypes = [ct.c_size_t] 2092 self.library.GW2LicenceDetails.restype = ct.c_char_p 2093 2094 # Variable initialisation 2095 ct_session = ct.c_size_t(session) 2096 2097 # API call 2098 licence_details = self.library.GW2LicenceDetails(ct_session) 2099 2100 # Convert to Python string 2101 licence_details = ct.string_at(licence_details).decode() 2102 2103 return licence_details 2104 2105 def licence_details(self): 2106 """ Returns a string containing details of the licence. 2107 2108 Returns: 2109 result (str): A string containing details of the licence. 2110 """ 2111 with self.new_session() as session: 2112 result = self._GW2LicenceDetails(session) 2113 2114 log.debug(f"\n\tsession: {session}\n\tGW2LicenceDetails: {result}") 2115 2116 return result 2117 2118 def _GW2RegisterExportTextDumpMemory(self, session: int): 2119 """ Registers an export text dump to be written in memory. 2120 2121 Args: 2122 session (int): The session integer. 2123 2124 Returns: 2125 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 2126 """ 2127 # API function declaration 2128 self.library.GW2RegisterExportTextDumpMemory.argtypes = [ 2129 ct.c_size_t, # Session_Handle session 2130 ct.POINTER(ct.c_void_p), # char ** exportTextDumpFileBuffer 2131 ct.POINTER(ct.c_size_t) # size_t * exportTextDumpLength 2132 ] 2133 2134 # Variable initialisation 2135 gw_return_object = glasswall.GwReturnObj() 2136 gw_return_object.session = ct.c_size_t(session) 2137 gw_return_object.buffer = ct.c_void_p() 2138 gw_return_object.buffer_length = ct.c_size_t() 2139 2140 # API call 2141 gw_return_object.status = self.library.GW2RegisterExportTextDumpMemory( 2142 gw_return_object.session, 2143 ct.byref(gw_return_object.buffer), 2144 ct.byref(gw_return_object.buffer_length) 2145 ) 2146 2147 return gw_return_object 2148 2149 def _GW2RegisterExportTextDumpFile(self, session: int, output_file: str): 2150 """ Registers an export text dump to be written to file. 2151 2152 Args: 2153 session (int): The session integer. 2154 output_file (str): The file path of the text dump file. 2155 2156 Returns: 2157 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 2158 """ 2159 # API function declaration 2160 self.library.GW2RegisterExportTextDumpFile.argtypes = [ 2161 ct.c_size_t, # Session_Handle session 2162 ct.c_char_p # const char * textDumpFilePathName 2163 ] 2164 2165 # Variable initialisation 2166 gw_return_object = glasswall.GwReturnObj() 2167 gw_return_object.session = ct.c_size_t(session) 2168 gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8")) 2169 2170 # API call 2171 gw_return_object.status = self.library.GW2RegisterExportTextDumpFile( 2172 gw_return_object.session, 2173 gw_return_object.output_file 2174 ) 2175 2176 return gw_return_object 2177 2178 def _GW2RegisterLicenceFile(self, session: int, input_file: str): 2179 """ Registers a "gwkey.lic" licence from file path. 2180 2181 Args: 2182 session (int): The session integer. 2183 input_file (str): The "gwkey.lic" licence input file path. 2184 2185 Returns: 2186 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'. 2187 """ 2188 # API function declaration 2189 self.library.GW2RegisterLicenceFile.argtypes = [ 2190 ct.c_size_t, # Session_Handle session 2191 ct.c_char_p, # const char *filename 2192 ] 2193 2194 # Variable initialisation 2195 gw_return_object = glasswall.GwReturnObj() 2196 gw_return_object.session = ct.c_size_t(session) 2197 gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8")) 2198 2199 # API call 2200 gw_return_object.status = self.library.GW2RegisterLicenceFile( 2201 gw_return_object.session, 2202 gw_return_object.input_file, 2203 ) 2204 2205 return gw_return_object 2206 2207 def _GW2RegisterLicenceMemory(self, session: int, input_file: bytes): 2208 """ Registers a "gwkey.lic" licence from memory. 2209 2210 Args: 2211 session (int): The session integer. 2212 input_file (bytes): The "gwkey.lic" licence input file. 2213 2214 Returns: 2215 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 2216 """ 2217 # API function declaration 2218 self.library.GW2RegisterLicenceMemory.argtypes = [ 2219 ct.c_size_t, # Session_Handle session 2220 ct.c_char_p, # const char *filename 2221 ct.c_size_t, # size_t licenceLength 2222 ] 2223 2224 # Variable initialisation 2225 gw_return_object = glasswall.GwReturnObj() 2226 gw_return_object.session = ct.c_size_t(session) 2227 gw_return_object.buffer = ct.c_char_p(input_file) 2228 gw_return_object.buffer_length = ct.c_size_t(len(input_file)) 2229 2230 # API call 2231 gw_return_object.status = self.library.GW2RegisterLicenceMemory( 2232 gw_return_object.session, 2233 gw_return_object.buffer, 2234 gw_return_object.buffer_length 2235 ) 2236 2237 return gw_return_object 2238 2239 def register_licence(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]): 2240 """ Registers a "gwkey.lic" licence from file path or memory. 2241 2242 Args: 2243 session (int): The session integer. 2244 input_file (Union[str, bytes, bytearray, io.BytesIO]): The "gwkey.lic" licence. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object. 2245 2246 Returns: 2247 - result (glasswall.GwReturnObj): Depending on the input 'input_file': 2248 - If input_file is a str file path: 2249 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'. 2250 2251 - If input_file is a file in memory: 2252 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 2253 """ 2254 if isinstance(input_file, str): 2255 if not os.path.isfile(input_file): 2256 raise FileNotFoundError(input_file) 2257 2258 input_file = os.path.abspath(input_file) 2259 2260 result = self._GW2RegisterLicenceFile(session, input_file) 2261 2262 elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)): 2263 # Convert bytearray and io.BytesIO to bytes 2264 input_file = utils.as_bytes(input_file) 2265 2266 result = self._GW2RegisterLicenceMemory(session, input_file) 2267 2268 else: 2269 raise TypeError(input_file) 2270 2271 if result.status not in successes.success_codes: 2272 log.error(format_object(result)) 2273 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 2274 else: 2275 log.debug(format_object(result)) 2276 2277 return result
A high level Python wrapper for Glasswall Editor / Core2.
22 def __init__(self, library_path: str, licence: Union[str, bytes, bytearray, io.BytesIO] = None): 23 """ Initialise the Editor instance. 24 25 Args: 26 library_path (str): The file or directory path to the Editor library. 27 licence (str, bytes, bytearray, or io.BytesIO, optional): The licence file content or path. This can be: 28 - A string representing the file path to the licence. 29 - A `bytes` or `bytearray` object containing the licence data. 30 - An `io.BytesIO` object for in-memory licence data. 31 If not specified, it is assumed that the licence file is located in the same directory as the `library_path`. 32 """ 33 super().__init__(library_path) 34 self.library = self.load_library(os.path.abspath(library_path)) 35 self.licence = licence 36 37 # Validate killswitch has not activated 38 self.validate_licence() 39 40 log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
Initialise the Editor instance.
Args:
library_path (str): The file or directory path to the Editor library.
licence (str, bytes, bytearray, or io.BytesIO, optional): The licence file content or path. This can be:
- A string representing the file path to the licence.
- A bytes
or bytearray
object containing the licence data.
- An io.BytesIO
object for in-memory licence data.
If not specified, it is assumed that the licence file is located in the same directory as the library_path
.
42 def validate_licence(self): 43 """ Validates the licence of the library by checking the licence details. 44 45 Raises: 46 LicenceExpired: If the licence has expired or could not be validated. 47 """ 48 licence_details = self.licence_details() 49 50 bad_details = [ 51 "Unable to Read Licence Key File", 52 "Licence File Missing Required Contents", 53 "Licence Expired", 54 ] 55 56 if any(bad_detail.lower() in licence_details.lower() for bad_detail in bad_details): 57 # bad_details found in licence_details 58 log.error(f"{self.__class__.__name__} licence validation failed. Licence details:\n{licence_details}") 59 raise errors.LicenceExpired(licence_details) 60 else: 61 log.debug(f"{self.__class__.__name__} licence validated successfully. Licence details:\n{licence_details}")
Validates the licence of the library by checking the licence details.
Raises: LicenceExpired: If the licence has expired or could not be validated.
63 def version(self): 64 """ Returns the Glasswall library version. 65 66 Returns: 67 version (str): The Glasswall library version. 68 """ 69 # API function declaration 70 self.library.GW2LibVersion.restype = ct.c_char_p 71 72 # API call 73 version = self.library.GW2LibVersion() 74 75 # Convert to Python string 76 version = ct.string_at(version).decode() 77 78 return version
Returns the Glasswall library version.
Returns: version (str): The Glasswall library version.
80 def open_session(self): 81 """ Open a new Glasswall session. 82 83 Returns: 84 session (int): An incrementing integer repsenting the current session. 85 """ 86 # API call 87 session = self.library.GW2OpenSession() 88 89 log.debug(f"\n\tsession: {session}") 90 91 if self.licence: 92 # Must register licence on each GW2OpenSession if the licence is not in the same directory as the library 93 self.register_licence(session, self.licence) 94 95 return session
Open a new Glasswall session.
Returns: session (int): An incrementing integer repsenting the current session.
97 def close_session(self, session: int) -> int: 98 """ Close the Glasswall session. All resources allocated by the session will be destroyed. 99 100 Args: 101 session (int): The session to close. 102 103 Returns: 104 status (int): The status code of the function call. 105 """ 106 if not isinstance(session, int): 107 raise TypeError(session) 108 109 # API function declaration 110 self.library.GW2CloseSession.argtypes = [ct.c_size_t] 111 112 # Variable initialisation 113 ct_session = ct.c_size_t(session) 114 115 # API call 116 status = self.library.GW2CloseSession(ct_session) 117 118 if status not in successes.success_codes: 119 log.error(f"\n\tsession: {session}\n\tstatus: {status}") 120 else: 121 log.debug(f"\n\tsession: {session}\n\tstatus: {status}") 122 123 return status
Close the Glasswall session. All resources allocated by the session will be destroyed.
Args: session (int): The session to close.
Returns: status (int): The status code of the function call.
125 @contextmanager 126 def new_session(self): 127 """ Context manager. Opens a new session on entry and closes the session on exit. """ 128 try: 129 session = self.open_session() 130 yield session 131 finally: 132 self.close_session(session)
Context manager. Opens a new session on entry and closes the session on exit.
134 def run_session(self, session): 135 """ Runs the Glasswall session and begins processing of a file. 136 137 Args: 138 session (int): The session to run. 139 140 Returns: 141 status (int): The status code of the function call. 142 """ 143 # API function declaration 144 self.library.GW2RunSession.argtypes = [ct.c_size_t] 145 146 # Variable initialisation 147 ct_session = ct.c_size_t(session) 148 149 # API call 150 status = self.library.GW2RunSession(ct_session) 151 152 if status not in successes.success_codes: 153 log.error(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}") 154 else: 155 log.debug(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}") 156 157 return status
Runs the Glasswall session and begins processing of a file.
Args: session (int): The session to run.
Returns: status (int): The status code of the function call.
159 def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True) -> Union[int, str]: 160 """ Determine the file type of a given input file, either as an integer identifier or a string. 161 162 Args: 163 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file to analyse. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object. 164 as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. 165 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 166 167 Returns: 168 file_type (Union[int, str]): The file type. 169 """ 170 if isinstance(input_file, str): 171 if not os.path.isfile(input_file): 172 raise FileNotFoundError(input_file) 173 174 # convert to ct.c_char_p of bytes 175 ct_input_file = ct.c_char_p(input_file.encode("utf-8")) 176 177 # API call 178 file_type = self.library.GW2DetermineFileTypeFromFile(ct_input_file) 179 180 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 181 # convert to bytes 182 bytes_input_file = utils.as_bytes(input_file) 183 184 # ctypes conversion 185 ct_buffer = ct.c_char_p(bytes_input_file) 186 ct_buffer_length = ct.c_size_t(len(bytes_input_file)) 187 188 # API call 189 file_type = self.library.GW2DetermineFileTypeFromMemory( 190 ct_buffer, 191 ct_buffer_length 192 ) 193 194 else: 195 raise TypeError(input_file) 196 197 file_type_as_string = dft.file_type_int_to_str(file_type) 198 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 199 200 if not dft.is_success(file_type): 201 log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 202 if raise_unsupported: 203 raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) 204 else: 205 log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") 206 207 if as_string: 208 return file_type_as_string 209 210 return file_type
Determine the file type of a given input file, either as an integer identifier or a string.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file to analyse. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object. as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_type (Union[int, str]): The file type.
255 def get_content_management_policy(self, session: int): 256 """ Returns the content management configuration for a given session. 257 258 Args: 259 session (int): The session integer. 260 261 Returns: 262 xml_string (str): The XML string of the current content management configuration. 263 """ 264 # NOTE GW2GetPolicySettings is current not implemented in editor 265 266 # set xml_string as loaded default config 267 xml_string = glasswall.content_management.policies.Editor(default="sanitise").text, 268 269 # log.debug(f"xml_string:\n{xml_string}") 270 271 return xml_string 272 273 # # API function declaration 274 # self.library.GW2GetPolicySettings.argtypes = [ 275 # ct.c_size_t, 276 # ct.c_void_p, 277 # ] 278 279 # # Variable initialisation 280 # ct_session = ct.c_size_t(session) 281 # ct_buffer = ct.c_void_p() 282 # ct_buffer_length = ct.c_size_t() 283 # # ct_file_format = ct.c_int(file_format) 284 285 # # API Call 286 # status = self.library.GW2GetPolicySettings( 287 # ct_session, 288 # ct.byref(ct_buffer), 289 # ct.byref(ct_buffer_length) 290 # ) 291 292 # print("GW2GetPolicySettings status:", status) 293 294 # file_bytes = utils.buffer_to_bytes( 295 # ct_buffer, 296 # ct_buffer_length, 297 # ) 298 299 # return file_bytes
Returns the content management configuration for a given session.
Args: session (int): The session integer.
Returns: xml_string (str): The XML string of the current content management configuration.
369 def set_content_management_policy(self, session: int, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, policy_format=0): 370 """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied. 371 372 Args: 373 session (int): The session integer. 374 input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 375 policy_format (int): The format of the content management policy. 0=XML. 376 377 Returns: 378 - result (glasswall.GwReturnObj): Depending on the input 'input_file': 379 - If input_file is a str file path: 380 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'. 381 382 - If input_file is a file in memory: 383 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'. 384 """ 385 # Validate type 386 if not isinstance(session, int): 387 raise TypeError(session) 388 if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 389 raise TypeError(input_file) 390 if not isinstance(policy_format, int): 391 raise TypeError(policy_format) 392 393 # Set input_file to default if input_file is None 394 if input_file is None: 395 input_file = glasswall.content_management.policies.Editor(default="sanitise") 396 397 # Validate xml content is parsable 398 utils.validate_xml(input_file) 399 400 # From file 401 if isinstance(input_file, str) and os.path.isfile(input_file): 402 input_file = os.path.abspath(input_file) 403 404 result = self._GW2RegisterPoliciesFile(session, input_file) 405 406 # From memory 407 elif isinstance(input_file, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 408 # Convert bytearray, io.BytesIO to bytes 409 if isinstance(input_file, (bytearray, io.BytesIO)): 410 input_file = utils.as_bytes(input_file) 411 # Convert string xml or Policy to bytes 412 if isinstance(input_file, (str, glasswall.content_management.policies.policy.Policy)): 413 input_file = input_file.encode("utf-8") 414 415 result = self._GW2RegisterPoliciesMemory(session, input_file) 416 417 if result.status not in successes.success_codes: 418 log.error(format_object(result)) 419 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 420 else: 421 log.debug(format_object(result)) 422 423 return result
Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.
Args: session (int): The session integer. input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. policy_format (int): The format of the content management policy. 0=XML.
Returns: - result (glasswall.GwReturnObj): Depending on the input 'input_file': - If input_file is a str file path: - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'.
- If input_file is a file in memory:
- gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'.
487 def register_input(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]): 488 """ Register an input file or bytes for the given session. 489 490 Args: 491 session (int): The session integer. 492 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 493 494 Returns: 495 - result (glasswall.GwReturnObj): Depending on the input 'input_file': 496 - If input_file is a str file path: 497 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'. 498 499 - If input_file is a file in memory: 500 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 501 """ 502 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)): 503 raise TypeError(input_file) 504 505 if isinstance(input_file, str): 506 if not os.path.isfile(input_file): 507 raise FileNotFoundError(input_file) 508 509 input_file = os.path.abspath(input_file) 510 511 result = self._GW2RegisterInputFile(session, input_file) 512 513 elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)): 514 # Convert bytearray and io.BytesIO to bytes 515 input_file = utils.as_bytes(input_file) 516 517 result = self._GW2RegisterInputMemory(session, input_file) 518 519 if result.status not in successes.success_codes: 520 log.error(format_object(result)) 521 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 522 else: 523 log.debug(format_object(result)) 524 525 return result
Register an input file or bytes for the given session.
Args: session (int): The session integer. input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
Returns: - result (glasswall.GwReturnObj): Depending on the input 'input_file': - If input_file is a str file path: - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
- If input_file is a file in memory:
- gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
587 def register_output(self, session, output_file: Optional[str] = None): 588 """ Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes. 589 590 Args: 591 session (int): The session integer. 592 output_file (Optional[str]): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes. 593 594 Returns: 595 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. 596 """ 597 if not isinstance(output_file, (type(None), str)): 598 raise TypeError(output_file) 599 600 if isinstance(output_file, str): 601 output_file = os.path.abspath(output_file) 602 603 result = self._GW2RegisterOutFile(session, output_file) 604 605 elif isinstance(output_file, type(None)): 606 result = self._GW2RegisterOutputMemory(session) 607 608 if result.status not in successes.success_codes: 609 log.error(format_object(result)) 610 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 611 else: 612 log.debug(format_object(result)) 613 614 return result
Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes.
Args: session (int): The session integer. output_file (Optional[str]): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes.
Returns: gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
684 def register_analysis(self, session: int, output_file: Optional[str] = None): 685 """ Registers an analysis file for the given session. The analysis file will be created during the session's run_session call. 686 687 Args: 688 session (int): The session integer. 689 output_file (Optional[str]): Default None. The file path where the analysis will be written. None returns the analysis as bytes. 690 691 Returns: 692 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'status', 'session', 'analysis_format'. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. If output_file is not None (file mode) 'output_file' is included. 693 """ 694 if not isinstance(output_file, (type(None), str)): 695 raise TypeError(output_file) 696 697 if isinstance(output_file, str): 698 output_file = os.path.abspath(output_file) 699 700 result = self._GW2RegisterAnalysisFile(session, output_file) 701 702 elif isinstance(output_file, type(None)): 703 result = self._GW2RegisterAnalysisMemory(session) 704 705 if result.status not in successes.success_codes: 706 log.error(format_object(result)) 707 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 708 else: 709 log.debug(format_object(result)) 710 711 return result
Registers an analysis file for the given session. The analysis file will be created during the session's run_session call.
Args: session (int): The session integer. output_file (Optional[str]): Default None. The file path where the analysis will be written. None returns the analysis as bytes.
Returns: gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'status', 'session', 'analysis_format'. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. If output_file is not None (file mode) 'output_file' is included.
713 def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 714 """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided. 715 716 Args: 717 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 718 output_file (Optional[str]): The output file path where the protected file will be written. 719 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 720 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 721 722 Returns: 723 file_bytes (bytes): The protected file bytes. 724 """ 725 # Validate arg types 726 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 727 raise TypeError(input_file) 728 if not isinstance(output_file, (type(None), str)): 729 raise TypeError(output_file) 730 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 731 raise TypeError(content_management_policy) 732 if not isinstance(raise_unsupported, bool): 733 raise TypeError(raise_unsupported) 734 735 # Convert string path arguments to absolute paths 736 if isinstance(input_file, str): 737 if not os.path.isfile(input_file): 738 raise FileNotFoundError(input_file) 739 input_file = os.path.abspath(input_file) 740 if isinstance(output_file, str): 741 output_file = os.path.abspath(output_file) 742 # make directories that do not exist 743 os.makedirs(os.path.dirname(output_file), exist_ok=True) 744 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 745 content_management_policy = os.path.abspath(content_management_policy) 746 747 # Convert memory inputs to bytes 748 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 749 input_file = utils.as_bytes(input_file) 750 751 with utils.CwdHandler(self.library_path): 752 with self.new_session() as session: 753 content_management_policy = self.set_content_management_policy(session, content_management_policy) 754 register_input = self.register_input(session, input_file) 755 register_output = self.register_output(session, output_file=output_file) 756 status = self.run_session(session) 757 758 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 759 if status not in successes.success_codes: 760 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 761 if raise_unsupported: 762 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 763 else: 764 file_bytes = None 765 else: 766 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 767 # Get file bytes 768 if isinstance(output_file, str): 769 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 770 if not os.path.isfile(output_file): 771 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 772 file_bytes = None 773 else: 774 with open(output_file, "rb") as f: 775 file_bytes = f.read() 776 else: 777 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 778 file_bytes = utils.buffer_to_bytes( 779 register_output.buffer, 780 register_output.buffer_length 781 ) 782 783 # Ensure memory allocated is not garbage collected 784 content_management_policy, register_input, register_output 785 786 return file_bytes
Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Optional[str]): The output file path where the protected file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_bytes (bytes): The protected file bytes.
788 def protect_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 789 """ Recursively processes all files in a directory in protect mode using the given content management policy. 790 The protected files are written to output_directory maintaining the same directory structure as input_directory. 791 792 Args: 793 input_directory (str): The input directory containing files to protect. 794 output_directory (Optional[str]): The output directory where the protected file will be written, or None to not write files. 795 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 796 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 797 798 Returns: 799 protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 800 """ 801 protected_files_dict = {} 802 # Call protect_file on each file in input_directory to output_directory 803 for input_file in utils.list_file_paths(input_directory): 804 relative_path = os.path.relpath(input_file, input_directory) 805 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 806 807 protected_bytes = self.protect_file( 808 input_file=input_file, 809 output_file=output_file, 810 raise_unsupported=raise_unsupported, 811 content_management_policy=content_management_policy, 812 ) 813 814 protected_files_dict[relative_path] = protected_bytes 815 816 return protected_files_dict
Recursively processes all files in a directory in protect mode using the given content management policy. The protected files are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing files to protect. output_directory (Optional[str]): The output directory where the protected file will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
818 def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 819 """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided. 820 821 Args: 822 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 823 output_file (Optional[str]): The output file path where the analysis file will be written. 824 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 825 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 826 827 Returns: 828 file_bytes (bytes): The analysis file bytes. 829 """ 830 # Validate arg types 831 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 832 raise TypeError(input_file) 833 if not isinstance(output_file, (type(None), str)): 834 raise TypeError(output_file) 835 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 836 raise TypeError(content_management_policy) 837 if not isinstance(raise_unsupported, bool): 838 raise TypeError(raise_unsupported) 839 840 # Convert string path arguments to absolute paths 841 if isinstance(input_file, str): 842 if not os.path.isfile(input_file): 843 raise FileNotFoundError(input_file) 844 input_file = os.path.abspath(input_file) 845 if isinstance(output_file, str): 846 output_file = os.path.abspath(output_file) 847 # make directories that do not exist 848 os.makedirs(os.path.dirname(output_file), exist_ok=True) 849 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 850 content_management_policy = os.path.abspath(content_management_policy) 851 852 # Convert memory inputs to bytes 853 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 854 input_file = utils.as_bytes(input_file) 855 856 with utils.CwdHandler(self.library_path): 857 with self.new_session() as session: 858 content_management_policy = self.set_content_management_policy(session, content_management_policy) 859 register_input = self.register_input(session, input_file) 860 register_analysis = self.register_analysis(session, output_file) 861 status = self.run_session(session) 862 863 file_bytes = None 864 if isinstance(output_file, str): 865 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 866 if os.path.isfile(output_file): 867 with open(output_file, "rb") as f: 868 file_bytes = f.read() 869 else: 870 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 871 if register_analysis.buffer and register_analysis.buffer_length: 872 file_bytes = utils.buffer_to_bytes( 873 register_analysis.buffer, 874 register_analysis.buffer_length 875 ) 876 877 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 878 if status not in successes.success_codes: 879 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 880 if raise_unsupported: 881 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 882 else: 883 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 884 885 # Ensure memory allocated is not garbage collected 886 content_management_policy, register_input, register_analysis 887 888 return file_bytes
Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Optional[str]): The output file path where the analysis file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_bytes (bytes): The analysis file bytes.
890 def analyse_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 891 """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory. 892 893 Args: 894 input_directory (str): The input directory containing files to analyse. 895 output_directory (Optional[str]): The output directory where the analysis files will be written, or None to not write files. 896 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 897 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 898 899 Returns: 900 analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 901 """ 902 analysis_files_dict = {} 903 # Call analyse_file on each file in input_directory to output_directory 904 for input_file in utils.list_file_paths(input_directory): 905 relative_path = os.path.relpath(input_file, input_directory) + ".xml" 906 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 907 908 analysis_bytes = self.analyse_file( 909 input_file=input_file, 910 output_file=output_file, 911 raise_unsupported=raise_unsupported, 912 content_management_policy=content_management_policy, 913 ) 914 915 analysis_files_dict[relative_path] = analysis_bytes 916 917 return analysis_files_dict
Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing files to analyse. output_directory (Optional[str]): The output directory where the analysis files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
919 def protect_and_analyse_file( 920 self, 921 input_file: Union[str, bytes, bytearray, io.BytesIO], 922 output_file: Optional[str] = None, 923 output_analysis_report: Optional[str] = None, 924 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 925 raise_unsupported: bool = True, 926 ): 927 """ Protects and analyses a file in a single session, returning both protected file bytes and analysis report bytes. 928 929 Args: 930 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 931 output_file (Optional[str]): The output file path where the protected file will be written. 932 output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written. 933 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 934 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 935 936 Returns: 937 Tuple[Optional[bytes], Optional[bytes]]: A tuple of (protected_file_bytes, analysis_report_bytes). 938 """ 939 # Validate arg types 940 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 941 raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}") 942 if not isinstance(output_file, (type(None), str)): 943 raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}") 944 if not isinstance(output_analysis_report, (type(None), str)): 945 raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}") 946 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 947 raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}") 948 if not isinstance(raise_unsupported, bool): 949 raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}") 950 951 # Convert string path arguments to absolute paths 952 if isinstance(input_file, str): 953 if not os.path.isfile(input_file): 954 raise FileNotFoundError(input_file) 955 input_file = os.path.abspath(input_file) 956 if isinstance(output_file, str): 957 output_file = os.path.abspath(output_file) 958 os.makedirs(os.path.dirname(output_file), exist_ok=True) 959 if isinstance(output_analysis_report, str): 960 output_analysis_report = os.path.abspath(output_analysis_report) 961 os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True) 962 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 963 content_management_policy = os.path.abspath(content_management_policy) 964 965 # Convert memory inputs to bytes 966 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 967 input_file = utils.as_bytes(input_file) 968 969 with utils.CwdHandler(self.library_path): 970 with self.new_session() as session: 971 content_management_policy = self.set_content_management_policy(session, content_management_policy) 972 register_input = self.register_input(session, input_file) 973 register_output = self.register_output(session, output_file) 974 register_analysis = self.register_analysis(session, output_analysis_report) 975 976 status = self.run_session(session) 977 978 protected_file_bytes = None 979 analysis_report_bytes = None 980 981 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 982 if status not in successes.success_codes: 983 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 984 if raise_unsupported: 985 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 986 987 # Get analysis report file bytes, even on processing failure 988 if isinstance(output_analysis_report, str): 989 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 990 if not os.path.isfile(output_analysis_report): 991 log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}") 992 else: 993 with open(output_analysis_report, "rb") as f: 994 analysis_report_bytes = f.read() 995 else: 996 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 997 if register_analysis.buffer and register_analysis.buffer_length: 998 analysis_report_bytes = utils.buffer_to_bytes( 999 register_analysis.buffer, 1000 register_analysis.buffer_length 1001 ) 1002 1003 # On success, get protected file bytes 1004 if status in successes.success_codes: 1005 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 1006 # Get file bytes 1007 if isinstance(output_file, str): 1008 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1009 if not os.path.isfile(output_file): 1010 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1011 else: 1012 with open(output_file, "rb") as f: 1013 protected_file_bytes = f.read() 1014 else: 1015 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1016 protected_file_bytes = utils.buffer_to_bytes( 1017 register_output.buffer, 1018 register_output.buffer_length 1019 ) 1020 1021 # Ensure memory allocated is not garbage collected 1022 content_management_policy, register_input, register_output, register_analysis 1023 1024 return protected_file_bytes, analysis_report_bytes
Protects and analyses a file in a single session, returning both protected file bytes and analysis report bytes.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Optional[str]): The output file path where the protected file will be written. output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: Tuple[Optional[bytes], Optional[bytes]]: A tuple of (protected_file_bytes, analysis_report_bytes).
1026 def protect_and_analyse_directory( 1027 self, 1028 input_directory: str, 1029 output_directory: Optional[str], 1030 analysis_directory: Optional[str], 1031 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 1032 raise_unsupported: bool = True, 1033 ): 1034 """ Recursively processes all files in a directory using protect and analyse mode with the given content management policy. 1035 Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory. 1036 1037 Args: 1038 input_directory (str): The input directory containing files to process. 1039 output_directory (Optional[str]): The output directory for protected files. 1040 analysis_directory (Optional[str]): The output directory for XML analysis reports. 1041 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 1042 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1043 1044 Returns: 1045 result_dict (dict): A dictionary mapping relative file paths to tuples of (protected_file_bytes, analysis_report_bytes). 1046 """ 1047 result_dict = {} 1048 for input_file in utils.list_file_paths(input_directory): 1049 relative_path = os.path.relpath(input_file, input_directory) 1050 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 1051 output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml") 1052 1053 protected_file_bytes, analysis_report_bytes = self.protect_and_analyse_file( 1054 input_file=input_file, 1055 output_file=output_file, 1056 output_analysis_report=output_analysis_report, 1057 content_management_policy=content_management_policy, 1058 raise_unsupported=raise_unsupported, 1059 ) 1060 1061 result_dict[relative_path] = (protected_file_bytes, analysis_report_bytes) 1062 1063 return result_dict
Recursively processes all files in a directory using protect and analyse mode with the given content management policy. Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory.
Args: input_directory (str): The input directory containing files to process. output_directory (Optional[str]): The output directory for protected files. analysis_directory (Optional[str]): The output directory for XML analysis reports. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: result_dict (dict): A dictionary mapping relative file paths to tuples of (protected_file_bytes, analysis_report_bytes).
1125 def register_export(self, session: int, output_file: Optional[str] = None): 1126 """ Registers a file to be exported for the given session. The export file will be created during the session's run_session call. 1127 1128 Args: 1129 session (int): The session integer. 1130 output_file (Optional[str]): Default None. The file path where the export will be written. None exports the file in memory. 1131 1132 Returns: 1133 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call and 'session', the session integer. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. 1134 """ 1135 if not isinstance(output_file, (type(None), str)): 1136 raise TypeError(output_file) 1137 1138 if isinstance(output_file, str): 1139 output_file = os.path.abspath(output_file) 1140 1141 result = self._GW2RegisterExportFile(session, output_file) 1142 1143 elif isinstance(output_file, type(None)): 1144 result = self._GW2RegisterExportMemory(session) 1145 1146 if result.status not in successes.success_codes: 1147 log.error(format_object(result)) 1148 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1149 else: 1150 log.debug(format_object(result)) 1151 1152 return result
Registers a file to be exported for the given session. The export file will be created during the session's run_session call.
Args: session (int): The session integer. output_file (Optional[str]): Default None. The file path where the export will be written. None exports the file in memory.
Returns: gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call and 'session', the session integer. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
1154 def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1155 """ Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided. 1156 1157 Args: 1158 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 1159 output_file (Optional[str]): The output file path where the .zip file will be written. 1160 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 1161 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1162 1163 Returns: 1164 file_bytes (bytes): The exported .zip file. 1165 """ 1166 # Validate arg types 1167 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 1168 raise TypeError(input_file) 1169 if not isinstance(output_file, (type(None), str)): 1170 raise TypeError(output_file) 1171 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 1172 raise TypeError(content_management_policy) 1173 if not isinstance(raise_unsupported, bool): 1174 raise TypeError(raise_unsupported) 1175 1176 # Convert string path arguments to absolute paths 1177 if isinstance(input_file, str): 1178 if not os.path.isfile(input_file): 1179 raise FileNotFoundError(input_file) 1180 input_file = os.path.abspath(input_file) 1181 if isinstance(output_file, str): 1182 output_file = os.path.abspath(output_file) 1183 # make directories that do not exist 1184 os.makedirs(os.path.dirname(output_file), exist_ok=True) 1185 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 1186 content_management_policy = os.path.abspath(content_management_policy) 1187 1188 # Convert memory inputs to bytes 1189 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 1190 input_file = utils.as_bytes(input_file) 1191 1192 with utils.CwdHandler(self.library_path): 1193 with self.new_session() as session: 1194 content_management_policy = self.set_content_management_policy(session, content_management_policy) 1195 register_input = self.register_input(session, input_file) 1196 register_export = self.register_export(session, output_file) 1197 status = self.run_session(session) 1198 1199 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 1200 if status not in successes.success_codes: 1201 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1202 if raise_unsupported: 1203 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1204 else: 1205 file_bytes = None 1206 else: 1207 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1208 # Get file bytes 1209 if isinstance(output_file, str): 1210 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1211 if not os.path.isfile(output_file): 1212 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1213 file_bytes = None 1214 else: 1215 with open(output_file, "rb") as f: 1216 file_bytes = f.read() 1217 else: 1218 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1219 file_bytes = utils.buffer_to_bytes( 1220 register_export.buffer, 1221 register_export.buffer_length 1222 ) 1223 1224 # Ensure memory allocated is not garbage collected 1225 content_management_policy, register_input, register_export 1226 1227 return file_bytes
Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Optional[str]): The output file path where the .zip file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_bytes (bytes): The exported .zip file.
1229 def export_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1230 """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory. 1231 1232 Args: 1233 input_directory (str): The input directory containing files to export. 1234 output_directory (Optional[str]): The output directory where the export files will be written, or None to not write files. 1235 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 1236 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1237 1238 Returns: 1239 export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 1240 """ 1241 export_files_dict = {} 1242 # Call export_file on each file in input_directory to output_directory 1243 for input_file in utils.list_file_paths(input_directory): 1244 relative_path = os.path.relpath(input_file, input_directory) + ".zip" 1245 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 1246 1247 export_bytes = self.export_file( 1248 input_file=input_file, 1249 output_file=output_file, 1250 raise_unsupported=raise_unsupported, 1251 content_management_policy=content_management_policy, 1252 ) 1253 1254 export_files_dict[relative_path] = export_bytes 1255 1256 return export_files_dict
Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing files to export. output_directory (Optional[str]): The output directory where the export files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
1258 def export_and_analyse_file( 1259 self, 1260 input_file: Union[str, bytes, bytearray, io.BytesIO], 1261 output_file: Optional[str] = None, 1262 output_analysis_report: Optional[str] = None, 1263 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 1264 raise_unsupported: bool = True, 1265 ): 1266 """ Exports and analyses a file in a single session, returning both exported .zip bytes and analysis report bytes. 1267 1268 Args: 1269 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 1270 output_file (Optional[str]): The output file path where the .zip export will be written. 1271 output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written. 1272 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 1273 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1274 1275 Returns: 1276 Tuple[Optional[bytes], Optional[bytes]]: A tuple of (exported_file_bytes, analysis_report_bytes). 1277 """ 1278 # Validate arg types 1279 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 1280 raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}") 1281 if not isinstance(output_file, (type(None), str)): 1282 raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}") 1283 if not isinstance(output_analysis_report, (type(None), str)): 1284 raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}") 1285 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 1286 raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}") 1287 if not isinstance(raise_unsupported, bool): 1288 raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}") 1289 1290 # Convert string path arguments to absolute paths 1291 if isinstance(input_file, str): 1292 if not os.path.isfile(input_file): 1293 raise FileNotFoundError(input_file) 1294 input_file = os.path.abspath(input_file) 1295 if isinstance(output_file, str): 1296 output_file = os.path.abspath(output_file) 1297 os.makedirs(os.path.dirname(output_file), exist_ok=True) 1298 if isinstance(output_analysis_report, str): 1299 output_analysis_report = os.path.abspath(output_analysis_report) 1300 os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True) 1301 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 1302 content_management_policy = os.path.abspath(content_management_policy) 1303 1304 # Convert memory inputs to bytes 1305 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 1306 input_file = utils.as_bytes(input_file) 1307 1308 with utils.CwdHandler(self.library_path): 1309 with self.new_session() as session: 1310 content_management_policy = self.set_content_management_policy(session, content_management_policy) 1311 register_input = self.register_input(session, input_file) 1312 register_export = self.register_export(session, output_file) 1313 register_analysis = self.register_analysis(session, output_analysis_report) 1314 1315 status = self.run_session(session) 1316 1317 export_file_bytes = None 1318 analysis_report_bytes = None 1319 1320 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 1321 if status not in successes.success_codes: 1322 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 1323 if raise_unsupported: 1324 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1325 1326 # Get analysis report file bytes, even on processing failure 1327 if isinstance(output_analysis_report, str): 1328 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1329 if not os.path.isfile(output_analysis_report): 1330 log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}") 1331 else: 1332 with open(output_analysis_report, "rb") as f: 1333 analysis_report_bytes = f.read() 1334 else: 1335 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1336 if register_analysis.buffer and register_analysis.buffer_length: 1337 analysis_report_bytes = utils.buffer_to_bytes( 1338 register_analysis.buffer, 1339 register_analysis.buffer_length 1340 ) 1341 1342 # On success, get export file bytes 1343 if status in successes.success_codes: 1344 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}") 1345 # Get export file bytes 1346 if isinstance(output_file, str): 1347 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1348 if not os.path.isfile(output_file): 1349 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1350 else: 1351 with open(output_file, "rb") as f: 1352 export_file_bytes = f.read() 1353 else: 1354 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1355 export_file_bytes = utils.buffer_to_bytes( 1356 register_export.buffer, 1357 register_export.buffer_length 1358 ) 1359 1360 # Ensure memory allocated is not garbage collected 1361 content_management_policy, register_input, register_export, register_analysis 1362 1363 return export_file_bytes, analysis_report_bytes
Exports and analyses a file in a single session, returning both exported .zip bytes and analysis report bytes.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Optional[str]): The output file path where the .zip export will be written. output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: Tuple[Optional[bytes], Optional[bytes]]: A tuple of (exported_file_bytes, analysis_report_bytes).
1365 def export_and_analyse_directory( 1366 self, 1367 input_directory: str, 1368 output_directory: Optional[str], 1369 analysis_directory: Optional[str], 1370 content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, 1371 raise_unsupported: bool = True, 1372 ): 1373 """ Recursively processes all files in a directory using export and analyse mode with the given content management policy. 1374 Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory. 1375 1376 Args: 1377 input_directory (str): The input directory containing files to process. 1378 output_directory (Optional[str]): The output directory for exported .zip files. 1379 analysis_directory (Optional[str]): The output directory for XML analysis reports. 1380 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. 1381 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1382 1383 Returns: 1384 result_dict (dict): A dictionary mapping relative file paths to tuples of (exported_file_bytes, analysis_report_bytes). 1385 """ 1386 result_dict = {} 1387 1388 for input_file in utils.list_file_paths(input_directory): 1389 relative_path = os.path.relpath(input_file, input_directory) 1390 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path + ".zip") 1391 output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml") 1392 1393 export_file_bytes, analysis_report_bytes = self.export_and_analyse_file( 1394 input_file=input_file, 1395 output_file=output_file, 1396 output_analysis_report=output_analysis_report, 1397 content_management_policy=content_management_policy, 1398 raise_unsupported=raise_unsupported, 1399 ) 1400 1401 result_dict[relative_path] = (export_file_bytes, analysis_report_bytes) 1402 1403 return result_dict
Recursively processes all files in a directory using export and analyse mode with the given content management policy. Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory.
Args: input_directory (str): The input directory containing files to process. output_directory (Optional[str]): The output directory for exported .zip files. analysis_directory (Optional[str]): The output directory for XML analysis reports. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: result_dict (dict): A dictionary mapping relative file paths to tuples of (exported_file_bytes, analysis_report_bytes).
1466 def register_import(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]): 1467 """ Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call. 1468 1469 Args: 1470 session (int): The session integer. 1471 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input import file path or bytes. 1472 1473 Returns: 1474 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. 1475 """ 1476 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)): 1477 raise TypeError(input_file) 1478 1479 if isinstance(input_file, str): 1480 if not os.path.isfile(input_file): 1481 raise FileNotFoundError(input_file) 1482 1483 input_file = os.path.abspath(input_file) 1484 1485 result = self._GW2RegisterImportFile(session, input_file) 1486 1487 elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)): 1488 # Convert bytearray and io.BytesIO to bytes 1489 input_file = utils.as_bytes(input_file) 1490 1491 result = self._GW2RegisterImportMemory(session, input_file) 1492 1493 if result.status not in successes.success_codes: 1494 log.error(format_object(result)) 1495 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1496 else: 1497 log.debug(format_object(result)) 1498 1499 return result
Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call.
Args: session (int): The session integer. input_file (Union[str, bytes, bytearray, io.BytesIO]): The input import file path or bytes.
Returns: gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
1501 def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1502 """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided. 1503 1504 Args: 1505 input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. 1506 output_file (Optional[str]): The output file path where the constructed file will be written. 1507 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. 1508 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1509 1510 Returns: 1511 file_bytes (bytes): The imported file bytes. 1512 """ 1513 # Validate arg types 1514 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 1515 raise TypeError(input_file) 1516 if not isinstance(output_file, (type(None), str)): 1517 raise TypeError(output_file) 1518 if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 1519 raise TypeError(content_management_policy) 1520 if not isinstance(raise_unsupported, bool): 1521 raise TypeError(raise_unsupported) 1522 1523 # Convert string path arguments to absolute paths 1524 if isinstance(input_file, str): 1525 if not os.path.isfile(input_file): 1526 raise FileNotFoundError(input_file) 1527 input_file = os.path.abspath(input_file) 1528 if isinstance(output_file, str): 1529 output_file = os.path.abspath(output_file) 1530 # make directories that do not exist 1531 os.makedirs(os.path.dirname(output_file), exist_ok=True) 1532 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 1533 content_management_policy = os.path.abspath(content_management_policy) 1534 1535 # Convert memory inputs to bytes 1536 if isinstance(input_file, (bytes, bytearray, io.BytesIO)): 1537 input_file = utils.as_bytes(input_file) 1538 1539 with utils.CwdHandler(self.library_path): 1540 with self.new_session() as session: 1541 content_management_policy = self.set_content_management_policy(session, content_management_policy) 1542 register_import = self.register_import(session, input_file) 1543 register_output = self.register_output(session, output_file) 1544 status = self.run_session(session) 1545 1546 input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file 1547 if status not in successes.success_codes: 1548 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1549 if raise_unsupported: 1550 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1551 else: 1552 file_bytes = None 1553 else: 1554 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}") 1555 # Get file bytes 1556 if isinstance(output_file, str): 1557 # File to file and memory to file, Editor wrote to a file, read it to get the file bytes 1558 if not os.path.isfile(output_file): 1559 log.error(f"Editor returned success code: {status} but no output file was found: {output_file}") 1560 file_bytes = None 1561 else: 1562 with open(output_file, "rb") as f: 1563 file_bytes = f.read() 1564 else: 1565 # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes 1566 file_bytes = utils.buffer_to_bytes( 1567 register_output.buffer, 1568 register_output.buffer_length 1569 ) 1570 1571 # Ensure memory allocated is not garbage collected 1572 content_management_policy, register_import, register_output 1573 1574 return file_bytes
Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. output_file (Optional[str]): The output file path where the constructed file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: file_bytes (bytes): The imported file bytes.
1576 def import_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): 1577 """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. 1578 The constructed files are written to output_directory maintaining the same directory structure as input_directory. 1579 1580 Args: 1581 input_directory (str): The input directory containing files to import. 1582 output_directory (Optional[str]): The output directory where the constructed files will be written, or None to not write files. 1583 content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. 1584 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1585 1586 Returns: 1587 import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. 1588 """ 1589 import_files_dict = {} 1590 # Call import_file on each file in input_directory to output_directory 1591 for input_file in utils.list_file_paths(input_directory): 1592 relative_path = os.path.relpath(input_file, input_directory) 1593 # Remove .zip extension from relative_path 1594 relative_path = os.path.splitext(relative_path)[0] 1595 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 1596 1597 import_bytes = self.import_file( 1598 input_file=input_file, 1599 output_file=output_file, 1600 raise_unsupported=raise_unsupported, 1601 content_management_policy=content_management_policy, 1602 ) 1603 1604 import_files_dict[relative_path] = import_bytes 1605 1606 return import_files_dict
Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. The constructed files are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing files to import. output_directory (Optional[str]): The output directory where the constructed files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
1647 @functools.lru_cache() 1648 def file_error_message(self, session: int) -> str: 1649 """ Retrieve the Glasswall Session Process error message. 1650 1651 Args: 1652 session (int): The session integer. 1653 1654 Returns: 1655 error_message (str): The Glasswall Session Process error message. 1656 """ 1657 # Validate arg types 1658 if not isinstance(session, int): 1659 raise TypeError(session) 1660 1661 result = self._GW2FileErrorMsg(session) 1662 1663 if result.status not in successes.success_codes: 1664 log.error(format_object(result)) 1665 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1666 else: 1667 log.debug(format_object(result)) 1668 1669 return result.error_message
Retrieve the Glasswall Session Process error message.
Args: session (int): The session integer.
Returns: error_message (str): The Glasswall Session Process error message.
1671 def GW2GetFileType(self, session: int, file_type_id): 1672 """ Retrieve the file type as a string. 1673 1674 Args: 1675 session (int): The session integer. 1676 file_type_id (int): The file type id. 1677 1678 Returns: 1679 file_type (str): The formal file name for the corresponding file id. 1680 """ 1681 # Validate arg types 1682 if not isinstance(session, int): 1683 raise TypeError(session) 1684 1685 # API function declaration 1686 self.library.GW2GetFileType.argtypes = [ 1687 ct.c_size_t, 1688 ct.c_size_t, 1689 ct.POINTER(ct.c_size_t), 1690 ct.POINTER(ct.c_void_p) 1691 ] 1692 1693 # Variable initialisation 1694 ct_session = ct.c_size_t(session) 1695 ct_file_type = ct.c_size_t(file_type_id) 1696 ct_buffer_length = ct.c_size_t() 1697 ct_buffer = ct.c_void_p() 1698 1699 # API call 1700 status = self.library.GW2GetFileType( 1701 ct_session, 1702 ct_file_type, 1703 ct.byref(ct_buffer_length), 1704 ct.byref(ct_buffer) 1705 ) 1706 1707 if status not in successes.success_codes: 1708 log.error(f"\n\tsession: {session}\n\tstatus: {status}") 1709 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1710 else: 1711 log.debug(f"\n\tsession: {session}\n\tstatus: {status}") 1712 1713 # Editor wrote to a buffer, convert it to bytes 1714 file_type_bytes = utils.buffer_to_bytes( 1715 ct_buffer, 1716 ct_buffer_length 1717 ) 1718 1719 file_type = file_type_bytes.decode() 1720 1721 return file_type
Retrieve the file type as a string.
Args: session (int): The session integer. file_type_id (int): The file type id.
Returns: file_type (str): The formal file name for the corresponding file id.
1723 def GW2GetFileTypeID(self, session: int, file_type_str): 1724 """ Retrieve the Glasswall file type id given a file type string. 1725 1726 Args: 1727 session (int): The session integer. 1728 file_type_str (str): The file type as a string. 1729 1730 Returns: 1731 file_type_id (str): The Glasswall file type id for the specified file type. 1732 """ 1733 # Validate arg types 1734 if not isinstance(session, int): 1735 raise TypeError(session) 1736 1737 # API function declaration 1738 self.library.GW2GetFileTypeID.argtypes = [ 1739 ct.c_size_t, 1740 ct.c_char_p, 1741 ct.POINTER(ct.c_size_t), 1742 ct.POINTER(ct.c_void_p) 1743 ] 1744 1745 # Variable initialisation 1746 ct_session = ct.c_size_t(session) 1747 ct_file_type = ct.c_char_p(file_type_str.encode('utf-8')) 1748 ct_buffer_length = ct.c_size_t() 1749 ct_buffer = ct.c_void_p() 1750 1751 # API call 1752 status = self.library.GW2GetFileTypeID( 1753 ct_session, 1754 ct_file_type, 1755 ct.byref(ct_buffer_length), 1756 ct.byref(ct_buffer) 1757 ) 1758 1759 if status not in successes.success_codes: 1760 log.error(f"\n\tsession: {session}\n\tstatus: {status}") 1761 raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) 1762 else: 1763 log.debug(f"\n\tsession: {session}\n\tstatus: {status}") 1764 1765 # Editor wrote to a buffer, convert it to bytes 1766 file_type_bytes = utils.buffer_to_bytes( 1767 ct_buffer, 1768 ct_buffer_length 1769 ) 1770 1771 file_type_id = file_type_bytes.decode() 1772 1773 return file_type_id
Retrieve the Glasswall file type id given a file type string.
Args: session (int): The session integer. file_type_str (str): The file type as a string.
Returns: file_type_id (str): The Glasswall file type id for the specified file type.
1775 def get_file_type_info(self, file_type: Union[str, int]): 1776 """ Retrieve information about a file type based on its identifier. 1777 1778 Args: 1779 file_type (Union[str, int]): The file type identifier. This can be either a string representing a file 1780 extension (e.g. 'bmp') or an integer corresponding to a file type (e.g. 29). 1781 1782 Returns: 1783 - file_type_info (Union[int, str]): Depending on the input 'file_type': 1784 - If `file_type` is a string (e.g. 'bmp'): 1785 - If the file type is recognised, returns an integer corresponding to that file type. 1786 - If the file type is not recognised, returns 0. 1787 - If `file_type` is an integer (e.g. 29): 1788 - If the integer corresponds to a recognised file type, returns a more detailed string description 1789 of the file type (e.g. 'BMP Image'). 1790 - If the integer does not match any recognised file type, returns an empty string. 1791 """ 1792 # Validate arg types 1793 if not isinstance(file_type, (str, int)): 1794 raise TypeError(file_type) 1795 1796 with utils.CwdHandler(self.library_path): 1797 with self.new_session() as session: 1798 1799 if isinstance(file_type, int): 1800 file_type_info = self.GW2GetFileType(session, file_type) 1801 if isinstance(file_type, str): 1802 file_type_info = self.GW2GetFileTypeID(session, file_type) 1803 1804 return file_type_info
Retrieve information about a file type based on its identifier.
Args: file_type (Union[str, int]): The file type identifier. This can be either a string representing a file extension (e.g. 'bmp') or an integer corresponding to a file type (e.g. 29).
Returns:
- file_type_info (Union[int, str]): Depending on the input 'file_type':
- If file_type
is a string (e.g. 'bmp'):
- If the file type is recognised, returns an integer corresponding to that file type.
- If the file type is not recognised, returns 0.
- If file_type
is an integer (e.g. 29):
- If the integer corresponds to a recognised file type, returns a more detailed string description
of the file type (e.g. 'BMP Image').
- If the integer does not match any recognised file type, returns an empty string.
1806 @utils.deprecated_function(replacement_function=get_file_type_info) 1807 def get_file_info(self, *args, **kwargs): 1808 """ Deprecated in 1.0.6. Use get_file_type_info. """ 1809 pass
Deprecated in 1.0.6. Use get_file_type_info.
1840 def register_report_file(self, session: int, output_file: str): 1841 """ Register the report file path for the given session. 1842 1843 Args: 1844 session (int): The session integer. 1845 output_file (str): The file path of the report file. 1846 1847 Returns: 1848 gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'. 1849 """ 1850 # Validate arg types 1851 if not isinstance(session, int): 1852 raise TypeError(session) 1853 if not isinstance(output_file, (type(None), str)): 1854 raise TypeError(output_file) 1855 1856 result = self._GW2RegisterReportFile(session, output_file) 1857 1858 if result.status not in successes.success_codes: 1859 log.error(format_object(result)) 1860 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1861 else: 1862 log.debug(format_object(result)) 1863 1864 return result
Register the report file path for the given session.
Args: session (int): The session integer. output_file (str): The file path of the report file.
Returns: gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
1910 def get_id_info(self, issue_id: int, raise_unsupported: bool = True): 1911 """ Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances" 1912 1913 Args: 1914 issue_id (int): The issue id. 1915 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1916 1917 Returns: 1918 id_info (str): The group description for the given Issue ID. 1919 """ 1920 # Validate arg types 1921 if not isinstance(issue_id, int): 1922 raise TypeError(issue_id) 1923 1924 with utils.CwdHandler(self.library_path): 1925 with self.new_session() as session: 1926 result = self._GW2GetIdInfo(session, issue_id) 1927 1928 if result.status not in successes.success_codes: 1929 log.error(format_object(result)) 1930 if raise_unsupported: 1931 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1932 else: 1933 log.debug(format_object(result)) 1934 1935 return result.id_info
Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances"
Args: issue_id (int): The issue id. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: id_info (str): The group description for the given Issue ID.
1975 def get_all_id_info(self, output_file: Optional[str] = None, raise_unsupported: bool = True) -> str: 1976 """ Retrieves the XML containing all the Issue ID ranges with their group descriptions 1977 1978 Args: 1979 output_file (Optional[str]): The output file path where the analysis file will be written. 1980 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 1981 1982 Returns: 1983 all_id_info (str): A string XML analysis report containing all id info. 1984 """ 1985 # Validate arg types 1986 if not isinstance(output_file, (type(None), str)): 1987 raise TypeError(output_file) 1988 if isinstance(output_file, str): 1989 output_file = os.path.abspath(output_file) 1990 1991 with utils.CwdHandler(self.library_path): 1992 with self.new_session() as session: 1993 result = self._GW2GetAllIdInfo(session) 1994 1995 if result.status not in successes.success_codes: 1996 log.error(format_object(result)) 1997 if raise_unsupported: 1998 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 1999 else: 2000 log.debug(format_object(result)) 2001 2002 if isinstance(output_file, str): 2003 # GW2GetAllIdInfo is memory only, write to file 2004 # make directories that do not exist 2005 os.makedirs(os.path.dirname(output_file), exist_ok=True) 2006 with open(output_file, "w") as f: 2007 f.write(result.all_id_info) 2008 2009 return result.all_id_info
Retrieves the XML containing all the Issue ID ranges with their group descriptions
Args: output_file (Optional[str]): The output file path where the analysis file will be written. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: all_id_info (str): A string XML analysis report containing all id info.
2056 def file_session_status_message(self, session: int, raise_unsupported: bool = True) -> str: 2057 """ Retrieves the Glasswall session status message. Gives a high level indication of the processing that was carried out. 2058 2059 Args: 2060 session (int): The session integer. 2061 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 2062 2063 Returns: 2064 result.message (str):The file session status message. 2065 """ 2066 # Validate arg types 2067 if not isinstance(session, int): 2068 raise TypeError(session) 2069 2070 result = self._GW2FileSessionStatus(session) 2071 2072 if result.status not in successes.success_codes: 2073 log.error(format_object(result)) 2074 if raise_unsupported: 2075 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 2076 else: 2077 log.debug(format_object(result)) 2078 2079 return result.message
Retrieves the Glasswall session status message. Gives a high level indication of the processing that was carried out.
Args: session (int): The session integer. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: result.message (str):The file session status message.
2105 def licence_details(self): 2106 """ Returns a string containing details of the licence. 2107 2108 Returns: 2109 result (str): A string containing details of the licence. 2110 """ 2111 with self.new_session() as session: 2112 result = self._GW2LicenceDetails(session) 2113 2114 log.debug(f"\n\tsession: {session}\n\tGW2LicenceDetails: {result}") 2115 2116 return result
Returns a string containing details of the licence.
Returns: result (str): A string containing details of the licence.
2239 def register_licence(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]): 2240 """ Registers a "gwkey.lic" licence from file path or memory. 2241 2242 Args: 2243 session (int): The session integer. 2244 input_file (Union[str, bytes, bytearray, io.BytesIO]): The "gwkey.lic" licence. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object. 2245 2246 Returns: 2247 - result (glasswall.GwReturnObj): Depending on the input 'input_file': 2248 - If input_file is a str file path: 2249 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'. 2250 2251 - If input_file is a file in memory: 2252 - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'. 2253 """ 2254 if isinstance(input_file, str): 2255 if not os.path.isfile(input_file): 2256 raise FileNotFoundError(input_file) 2257 2258 input_file = os.path.abspath(input_file) 2259 2260 result = self._GW2RegisterLicenceFile(session, input_file) 2261 2262 elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)): 2263 # Convert bytearray and io.BytesIO to bytes 2264 input_file = utils.as_bytes(input_file) 2265 2266 result = self._GW2RegisterLicenceMemory(session, input_file) 2267 2268 else: 2269 raise TypeError(input_file) 2270 2271 if result.status not in successes.success_codes: 2272 log.error(format_object(result)) 2273 raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status) 2274 else: 2275 log.debug(format_object(result)) 2276 2277 return result
Registers a "gwkey.lic" licence from file path or memory.
Args: session (int): The session integer. input_file (Union[str, bytes, bytearray, io.BytesIO]): The "gwkey.lic" licence. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object.
Returns: - result (glasswall.GwReturnObj): Depending on the input 'input_file': - If input_file is a str file path: - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
- If input_file is a file in memory:
- gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.