glasswall.libraries.editor.editor

   1import ctypes as ct
   2import functools
   3import io
   4import os
   5from contextlib import contextmanager
   6from typing import Union, Optional
   7
   8import glasswall
   9from glasswall import determine_file_type as dft
  10from glasswall import utils
  11from glasswall.config.logging import log, format_object
  12from glasswall.libraries.editor import errors, successes
  13from glasswall.libraries.library import Library
  14
  15
  16class Editor(Library):
  17    """ A high level Python wrapper for Glasswall Editor / Core2. """
  18
  19    def __init__(self, library_path: str, licence: Union[str, bytes, bytearray, io.BytesIO] = None):
  20        """ Initialise the Editor instance.
  21
  22        Args:
  23            library_path (str): The file or directory path to the Editor library.
  24            licence (str, bytes, bytearray, or io.BytesIO, optional): The licence file content or path. This can be:
  25                - A string representing the file path to the licence.
  26                - A `bytes` or `bytearray` object containing the licence data.
  27                - An `io.BytesIO` object for in-memory licence data.
  28                If not specified, it is assumed that the licence file is located in the same directory as the `library_path`.
  29        """
  30        super().__init__(library_path)
  31        self.library = self.load_library(os.path.abspath(library_path))
  32        self.licence = licence
  33
  34        # Validate killswitch has not activated
  35        self.validate_licence()
  36
  37        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
  38
  39    def validate_licence(self):
  40        """ Validates the licence of the library by checking the licence details.
  41
  42        Raises:
  43            LicenceExpired: If the licence has expired or could not be validated.
  44        """
  45        licence_details = self.licence_details()
  46
  47        bad_details = [
  48            "Unable to Read Licence Key File",
  49            "Licence File Missing Required Contents",
  50            "Licence Expired",
  51        ]
  52
  53        if any(bad_detail.lower() in licence_details.lower() for bad_detail in bad_details):
  54            # bad_details found in licence_details
  55            log.error(f"{self.__class__.__name__} licence validation failed. Licence details:\n{licence_details}")
  56            raise errors.LicenceExpired(licence_details)
  57        else:
  58            log.debug(f"{self.__class__.__name__} licence validated successfully. Licence details:\n{licence_details}")
  59
  60    def version(self):
  61        """ Returns the Glasswall library version.
  62
  63        Returns:
  64            version (str): The Glasswall library version.
  65        """
  66        # API function declaration
  67        self.library.GW2LibVersion.restype = ct.c_char_p
  68
  69        # API call
  70        version = self.library.GW2LibVersion()
  71
  72        # Convert to Python string
  73        version = ct.string_at(version).decode()
  74
  75        return version
  76
  77    def open_session(self):
  78        """ Open a new Glasswall session.
  79
  80        Returns:
  81            session (int): An incrementing integer repsenting the current session.
  82        """
  83        # API call
  84        session = self.library.GW2OpenSession()
  85
  86        log.debug(f"\n\tsession: {session}")
  87
  88        if self.licence:
  89            # Must register licence on each GW2OpenSession if the licence is not in the same directory as the library
  90            self.register_licence(session, self.licence)
  91
  92        return session
  93
  94    def close_session(self, session: int) -> int:
  95        """ Close the Glasswall session. All resources allocated by the session will be destroyed.
  96
  97        Args:
  98            session (int): The session to close.
  99
 100        Returns:
 101            status (int): The status code of the function call.
 102        """
 103        if not isinstance(session, int):
 104            raise TypeError(session)
 105
 106        # API function declaration
 107        self.library.GW2CloseSession.argtypes = [ct.c_size_t]
 108
 109        # Variable initialisation
 110        ct_session = ct.c_size_t(session)
 111
 112        # API call
 113        status = self.library.GW2CloseSession(ct_session)
 114
 115        if status not in successes.success_codes:
 116            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
 117        else:
 118            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")
 119
 120        return status
 121
 122    @contextmanager
 123    def new_session(self):
 124        """ Context manager. Opens a new session on entry and closes the session on exit. """
 125        try:
 126            session = self.open_session()
 127            yield session
 128        finally:
 129            self.close_session(session)
 130
 131    def run_session(self, session):
 132        """ Runs the Glasswall session and begins processing of a file.
 133
 134        Args:
 135            session (int): The session to run.
 136
 137        Returns:
 138            status (int): The status code of the function call.
 139        """
 140        # API function declaration
 141        self.library.GW2RunSession.argtypes = [ct.c_size_t]
 142
 143        # Variable initialisation
 144        ct_session = ct.c_size_t(session)
 145
 146        # API call
 147        status = self.library.GW2RunSession(ct_session)
 148
 149        if status not in successes.success_codes:
 150            log.error(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}")
 151        else:
 152            log.debug(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}")
 153
 154        return status
 155
 156    def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True) -> Union[int, str]:
 157        """ Determine the file type of a given input file, either as an integer identifier or a string.
 158
 159        Args:
 160            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file to analyse. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object.
 161            as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False.
 162            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 163
 164        Returns:
 165            file_type (Union[int, str]): The file type.
 166        """
 167        if isinstance(input_file, str):
 168            if not os.path.isfile(input_file):
 169                raise FileNotFoundError(input_file)
 170
 171            # convert to ct.c_char_p of bytes
 172            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
 173
 174            # API call
 175            file_type = self.library.GW2DetermineFileTypeFromFile(ct_input_file)
 176
 177        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 178            # convert to bytes
 179            bytes_input_file = utils.as_bytes(input_file)
 180
 181            # ctypes conversion
 182            ct_buffer = ct.c_char_p(bytes_input_file)
 183            ct_buffer_length = ct.c_size_t(len(bytes_input_file))
 184
 185            # API call
 186            file_type = self.library.GW2DetermineFileTypeFromMemory(
 187                ct_buffer,
 188                ct_buffer_length
 189            )
 190
 191        else:
 192            raise TypeError(input_file)
 193
 194        file_type_as_string = dft.file_type_int_to_str(file_type)
 195        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 196
 197        if not dft.is_success(file_type):
 198            log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
 199            if raise_unsupported:
 200                raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
 201        else:
 202            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
 203
 204        if as_string:
 205            return file_type_as_string
 206
 207        return file_type
 208
 209    def _GW2GetPolicySettings(self, session: int, policy_format: int = 0):
 210        """ Get current policy settings for the given session.
 211
 212        Args:
 213            session (int): The session integer.
 214            policy_format (int): The format of the content management policy. 0=XML.
 215
 216        Returns:
 217            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status', 'policy'.
 218        """
 219        # API function declaration
 220        self.library.GW2GetPolicySettings.argtypes = [
 221            ct.c_size_t,  # Session_Handle session
 222            ct.POINTER(ct.c_void_p),  # char ** policiesBuffer
 223            ct.POINTER(ct.c_size_t),  # size_t * policiesLength
 224            ct.c_int,  # Policy_Format format
 225        ]
 226
 227        # Variable initialisation
 228        gw_return_object = glasswall.GwReturnObj()
 229        gw_return_object.session = ct.c_size_t(session)
 230        gw_return_object.buffer = ct.c_void_p()
 231        gw_return_object.buffer_length = ct.c_size_t()
 232        gw_return_object.policy_format = ct.c_int(policy_format)
 233
 234        # API Call
 235        gw_return_object.status = self.library.GW2GetPolicySettings(
 236            gw_return_object.session,
 237            ct.byref(gw_return_object.buffer),
 238            ct.byref(gw_return_object.buffer_length),
 239            gw_return_object.policy_format
 240        )
 241
 242        # Editor wrote to a buffer, convert it to bytes
 243        policy_bytes = utils.buffer_to_bytes(
 244            gw_return_object.buffer,
 245            gw_return_object.buffer_length
 246        )
 247
 248        gw_return_object.policy = policy_bytes.decode()
 249
 250        return gw_return_object
 251
 252    def get_content_management_policy(self, session: int):
 253        """ Returns the content management configuration for a given session.
 254
 255        Args:
 256            session (int): The session integer.
 257
 258        Returns:
 259            xml_string (str): The XML string of the current content management configuration.
 260        """
 261        # NOTE GW2GetPolicySettings is current not implemented in editor
 262
 263        # set xml_string as loaded default config
 264        xml_string = glasswall.content_management.policies.Editor(default="sanitise").text,
 265
 266        # log.debug(f"xml_string:\n{xml_string}")
 267
 268        return xml_string
 269
 270        # # API function declaration
 271        # self.library.GW2GetPolicySettings.argtypes = [
 272        #     ct.c_size_t,
 273        #     ct.c_void_p,
 274        # ]
 275
 276        # # Variable initialisation
 277        # ct_session = ct.c_size_t(session)
 278        # ct_buffer = ct.c_void_p()
 279        # ct_buffer_length = ct.c_size_t()
 280        # # ct_file_format = ct.c_int(file_format)
 281
 282        # # API Call
 283        # status = self.library.GW2GetPolicySettings(
 284        #     ct_session,
 285        #     ct.byref(ct_buffer),
 286        #     ct.byref(ct_buffer_length)
 287        # )
 288
 289        # print("GW2GetPolicySettings status:", status)
 290
 291        # file_bytes = utils.buffer_to_bytes(
 292        #     ct_buffer,
 293        #     ct_buffer_length,
 294        # )
 295
 296        # return file_bytes
 297
 298    def _GW2RegisterPoliciesFile(self, session: int, input_file: str, policy_format: int = 0):
 299        """ Registers the policies to be used by Glasswall when processing files.
 300
 301        Args:
 302            session (int): The session integer.
 303            input_file (str): The content management policy input file path.
 304            policy_format (int): The format of the content management policy. 0=XML.
 305
 306        Returns:
 307            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'.
 308        """
 309        # API function declaration
 310        self.library.GW2RegisterPoliciesFile.argtypes = [
 311            ct.c_size_t,  # Session_Handle session
 312            ct.c_char_p,  # const char *filename
 313            ct.c_int,  # Policy_Format format
 314        ]
 315
 316        # Variable initialisation
 317        gw_return_object = glasswall.GwReturnObj()
 318        gw_return_object.session = ct.c_size_t(session)
 319        gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8"))
 320        gw_return_object.policy_format = ct.c_int(policy_format)
 321
 322        gw_return_object.status = self.library.GW2RegisterPoliciesFile(
 323            gw_return_object.session,
 324            gw_return_object.input_file,
 325            gw_return_object.policy_format
 326        )
 327
 328        return gw_return_object
 329
 330    def _GW2RegisterPoliciesMemory(self, session: int, input_file: bytes, policy_format: int = 0):
 331        """ Registers the policies in memory to be used by Glasswall when processing files.
 332
 333        Args:
 334            session (int): The session integer.
 335            input_file (str): The content management policy input file bytes.
 336            policy_format (int): The format of the content management policy. 0=XML.
 337
 338        Returns:
 339            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'.
 340        """
 341        # API function declaration
 342        self.library.GW2RegisterPoliciesMemory.argtype = [
 343            ct.c_size_t,  # Session_Handle session
 344            ct.c_char_p,  # const char *policies
 345            ct.c_size_t,  # size_t policiesLength
 346            ct.c_int  # Policy_Format format
 347        ]
 348
 349        # Variable initialisation
 350        gw_return_object = glasswall.GwReturnObj()
 351        gw_return_object.session = ct.c_size_t(session)
 352        gw_return_object.buffer = ct.c_char_p(input_file)
 353        gw_return_object.buffer_length = ct.c_size_t(len(input_file))
 354        gw_return_object.policy_format = ct.c_int(policy_format)
 355
 356        # API Call
 357        gw_return_object.status = self.library.GW2RegisterPoliciesMemory(
 358            gw_return_object.session,
 359            gw_return_object.buffer,
 360            gw_return_object.buffer_length,
 361            gw_return_object.policy_format
 362        )
 363
 364        return gw_return_object
 365
 366    def set_content_management_policy(self, session: int, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, policy_format=0):
 367        """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.
 368
 369        Args:
 370            session (int): The session integer.
 371            input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 372            policy_format (int): The format of the content management policy. 0=XML.
 373
 374        Returns:
 375            - result (glasswall.GwReturnObj): Depending on the input 'input_file':
 376                - If input_file is a str file path:
 377                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'.
 378
 379                - If input_file is a file in memory:
 380                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'.
 381        """
 382        # Validate type
 383        if not isinstance(session, int):
 384            raise TypeError(session)
 385        if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 386            raise TypeError(input_file)
 387        if not isinstance(policy_format, int):
 388            raise TypeError(policy_format)
 389
 390        # Set input_file to default if input_file is None
 391        if input_file is None:
 392            input_file = glasswall.content_management.policies.Editor(default="sanitise")
 393
 394        # Validate xml content is parsable
 395        utils.validate_xml(input_file)
 396
 397        # From file
 398        if isinstance(input_file, str) and os.path.isfile(input_file):
 399            input_file = os.path.abspath(input_file)
 400
 401            result = self._GW2RegisterPoliciesFile(session, input_file)
 402
 403        # From memory
 404        elif isinstance(input_file, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 405            # Convert bytearray, io.BytesIO to bytes
 406            if isinstance(input_file, (bytearray, io.BytesIO)):
 407                input_file = utils.as_bytes(input_file)
 408            # Convert string xml or Policy to bytes
 409            if isinstance(input_file, (str, glasswall.content_management.policies.policy.Policy)):
 410                input_file = input_file.encode("utf-8")
 411
 412            result = self._GW2RegisterPoliciesMemory(session, input_file)
 413
 414        if result.status not in successes.success_codes:
 415            log.error(format_object(result))
 416            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
 417        else:
 418            log.debug(format_object(result))
 419
 420        return result
 421
 422    def _GW2RegisterInputFile(self, session: int, input_file: str):
 423        """ Register an input file for the given session.
 424
 425        Args:
 426            session (int): The session integer.
 427            input_file (str): The input file path.
 428
 429        Returns:
 430            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
 431        """
 432        # API function declaration
 433        self.library.GW2RegisterInputFile.argtypes = [
 434            ct.c_size_t,  # Session_Handle session
 435            ct.c_char_p  # const char * inputFilePath
 436        ]
 437
 438        # Variable initialisation
 439        gw_return_object = glasswall.GwReturnObj()
 440        gw_return_object.session = ct.c_size_t(session)
 441        gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8"))
 442        gw_return_object.input_file_path = input_file
 443
 444        # API call
 445        gw_return_object.status = self.library.GW2RegisterInputFile(
 446            gw_return_object.session,
 447            gw_return_object.input_file
 448        )
 449
 450        return gw_return_object
 451
 452    def _GW2RegisterInputMemory(self, session: int, input_file: bytes):
 453        """ Register an input file in memory for the given session.
 454
 455        Args:
 456            session (int): The session integer.
 457            input_file (bytes): The input file in memory.
 458
 459        Returns:
 460            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
 461        """
 462        # API function declaration
 463        self.library.GW2RegisterInputMemory.argtypes = [
 464            ct.c_size_t,  # Session_Handle session
 465            ct.c_char_p,  # const char * inputFileBuffer
 466            ct.c_size_t,  # size_t inputLength
 467        ]
 468
 469        # Variable initialisation
 470        gw_return_object = glasswall.GwReturnObj()
 471        gw_return_object.session = ct.c_size_t(session)
 472        gw_return_object.buffer = ct.c_char_p(input_file)
 473        gw_return_object.buffer_length = ct.c_size_t(len(input_file))
 474
 475        # API call
 476        gw_return_object.status = self.library.GW2RegisterInputMemory(
 477            gw_return_object.session,
 478            gw_return_object.buffer,
 479            gw_return_object.buffer_length
 480        )
 481
 482        return gw_return_object
 483
 484    def register_input(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
 485        """ Register an input file or bytes for the given session.
 486
 487        Args:
 488            session (int): The session integer.
 489            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 490
 491        Returns:
 492            - result (glasswall.GwReturnObj): Depending on the input 'input_file':
 493                - If input_file is a str file path:
 494                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
 495
 496                - If input_file is a file in memory:
 497                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
 498        """
 499        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
 500            raise TypeError(input_file)
 501
 502        if isinstance(input_file, str):
 503            if not os.path.isfile(input_file):
 504                raise FileNotFoundError(input_file)
 505
 506            input_file = os.path.abspath(input_file)
 507
 508            result = self._GW2RegisterInputFile(session, input_file)
 509
 510        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
 511            # Convert bytearray and io.BytesIO to bytes
 512            input_file = utils.as_bytes(input_file)
 513
 514            result = self._GW2RegisterInputMemory(session, input_file)
 515
 516        if result.status not in successes.success_codes:
 517            log.error(format_object(result))
 518            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
 519        else:
 520            log.debug(format_object(result))
 521
 522        return result
 523
 524    def _GW2RegisterOutFile(self, session: int, output_file: str):
 525        """ Register an output file for the given session.
 526
 527        Args:
 528            session (int): The session integer.
 529            output_file (str): The output file path.
 530
 531        Returns:
 532            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
 533        """
 534        # API function declaration
 535        self.library.GW2RegisterOutFile.argtypes = [
 536            ct.c_size_t,  # Session_Handle session
 537            ct.c_char_p  # const char * outputFilePath
 538        ]
 539
 540        # Variable initialisation
 541        gw_return_object = glasswall.GwReturnObj()
 542        gw_return_object.session = ct.c_size_t(session)
 543        gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8"))
 544
 545        # API call
 546        gw_return_object.status = self.library.GW2RegisterOutFile(
 547            gw_return_object.session,
 548            gw_return_object.output_file
 549        )
 550
 551        return gw_return_object
 552
 553    def _GW2RegisterOutputMemory(self, session: int):
 554        """ Register an output file in memory for the given session.
 555
 556        Args:
 557            session (int): The session integer.
 558
 559        Returns:
 560            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
 561        """
 562        # API function declaration
 563        self.library.GW2RegisterOutputMemory.argtypes = [
 564            ct.c_size_t,  # Session_Handle session
 565            ct.POINTER(ct.c_void_p),  # char ** outputBuffer
 566            ct.POINTER(ct.c_size_t)  # size_t * outputLength
 567        ]
 568
 569        # Variable initialisation
 570        gw_return_object = glasswall.GwReturnObj()
 571        gw_return_object.session = ct.c_size_t(session)
 572        gw_return_object.buffer = ct.c_void_p()
 573        gw_return_object.buffer_length = ct.c_size_t()
 574
 575        # API call
 576        gw_return_object.status = self.library.GW2RegisterOutputMemory(
 577            gw_return_object.session,
 578            ct.byref(gw_return_object.buffer),
 579            ct.byref(gw_return_object.buffer_length)
 580        )
 581
 582        return gw_return_object
 583
 584    def register_output(self, session, output_file: Optional[str] = None):
 585        """ Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes.
 586
 587        Args:
 588            session (int): The session integer.
 589            output_file (Optional[str]): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes.
 590
 591        Returns:
 592            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
 593        """
 594        if not isinstance(output_file, (type(None), str)):
 595            raise TypeError(output_file)
 596
 597        if isinstance(output_file, str):
 598            output_file = os.path.abspath(output_file)
 599
 600            result = self._GW2RegisterOutFile(session, output_file)
 601
 602        elif isinstance(output_file, type(None)):
 603            result = self._GW2RegisterOutputMemory(session)
 604
 605        if result.status not in successes.success_codes:
 606            log.error(format_object(result))
 607            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
 608        else:
 609            log.debug(format_object(result))
 610
 611        return result
 612
 613    def _GW2RegisterAnalysisFile(self, session: int, output_file: str, analysis_format: int = 0):
 614        """ Register an analysis output file for the given session.
 615
 616        Args:
 617            session (int): The session integer.
 618            output_file (str): The analysis output file path.
 619            analysis_format (int): The format of the analysis report. 0=XML, 1=XMLExtended
 620
 621        Returns:
 622            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'analysis_format', 'status'.
 623        """
 624        # API function declaration
 625        self.library.GW2RegisterAnalysisFile.argtypes = [
 626            ct.c_size_t,  # Session_Handle session
 627            ct.c_char_p,  # const char * analysisFilePathName
 628            ct.c_int,  # Analysis_Format format
 629        ]
 630
 631        # Variable initialisation
 632        gw_return_object = glasswall.GwReturnObj()
 633        gw_return_object.session = ct.c_size_t(session)
 634        gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8"))
 635        gw_return_object.analysis_format = ct.c_int()
 636
 637        # API call
 638        gw_return_object.status = self.library.GW2RegisterAnalysisFile(
 639            gw_return_object.session,
 640            gw_return_object.output_file,
 641            gw_return_object.analysis_format
 642        )
 643
 644        return gw_return_object
 645
 646    def _GW2RegisterAnalysisMemory(self, session: int, analysis_format: int = 0):
 647        """ Register an analysis output file in memory for the given session.
 648
 649        Args:
 650            session (int): The session integer.
 651            analysis_format (int): The format of the analysis report. 0=XML, 1=XMLExtended
 652
 653        Returns:
 654            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'analysis_format', 'status'.
 655        """
 656        # API function declaration
 657        self.library.GW2RegisterAnalysisMemory.argtypes = [
 658            ct.c_size_t,  # Session_Handle session
 659            ct.POINTER(ct.c_void_p),  # char ** analysisFileBuffer
 660            ct.POINTER(ct.c_size_t),  # size_t * analysisoutputLength
 661            ct.c_int  # Analysis_Format format
 662        ]
 663
 664        # Variable initialisation
 665        gw_return_object = glasswall.GwReturnObj()
 666        gw_return_object.session = ct.c_size_t(session)
 667        gw_return_object.buffer = ct.c_void_p()
 668        gw_return_object.buffer_length = ct.c_size_t()
 669        gw_return_object.analysis_format = ct.c_int()
 670
 671        # API call
 672        gw_return_object.status = self.library.GW2RegisterAnalysisMemory(
 673            gw_return_object.session,
 674            ct.byref(gw_return_object.buffer),
 675            ct.byref(gw_return_object.buffer_length),
 676            gw_return_object.analysis_format
 677        )
 678
 679        return gw_return_object
 680
 681    def register_analysis(self, session: int, output_file: Optional[str] = None):
 682        """ Registers an analysis file for the given session. The analysis file will be created during the session's run_session call.
 683
 684        Args:
 685            session (int): The session integer.
 686            output_file (Optional[str]): Default None. The file path where the analysis will be written. None returns the analysis as bytes.
 687
 688        Returns:
 689            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'status', 'session', 'analysis_format'. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. If output_file is not None (file mode) 'output_file' is included.
 690        """
 691        if not isinstance(output_file, (type(None), str)):
 692            raise TypeError(output_file)
 693
 694        if isinstance(output_file, str):
 695            output_file = os.path.abspath(output_file)
 696
 697            result = self._GW2RegisterAnalysisFile(session, output_file)
 698
 699        elif isinstance(output_file, type(None)):
 700            result = self._GW2RegisterAnalysisMemory(session)
 701
 702        if result.status not in successes.success_codes:
 703            log.error(format_object(result))
 704            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
 705        else:
 706            log.debug(format_object(result))
 707
 708        return result
 709
 710    def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 711        """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.
 712
 713        Args:
 714            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 715            output_file (Optional[str]): The output file path where the protected file will be written.
 716            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
 717            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 718
 719        Returns:
 720            file_bytes (bytes): The protected file bytes.
 721        """
 722        # Validate arg types
 723        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 724            raise TypeError(input_file)
 725        if not isinstance(output_file, (type(None), str)):
 726            raise TypeError(output_file)
 727        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 728            raise TypeError(content_management_policy)
 729        if not isinstance(raise_unsupported, bool):
 730            raise TypeError(raise_unsupported)
 731
 732        # Convert string path arguments to absolute paths
 733        if isinstance(input_file, str):
 734            if not os.path.isfile(input_file):
 735                raise FileNotFoundError(input_file)
 736            input_file = os.path.abspath(input_file)
 737        if isinstance(output_file, str):
 738            output_file = os.path.abspath(output_file)
 739            # make directories that do not exist
 740            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 741        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 742            content_management_policy = os.path.abspath(content_management_policy)
 743
 744        # Convert memory inputs to bytes
 745        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 746            input_file = utils.as_bytes(input_file)
 747
 748        with utils.CwdHandler(self.library_path):
 749            with self.new_session() as session:
 750                content_management_policy = self.set_content_management_policy(session, content_management_policy)
 751                register_input = self.register_input(session, input_file)
 752                register_output = self.register_output(session, output_file=output_file)
 753                status = self.run_session(session)
 754
 755                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 756                if status not in successes.success_codes:
 757                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
 758                    if raise_unsupported:
 759                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 760                    else:
 761                        file_bytes = None
 762                else:
 763                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
 764                    # Get file bytes
 765                    if isinstance(output_file, str):
 766                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
 767                        if not os.path.isfile(output_file):
 768                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
 769                            file_bytes = None
 770                        else:
 771                            with open(output_file, "rb") as f:
 772                                file_bytes = f.read()
 773                    else:
 774                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
 775                        file_bytes = utils.buffer_to_bytes(
 776                            register_output.buffer,
 777                            register_output.buffer_length
 778                        )
 779
 780                # Ensure memory allocated is not garbage collected
 781                content_management_policy, register_input, register_output
 782
 783                return file_bytes
 784
 785    def protect_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 786        """ Recursively processes all files in a directory in protect mode using the given content management policy.
 787        The protected files are written to output_directory maintaining the same directory structure as input_directory.
 788
 789        Args:
 790            input_directory (str): The input directory containing files to protect.
 791            output_directory (Optional[str]): The output directory where the protected file will be written, or None to not write files.
 792            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 793            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 794
 795        Returns:
 796            protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
 797        """
 798        protected_files_dict = {}
 799        # Call protect_file on each file in input_directory to output_directory
 800        for input_file in utils.list_file_paths(input_directory):
 801            relative_path = os.path.relpath(input_file, input_directory)
 802            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
 803
 804            protected_bytes = self.protect_file(
 805                input_file=input_file,
 806                output_file=output_file,
 807                raise_unsupported=raise_unsupported,
 808                content_management_policy=content_management_policy,
 809            )
 810
 811            protected_files_dict[relative_path] = protected_bytes
 812
 813        return protected_files_dict
 814
 815    def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 816        """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.
 817
 818        Args:
 819            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 820            output_file (Optional[str]): The output file path where the analysis file will be written.
 821            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
 822            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 823
 824        Returns:
 825            file_bytes (bytes): The analysis file bytes.
 826        """
 827        # Validate arg types
 828        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 829            raise TypeError(input_file)
 830        if not isinstance(output_file, (type(None), str)):
 831            raise TypeError(output_file)
 832        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 833            raise TypeError(content_management_policy)
 834        if not isinstance(raise_unsupported, bool):
 835            raise TypeError(raise_unsupported)
 836
 837        # Convert string path arguments to absolute paths
 838        if isinstance(input_file, str):
 839            if not os.path.isfile(input_file):
 840                raise FileNotFoundError(input_file)
 841            input_file = os.path.abspath(input_file)
 842        if isinstance(output_file, str):
 843            output_file = os.path.abspath(output_file)
 844            # make directories that do not exist
 845            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 846        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 847            content_management_policy = os.path.abspath(content_management_policy)
 848
 849        # Convert memory inputs to bytes
 850        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 851            input_file = utils.as_bytes(input_file)
 852
 853        with utils.CwdHandler(self.library_path):
 854            with self.new_session() as session:
 855                content_management_policy = self.set_content_management_policy(session, content_management_policy)
 856                register_input = self.register_input(session, input_file)
 857                register_analysis = self.register_analysis(session, output_file)
 858                status = self.run_session(session)
 859
 860                file_bytes = None
 861                if isinstance(output_file, str):
 862                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
 863                    if os.path.isfile(output_file):
 864                        with open(output_file, "rb") as f:
 865                            file_bytes = f.read()
 866                else:
 867                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
 868                    if register_analysis.buffer and register_analysis.buffer_length:
 869                        file_bytes = utils.buffer_to_bytes(
 870                            register_analysis.buffer,
 871                            register_analysis.buffer_length
 872                        )
 873
 874                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 875                if status not in successes.success_codes:
 876                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
 877                    if raise_unsupported:
 878                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 879                else:
 880                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
 881
 882                # Ensure memory allocated is not garbage collected
 883                content_management_policy, register_input, register_analysis
 884
 885                return file_bytes
 886
 887    def analyse_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 888        """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.
 889
 890        Args:
 891            input_directory (str): The input directory containing files to analyse.
 892            output_directory (Optional[str]): The output directory where the analysis files will be written, or None to not write files.
 893            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 894            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 895
 896        Returns:
 897            analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
 898        """
 899        analysis_files_dict = {}
 900        # Call analyse_file on each file in input_directory to output_directory
 901        for input_file in utils.list_file_paths(input_directory):
 902            relative_path = os.path.relpath(input_file, input_directory) + ".xml"
 903            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
 904
 905            analysis_bytes = self.analyse_file(
 906                input_file=input_file,
 907                output_file=output_file,
 908                raise_unsupported=raise_unsupported,
 909                content_management_policy=content_management_policy,
 910            )
 911
 912            analysis_files_dict[relative_path] = analysis_bytes
 913
 914        return analysis_files_dict
 915
 916    def protect_and_analyse_file(
 917        self,
 918        input_file: Union[str, bytes, bytearray, io.BytesIO],
 919        output_file: Optional[str] = None,
 920        output_analysis_report: Optional[str] = None,
 921        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
 922        raise_unsupported: bool = True,
 923    ):
 924        """ Protects and analyses a file in a single session, returning both protected file bytes and analysis report bytes.
 925
 926        Args:
 927            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 928            output_file (Optional[str]): The output file path where the protected file will be written.
 929            output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written.
 930            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
 931            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 932
 933        Returns:
 934            Tuple[Optional[bytes], Optional[bytes]]: A tuple of (protected_file_bytes, analysis_report_bytes).
 935        """
 936        # Validate arg types
 937        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 938            raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}")
 939        if not isinstance(output_file, (type(None), str)):
 940            raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}")
 941        if not isinstance(output_analysis_report, (type(None), str)):
 942            raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}")
 943        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 944            raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}")
 945        if not isinstance(raise_unsupported, bool):
 946            raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}")
 947
 948        # Convert string path arguments to absolute paths
 949        if isinstance(input_file, str):
 950            if not os.path.isfile(input_file):
 951                raise FileNotFoundError(input_file)
 952            input_file = os.path.abspath(input_file)
 953        if isinstance(output_file, str):
 954            output_file = os.path.abspath(output_file)
 955            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 956        if isinstance(output_analysis_report, str):
 957            output_analysis_report = os.path.abspath(output_analysis_report)
 958            os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True)
 959        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 960            content_management_policy = os.path.abspath(content_management_policy)
 961
 962        # Convert memory inputs to bytes
 963        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 964            input_file = utils.as_bytes(input_file)
 965
 966        with utils.CwdHandler(self.library_path):
 967            with self.new_session() as session:
 968                content_management_policy = self.set_content_management_policy(session, content_management_policy)
 969                register_input = self.register_input(session, input_file)
 970                register_output = self.register_output(session, output_file)
 971                register_analysis = self.register_analysis(session, output_analysis_report)
 972
 973                status = self.run_session(session)
 974
 975                protected_file_bytes = None
 976                analysis_report_bytes = None
 977
 978                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 979                if status not in successes.success_codes:
 980                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
 981                    if raise_unsupported:
 982                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 983
 984                # Get analysis report file bytes, even on processing failure
 985                if isinstance(output_analysis_report, str):
 986                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
 987                    if not os.path.isfile(output_analysis_report):
 988                        log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}")
 989                    else:
 990                        with open(output_analysis_report, "rb") as f:
 991                            analysis_report_bytes = f.read()
 992                else:
 993                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
 994                    if register_analysis.buffer and register_analysis.buffer_length:
 995                        analysis_report_bytes = utils.buffer_to_bytes(
 996                            register_analysis.buffer,
 997                            register_analysis.buffer_length
 998                        )
 999
1000                # On success, get protected file bytes
1001                if status in successes.success_codes:
1002                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
1003                    # Get file bytes
1004                    if isinstance(output_file, str):
1005                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1006                        if not os.path.isfile(output_file):
1007                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1008                        else:
1009                            with open(output_file, "rb") as f:
1010                                protected_file_bytes = f.read()
1011                    else:
1012                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1013                        protected_file_bytes = utils.buffer_to_bytes(
1014                            register_output.buffer,
1015                            register_output.buffer_length
1016                        )
1017
1018                # Ensure memory allocated is not garbage collected
1019                content_management_policy, register_input, register_output, register_analysis
1020
1021                return protected_file_bytes, analysis_report_bytes
1022
1023    def protect_and_analyse_directory(
1024        self,
1025        input_directory: str,
1026        output_directory: Optional[str],
1027        analysis_directory: Optional[str],
1028        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
1029        raise_unsupported: bool = True,
1030    ):
1031        """ Recursively processes all files in a directory using protect and analyse mode with the given content management policy.
1032        Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory.
1033
1034        Args:
1035            input_directory (str): The input directory containing files to process.
1036            output_directory (Optional[str]): The output directory for protected files.
1037            analysis_directory (Optional[str]): The output directory for XML analysis reports.
1038            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
1039            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1040
1041        Returns:
1042            result_dict (dict): A dictionary mapping relative file paths to tuples of (protected_file_bytes, analysis_report_bytes).
1043        """
1044        result_dict = {}
1045        for input_file in utils.list_file_paths(input_directory):
1046            relative_path = os.path.relpath(input_file, input_directory)
1047            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
1048            output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml")
1049
1050            protected_file_bytes, analysis_report_bytes = self.protect_and_analyse_file(
1051                input_file=input_file,
1052                output_file=output_file,
1053                output_analysis_report=output_analysis_report,
1054                content_management_policy=content_management_policy,
1055                raise_unsupported=raise_unsupported,
1056            )
1057
1058            result_dict[relative_path] = (protected_file_bytes, analysis_report_bytes)
1059
1060        return result_dict
1061
1062    def _GW2RegisterExportFile(self, session: int, output_file: str):
1063        """ Register an export output file for the given session.
1064
1065        Args:
1066            session (int): The session integer.
1067            output_file (str): The export output file path.
1068
1069        Returns:
1070            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
1071        """
1072        # API function declaration
1073        self.library.GW2RegisterExportFile.argtypes = [
1074            ct.c_size_t,  # Session_Handle session
1075            ct.c_char_p  # const char * exportFilePath
1076        ]
1077
1078        # Variable initialisation
1079        gw_return_object = glasswall.GwReturnObj()
1080        gw_return_object.session = ct.c_size_t(session)
1081        gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8"))
1082
1083        # API Call
1084        gw_return_object.status = self.library.GW2RegisterExportFile(
1085            gw_return_object.session,
1086            gw_return_object.output_file
1087        )
1088
1089        return gw_return_object
1090
1091    def _GW2RegisterExportMemory(self, session: int):
1092        """ Register an export output file in memory for the given session.
1093
1094        Args:
1095            session (int): The session integer.
1096
1097        Returns:
1098            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
1099        """
1100        # API function declaration
1101        self.library.GW2RegisterExportMemory.argtypes = [
1102            ct.c_size_t,  # Session_Handle session
1103            ct.POINTER(ct.c_void_p),  # char ** exportFileBuffer
1104            ct.POINTER(ct.c_size_t)  # size_t * exportLength
1105        ]
1106
1107        # Variable initialisation
1108        gw_return_object = glasswall.GwReturnObj()
1109        gw_return_object.session = ct.c_size_t(session)
1110        gw_return_object.buffer = ct.c_void_p()
1111        gw_return_object.buffer_length = ct.c_size_t()
1112
1113        # API call
1114        gw_return_object.status = self.library.GW2RegisterExportMemory(
1115            gw_return_object.session,
1116            ct.byref(gw_return_object.buffer),
1117            ct.byref(gw_return_object.buffer_length)
1118        )
1119
1120        return gw_return_object
1121
1122    def register_export(self, session: int, output_file: Optional[str] = None):
1123        """ Registers a file to be exported for the given session. The export file will be created during the session's run_session call.
1124
1125        Args:
1126            session (int): The session integer.
1127            output_file (Optional[str]): Default None. The file path where the export will be written. None exports the file in memory.
1128
1129        Returns:
1130            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call and 'session', the session integer. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
1131        """
1132        if not isinstance(output_file, (type(None), str)):
1133            raise TypeError(output_file)
1134
1135        if isinstance(output_file, str):
1136            output_file = os.path.abspath(output_file)
1137
1138            result = self._GW2RegisterExportFile(session, output_file)
1139
1140        elif isinstance(output_file, type(None)):
1141            result = self._GW2RegisterExportMemory(session)
1142
1143        if result.status not in successes.success_codes:
1144            log.error(format_object(result))
1145            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1146        else:
1147            log.debug(format_object(result))
1148
1149        return result
1150
1151    def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1152        """ Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided.
1153
1154        Args:
1155            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
1156            output_file (Optional[str]): The output file path where the .zip file will be written.
1157            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
1158            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1159
1160        Returns:
1161            file_bytes (bytes): The exported .zip file.
1162        """
1163        # Validate arg types
1164        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
1165            raise TypeError(input_file)
1166        if not isinstance(output_file, (type(None), str)):
1167            raise TypeError(output_file)
1168        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
1169            raise TypeError(content_management_policy)
1170        if not isinstance(raise_unsupported, bool):
1171            raise TypeError(raise_unsupported)
1172
1173        # Convert string path arguments to absolute paths
1174        if isinstance(input_file, str):
1175            if not os.path.isfile(input_file):
1176                raise FileNotFoundError(input_file)
1177            input_file = os.path.abspath(input_file)
1178        if isinstance(output_file, str):
1179            output_file = os.path.abspath(output_file)
1180            # make directories that do not exist
1181            os.makedirs(os.path.dirname(output_file), exist_ok=True)
1182        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
1183            content_management_policy = os.path.abspath(content_management_policy)
1184
1185        # Convert memory inputs to bytes
1186        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
1187            input_file = utils.as_bytes(input_file)
1188
1189        with utils.CwdHandler(self.library_path):
1190            with self.new_session() as session:
1191                content_management_policy = self.set_content_management_policy(session, content_management_policy)
1192                register_input = self.register_input(session, input_file)
1193                register_export = self.register_export(session, output_file)
1194                status = self.run_session(session)
1195
1196                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
1197                if status not in successes.success_codes:
1198                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1199                    if raise_unsupported:
1200                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1201                    else:
1202                        file_bytes = None
1203                else:
1204                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1205                    # Get file bytes
1206                    if isinstance(output_file, str):
1207                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1208                        if not os.path.isfile(output_file):
1209                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1210                            file_bytes = None
1211                        else:
1212                            with open(output_file, "rb") as f:
1213                                file_bytes = f.read()
1214                    else:
1215                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1216                        file_bytes = utils.buffer_to_bytes(
1217                            register_export.buffer,
1218                            register_export.buffer_length
1219                        )
1220
1221                # Ensure memory allocated is not garbage collected
1222                content_management_policy, register_input, register_export
1223
1224                return file_bytes
1225
1226    def export_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1227        """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.
1228
1229        Args:
1230            input_directory (str): The input directory containing files to export.
1231            output_directory (Optional[str]): The output directory where the export files will be written, or None to not write files.
1232            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
1233            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1234
1235        Returns:
1236            export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
1237        """
1238        export_files_dict = {}
1239        # Call export_file on each file in input_directory to output_directory
1240        for input_file in utils.list_file_paths(input_directory):
1241            relative_path = os.path.relpath(input_file, input_directory) + ".zip"
1242            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
1243
1244            export_bytes = self.export_file(
1245                input_file=input_file,
1246                output_file=output_file,
1247                raise_unsupported=raise_unsupported,
1248                content_management_policy=content_management_policy,
1249            )
1250
1251            export_files_dict[relative_path] = export_bytes
1252
1253        return export_files_dict
1254
1255    def export_and_analyse_file(
1256        self,
1257        input_file: Union[str, bytes, bytearray, io.BytesIO],
1258        output_file: Optional[str] = None,
1259        output_analysis_report: Optional[str] = None,
1260        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
1261        raise_unsupported: bool = True,
1262    ):
1263        """ Exports and analyses a file in a single session, returning both exported .zip bytes and analysis report bytes.
1264
1265        Args:
1266            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
1267            output_file (Optional[str]): The output file path where the .zip export will be written.
1268            output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written.
1269            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
1270            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1271
1272        Returns:
1273            Tuple[Optional[bytes], Optional[bytes]]: A tuple of (exported_file_bytes, analysis_report_bytes).
1274        """
1275        # Validate arg types
1276        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
1277            raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}")
1278        if not isinstance(output_file, (type(None), str)):
1279            raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}")
1280        if not isinstance(output_analysis_report, (type(None), str)):
1281            raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}")
1282        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
1283            raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}")
1284        if not isinstance(raise_unsupported, bool):
1285            raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}")
1286
1287        # Convert string path arguments to absolute paths
1288        if isinstance(input_file, str):
1289            if not os.path.isfile(input_file):
1290                raise FileNotFoundError(input_file)
1291            input_file = os.path.abspath(input_file)
1292        if isinstance(output_file, str):
1293            output_file = os.path.abspath(output_file)
1294            os.makedirs(os.path.dirname(output_file), exist_ok=True)
1295        if isinstance(output_analysis_report, str):
1296            output_analysis_report = os.path.abspath(output_analysis_report)
1297            os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True)
1298        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
1299            content_management_policy = os.path.abspath(content_management_policy)
1300
1301        # Convert memory inputs to bytes
1302        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
1303            input_file = utils.as_bytes(input_file)
1304
1305        with utils.CwdHandler(self.library_path):
1306            with self.new_session() as session:
1307                content_management_policy = self.set_content_management_policy(session, content_management_policy)
1308                register_input = self.register_input(session, input_file)
1309                register_export = self.register_export(session, output_file)
1310                register_analysis = self.register_analysis(session, output_analysis_report)
1311
1312                status = self.run_session(session)
1313
1314                export_file_bytes = None
1315                analysis_report_bytes = None
1316
1317                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
1318                if status not in successes.success_codes:
1319                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
1320                    if raise_unsupported:
1321                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1322
1323                # Get analysis report file bytes, even on processing failure
1324                if isinstance(output_analysis_report, str):
1325                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1326                    if not os.path.isfile(output_analysis_report):
1327                        log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}")
1328                    else:
1329                        with open(output_analysis_report, "rb") as f:
1330                            analysis_report_bytes = f.read()
1331                else:
1332                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1333                    if register_analysis.buffer and register_analysis.buffer_length:
1334                        analysis_report_bytes = utils.buffer_to_bytes(
1335                            register_analysis.buffer,
1336                            register_analysis.buffer_length
1337                        )
1338
1339                # On success, get export file bytes
1340                if status in successes.success_codes:
1341                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
1342                    # Get export file bytes
1343                    if isinstance(output_file, str):
1344                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1345                        if not os.path.isfile(output_file):
1346                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1347                        else:
1348                            with open(output_file, "rb") as f:
1349                                export_file_bytes = f.read()
1350                    else:
1351                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1352                        export_file_bytes = utils.buffer_to_bytes(
1353                            register_export.buffer,
1354                            register_export.buffer_length
1355                        )
1356
1357                # Ensure memory allocated is not garbage collected
1358                content_management_policy, register_input, register_export, register_analysis
1359
1360                return export_file_bytes, analysis_report_bytes
1361
1362    def export_and_analyse_directory(
1363        self,
1364        input_directory: str,
1365        output_directory: Optional[str],
1366        analysis_directory: Optional[str],
1367        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
1368        raise_unsupported: bool = True,
1369    ):
1370        """ Recursively processes all files in a directory using export and analyse mode with the given content management policy.
1371        Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory.
1372
1373        Args:
1374            input_directory (str): The input directory containing files to process.
1375            output_directory (Optional[str]): The output directory for exported .zip files.
1376            analysis_directory (Optional[str]): The output directory for XML analysis reports.
1377            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
1378            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1379
1380        Returns:
1381            result_dict (dict): A dictionary mapping relative file paths to tuples of (exported_file_bytes, analysis_report_bytes).
1382        """
1383        result_dict = {}
1384
1385        for input_file in utils.list_file_paths(input_directory):
1386            relative_path = os.path.relpath(input_file, input_directory)
1387            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path + ".zip")
1388            output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml")
1389
1390            export_file_bytes, analysis_report_bytes = self.export_and_analyse_file(
1391                input_file=input_file,
1392                output_file=output_file,
1393                output_analysis_report=output_analysis_report,
1394                content_management_policy=content_management_policy,
1395                raise_unsupported=raise_unsupported,
1396            )
1397
1398            result_dict[relative_path] = (export_file_bytes, analysis_report_bytes)
1399
1400        return result_dict
1401
1402    def _GW2RegisterImportFile(self, session: int, input_file: str):
1403        """ Register an import input file for the given session.
1404
1405        Args:
1406            session (int): The session integer.
1407            input_file (str): The input import file path.
1408
1409        Returns:
1410            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
1411        """
1412        # API function declaration
1413        self.library.GW2RegisterImportFile.argtypes = [
1414            ct.c_size_t,  # Session_Handle session
1415            ct.c_char_p  # const char * importFilePath
1416        ]
1417
1418        # Variable initialisation
1419        gw_return_object = glasswall.GwReturnObj()
1420        gw_return_object.session = ct.c_size_t(session)
1421        gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8"))
1422
1423        # API Call
1424        gw_return_object.status = self.library.GW2RegisterImportFile(
1425            gw_return_object.session,
1426            gw_return_object.input_file
1427        )
1428
1429        return gw_return_object
1430
1431    def _GW2RegisterImportMemory(self, session: int, input_file: bytes):
1432        """ Register an import input file in memory for the given session.
1433
1434        Args:
1435            session (int): The session integer.
1436            input_file (str): The input import file in memory.
1437
1438        Returns:
1439            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
1440        """
1441        # API function declaration
1442        self.library.GW2RegisterImportMemory.argtypes = [
1443            ct.c_size_t,  # Session_Handle session
1444            ct.c_void_p,  # char * importFileBuffer
1445            ct.c_size_t  # size_t importLength
1446        ]
1447
1448        # Variable initialisation
1449        gw_return_object = glasswall.GwReturnObj()
1450        gw_return_object.session = ct.c_size_t(session)
1451        gw_return_object.buffer = ct.c_char_p(input_file)
1452        gw_return_object.buffer_length = ct.c_size_t(len(input_file))
1453
1454        # API call
1455        gw_return_object.status = self.library.GW2RegisterImportMemory(
1456            gw_return_object.session,
1457            gw_return_object.buffer,
1458            gw_return_object.buffer_length
1459        )
1460
1461        return gw_return_object
1462
1463    def register_import(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
1464        """ Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call.
1465
1466        Args:
1467            session (int): The session integer.
1468            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input import file path or bytes.
1469
1470        Returns:
1471            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
1472        """
1473        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
1474            raise TypeError(input_file)
1475
1476        if isinstance(input_file, str):
1477            if not os.path.isfile(input_file):
1478                raise FileNotFoundError(input_file)
1479
1480            input_file = os.path.abspath(input_file)
1481
1482            result = self._GW2RegisterImportFile(session, input_file)
1483
1484        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
1485            # Convert bytearray and io.BytesIO to bytes
1486            input_file = utils.as_bytes(input_file)
1487
1488            result = self._GW2RegisterImportMemory(session, input_file)
1489
1490        if result.status not in successes.success_codes:
1491            log.error(format_object(result))
1492            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1493        else:
1494            log.debug(format_object(result))
1495
1496        return result
1497
1498    def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1499        """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.
1500
1501        Args:
1502            input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes.
1503            output_file (Optional[str]): The output file path where the constructed file will be written.
1504            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
1505            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1506
1507        Returns:
1508            file_bytes (bytes): The imported file bytes.
1509        """
1510        # Validate arg types
1511        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
1512            raise TypeError(input_file)
1513        if not isinstance(output_file, (type(None), str)):
1514            raise TypeError(output_file)
1515        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
1516            raise TypeError(content_management_policy)
1517        if not isinstance(raise_unsupported, bool):
1518            raise TypeError(raise_unsupported)
1519
1520        # Convert string path arguments to absolute paths
1521        if isinstance(input_file, str):
1522            if not os.path.isfile(input_file):
1523                raise FileNotFoundError(input_file)
1524            input_file = os.path.abspath(input_file)
1525        if isinstance(output_file, str):
1526            output_file = os.path.abspath(output_file)
1527            # make directories that do not exist
1528            os.makedirs(os.path.dirname(output_file), exist_ok=True)
1529        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
1530            content_management_policy = os.path.abspath(content_management_policy)
1531
1532        # Convert memory inputs to bytes
1533        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
1534            input_file = utils.as_bytes(input_file)
1535
1536        with utils.CwdHandler(self.library_path):
1537            with self.new_session() as session:
1538                content_management_policy = self.set_content_management_policy(session, content_management_policy)
1539                register_import = self.register_import(session, input_file)
1540                register_output = self.register_output(session, output_file)
1541                status = self.run_session(session)
1542
1543                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
1544                if status not in successes.success_codes:
1545                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1546                    if raise_unsupported:
1547                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1548                    else:
1549                        file_bytes = None
1550                else:
1551                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1552                    # Get file bytes
1553                    if isinstance(output_file, str):
1554                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1555                        if not os.path.isfile(output_file):
1556                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1557                            file_bytes = None
1558                        else:
1559                            with open(output_file, "rb") as f:
1560                                file_bytes = f.read()
1561                    else:
1562                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1563                        file_bytes = utils.buffer_to_bytes(
1564                            register_output.buffer,
1565                            register_output.buffer_length
1566                        )
1567
1568                # Ensure memory allocated is not garbage collected
1569                content_management_policy, register_import, register_output
1570
1571                return file_bytes
1572
1573    def import_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1574        """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced.
1575        The constructed files are written to output_directory maintaining the same directory structure as input_directory.
1576
1577        Args:
1578            input_directory (str): The input directory containing files to import.
1579            output_directory (Optional[str]): The output directory where the constructed files will be written, or None to not write files.
1580            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
1581            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1582
1583        Returns:
1584            import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
1585        """
1586        import_files_dict = {}
1587        # Call import_file on each file in input_directory to output_directory
1588        for input_file in utils.list_file_paths(input_directory):
1589            relative_path = os.path.relpath(input_file, input_directory)
1590            # Remove .zip extension from relative_path
1591            relative_path = os.path.splitext(relative_path)[0]
1592            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
1593
1594            import_bytes = self.import_file(
1595                input_file=input_file,
1596                output_file=output_file,
1597                raise_unsupported=raise_unsupported,
1598                content_management_policy=content_management_policy,
1599            )
1600
1601            import_files_dict[relative_path] = import_bytes
1602
1603        return import_files_dict
1604
1605    def _GW2FileErrorMsg(self, session: int):
1606        """ Retrieve the Glasswall Session Process error message.
1607
1608        Args:
1609            session (int): The session integer.
1610
1611        Returns:
1612            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status', 'error_message'.
1613        """
1614        # API function declaration
1615        self.library.GW2FileErrorMsg.argtypes = [
1616            ct.c_size_t,  # Session_Handle session
1617            ct.POINTER(ct.c_void_p),  # char **errorMsgBuffer
1618            ct.POINTER(ct.c_size_t)  # size_t *errorMsgBufferLength
1619        ]
1620
1621        # Variable initialisation
1622        gw_return_object = glasswall.GwReturnObj()
1623        gw_return_object.session = ct.c_size_t(session)
1624        gw_return_object.buffer = ct.c_void_p()
1625        gw_return_object.buffer_length = ct.c_size_t()
1626
1627        # API call
1628        gw_return_object.status = self.library.GW2FileErrorMsg(
1629            gw_return_object.session,
1630            ct.byref(gw_return_object.buffer),
1631            ct.byref(gw_return_object.buffer_length)
1632        )
1633
1634        # Editor wrote to a buffer, convert it to bytes
1635        error_bytes = utils.buffer_to_bytes(
1636            gw_return_object.buffer,
1637            gw_return_object.buffer_length
1638        )
1639
1640        gw_return_object.error_message = error_bytes.decode()
1641
1642        return gw_return_object
1643
1644    @functools.lru_cache()
1645    def file_error_message(self, session: int) -> str:
1646        """ Retrieve the Glasswall Session Process error message.
1647
1648        Args:
1649            session (int): The session integer.
1650
1651        Returns:
1652            error_message (str): The Glasswall Session Process error message.
1653        """
1654        # Validate arg types
1655        if not isinstance(session, int):
1656            raise TypeError(session)
1657
1658        result = self._GW2FileErrorMsg(session)
1659
1660        if result.status not in successes.success_codes:
1661            log.error(format_object(result))
1662            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1663        else:
1664            log.debug(format_object(result))
1665
1666        return result.error_message
1667
1668    def GW2GetFileType(self, session: int, file_type_id):
1669        """ Retrieve the file type as a string.
1670
1671        Args:
1672            session (int): The session integer.
1673            file_type_id (int): The file type id.
1674
1675        Returns:
1676            file_type (str): The formal file name for the corresponding file id.
1677        """
1678        # Validate arg types
1679        if not isinstance(session, int):
1680            raise TypeError(session)
1681
1682        # API function declaration
1683        self.library.GW2GetFileType.argtypes = [
1684            ct.c_size_t,
1685            ct.c_size_t,
1686            ct.POINTER(ct.c_size_t),
1687            ct.POINTER(ct.c_void_p)
1688        ]
1689
1690        # Variable initialisation
1691        ct_session = ct.c_size_t(session)
1692        ct_file_type = ct.c_size_t(file_type_id)
1693        ct_buffer_length = ct.c_size_t()
1694        ct_buffer = ct.c_void_p()
1695
1696        # API call
1697        status = self.library.GW2GetFileType(
1698            ct_session,
1699            ct_file_type,
1700            ct.byref(ct_buffer_length),
1701            ct.byref(ct_buffer)
1702        )
1703
1704        if status not in successes.success_codes:
1705            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
1706            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1707        else:
1708            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")
1709
1710        # Editor wrote to a buffer, convert it to bytes
1711        file_type_bytes = utils.buffer_to_bytes(
1712            ct_buffer,
1713            ct_buffer_length
1714        )
1715
1716        file_type = file_type_bytes.decode()
1717
1718        return file_type
1719
1720    def GW2GetFileTypeID(self, session: int, file_type_str):
1721        """ Retrieve the Glasswall file type id given a file type string.
1722
1723        Args:
1724            session (int): The session integer.
1725            file_type_str (str): The file type as a string.
1726
1727        Returns:
1728            file_type_id (str): The Glasswall file type id for the specified file type.
1729        """
1730        # Validate arg types
1731        if not isinstance(session, int):
1732            raise TypeError(session)
1733
1734        # API function declaration
1735        self.library.GW2GetFileTypeID.argtypes = [
1736            ct.c_size_t,
1737            ct.c_char_p,
1738            ct.POINTER(ct.c_size_t),
1739            ct.POINTER(ct.c_void_p)
1740        ]
1741
1742        # Variable initialisation
1743        ct_session = ct.c_size_t(session)
1744        ct_file_type = ct.c_char_p(file_type_str.encode('utf-8'))
1745        ct_buffer_length = ct.c_size_t()
1746        ct_buffer = ct.c_void_p()
1747
1748        # API call
1749        status = self.library.GW2GetFileTypeID(
1750            ct_session,
1751            ct_file_type,
1752            ct.byref(ct_buffer_length),
1753            ct.byref(ct_buffer)
1754        )
1755
1756        if status not in successes.success_codes:
1757            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
1758            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1759        else:
1760            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")
1761
1762        # Editor wrote to a buffer, convert it to bytes
1763        file_type_bytes = utils.buffer_to_bytes(
1764            ct_buffer,
1765            ct_buffer_length
1766        )
1767
1768        file_type_id = file_type_bytes.decode()
1769
1770        return file_type_id
1771
1772    def get_file_type_info(self, file_type: Union[str, int]):
1773        """ Retrieve information about a file type based on its identifier.
1774
1775        Args:
1776            file_type (Union[str, int]): The file type identifier. This can be either a string representing a file
1777            extension (e.g. 'bmp') or an integer corresponding to a file type (e.g. 29).
1778
1779        Returns:
1780            - file_type_info (Union[int, str]): Depending on the input 'file_type':
1781                - If `file_type` is a string (e.g. 'bmp'):
1782                    - If the file type is recognised, returns an integer corresponding to that file type.
1783                    - If the file type is not recognised, returns 0.
1784                - If `file_type` is an integer (e.g. 29):
1785                    - If the integer corresponds to a recognised file type, returns a more detailed string description
1786                        of the file type (e.g. 'BMP Image').
1787                    - If the integer does not match any recognised file type, returns an empty string.
1788        """
1789        # Validate arg types
1790        if not isinstance(file_type, (str, int)):
1791            raise TypeError(file_type)
1792
1793        with utils.CwdHandler(self.library_path):
1794            with self.new_session() as session:
1795
1796                if isinstance(file_type, int):
1797                    file_type_info = self.GW2GetFileType(session, file_type)
1798                if isinstance(file_type, str):
1799                    file_type_info = self.GW2GetFileTypeID(session, file_type)
1800
1801                return file_type_info
1802
1803    @utils.deprecated_function(replacement_function=get_file_type_info)
1804    def get_file_info(self, *args, **kwargs):
1805        """ Deprecated in 1.0.6. Use get_file_type_info. """
1806        pass
1807
1808    def _GW2RegisterReportFile(self, session: int, output_file: str):
1809        """ Register an output report file path for the given session.
1810
1811        Args:
1812            session (int): The session integer.
1813            output_file (str): The file path of the output report file.
1814
1815        Returns:
1816            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
1817        """
1818        # API function declaration
1819        self.library.GW2RegisterReportFile.argtypes = [
1820            ct.c_size_t,  # Session_Handle session
1821            ct.c_char_p,  # const char * reportFilePathName
1822        ]
1823
1824        # Variable initialisation
1825        gw_return_object = glasswall.GwReturnObj()
1826        gw_return_object.session = ct.c_size_t(session)
1827        gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8"))
1828
1829        # API call
1830        gw_return_object.status = self.library.GW2RegisterReportFile(
1831            gw_return_object.session,
1832            gw_return_object.output_file
1833        )
1834
1835        return gw_return_object
1836
1837    def register_report_file(self, session: int, output_file: str):
1838        """ Register the report file path for the given session.
1839
1840        Args:
1841            session (int): The session integer.
1842            output_file (str): The file path of the report file.
1843
1844        Returns:
1845            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
1846        """
1847        # Validate arg types
1848        if not isinstance(session, int):
1849            raise TypeError(session)
1850        if not isinstance(output_file, (type(None), str)):
1851            raise TypeError(output_file)
1852
1853        result = self._GW2RegisterReportFile(session, output_file)
1854
1855        if result.status not in successes.success_codes:
1856            log.error(format_object(result))
1857            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1858        else:
1859            log.debug(format_object(result))
1860
1861        return result
1862
1863    def _GW2GetIdInfo(self, session: int, issue_id: int):
1864        """ Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances"
1865
1866        Args:
1867            session (int): The session integer.
1868            issue_id (int): The issue id.
1869            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1870
1871        Returns:
1872            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'issue_id', 'buffer_length', 'buffer', 'status', 'id_info'.
1873        """
1874        # API function declaration
1875        self.library.GW2GetIdInfo.argtypes = [
1876            ct.c_size_t,  # Session_Handle session
1877            ct.c_size_t,  # size_t issueId
1878            ct.POINTER(ct.c_size_t),  # size_t * bufferLength
1879            ct.POINTER(ct.c_void_p)  # char ** outputBuffer
1880        ]
1881
1882        # Variable initialisation
1883        gw_return_object = glasswall.GwReturnObj()
1884        gw_return_object.session = ct.c_size_t(session)
1885        gw_return_object.issue_id = ct.c_size_t(issue_id)
1886        gw_return_object.buffer_length = ct.c_size_t()
1887        gw_return_object.buffer = ct.c_void_p()
1888
1889        # API call
1890        gw_return_object.status = self.library.GW2GetIdInfo(
1891            gw_return_object.session,
1892            gw_return_object.issue_id,
1893            ct.byref(gw_return_object.buffer_length),
1894            ct.byref(gw_return_object.buffer)
1895        )
1896
1897        # Editor wrote to a buffer, convert it to bytes
1898        id_info_bytes = utils.buffer_to_bytes(
1899            gw_return_object.buffer,
1900            gw_return_object.buffer_length
1901        )
1902
1903        gw_return_object.id_info = id_info_bytes.decode()
1904
1905        return gw_return_object
1906
1907    def get_id_info(self, issue_id: int, raise_unsupported: bool = True):
1908        """ Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances"
1909
1910        Args:
1911            issue_id (int): The issue id.
1912            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1913
1914        Returns:
1915            id_info (str): The group description for the given Issue ID.
1916        """
1917        # Validate arg types
1918        if not isinstance(issue_id, int):
1919            raise TypeError(issue_id)
1920
1921        with utils.CwdHandler(self.library_path):
1922            with self.new_session() as session:
1923                result = self._GW2GetIdInfo(session, issue_id)
1924
1925                if result.status not in successes.success_codes:
1926                    log.error(format_object(result))
1927                    if raise_unsupported:
1928                        raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1929                else:
1930                    log.debug(format_object(result))
1931
1932                return result.id_info
1933
1934    def _GW2GetAllIdInfo(self, session: int):
1935        """ Retrieves the XML containing all the Issue ID ranges with their group descriptions
1936
1937        Args:
1938            session (int): The session integer.
1939
1940        Returns:
1941            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'analysis_format', 'status', 'all_id_info'.
1942        """
1943
1944        # API function declaration
1945        self.library.GW2GetAllIdInfo.argtypes = [
1946            ct.c_size_t,  # Session_Handle session
1947            ct.POINTER(ct.c_size_t),  # size_t * bufferLength
1948            ct.POINTER(ct.c_void_p)  # char ** outputBuffer
1949        ]
1950
1951        # Variable initialisation
1952        # The extracted issue Id information is stored in the analysis report, register an analysis session.
1953        gw_return_object = self._GW2RegisterAnalysisMemory(session)
1954
1955        # API call
1956        gw_return_object.status = self.library.GW2GetAllIdInfo(
1957            gw_return_object.session,
1958            ct.byref(gw_return_object.buffer_length),
1959            ct.byref(gw_return_object.buffer)
1960        )
1961
1962        # Editor wrote to a buffer, convert it to bytes
1963        all_id_info_bytes = utils.buffer_to_bytes(
1964            gw_return_object.buffer,
1965            gw_return_object.buffer_length
1966        )
1967
1968        gw_return_object.all_id_info = all_id_info_bytes.decode()
1969
1970        return gw_return_object
1971
1972    def get_all_id_info(self, output_file: Optional[str] = None, raise_unsupported: bool = True) -> str:
1973        """ Retrieves the XML containing all the Issue ID ranges with their group descriptions
1974
1975        Args:
1976            output_file (Optional[str]): The output file path where the analysis file will be written.
1977            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1978
1979        Returns:
1980            all_id_info (str): A string XML analysis report containing all id info.
1981        """
1982        # Validate arg types
1983        if not isinstance(output_file, (type(None), str)):
1984            raise TypeError(output_file)
1985        if isinstance(output_file, str):
1986            output_file = os.path.abspath(output_file)
1987
1988        with utils.CwdHandler(self.library_path):
1989            with self.new_session() as session:
1990                result = self._GW2GetAllIdInfo(session)
1991
1992                if result.status not in successes.success_codes:
1993                    log.error(format_object(result))
1994                    if raise_unsupported:
1995                        raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1996                else:
1997                    log.debug(format_object(result))
1998
1999                if isinstance(output_file, str):
2000                    # GW2GetAllIdInfo is memory only, write to file
2001                    # make directories that do not exist
2002                    os.makedirs(os.path.dirname(output_file), exist_ok=True)
2003                    with open(output_file, "w") as f:
2004                        f.write(result.all_id_info)
2005
2006                return result.all_id_info
2007
2008    def _GW2FileSessionStatus(self, session: int):
2009        """ Retrieves the Glasswall Session Status. Also gives a high level indication of the processing that was carried out on the last document processed by the library
2010
2011        Args:
2012            session (int): The session integer.
2013            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
2014
2015        Returns:
2016            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'session_status', 'buffer', 'buffer_length', 'status', 'message'.
2017        """
2018        # API function declaration
2019        self.library.GW2FileSessionStatus.argtypes = [
2020            ct.c_size_t,  # Session_Handle session
2021            ct.POINTER(ct.c_int),  # int *glasswallSessionStatus
2022            ct.POINTER(ct.c_void_p),  # char **statusMsgBuffer
2023            ct.POINTER(ct.c_size_t)  # size_t *statusbufferLength
2024        ]
2025
2026        # Variable initialisation
2027        gw_return_object = glasswall.GwReturnObj()
2028        gw_return_object.session = ct.c_size_t(session)
2029        gw_return_object.session_status = ct.c_int()
2030        gw_return_object.buffer = ct.c_void_p()
2031        gw_return_object.buffer_length = ct.c_size_t()
2032
2033        # API call
2034        gw_return_object.status = self.library.GW2FileSessionStatus(
2035            gw_return_object.session,
2036            ct.byref(gw_return_object.session_status),
2037            ct.byref(gw_return_object.buffer),
2038            ct.byref(gw_return_object.buffer_length)
2039        )
2040
2041        # Convert session_status to int
2042        gw_return_object.session_status = gw_return_object.session_status.value
2043
2044        # Editor wrote to a buffer, convert it to bytes
2045        message_bytes = utils.buffer_to_bytes(
2046            gw_return_object.buffer,
2047            gw_return_object.buffer_length
2048        )
2049        gw_return_object.message = message_bytes.decode()
2050
2051        return gw_return_object
2052
2053    def file_session_status_message(self, session: int, raise_unsupported: bool = True) -> str:
2054        """ Retrieves the Glasswall session status message. Gives a high level indication of the processing that was carried out.
2055
2056        Args:
2057            session (int): The session integer.
2058            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
2059
2060        Returns:
2061            result.message (str):The file session status message.
2062        """
2063        # Validate arg types
2064        if not isinstance(session, int):
2065            raise TypeError(session)
2066
2067        result = self._GW2FileSessionStatus(session)
2068
2069        if result.status not in successes.success_codes:
2070            log.error(format_object(result))
2071            if raise_unsupported:
2072                raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
2073        else:
2074            log.debug(format_object(result))
2075
2076        return result.message
2077
2078    def _GW2LicenceDetails(self, session: int):
2079        """ Returns a human readable string containing licence details.
2080
2081        Args:
2082            session (int): The session integer.
2083
2084        Returns:
2085            licence_details (str): A human readable string representing the relevant information contained in the licence.
2086        """
2087        # API function declaration
2088        self.library.GW2LicenceDetails.argtypes = [ct.c_size_t]
2089        self.library.GW2LicenceDetails.restype = ct.c_char_p
2090
2091        # Variable initialisation
2092        ct_session = ct.c_size_t(session)
2093
2094        # API call
2095        licence_details = self.library.GW2LicenceDetails(ct_session)
2096
2097        # Convert to Python string
2098        licence_details = ct.string_at(licence_details).decode()
2099
2100        return licence_details
2101
2102    def licence_details(self):
2103        """ Returns a string containing details of the licence.
2104
2105        Returns:
2106            result (str): A string containing details of the licence.
2107        """
2108        with self.new_session() as session:
2109            result = self._GW2LicenceDetails(session)
2110
2111            log.debug(f"\n\tsession: {session}\n\tGW2LicenceDetails: {result}")
2112
2113        return result
2114
2115    def _GW2RegisterExportTextDumpMemory(self, session: int):
2116        """ Registers an export text dump to be written in memory.
2117
2118        Args:
2119            session (int): The session integer.
2120
2121        Returns:
2122            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
2123        """
2124        # API function declaration
2125        self.library.GW2RegisterExportTextDumpMemory.argtypes = [
2126            ct.c_size_t,  # Session_Handle session
2127            ct.POINTER(ct.c_void_p),  # char ** exportTextDumpFileBuffer
2128            ct.POINTER(ct.c_size_t)  # size_t * exportTextDumpLength
2129        ]
2130
2131        # Variable initialisation
2132        gw_return_object = glasswall.GwReturnObj()
2133        gw_return_object.session = ct.c_size_t(session)
2134        gw_return_object.buffer = ct.c_void_p()
2135        gw_return_object.buffer_length = ct.c_size_t()
2136
2137        # API call
2138        gw_return_object.status = self.library.GW2RegisterExportTextDumpMemory(
2139            gw_return_object.session,
2140            ct.byref(gw_return_object.buffer),
2141            ct.byref(gw_return_object.buffer_length)
2142        )
2143
2144        return gw_return_object
2145
2146    def _GW2RegisterExportTextDumpFile(self, session: int, output_file: str):
2147        """ Registers an export text dump to be written to file.
2148
2149        Args:
2150            session (int): The session integer.
2151            output_file (str): The file path of the text dump file.
2152
2153        Returns:
2154            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
2155        """
2156        # API function declaration
2157        self.library.GW2RegisterExportTextDumpFile.argtypes = [
2158            ct.c_size_t,  # Session_Handle session
2159            ct.c_char_p  # const char * textDumpFilePathName
2160        ]
2161
2162        # Variable initialisation
2163        gw_return_object = glasswall.GwReturnObj()
2164        gw_return_object.session = ct.c_size_t(session)
2165        gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8"))
2166
2167        # API call
2168        gw_return_object.status = self.library.GW2RegisterExportTextDumpFile(
2169            gw_return_object.session,
2170            gw_return_object.output_file
2171        )
2172
2173        return gw_return_object
2174
2175    def _GW2RegisterLicenceFile(self, session: int, input_file: str):
2176        """ Registers a "gwkey.lic" licence from file path.
2177
2178        Args:
2179            session (int): The session integer.
2180            input_file (str): The "gwkey.lic" licence input file path.
2181
2182        Returns:
2183            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
2184        """
2185        # API function declaration
2186        self.library.GW2RegisterLicenceFile.argtypes = [
2187            ct.c_size_t,  # Session_Handle session
2188            ct.c_char_p,  # const char *filename
2189        ]
2190
2191        # Variable initialisation
2192        gw_return_object = glasswall.GwReturnObj()
2193        gw_return_object.session = ct.c_size_t(session)
2194        gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8"))
2195
2196        # API call
2197        gw_return_object.status = self.library.GW2RegisterLicenceFile(
2198            gw_return_object.session,
2199            gw_return_object.input_file,
2200        )
2201
2202        return gw_return_object
2203
2204    def _GW2RegisterLicenceMemory(self, session: int, input_file: bytes):
2205        """ Registers a "gwkey.lic" licence from memory.
2206
2207        Args:
2208            session (int): The session integer.
2209            input_file (bytes): The "gwkey.lic" licence input file.
2210
2211        Returns:
2212            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
2213        """
2214        # API function declaration
2215        self.library.GW2RegisterLicenceMemory.argtypes = [
2216            ct.c_size_t,  # Session_Handle session
2217            ct.c_char_p,  # const char *filename
2218            ct.c_size_t,  # size_t licenceLength
2219        ]
2220
2221        # Variable initialisation
2222        gw_return_object = glasswall.GwReturnObj()
2223        gw_return_object.session = ct.c_size_t(session)
2224        gw_return_object.buffer = ct.c_char_p(input_file)
2225        gw_return_object.buffer_length = ct.c_size_t(len(input_file))
2226
2227        # API call
2228        gw_return_object.status = self.library.GW2RegisterLicenceMemory(
2229            gw_return_object.session,
2230            gw_return_object.buffer,
2231            gw_return_object.buffer_length
2232        )
2233
2234        return gw_return_object
2235
2236    def register_licence(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
2237        """ Registers a "gwkey.lic" licence from file path or memory.
2238
2239        Args:
2240            session (int): The session integer.
2241            input_file (Union[str, bytes, bytearray, io.BytesIO]): The "gwkey.lic" licence. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object.
2242
2243        Returns:
2244            - result (glasswall.GwReturnObj): Depending on the input 'input_file':
2245                - If input_file is a str file path:
2246                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
2247
2248                - If input_file is a file in memory:
2249                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
2250        """
2251        if isinstance(input_file, str):
2252            if not os.path.isfile(input_file):
2253                raise FileNotFoundError(input_file)
2254
2255            input_file = os.path.abspath(input_file)
2256
2257            result = self._GW2RegisterLicenceFile(session, input_file)
2258
2259        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
2260            # Convert bytearray and io.BytesIO to bytes
2261            input_file = utils.as_bytes(input_file)
2262
2263            result = self._GW2RegisterLicenceMemory(session, input_file)
2264
2265        else:
2266            raise TypeError(input_file)
2267
2268        if result.status not in successes.success_codes:
2269            log.error(format_object(result))
2270            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
2271        else:
2272            log.debug(format_object(result))
2273
2274        return result
class Editor(glasswall.libraries.library.Library):
  19class Editor(Library):
  20    """ A high level Python wrapper for Glasswall Editor / Core2. """
  21
  22    def __init__(self, library_path: str, licence: Union[str, bytes, bytearray, io.BytesIO] = None):
  23        """ Initialise the Editor instance.
  24
  25        Args:
  26            library_path (str): The file or directory path to the Editor library.
  27            licence (str, bytes, bytearray, or io.BytesIO, optional): The licence file content or path. This can be:
  28                - A string representing the file path to the licence.
  29                - A `bytes` or `bytearray` object containing the licence data.
  30                - An `io.BytesIO` object for in-memory licence data.
  31                If not specified, it is assumed that the licence file is located in the same directory as the `library_path`.
  32        """
  33        super().__init__(library_path)
  34        self.library = self.load_library(os.path.abspath(library_path))
  35        self.licence = licence
  36
  37        # Validate killswitch has not activated
  38        self.validate_licence()
  39
  40        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
  41
  42    def validate_licence(self):
  43        """ Validates the licence of the library by checking the licence details.
  44
  45        Raises:
  46            LicenceExpired: If the licence has expired or could not be validated.
  47        """
  48        licence_details = self.licence_details()
  49
  50        bad_details = [
  51            "Unable to Read Licence Key File",
  52            "Licence File Missing Required Contents",
  53            "Licence Expired",
  54        ]
  55
  56        if any(bad_detail.lower() in licence_details.lower() for bad_detail in bad_details):
  57            # bad_details found in licence_details
  58            log.error(f"{self.__class__.__name__} licence validation failed. Licence details:\n{licence_details}")
  59            raise errors.LicenceExpired(licence_details)
  60        else:
  61            log.debug(f"{self.__class__.__name__} licence validated successfully. Licence details:\n{licence_details}")
  62
  63    def version(self):
  64        """ Returns the Glasswall library version.
  65
  66        Returns:
  67            version (str): The Glasswall library version.
  68        """
  69        # API function declaration
  70        self.library.GW2LibVersion.restype = ct.c_char_p
  71
  72        # API call
  73        version = self.library.GW2LibVersion()
  74
  75        # Convert to Python string
  76        version = ct.string_at(version).decode()
  77
  78        return version
  79
  80    def open_session(self):
  81        """ Open a new Glasswall session.
  82
  83        Returns:
  84            session (int): An incrementing integer repsenting the current session.
  85        """
  86        # API call
  87        session = self.library.GW2OpenSession()
  88
  89        log.debug(f"\n\tsession: {session}")
  90
  91        if self.licence:
  92            # Must register licence on each GW2OpenSession if the licence is not in the same directory as the library
  93            self.register_licence(session, self.licence)
  94
  95        return session
  96
  97    def close_session(self, session: int) -> int:
  98        """ Close the Glasswall session. All resources allocated by the session will be destroyed.
  99
 100        Args:
 101            session (int): The session to close.
 102
 103        Returns:
 104            status (int): The status code of the function call.
 105        """
 106        if not isinstance(session, int):
 107            raise TypeError(session)
 108
 109        # API function declaration
 110        self.library.GW2CloseSession.argtypes = [ct.c_size_t]
 111
 112        # Variable initialisation
 113        ct_session = ct.c_size_t(session)
 114
 115        # API call
 116        status = self.library.GW2CloseSession(ct_session)
 117
 118        if status not in successes.success_codes:
 119            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
 120        else:
 121            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")
 122
 123        return status
 124
 125    @contextmanager
 126    def new_session(self):
 127        """ Context manager. Opens a new session on entry and closes the session on exit. """
 128        try:
 129            session = self.open_session()
 130            yield session
 131        finally:
 132            self.close_session(session)
 133
 134    def run_session(self, session):
 135        """ Runs the Glasswall session and begins processing of a file.
 136
 137        Args:
 138            session (int): The session to run.
 139
 140        Returns:
 141            status (int): The status code of the function call.
 142        """
 143        # API function declaration
 144        self.library.GW2RunSession.argtypes = [ct.c_size_t]
 145
 146        # Variable initialisation
 147        ct_session = ct.c_size_t(session)
 148
 149        # API call
 150        status = self.library.GW2RunSession(ct_session)
 151
 152        if status not in successes.success_codes:
 153            log.error(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}")
 154        else:
 155            log.debug(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}")
 156
 157        return status
 158
 159    def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True) -> Union[int, str]:
 160        """ Determine the file type of a given input file, either as an integer identifier or a string.
 161
 162        Args:
 163            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file to analyse. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object.
 164            as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False.
 165            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 166
 167        Returns:
 168            file_type (Union[int, str]): The file type.
 169        """
 170        if isinstance(input_file, str):
 171            if not os.path.isfile(input_file):
 172                raise FileNotFoundError(input_file)
 173
 174            # convert to ct.c_char_p of bytes
 175            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
 176
 177            # API call
 178            file_type = self.library.GW2DetermineFileTypeFromFile(ct_input_file)
 179
 180        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 181            # convert to bytes
 182            bytes_input_file = utils.as_bytes(input_file)
 183
 184            # ctypes conversion
 185            ct_buffer = ct.c_char_p(bytes_input_file)
 186            ct_buffer_length = ct.c_size_t(len(bytes_input_file))
 187
 188            # API call
 189            file_type = self.library.GW2DetermineFileTypeFromMemory(
 190                ct_buffer,
 191                ct_buffer_length
 192            )
 193
 194        else:
 195            raise TypeError(input_file)
 196
 197        file_type_as_string = dft.file_type_int_to_str(file_type)
 198        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 199
 200        if not dft.is_success(file_type):
 201            log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
 202            if raise_unsupported:
 203                raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
 204        else:
 205            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
 206
 207        if as_string:
 208            return file_type_as_string
 209
 210        return file_type
 211
 212    def _GW2GetPolicySettings(self, session: int, policy_format: int = 0):
 213        """ Get current policy settings for the given session.
 214
 215        Args:
 216            session (int): The session integer.
 217            policy_format (int): The format of the content management policy. 0=XML.
 218
 219        Returns:
 220            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status', 'policy'.
 221        """
 222        # API function declaration
 223        self.library.GW2GetPolicySettings.argtypes = [
 224            ct.c_size_t,  # Session_Handle session
 225            ct.POINTER(ct.c_void_p),  # char ** policiesBuffer
 226            ct.POINTER(ct.c_size_t),  # size_t * policiesLength
 227            ct.c_int,  # Policy_Format format
 228        ]
 229
 230        # Variable initialisation
 231        gw_return_object = glasswall.GwReturnObj()
 232        gw_return_object.session = ct.c_size_t(session)
 233        gw_return_object.buffer = ct.c_void_p()
 234        gw_return_object.buffer_length = ct.c_size_t()
 235        gw_return_object.policy_format = ct.c_int(policy_format)
 236
 237        # API Call
 238        gw_return_object.status = self.library.GW2GetPolicySettings(
 239            gw_return_object.session,
 240            ct.byref(gw_return_object.buffer),
 241            ct.byref(gw_return_object.buffer_length),
 242            gw_return_object.policy_format
 243        )
 244
 245        # Editor wrote to a buffer, convert it to bytes
 246        policy_bytes = utils.buffer_to_bytes(
 247            gw_return_object.buffer,
 248            gw_return_object.buffer_length
 249        )
 250
 251        gw_return_object.policy = policy_bytes.decode()
 252
 253        return gw_return_object
 254
 255    def get_content_management_policy(self, session: int):
 256        """ Returns the content management configuration for a given session.
 257
 258        Args:
 259            session (int): The session integer.
 260
 261        Returns:
 262            xml_string (str): The XML string of the current content management configuration.
 263        """
 264        # NOTE GW2GetPolicySettings is current not implemented in editor
 265
 266        # set xml_string as loaded default config
 267        xml_string = glasswall.content_management.policies.Editor(default="sanitise").text,
 268
 269        # log.debug(f"xml_string:\n{xml_string}")
 270
 271        return xml_string
 272
 273        # # API function declaration
 274        # self.library.GW2GetPolicySettings.argtypes = [
 275        #     ct.c_size_t,
 276        #     ct.c_void_p,
 277        # ]
 278
 279        # # Variable initialisation
 280        # ct_session = ct.c_size_t(session)
 281        # ct_buffer = ct.c_void_p()
 282        # ct_buffer_length = ct.c_size_t()
 283        # # ct_file_format = ct.c_int(file_format)
 284
 285        # # API Call
 286        # status = self.library.GW2GetPolicySettings(
 287        #     ct_session,
 288        #     ct.byref(ct_buffer),
 289        #     ct.byref(ct_buffer_length)
 290        # )
 291
 292        # print("GW2GetPolicySettings status:", status)
 293
 294        # file_bytes = utils.buffer_to_bytes(
 295        #     ct_buffer,
 296        #     ct_buffer_length,
 297        # )
 298
 299        # return file_bytes
 300
 301    def _GW2RegisterPoliciesFile(self, session: int, input_file: str, policy_format: int = 0):
 302        """ Registers the policies to be used by Glasswall when processing files.
 303
 304        Args:
 305            session (int): The session integer.
 306            input_file (str): The content management policy input file path.
 307            policy_format (int): The format of the content management policy. 0=XML.
 308
 309        Returns:
 310            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'.
 311        """
 312        # API function declaration
 313        self.library.GW2RegisterPoliciesFile.argtypes = [
 314            ct.c_size_t,  # Session_Handle session
 315            ct.c_char_p,  # const char *filename
 316            ct.c_int,  # Policy_Format format
 317        ]
 318
 319        # Variable initialisation
 320        gw_return_object = glasswall.GwReturnObj()
 321        gw_return_object.session = ct.c_size_t(session)
 322        gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8"))
 323        gw_return_object.policy_format = ct.c_int(policy_format)
 324
 325        gw_return_object.status = self.library.GW2RegisterPoliciesFile(
 326            gw_return_object.session,
 327            gw_return_object.input_file,
 328            gw_return_object.policy_format
 329        )
 330
 331        return gw_return_object
 332
 333    def _GW2RegisterPoliciesMemory(self, session: int, input_file: bytes, policy_format: int = 0):
 334        """ Registers the policies in memory to be used by Glasswall when processing files.
 335
 336        Args:
 337            session (int): The session integer.
 338            input_file (str): The content management policy input file bytes.
 339            policy_format (int): The format of the content management policy. 0=XML.
 340
 341        Returns:
 342            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'.
 343        """
 344        # API function declaration
 345        self.library.GW2RegisterPoliciesMemory.argtype = [
 346            ct.c_size_t,  # Session_Handle session
 347            ct.c_char_p,  # const char *policies
 348            ct.c_size_t,  # size_t policiesLength
 349            ct.c_int  # Policy_Format format
 350        ]
 351
 352        # Variable initialisation
 353        gw_return_object = glasswall.GwReturnObj()
 354        gw_return_object.session = ct.c_size_t(session)
 355        gw_return_object.buffer = ct.c_char_p(input_file)
 356        gw_return_object.buffer_length = ct.c_size_t(len(input_file))
 357        gw_return_object.policy_format = ct.c_int(policy_format)
 358
 359        # API Call
 360        gw_return_object.status = self.library.GW2RegisterPoliciesMemory(
 361            gw_return_object.session,
 362            gw_return_object.buffer,
 363            gw_return_object.buffer_length,
 364            gw_return_object.policy_format
 365        )
 366
 367        return gw_return_object
 368
 369    def set_content_management_policy(self, session: int, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, policy_format=0):
 370        """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.
 371
 372        Args:
 373            session (int): The session integer.
 374            input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 375            policy_format (int): The format of the content management policy. 0=XML.
 376
 377        Returns:
 378            - result (glasswall.GwReturnObj): Depending on the input 'input_file':
 379                - If input_file is a str file path:
 380                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'.
 381
 382                - If input_file is a file in memory:
 383                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'.
 384        """
 385        # Validate type
 386        if not isinstance(session, int):
 387            raise TypeError(session)
 388        if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 389            raise TypeError(input_file)
 390        if not isinstance(policy_format, int):
 391            raise TypeError(policy_format)
 392
 393        # Set input_file to default if input_file is None
 394        if input_file is None:
 395            input_file = glasswall.content_management.policies.Editor(default="sanitise")
 396
 397        # Validate xml content is parsable
 398        utils.validate_xml(input_file)
 399
 400        # From file
 401        if isinstance(input_file, str) and os.path.isfile(input_file):
 402            input_file = os.path.abspath(input_file)
 403
 404            result = self._GW2RegisterPoliciesFile(session, input_file)
 405
 406        # From memory
 407        elif isinstance(input_file, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 408            # Convert bytearray, io.BytesIO to bytes
 409            if isinstance(input_file, (bytearray, io.BytesIO)):
 410                input_file = utils.as_bytes(input_file)
 411            # Convert string xml or Policy to bytes
 412            if isinstance(input_file, (str, glasswall.content_management.policies.policy.Policy)):
 413                input_file = input_file.encode("utf-8")
 414
 415            result = self._GW2RegisterPoliciesMemory(session, input_file)
 416
 417        if result.status not in successes.success_codes:
 418            log.error(format_object(result))
 419            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
 420        else:
 421            log.debug(format_object(result))
 422
 423        return result
 424
 425    def _GW2RegisterInputFile(self, session: int, input_file: str):
 426        """ Register an input file for the given session.
 427
 428        Args:
 429            session (int): The session integer.
 430            input_file (str): The input file path.
 431
 432        Returns:
 433            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
 434        """
 435        # API function declaration
 436        self.library.GW2RegisterInputFile.argtypes = [
 437            ct.c_size_t,  # Session_Handle session
 438            ct.c_char_p  # const char * inputFilePath
 439        ]
 440
 441        # Variable initialisation
 442        gw_return_object = glasswall.GwReturnObj()
 443        gw_return_object.session = ct.c_size_t(session)
 444        gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8"))
 445        gw_return_object.input_file_path = input_file
 446
 447        # API call
 448        gw_return_object.status = self.library.GW2RegisterInputFile(
 449            gw_return_object.session,
 450            gw_return_object.input_file
 451        )
 452
 453        return gw_return_object
 454
 455    def _GW2RegisterInputMemory(self, session: int, input_file: bytes):
 456        """ Register an input file in memory for the given session.
 457
 458        Args:
 459            session (int): The session integer.
 460            input_file (bytes): The input file in memory.
 461
 462        Returns:
 463            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
 464        """
 465        # API function declaration
 466        self.library.GW2RegisterInputMemory.argtypes = [
 467            ct.c_size_t,  # Session_Handle session
 468            ct.c_char_p,  # const char * inputFileBuffer
 469            ct.c_size_t,  # size_t inputLength
 470        ]
 471
 472        # Variable initialisation
 473        gw_return_object = glasswall.GwReturnObj()
 474        gw_return_object.session = ct.c_size_t(session)
 475        gw_return_object.buffer = ct.c_char_p(input_file)
 476        gw_return_object.buffer_length = ct.c_size_t(len(input_file))
 477
 478        # API call
 479        gw_return_object.status = self.library.GW2RegisterInputMemory(
 480            gw_return_object.session,
 481            gw_return_object.buffer,
 482            gw_return_object.buffer_length
 483        )
 484
 485        return gw_return_object
 486
 487    def register_input(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
 488        """ Register an input file or bytes for the given session.
 489
 490        Args:
 491            session (int): The session integer.
 492            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 493
 494        Returns:
 495            - result (glasswall.GwReturnObj): Depending on the input 'input_file':
 496                - If input_file is a str file path:
 497                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
 498
 499                - If input_file is a file in memory:
 500                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
 501        """
 502        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
 503            raise TypeError(input_file)
 504
 505        if isinstance(input_file, str):
 506            if not os.path.isfile(input_file):
 507                raise FileNotFoundError(input_file)
 508
 509            input_file = os.path.abspath(input_file)
 510
 511            result = self._GW2RegisterInputFile(session, input_file)
 512
 513        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
 514            # Convert bytearray and io.BytesIO to bytes
 515            input_file = utils.as_bytes(input_file)
 516
 517            result = self._GW2RegisterInputMemory(session, input_file)
 518
 519        if result.status not in successes.success_codes:
 520            log.error(format_object(result))
 521            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
 522        else:
 523            log.debug(format_object(result))
 524
 525        return result
 526
 527    def _GW2RegisterOutFile(self, session: int, output_file: str):
 528        """ Register an output file for the given session.
 529
 530        Args:
 531            session (int): The session integer.
 532            output_file (str): The output file path.
 533
 534        Returns:
 535            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
 536        """
 537        # API function declaration
 538        self.library.GW2RegisterOutFile.argtypes = [
 539            ct.c_size_t,  # Session_Handle session
 540            ct.c_char_p  # const char * outputFilePath
 541        ]
 542
 543        # Variable initialisation
 544        gw_return_object = glasswall.GwReturnObj()
 545        gw_return_object.session = ct.c_size_t(session)
 546        gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8"))
 547
 548        # API call
 549        gw_return_object.status = self.library.GW2RegisterOutFile(
 550            gw_return_object.session,
 551            gw_return_object.output_file
 552        )
 553
 554        return gw_return_object
 555
 556    def _GW2RegisterOutputMemory(self, session: int):
 557        """ Register an output file in memory for the given session.
 558
 559        Args:
 560            session (int): The session integer.
 561
 562        Returns:
 563            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
 564        """
 565        # API function declaration
 566        self.library.GW2RegisterOutputMemory.argtypes = [
 567            ct.c_size_t,  # Session_Handle session
 568            ct.POINTER(ct.c_void_p),  # char ** outputBuffer
 569            ct.POINTER(ct.c_size_t)  # size_t * outputLength
 570        ]
 571
 572        # Variable initialisation
 573        gw_return_object = glasswall.GwReturnObj()
 574        gw_return_object.session = ct.c_size_t(session)
 575        gw_return_object.buffer = ct.c_void_p()
 576        gw_return_object.buffer_length = ct.c_size_t()
 577
 578        # API call
 579        gw_return_object.status = self.library.GW2RegisterOutputMemory(
 580            gw_return_object.session,
 581            ct.byref(gw_return_object.buffer),
 582            ct.byref(gw_return_object.buffer_length)
 583        )
 584
 585        return gw_return_object
 586
 587    def register_output(self, session, output_file: Optional[str] = None):
 588        """ Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes.
 589
 590        Args:
 591            session (int): The session integer.
 592            output_file (Optional[str]): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes.
 593
 594        Returns:
 595            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
 596        """
 597        if not isinstance(output_file, (type(None), str)):
 598            raise TypeError(output_file)
 599
 600        if isinstance(output_file, str):
 601            output_file = os.path.abspath(output_file)
 602
 603            result = self._GW2RegisterOutFile(session, output_file)
 604
 605        elif isinstance(output_file, type(None)):
 606            result = self._GW2RegisterOutputMemory(session)
 607
 608        if result.status not in successes.success_codes:
 609            log.error(format_object(result))
 610            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
 611        else:
 612            log.debug(format_object(result))
 613
 614        return result
 615
 616    def _GW2RegisterAnalysisFile(self, session: int, output_file: str, analysis_format: int = 0):
 617        """ Register an analysis output file for the given session.
 618
 619        Args:
 620            session (int): The session integer.
 621            output_file (str): The analysis output file path.
 622            analysis_format (int): The format of the analysis report. 0=XML, 1=XMLExtended
 623
 624        Returns:
 625            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'analysis_format', 'status'.
 626        """
 627        # API function declaration
 628        self.library.GW2RegisterAnalysisFile.argtypes = [
 629            ct.c_size_t,  # Session_Handle session
 630            ct.c_char_p,  # const char * analysisFilePathName
 631            ct.c_int,  # Analysis_Format format
 632        ]
 633
 634        # Variable initialisation
 635        gw_return_object = glasswall.GwReturnObj()
 636        gw_return_object.session = ct.c_size_t(session)
 637        gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8"))
 638        gw_return_object.analysis_format = ct.c_int()
 639
 640        # API call
 641        gw_return_object.status = self.library.GW2RegisterAnalysisFile(
 642            gw_return_object.session,
 643            gw_return_object.output_file,
 644            gw_return_object.analysis_format
 645        )
 646
 647        return gw_return_object
 648
 649    def _GW2RegisterAnalysisMemory(self, session: int, analysis_format: int = 0):
 650        """ Register an analysis output file in memory for the given session.
 651
 652        Args:
 653            session (int): The session integer.
 654            analysis_format (int): The format of the analysis report. 0=XML, 1=XMLExtended
 655
 656        Returns:
 657            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'analysis_format', 'status'.
 658        """
 659        # API function declaration
 660        self.library.GW2RegisterAnalysisMemory.argtypes = [
 661            ct.c_size_t,  # Session_Handle session
 662            ct.POINTER(ct.c_void_p),  # char ** analysisFileBuffer
 663            ct.POINTER(ct.c_size_t),  # size_t * analysisoutputLength
 664            ct.c_int  # Analysis_Format format
 665        ]
 666
 667        # Variable initialisation
 668        gw_return_object = glasswall.GwReturnObj()
 669        gw_return_object.session = ct.c_size_t(session)
 670        gw_return_object.buffer = ct.c_void_p()
 671        gw_return_object.buffer_length = ct.c_size_t()
 672        gw_return_object.analysis_format = ct.c_int()
 673
 674        # API call
 675        gw_return_object.status = self.library.GW2RegisterAnalysisMemory(
 676            gw_return_object.session,
 677            ct.byref(gw_return_object.buffer),
 678            ct.byref(gw_return_object.buffer_length),
 679            gw_return_object.analysis_format
 680        )
 681
 682        return gw_return_object
 683
 684    def register_analysis(self, session: int, output_file: Optional[str] = None):
 685        """ Registers an analysis file for the given session. The analysis file will be created during the session's run_session call.
 686
 687        Args:
 688            session (int): The session integer.
 689            output_file (Optional[str]): Default None. The file path where the analysis will be written. None returns the analysis as bytes.
 690
 691        Returns:
 692            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'status', 'session', 'analysis_format'. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. If output_file is not None (file mode) 'output_file' is included.
 693        """
 694        if not isinstance(output_file, (type(None), str)):
 695            raise TypeError(output_file)
 696
 697        if isinstance(output_file, str):
 698            output_file = os.path.abspath(output_file)
 699
 700            result = self._GW2RegisterAnalysisFile(session, output_file)
 701
 702        elif isinstance(output_file, type(None)):
 703            result = self._GW2RegisterAnalysisMemory(session)
 704
 705        if result.status not in successes.success_codes:
 706            log.error(format_object(result))
 707            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
 708        else:
 709            log.debug(format_object(result))
 710
 711        return result
 712
 713    def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 714        """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.
 715
 716        Args:
 717            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 718            output_file (Optional[str]): The output file path where the protected file will be written.
 719            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
 720            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 721
 722        Returns:
 723            file_bytes (bytes): The protected file bytes.
 724        """
 725        # Validate arg types
 726        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 727            raise TypeError(input_file)
 728        if not isinstance(output_file, (type(None), str)):
 729            raise TypeError(output_file)
 730        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 731            raise TypeError(content_management_policy)
 732        if not isinstance(raise_unsupported, bool):
 733            raise TypeError(raise_unsupported)
 734
 735        # Convert string path arguments to absolute paths
 736        if isinstance(input_file, str):
 737            if not os.path.isfile(input_file):
 738                raise FileNotFoundError(input_file)
 739            input_file = os.path.abspath(input_file)
 740        if isinstance(output_file, str):
 741            output_file = os.path.abspath(output_file)
 742            # make directories that do not exist
 743            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 744        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 745            content_management_policy = os.path.abspath(content_management_policy)
 746
 747        # Convert memory inputs to bytes
 748        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 749            input_file = utils.as_bytes(input_file)
 750
 751        with utils.CwdHandler(self.library_path):
 752            with self.new_session() as session:
 753                content_management_policy = self.set_content_management_policy(session, content_management_policy)
 754                register_input = self.register_input(session, input_file)
 755                register_output = self.register_output(session, output_file=output_file)
 756                status = self.run_session(session)
 757
 758                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 759                if status not in successes.success_codes:
 760                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
 761                    if raise_unsupported:
 762                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 763                    else:
 764                        file_bytes = None
 765                else:
 766                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
 767                    # Get file bytes
 768                    if isinstance(output_file, str):
 769                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
 770                        if not os.path.isfile(output_file):
 771                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
 772                            file_bytes = None
 773                        else:
 774                            with open(output_file, "rb") as f:
 775                                file_bytes = f.read()
 776                    else:
 777                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
 778                        file_bytes = utils.buffer_to_bytes(
 779                            register_output.buffer,
 780                            register_output.buffer_length
 781                        )
 782
 783                # Ensure memory allocated is not garbage collected
 784                content_management_policy, register_input, register_output
 785
 786                return file_bytes
 787
 788    def protect_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 789        """ Recursively processes all files in a directory in protect mode using the given content management policy.
 790        The protected files are written to output_directory maintaining the same directory structure as input_directory.
 791
 792        Args:
 793            input_directory (str): The input directory containing files to protect.
 794            output_directory (Optional[str]): The output directory where the protected file will be written, or None to not write files.
 795            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 796            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 797
 798        Returns:
 799            protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
 800        """
 801        protected_files_dict = {}
 802        # Call protect_file on each file in input_directory to output_directory
 803        for input_file in utils.list_file_paths(input_directory):
 804            relative_path = os.path.relpath(input_file, input_directory)
 805            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
 806
 807            protected_bytes = self.protect_file(
 808                input_file=input_file,
 809                output_file=output_file,
 810                raise_unsupported=raise_unsupported,
 811                content_management_policy=content_management_policy,
 812            )
 813
 814            protected_files_dict[relative_path] = protected_bytes
 815
 816        return protected_files_dict
 817
 818    def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 819        """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.
 820
 821        Args:
 822            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 823            output_file (Optional[str]): The output file path where the analysis file will be written.
 824            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
 825            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 826
 827        Returns:
 828            file_bytes (bytes): The analysis file bytes.
 829        """
 830        # Validate arg types
 831        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 832            raise TypeError(input_file)
 833        if not isinstance(output_file, (type(None), str)):
 834            raise TypeError(output_file)
 835        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 836            raise TypeError(content_management_policy)
 837        if not isinstance(raise_unsupported, bool):
 838            raise TypeError(raise_unsupported)
 839
 840        # Convert string path arguments to absolute paths
 841        if isinstance(input_file, str):
 842            if not os.path.isfile(input_file):
 843                raise FileNotFoundError(input_file)
 844            input_file = os.path.abspath(input_file)
 845        if isinstance(output_file, str):
 846            output_file = os.path.abspath(output_file)
 847            # make directories that do not exist
 848            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 849        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 850            content_management_policy = os.path.abspath(content_management_policy)
 851
 852        # Convert memory inputs to bytes
 853        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 854            input_file = utils.as_bytes(input_file)
 855
 856        with utils.CwdHandler(self.library_path):
 857            with self.new_session() as session:
 858                content_management_policy = self.set_content_management_policy(session, content_management_policy)
 859                register_input = self.register_input(session, input_file)
 860                register_analysis = self.register_analysis(session, output_file)
 861                status = self.run_session(session)
 862
 863                file_bytes = None
 864                if isinstance(output_file, str):
 865                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
 866                    if os.path.isfile(output_file):
 867                        with open(output_file, "rb") as f:
 868                            file_bytes = f.read()
 869                else:
 870                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
 871                    if register_analysis.buffer and register_analysis.buffer_length:
 872                        file_bytes = utils.buffer_to_bytes(
 873                            register_analysis.buffer,
 874                            register_analysis.buffer_length
 875                        )
 876
 877                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 878                if status not in successes.success_codes:
 879                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
 880                    if raise_unsupported:
 881                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 882                else:
 883                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
 884
 885                # Ensure memory allocated is not garbage collected
 886                content_management_policy, register_input, register_analysis
 887
 888                return file_bytes
 889
 890    def analyse_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 891        """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.
 892
 893        Args:
 894            input_directory (str): The input directory containing files to analyse.
 895            output_directory (Optional[str]): The output directory where the analysis files will be written, or None to not write files.
 896            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 897            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 898
 899        Returns:
 900            analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
 901        """
 902        analysis_files_dict = {}
 903        # Call analyse_file on each file in input_directory to output_directory
 904        for input_file in utils.list_file_paths(input_directory):
 905            relative_path = os.path.relpath(input_file, input_directory) + ".xml"
 906            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
 907
 908            analysis_bytes = self.analyse_file(
 909                input_file=input_file,
 910                output_file=output_file,
 911                raise_unsupported=raise_unsupported,
 912                content_management_policy=content_management_policy,
 913            )
 914
 915            analysis_files_dict[relative_path] = analysis_bytes
 916
 917        return analysis_files_dict
 918
 919    def protect_and_analyse_file(
 920        self,
 921        input_file: Union[str, bytes, bytearray, io.BytesIO],
 922        output_file: Optional[str] = None,
 923        output_analysis_report: Optional[str] = None,
 924        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
 925        raise_unsupported: bool = True,
 926    ):
 927        """ Protects and analyses a file in a single session, returning both protected file bytes and analysis report bytes.
 928
 929        Args:
 930            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 931            output_file (Optional[str]): The output file path where the protected file will be written.
 932            output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written.
 933            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
 934            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 935
 936        Returns:
 937            Tuple[Optional[bytes], Optional[bytes]]: A tuple of (protected_file_bytes, analysis_report_bytes).
 938        """
 939        # Validate arg types
 940        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 941            raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}")
 942        if not isinstance(output_file, (type(None), str)):
 943            raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}")
 944        if not isinstance(output_analysis_report, (type(None), str)):
 945            raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}")
 946        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 947            raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}")
 948        if not isinstance(raise_unsupported, bool):
 949            raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}")
 950
 951        # Convert string path arguments to absolute paths
 952        if isinstance(input_file, str):
 953            if not os.path.isfile(input_file):
 954                raise FileNotFoundError(input_file)
 955            input_file = os.path.abspath(input_file)
 956        if isinstance(output_file, str):
 957            output_file = os.path.abspath(output_file)
 958            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 959        if isinstance(output_analysis_report, str):
 960            output_analysis_report = os.path.abspath(output_analysis_report)
 961            os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True)
 962        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 963            content_management_policy = os.path.abspath(content_management_policy)
 964
 965        # Convert memory inputs to bytes
 966        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 967            input_file = utils.as_bytes(input_file)
 968
 969        with utils.CwdHandler(self.library_path):
 970            with self.new_session() as session:
 971                content_management_policy = self.set_content_management_policy(session, content_management_policy)
 972                register_input = self.register_input(session, input_file)
 973                register_output = self.register_output(session, output_file)
 974                register_analysis = self.register_analysis(session, output_analysis_report)
 975
 976                status = self.run_session(session)
 977
 978                protected_file_bytes = None
 979                analysis_report_bytes = None
 980
 981                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 982                if status not in successes.success_codes:
 983                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
 984                    if raise_unsupported:
 985                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 986
 987                # Get analysis report file bytes, even on processing failure
 988                if isinstance(output_analysis_report, str):
 989                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
 990                    if not os.path.isfile(output_analysis_report):
 991                        log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}")
 992                    else:
 993                        with open(output_analysis_report, "rb") as f:
 994                            analysis_report_bytes = f.read()
 995                else:
 996                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
 997                    if register_analysis.buffer and register_analysis.buffer_length:
 998                        analysis_report_bytes = utils.buffer_to_bytes(
 999                            register_analysis.buffer,
1000                            register_analysis.buffer_length
1001                        )
1002
1003                # On success, get protected file bytes
1004                if status in successes.success_codes:
1005                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
1006                    # Get file bytes
1007                    if isinstance(output_file, str):
1008                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1009                        if not os.path.isfile(output_file):
1010                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1011                        else:
1012                            with open(output_file, "rb") as f:
1013                                protected_file_bytes = f.read()
1014                    else:
1015                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1016                        protected_file_bytes = utils.buffer_to_bytes(
1017                            register_output.buffer,
1018                            register_output.buffer_length
1019                        )
1020
1021                # Ensure memory allocated is not garbage collected
1022                content_management_policy, register_input, register_output, register_analysis
1023
1024                return protected_file_bytes, analysis_report_bytes
1025
1026    def protect_and_analyse_directory(
1027        self,
1028        input_directory: str,
1029        output_directory: Optional[str],
1030        analysis_directory: Optional[str],
1031        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
1032        raise_unsupported: bool = True,
1033    ):
1034        """ Recursively processes all files in a directory using protect and analyse mode with the given content management policy.
1035        Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory.
1036
1037        Args:
1038            input_directory (str): The input directory containing files to process.
1039            output_directory (Optional[str]): The output directory for protected files.
1040            analysis_directory (Optional[str]): The output directory for XML analysis reports.
1041            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
1042            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1043
1044        Returns:
1045            result_dict (dict): A dictionary mapping relative file paths to tuples of (protected_file_bytes, analysis_report_bytes).
1046        """
1047        result_dict = {}
1048        for input_file in utils.list_file_paths(input_directory):
1049            relative_path = os.path.relpath(input_file, input_directory)
1050            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
1051            output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml")
1052
1053            protected_file_bytes, analysis_report_bytes = self.protect_and_analyse_file(
1054                input_file=input_file,
1055                output_file=output_file,
1056                output_analysis_report=output_analysis_report,
1057                content_management_policy=content_management_policy,
1058                raise_unsupported=raise_unsupported,
1059            )
1060
1061            result_dict[relative_path] = (protected_file_bytes, analysis_report_bytes)
1062
1063        return result_dict
1064
1065    def _GW2RegisterExportFile(self, session: int, output_file: str):
1066        """ Register an export output file for the given session.
1067
1068        Args:
1069            session (int): The session integer.
1070            output_file (str): The export output file path.
1071
1072        Returns:
1073            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
1074        """
1075        # API function declaration
1076        self.library.GW2RegisterExportFile.argtypes = [
1077            ct.c_size_t,  # Session_Handle session
1078            ct.c_char_p  # const char * exportFilePath
1079        ]
1080
1081        # Variable initialisation
1082        gw_return_object = glasswall.GwReturnObj()
1083        gw_return_object.session = ct.c_size_t(session)
1084        gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8"))
1085
1086        # API Call
1087        gw_return_object.status = self.library.GW2RegisterExportFile(
1088            gw_return_object.session,
1089            gw_return_object.output_file
1090        )
1091
1092        return gw_return_object
1093
1094    def _GW2RegisterExportMemory(self, session: int):
1095        """ Register an export output file in memory for the given session.
1096
1097        Args:
1098            session (int): The session integer.
1099
1100        Returns:
1101            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
1102        """
1103        # API function declaration
1104        self.library.GW2RegisterExportMemory.argtypes = [
1105            ct.c_size_t,  # Session_Handle session
1106            ct.POINTER(ct.c_void_p),  # char ** exportFileBuffer
1107            ct.POINTER(ct.c_size_t)  # size_t * exportLength
1108        ]
1109
1110        # Variable initialisation
1111        gw_return_object = glasswall.GwReturnObj()
1112        gw_return_object.session = ct.c_size_t(session)
1113        gw_return_object.buffer = ct.c_void_p()
1114        gw_return_object.buffer_length = ct.c_size_t()
1115
1116        # API call
1117        gw_return_object.status = self.library.GW2RegisterExportMemory(
1118            gw_return_object.session,
1119            ct.byref(gw_return_object.buffer),
1120            ct.byref(gw_return_object.buffer_length)
1121        )
1122
1123        return gw_return_object
1124
1125    def register_export(self, session: int, output_file: Optional[str] = None):
1126        """ Registers a file to be exported for the given session. The export file will be created during the session's run_session call.
1127
1128        Args:
1129            session (int): The session integer.
1130            output_file (Optional[str]): Default None. The file path where the export will be written. None exports the file in memory.
1131
1132        Returns:
1133            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call and 'session', the session integer. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
1134        """
1135        if not isinstance(output_file, (type(None), str)):
1136            raise TypeError(output_file)
1137
1138        if isinstance(output_file, str):
1139            output_file = os.path.abspath(output_file)
1140
1141            result = self._GW2RegisterExportFile(session, output_file)
1142
1143        elif isinstance(output_file, type(None)):
1144            result = self._GW2RegisterExportMemory(session)
1145
1146        if result.status not in successes.success_codes:
1147            log.error(format_object(result))
1148            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1149        else:
1150            log.debug(format_object(result))
1151
1152        return result
1153
1154    def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1155        """ Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided.
1156
1157        Args:
1158            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
1159            output_file (Optional[str]): The output file path where the .zip file will be written.
1160            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
1161            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1162
1163        Returns:
1164            file_bytes (bytes): The exported .zip file.
1165        """
1166        # Validate arg types
1167        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
1168            raise TypeError(input_file)
1169        if not isinstance(output_file, (type(None), str)):
1170            raise TypeError(output_file)
1171        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
1172            raise TypeError(content_management_policy)
1173        if not isinstance(raise_unsupported, bool):
1174            raise TypeError(raise_unsupported)
1175
1176        # Convert string path arguments to absolute paths
1177        if isinstance(input_file, str):
1178            if not os.path.isfile(input_file):
1179                raise FileNotFoundError(input_file)
1180            input_file = os.path.abspath(input_file)
1181        if isinstance(output_file, str):
1182            output_file = os.path.abspath(output_file)
1183            # make directories that do not exist
1184            os.makedirs(os.path.dirname(output_file), exist_ok=True)
1185        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
1186            content_management_policy = os.path.abspath(content_management_policy)
1187
1188        # Convert memory inputs to bytes
1189        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
1190            input_file = utils.as_bytes(input_file)
1191
1192        with utils.CwdHandler(self.library_path):
1193            with self.new_session() as session:
1194                content_management_policy = self.set_content_management_policy(session, content_management_policy)
1195                register_input = self.register_input(session, input_file)
1196                register_export = self.register_export(session, output_file)
1197                status = self.run_session(session)
1198
1199                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
1200                if status not in successes.success_codes:
1201                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1202                    if raise_unsupported:
1203                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1204                    else:
1205                        file_bytes = None
1206                else:
1207                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1208                    # Get file bytes
1209                    if isinstance(output_file, str):
1210                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1211                        if not os.path.isfile(output_file):
1212                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1213                            file_bytes = None
1214                        else:
1215                            with open(output_file, "rb") as f:
1216                                file_bytes = f.read()
1217                    else:
1218                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1219                        file_bytes = utils.buffer_to_bytes(
1220                            register_export.buffer,
1221                            register_export.buffer_length
1222                        )
1223
1224                # Ensure memory allocated is not garbage collected
1225                content_management_policy, register_input, register_export
1226
1227                return file_bytes
1228
1229    def export_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1230        """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.
1231
1232        Args:
1233            input_directory (str): The input directory containing files to export.
1234            output_directory (Optional[str]): The output directory where the export files will be written, or None to not write files.
1235            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
1236            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1237
1238        Returns:
1239            export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
1240        """
1241        export_files_dict = {}
1242        # Call export_file on each file in input_directory to output_directory
1243        for input_file in utils.list_file_paths(input_directory):
1244            relative_path = os.path.relpath(input_file, input_directory) + ".zip"
1245            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
1246
1247            export_bytes = self.export_file(
1248                input_file=input_file,
1249                output_file=output_file,
1250                raise_unsupported=raise_unsupported,
1251                content_management_policy=content_management_policy,
1252            )
1253
1254            export_files_dict[relative_path] = export_bytes
1255
1256        return export_files_dict
1257
1258    def export_and_analyse_file(
1259        self,
1260        input_file: Union[str, bytes, bytearray, io.BytesIO],
1261        output_file: Optional[str] = None,
1262        output_analysis_report: Optional[str] = None,
1263        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
1264        raise_unsupported: bool = True,
1265    ):
1266        """ Exports and analyses a file in a single session, returning both exported .zip bytes and analysis report bytes.
1267
1268        Args:
1269            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
1270            output_file (Optional[str]): The output file path where the .zip export will be written.
1271            output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written.
1272            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
1273            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1274
1275        Returns:
1276            Tuple[Optional[bytes], Optional[bytes]]: A tuple of (exported_file_bytes, analysis_report_bytes).
1277        """
1278        # Validate arg types
1279        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
1280            raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}")
1281        if not isinstance(output_file, (type(None), str)):
1282            raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}")
1283        if not isinstance(output_analysis_report, (type(None), str)):
1284            raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}")
1285        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
1286            raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}")
1287        if not isinstance(raise_unsupported, bool):
1288            raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}")
1289
1290        # Convert string path arguments to absolute paths
1291        if isinstance(input_file, str):
1292            if not os.path.isfile(input_file):
1293                raise FileNotFoundError(input_file)
1294            input_file = os.path.abspath(input_file)
1295        if isinstance(output_file, str):
1296            output_file = os.path.abspath(output_file)
1297            os.makedirs(os.path.dirname(output_file), exist_ok=True)
1298        if isinstance(output_analysis_report, str):
1299            output_analysis_report = os.path.abspath(output_analysis_report)
1300            os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True)
1301        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
1302            content_management_policy = os.path.abspath(content_management_policy)
1303
1304        # Convert memory inputs to bytes
1305        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
1306            input_file = utils.as_bytes(input_file)
1307
1308        with utils.CwdHandler(self.library_path):
1309            with self.new_session() as session:
1310                content_management_policy = self.set_content_management_policy(session, content_management_policy)
1311                register_input = self.register_input(session, input_file)
1312                register_export = self.register_export(session, output_file)
1313                register_analysis = self.register_analysis(session, output_analysis_report)
1314
1315                status = self.run_session(session)
1316
1317                export_file_bytes = None
1318                analysis_report_bytes = None
1319
1320                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
1321                if status not in successes.success_codes:
1322                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
1323                    if raise_unsupported:
1324                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1325
1326                # Get analysis report file bytes, even on processing failure
1327                if isinstance(output_analysis_report, str):
1328                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1329                    if not os.path.isfile(output_analysis_report):
1330                        log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}")
1331                    else:
1332                        with open(output_analysis_report, "rb") as f:
1333                            analysis_report_bytes = f.read()
1334                else:
1335                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1336                    if register_analysis.buffer and register_analysis.buffer_length:
1337                        analysis_report_bytes = utils.buffer_to_bytes(
1338                            register_analysis.buffer,
1339                            register_analysis.buffer_length
1340                        )
1341
1342                # On success, get export file bytes
1343                if status in successes.success_codes:
1344                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
1345                    # Get export file bytes
1346                    if isinstance(output_file, str):
1347                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1348                        if not os.path.isfile(output_file):
1349                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1350                        else:
1351                            with open(output_file, "rb") as f:
1352                                export_file_bytes = f.read()
1353                    else:
1354                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1355                        export_file_bytes = utils.buffer_to_bytes(
1356                            register_export.buffer,
1357                            register_export.buffer_length
1358                        )
1359
1360                # Ensure memory allocated is not garbage collected
1361                content_management_policy, register_input, register_export, register_analysis
1362
1363                return export_file_bytes, analysis_report_bytes
1364
1365    def export_and_analyse_directory(
1366        self,
1367        input_directory: str,
1368        output_directory: Optional[str],
1369        analysis_directory: Optional[str],
1370        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
1371        raise_unsupported: bool = True,
1372    ):
1373        """ Recursively processes all files in a directory using export and analyse mode with the given content management policy.
1374        Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory.
1375
1376        Args:
1377            input_directory (str): The input directory containing files to process.
1378            output_directory (Optional[str]): The output directory for exported .zip files.
1379            analysis_directory (Optional[str]): The output directory for XML analysis reports.
1380            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
1381            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1382
1383        Returns:
1384            result_dict (dict): A dictionary mapping relative file paths to tuples of (exported_file_bytes, analysis_report_bytes).
1385        """
1386        result_dict = {}
1387
1388        for input_file in utils.list_file_paths(input_directory):
1389            relative_path = os.path.relpath(input_file, input_directory)
1390            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path + ".zip")
1391            output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml")
1392
1393            export_file_bytes, analysis_report_bytes = self.export_and_analyse_file(
1394                input_file=input_file,
1395                output_file=output_file,
1396                output_analysis_report=output_analysis_report,
1397                content_management_policy=content_management_policy,
1398                raise_unsupported=raise_unsupported,
1399            )
1400
1401            result_dict[relative_path] = (export_file_bytes, analysis_report_bytes)
1402
1403        return result_dict
1404
1405    def _GW2RegisterImportFile(self, session: int, input_file: str):
1406        """ Register an import input file for the given session.
1407
1408        Args:
1409            session (int): The session integer.
1410            input_file (str): The input import file path.
1411
1412        Returns:
1413            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
1414        """
1415        # API function declaration
1416        self.library.GW2RegisterImportFile.argtypes = [
1417            ct.c_size_t,  # Session_Handle session
1418            ct.c_char_p  # const char * importFilePath
1419        ]
1420
1421        # Variable initialisation
1422        gw_return_object = glasswall.GwReturnObj()
1423        gw_return_object.session = ct.c_size_t(session)
1424        gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8"))
1425
1426        # API Call
1427        gw_return_object.status = self.library.GW2RegisterImportFile(
1428            gw_return_object.session,
1429            gw_return_object.input_file
1430        )
1431
1432        return gw_return_object
1433
1434    def _GW2RegisterImportMemory(self, session: int, input_file: bytes):
1435        """ Register an import input file in memory for the given session.
1436
1437        Args:
1438            session (int): The session integer.
1439            input_file (str): The input import file in memory.
1440
1441        Returns:
1442            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
1443        """
1444        # API function declaration
1445        self.library.GW2RegisterImportMemory.argtypes = [
1446            ct.c_size_t,  # Session_Handle session
1447            ct.c_void_p,  # char * importFileBuffer
1448            ct.c_size_t  # size_t importLength
1449        ]
1450
1451        # Variable initialisation
1452        gw_return_object = glasswall.GwReturnObj()
1453        gw_return_object.session = ct.c_size_t(session)
1454        gw_return_object.buffer = ct.c_char_p(input_file)
1455        gw_return_object.buffer_length = ct.c_size_t(len(input_file))
1456
1457        # API call
1458        gw_return_object.status = self.library.GW2RegisterImportMemory(
1459            gw_return_object.session,
1460            gw_return_object.buffer,
1461            gw_return_object.buffer_length
1462        )
1463
1464        return gw_return_object
1465
1466    def register_import(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
1467        """ Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call.
1468
1469        Args:
1470            session (int): The session integer.
1471            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input import file path or bytes.
1472
1473        Returns:
1474            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
1475        """
1476        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
1477            raise TypeError(input_file)
1478
1479        if isinstance(input_file, str):
1480            if not os.path.isfile(input_file):
1481                raise FileNotFoundError(input_file)
1482
1483            input_file = os.path.abspath(input_file)
1484
1485            result = self._GW2RegisterImportFile(session, input_file)
1486
1487        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
1488            # Convert bytearray and io.BytesIO to bytes
1489            input_file = utils.as_bytes(input_file)
1490
1491            result = self._GW2RegisterImportMemory(session, input_file)
1492
1493        if result.status not in successes.success_codes:
1494            log.error(format_object(result))
1495            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1496        else:
1497            log.debug(format_object(result))
1498
1499        return result
1500
1501    def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1502        """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.
1503
1504        Args:
1505            input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes.
1506            output_file (Optional[str]): The output file path where the constructed file will be written.
1507            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
1508            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1509
1510        Returns:
1511            file_bytes (bytes): The imported file bytes.
1512        """
1513        # Validate arg types
1514        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
1515            raise TypeError(input_file)
1516        if not isinstance(output_file, (type(None), str)):
1517            raise TypeError(output_file)
1518        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
1519            raise TypeError(content_management_policy)
1520        if not isinstance(raise_unsupported, bool):
1521            raise TypeError(raise_unsupported)
1522
1523        # Convert string path arguments to absolute paths
1524        if isinstance(input_file, str):
1525            if not os.path.isfile(input_file):
1526                raise FileNotFoundError(input_file)
1527            input_file = os.path.abspath(input_file)
1528        if isinstance(output_file, str):
1529            output_file = os.path.abspath(output_file)
1530            # make directories that do not exist
1531            os.makedirs(os.path.dirname(output_file), exist_ok=True)
1532        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
1533            content_management_policy = os.path.abspath(content_management_policy)
1534
1535        # Convert memory inputs to bytes
1536        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
1537            input_file = utils.as_bytes(input_file)
1538
1539        with utils.CwdHandler(self.library_path):
1540            with self.new_session() as session:
1541                content_management_policy = self.set_content_management_policy(session, content_management_policy)
1542                register_import = self.register_import(session, input_file)
1543                register_output = self.register_output(session, output_file)
1544                status = self.run_session(session)
1545
1546                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
1547                if status not in successes.success_codes:
1548                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1549                    if raise_unsupported:
1550                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1551                    else:
1552                        file_bytes = None
1553                else:
1554                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1555                    # Get file bytes
1556                    if isinstance(output_file, str):
1557                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1558                        if not os.path.isfile(output_file):
1559                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1560                            file_bytes = None
1561                        else:
1562                            with open(output_file, "rb") as f:
1563                                file_bytes = f.read()
1564                    else:
1565                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1566                        file_bytes = utils.buffer_to_bytes(
1567                            register_output.buffer,
1568                            register_output.buffer_length
1569                        )
1570
1571                # Ensure memory allocated is not garbage collected
1572                content_management_policy, register_import, register_output
1573
1574                return file_bytes
1575
1576    def import_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1577        """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced.
1578        The constructed files are written to output_directory maintaining the same directory structure as input_directory.
1579
1580        Args:
1581            input_directory (str): The input directory containing files to import.
1582            output_directory (Optional[str]): The output directory where the constructed files will be written, or None to not write files.
1583            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
1584            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1585
1586        Returns:
1587            import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
1588        """
1589        import_files_dict = {}
1590        # Call import_file on each file in input_directory to output_directory
1591        for input_file in utils.list_file_paths(input_directory):
1592            relative_path = os.path.relpath(input_file, input_directory)
1593            # Remove .zip extension from relative_path
1594            relative_path = os.path.splitext(relative_path)[0]
1595            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
1596
1597            import_bytes = self.import_file(
1598                input_file=input_file,
1599                output_file=output_file,
1600                raise_unsupported=raise_unsupported,
1601                content_management_policy=content_management_policy,
1602            )
1603
1604            import_files_dict[relative_path] = import_bytes
1605
1606        return import_files_dict
1607
1608    def _GW2FileErrorMsg(self, session: int):
1609        """ Retrieve the Glasswall Session Process error message.
1610
1611        Args:
1612            session (int): The session integer.
1613
1614        Returns:
1615            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status', 'error_message'.
1616        """
1617        # API function declaration
1618        self.library.GW2FileErrorMsg.argtypes = [
1619            ct.c_size_t,  # Session_Handle session
1620            ct.POINTER(ct.c_void_p),  # char **errorMsgBuffer
1621            ct.POINTER(ct.c_size_t)  # size_t *errorMsgBufferLength
1622        ]
1623
1624        # Variable initialisation
1625        gw_return_object = glasswall.GwReturnObj()
1626        gw_return_object.session = ct.c_size_t(session)
1627        gw_return_object.buffer = ct.c_void_p()
1628        gw_return_object.buffer_length = ct.c_size_t()
1629
1630        # API call
1631        gw_return_object.status = self.library.GW2FileErrorMsg(
1632            gw_return_object.session,
1633            ct.byref(gw_return_object.buffer),
1634            ct.byref(gw_return_object.buffer_length)
1635        )
1636
1637        # Editor wrote to a buffer, convert it to bytes
1638        error_bytes = utils.buffer_to_bytes(
1639            gw_return_object.buffer,
1640            gw_return_object.buffer_length
1641        )
1642
1643        gw_return_object.error_message = error_bytes.decode()
1644
1645        return gw_return_object
1646
1647    @functools.lru_cache()
1648    def file_error_message(self, session: int) -> str:
1649        """ Retrieve the Glasswall Session Process error message.
1650
1651        Args:
1652            session (int): The session integer.
1653
1654        Returns:
1655            error_message (str): The Glasswall Session Process error message.
1656        """
1657        # Validate arg types
1658        if not isinstance(session, int):
1659            raise TypeError(session)
1660
1661        result = self._GW2FileErrorMsg(session)
1662
1663        if result.status not in successes.success_codes:
1664            log.error(format_object(result))
1665            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1666        else:
1667            log.debug(format_object(result))
1668
1669        return result.error_message
1670
1671    def GW2GetFileType(self, session: int, file_type_id):
1672        """ Retrieve the file type as a string.
1673
1674        Args:
1675            session (int): The session integer.
1676            file_type_id (int): The file type id.
1677
1678        Returns:
1679            file_type (str): The formal file name for the corresponding file id.
1680        """
1681        # Validate arg types
1682        if not isinstance(session, int):
1683            raise TypeError(session)
1684
1685        # API function declaration
1686        self.library.GW2GetFileType.argtypes = [
1687            ct.c_size_t,
1688            ct.c_size_t,
1689            ct.POINTER(ct.c_size_t),
1690            ct.POINTER(ct.c_void_p)
1691        ]
1692
1693        # Variable initialisation
1694        ct_session = ct.c_size_t(session)
1695        ct_file_type = ct.c_size_t(file_type_id)
1696        ct_buffer_length = ct.c_size_t()
1697        ct_buffer = ct.c_void_p()
1698
1699        # API call
1700        status = self.library.GW2GetFileType(
1701            ct_session,
1702            ct_file_type,
1703            ct.byref(ct_buffer_length),
1704            ct.byref(ct_buffer)
1705        )
1706
1707        if status not in successes.success_codes:
1708            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
1709            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1710        else:
1711            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")
1712
1713        # Editor wrote to a buffer, convert it to bytes
1714        file_type_bytes = utils.buffer_to_bytes(
1715            ct_buffer,
1716            ct_buffer_length
1717        )
1718
1719        file_type = file_type_bytes.decode()
1720
1721        return file_type
1722
1723    def GW2GetFileTypeID(self, session: int, file_type_str):
1724        """ Retrieve the Glasswall file type id given a file type string.
1725
1726        Args:
1727            session (int): The session integer.
1728            file_type_str (str): The file type as a string.
1729
1730        Returns:
1731            file_type_id (str): The Glasswall file type id for the specified file type.
1732        """
1733        # Validate arg types
1734        if not isinstance(session, int):
1735            raise TypeError(session)
1736
1737        # API function declaration
1738        self.library.GW2GetFileTypeID.argtypes = [
1739            ct.c_size_t,
1740            ct.c_char_p,
1741            ct.POINTER(ct.c_size_t),
1742            ct.POINTER(ct.c_void_p)
1743        ]
1744
1745        # Variable initialisation
1746        ct_session = ct.c_size_t(session)
1747        ct_file_type = ct.c_char_p(file_type_str.encode('utf-8'))
1748        ct_buffer_length = ct.c_size_t()
1749        ct_buffer = ct.c_void_p()
1750
1751        # API call
1752        status = self.library.GW2GetFileTypeID(
1753            ct_session,
1754            ct_file_type,
1755            ct.byref(ct_buffer_length),
1756            ct.byref(ct_buffer)
1757        )
1758
1759        if status not in successes.success_codes:
1760            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
1761            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1762        else:
1763            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")
1764
1765        # Editor wrote to a buffer, convert it to bytes
1766        file_type_bytes = utils.buffer_to_bytes(
1767            ct_buffer,
1768            ct_buffer_length
1769        )
1770
1771        file_type_id = file_type_bytes.decode()
1772
1773        return file_type_id
1774
1775    def get_file_type_info(self, file_type: Union[str, int]):
1776        """ Retrieve information about a file type based on its identifier.
1777
1778        Args:
1779            file_type (Union[str, int]): The file type identifier. This can be either a string representing a file
1780            extension (e.g. 'bmp') or an integer corresponding to a file type (e.g. 29).
1781
1782        Returns:
1783            - file_type_info (Union[int, str]): Depending on the input 'file_type':
1784                - If `file_type` is a string (e.g. 'bmp'):
1785                    - If the file type is recognised, returns an integer corresponding to that file type.
1786                    - If the file type is not recognised, returns 0.
1787                - If `file_type` is an integer (e.g. 29):
1788                    - If the integer corresponds to a recognised file type, returns a more detailed string description
1789                        of the file type (e.g. 'BMP Image').
1790                    - If the integer does not match any recognised file type, returns an empty string.
1791        """
1792        # Validate arg types
1793        if not isinstance(file_type, (str, int)):
1794            raise TypeError(file_type)
1795
1796        with utils.CwdHandler(self.library_path):
1797            with self.new_session() as session:
1798
1799                if isinstance(file_type, int):
1800                    file_type_info = self.GW2GetFileType(session, file_type)
1801                if isinstance(file_type, str):
1802                    file_type_info = self.GW2GetFileTypeID(session, file_type)
1803
1804                return file_type_info
1805
1806    @utils.deprecated_function(replacement_function=get_file_type_info)
1807    def get_file_info(self, *args, **kwargs):
1808        """ Deprecated in 1.0.6. Use get_file_type_info. """
1809        pass
1810
1811    def _GW2RegisterReportFile(self, session: int, output_file: str):
1812        """ Register an output report file path for the given session.
1813
1814        Args:
1815            session (int): The session integer.
1816            output_file (str): The file path of the output report file.
1817
1818        Returns:
1819            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
1820        """
1821        # API function declaration
1822        self.library.GW2RegisterReportFile.argtypes = [
1823            ct.c_size_t,  # Session_Handle session
1824            ct.c_char_p,  # const char * reportFilePathName
1825        ]
1826
1827        # Variable initialisation
1828        gw_return_object = glasswall.GwReturnObj()
1829        gw_return_object.session = ct.c_size_t(session)
1830        gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8"))
1831
1832        # API call
1833        gw_return_object.status = self.library.GW2RegisterReportFile(
1834            gw_return_object.session,
1835            gw_return_object.output_file
1836        )
1837
1838        return gw_return_object
1839
1840    def register_report_file(self, session: int, output_file: str):
1841        """ Register the report file path for the given session.
1842
1843        Args:
1844            session (int): The session integer.
1845            output_file (str): The file path of the report file.
1846
1847        Returns:
1848            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
1849        """
1850        # Validate arg types
1851        if not isinstance(session, int):
1852            raise TypeError(session)
1853        if not isinstance(output_file, (type(None), str)):
1854            raise TypeError(output_file)
1855
1856        result = self._GW2RegisterReportFile(session, output_file)
1857
1858        if result.status not in successes.success_codes:
1859            log.error(format_object(result))
1860            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1861        else:
1862            log.debug(format_object(result))
1863
1864        return result
1865
1866    def _GW2GetIdInfo(self, session: int, issue_id: int):
1867        """ Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances"
1868
1869        Args:
1870            session (int): The session integer.
1871            issue_id (int): The issue id.
1872            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1873
1874        Returns:
1875            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'issue_id', 'buffer_length', 'buffer', 'status', 'id_info'.
1876        """
1877        # API function declaration
1878        self.library.GW2GetIdInfo.argtypes = [
1879            ct.c_size_t,  # Session_Handle session
1880            ct.c_size_t,  # size_t issueId
1881            ct.POINTER(ct.c_size_t),  # size_t * bufferLength
1882            ct.POINTER(ct.c_void_p)  # char ** outputBuffer
1883        ]
1884
1885        # Variable initialisation
1886        gw_return_object = glasswall.GwReturnObj()
1887        gw_return_object.session = ct.c_size_t(session)
1888        gw_return_object.issue_id = ct.c_size_t(issue_id)
1889        gw_return_object.buffer_length = ct.c_size_t()
1890        gw_return_object.buffer = ct.c_void_p()
1891
1892        # API call
1893        gw_return_object.status = self.library.GW2GetIdInfo(
1894            gw_return_object.session,
1895            gw_return_object.issue_id,
1896            ct.byref(gw_return_object.buffer_length),
1897            ct.byref(gw_return_object.buffer)
1898        )
1899
1900        # Editor wrote to a buffer, convert it to bytes
1901        id_info_bytes = utils.buffer_to_bytes(
1902            gw_return_object.buffer,
1903            gw_return_object.buffer_length
1904        )
1905
1906        gw_return_object.id_info = id_info_bytes.decode()
1907
1908        return gw_return_object
1909
1910    def get_id_info(self, issue_id: int, raise_unsupported: bool = True):
1911        """ Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances"
1912
1913        Args:
1914            issue_id (int): The issue id.
1915            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1916
1917        Returns:
1918            id_info (str): The group description for the given Issue ID.
1919        """
1920        # Validate arg types
1921        if not isinstance(issue_id, int):
1922            raise TypeError(issue_id)
1923
1924        with utils.CwdHandler(self.library_path):
1925            with self.new_session() as session:
1926                result = self._GW2GetIdInfo(session, issue_id)
1927
1928                if result.status not in successes.success_codes:
1929                    log.error(format_object(result))
1930                    if raise_unsupported:
1931                        raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1932                else:
1933                    log.debug(format_object(result))
1934
1935                return result.id_info
1936
1937    def _GW2GetAllIdInfo(self, session: int):
1938        """ Retrieves the XML containing all the Issue ID ranges with their group descriptions
1939
1940        Args:
1941            session (int): The session integer.
1942
1943        Returns:
1944            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'analysis_format', 'status', 'all_id_info'.
1945        """
1946
1947        # API function declaration
1948        self.library.GW2GetAllIdInfo.argtypes = [
1949            ct.c_size_t,  # Session_Handle session
1950            ct.POINTER(ct.c_size_t),  # size_t * bufferLength
1951            ct.POINTER(ct.c_void_p)  # char ** outputBuffer
1952        ]
1953
1954        # Variable initialisation
1955        # The extracted issue Id information is stored in the analysis report, register an analysis session.
1956        gw_return_object = self._GW2RegisterAnalysisMemory(session)
1957
1958        # API call
1959        gw_return_object.status = self.library.GW2GetAllIdInfo(
1960            gw_return_object.session,
1961            ct.byref(gw_return_object.buffer_length),
1962            ct.byref(gw_return_object.buffer)
1963        )
1964
1965        # Editor wrote to a buffer, convert it to bytes
1966        all_id_info_bytes = utils.buffer_to_bytes(
1967            gw_return_object.buffer,
1968            gw_return_object.buffer_length
1969        )
1970
1971        gw_return_object.all_id_info = all_id_info_bytes.decode()
1972
1973        return gw_return_object
1974
1975    def get_all_id_info(self, output_file: Optional[str] = None, raise_unsupported: bool = True) -> str:
1976        """ Retrieves the XML containing all the Issue ID ranges with their group descriptions
1977
1978        Args:
1979            output_file (Optional[str]): The output file path where the analysis file will be written.
1980            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1981
1982        Returns:
1983            all_id_info (str): A string XML analysis report containing all id info.
1984        """
1985        # Validate arg types
1986        if not isinstance(output_file, (type(None), str)):
1987            raise TypeError(output_file)
1988        if isinstance(output_file, str):
1989            output_file = os.path.abspath(output_file)
1990
1991        with utils.CwdHandler(self.library_path):
1992            with self.new_session() as session:
1993                result = self._GW2GetAllIdInfo(session)
1994
1995                if result.status not in successes.success_codes:
1996                    log.error(format_object(result))
1997                    if raise_unsupported:
1998                        raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1999                else:
2000                    log.debug(format_object(result))
2001
2002                if isinstance(output_file, str):
2003                    # GW2GetAllIdInfo is memory only, write to file
2004                    # make directories that do not exist
2005                    os.makedirs(os.path.dirname(output_file), exist_ok=True)
2006                    with open(output_file, "w") as f:
2007                        f.write(result.all_id_info)
2008
2009                return result.all_id_info
2010
2011    def _GW2FileSessionStatus(self, session: int):
2012        """ Retrieves the Glasswall Session Status. Also gives a high level indication of the processing that was carried out on the last document processed by the library
2013
2014        Args:
2015            session (int): The session integer.
2016            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
2017
2018        Returns:
2019            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'session_status', 'buffer', 'buffer_length', 'status', 'message'.
2020        """
2021        # API function declaration
2022        self.library.GW2FileSessionStatus.argtypes = [
2023            ct.c_size_t,  # Session_Handle session
2024            ct.POINTER(ct.c_int),  # int *glasswallSessionStatus
2025            ct.POINTER(ct.c_void_p),  # char **statusMsgBuffer
2026            ct.POINTER(ct.c_size_t)  # size_t *statusbufferLength
2027        ]
2028
2029        # Variable initialisation
2030        gw_return_object = glasswall.GwReturnObj()
2031        gw_return_object.session = ct.c_size_t(session)
2032        gw_return_object.session_status = ct.c_int()
2033        gw_return_object.buffer = ct.c_void_p()
2034        gw_return_object.buffer_length = ct.c_size_t()
2035
2036        # API call
2037        gw_return_object.status = self.library.GW2FileSessionStatus(
2038            gw_return_object.session,
2039            ct.byref(gw_return_object.session_status),
2040            ct.byref(gw_return_object.buffer),
2041            ct.byref(gw_return_object.buffer_length)
2042        )
2043
2044        # Convert session_status to int
2045        gw_return_object.session_status = gw_return_object.session_status.value
2046
2047        # Editor wrote to a buffer, convert it to bytes
2048        message_bytes = utils.buffer_to_bytes(
2049            gw_return_object.buffer,
2050            gw_return_object.buffer_length
2051        )
2052        gw_return_object.message = message_bytes.decode()
2053
2054        return gw_return_object
2055
2056    def file_session_status_message(self, session: int, raise_unsupported: bool = True) -> str:
2057        """ Retrieves the Glasswall session status message. Gives a high level indication of the processing that was carried out.
2058
2059        Args:
2060            session (int): The session integer.
2061            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
2062
2063        Returns:
2064            result.message (str):The file session status message.
2065        """
2066        # Validate arg types
2067        if not isinstance(session, int):
2068            raise TypeError(session)
2069
2070        result = self._GW2FileSessionStatus(session)
2071
2072        if result.status not in successes.success_codes:
2073            log.error(format_object(result))
2074            if raise_unsupported:
2075                raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
2076        else:
2077            log.debug(format_object(result))
2078
2079        return result.message
2080
2081    def _GW2LicenceDetails(self, session: int):
2082        """ Returns a human readable string containing licence details.
2083
2084        Args:
2085            session (int): The session integer.
2086
2087        Returns:
2088            licence_details (str): A human readable string representing the relevant information contained in the licence.
2089        """
2090        # API function declaration
2091        self.library.GW2LicenceDetails.argtypes = [ct.c_size_t]
2092        self.library.GW2LicenceDetails.restype = ct.c_char_p
2093
2094        # Variable initialisation
2095        ct_session = ct.c_size_t(session)
2096
2097        # API call
2098        licence_details = self.library.GW2LicenceDetails(ct_session)
2099
2100        # Convert to Python string
2101        licence_details = ct.string_at(licence_details).decode()
2102
2103        return licence_details
2104
2105    def licence_details(self):
2106        """ Returns a string containing details of the licence.
2107
2108        Returns:
2109            result (str): A string containing details of the licence.
2110        """
2111        with self.new_session() as session:
2112            result = self._GW2LicenceDetails(session)
2113
2114            log.debug(f"\n\tsession: {session}\n\tGW2LicenceDetails: {result}")
2115
2116        return result
2117
2118    def _GW2RegisterExportTextDumpMemory(self, session: int):
2119        """ Registers an export text dump to be written in memory.
2120
2121        Args:
2122            session (int): The session integer.
2123
2124        Returns:
2125            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
2126        """
2127        # API function declaration
2128        self.library.GW2RegisterExportTextDumpMemory.argtypes = [
2129            ct.c_size_t,  # Session_Handle session
2130            ct.POINTER(ct.c_void_p),  # char ** exportTextDumpFileBuffer
2131            ct.POINTER(ct.c_size_t)  # size_t * exportTextDumpLength
2132        ]
2133
2134        # Variable initialisation
2135        gw_return_object = glasswall.GwReturnObj()
2136        gw_return_object.session = ct.c_size_t(session)
2137        gw_return_object.buffer = ct.c_void_p()
2138        gw_return_object.buffer_length = ct.c_size_t()
2139
2140        # API call
2141        gw_return_object.status = self.library.GW2RegisterExportTextDumpMemory(
2142            gw_return_object.session,
2143            ct.byref(gw_return_object.buffer),
2144            ct.byref(gw_return_object.buffer_length)
2145        )
2146
2147        return gw_return_object
2148
2149    def _GW2RegisterExportTextDumpFile(self, session: int, output_file: str):
2150        """ Registers an export text dump to be written to file.
2151
2152        Args:
2153            session (int): The session integer.
2154            output_file (str): The file path of the text dump file.
2155
2156        Returns:
2157            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
2158        """
2159        # API function declaration
2160        self.library.GW2RegisterExportTextDumpFile.argtypes = [
2161            ct.c_size_t,  # Session_Handle session
2162            ct.c_char_p  # const char * textDumpFilePathName
2163        ]
2164
2165        # Variable initialisation
2166        gw_return_object = glasswall.GwReturnObj()
2167        gw_return_object.session = ct.c_size_t(session)
2168        gw_return_object.output_file = ct.c_char_p(output_file.encode("utf-8"))
2169
2170        # API call
2171        gw_return_object.status = self.library.GW2RegisterExportTextDumpFile(
2172            gw_return_object.session,
2173            gw_return_object.output_file
2174        )
2175
2176        return gw_return_object
2177
2178    def _GW2RegisterLicenceFile(self, session: int, input_file: str):
2179        """ Registers a "gwkey.lic" licence from file path.
2180
2181        Args:
2182            session (int): The session integer.
2183            input_file (str): The "gwkey.lic" licence input file path.
2184
2185        Returns:
2186            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
2187        """
2188        # API function declaration
2189        self.library.GW2RegisterLicenceFile.argtypes = [
2190            ct.c_size_t,  # Session_Handle session
2191            ct.c_char_p,  # const char *filename
2192        ]
2193
2194        # Variable initialisation
2195        gw_return_object = glasswall.GwReturnObj()
2196        gw_return_object.session = ct.c_size_t(session)
2197        gw_return_object.input_file = ct.c_char_p(input_file.encode("utf-8"))
2198
2199        # API call
2200        gw_return_object.status = self.library.GW2RegisterLicenceFile(
2201            gw_return_object.session,
2202            gw_return_object.input_file,
2203        )
2204
2205        return gw_return_object
2206
2207    def _GW2RegisterLicenceMemory(self, session: int, input_file: bytes):
2208        """ Registers a "gwkey.lic" licence from memory.
2209
2210        Args:
2211            session (int): The session integer.
2212            input_file (bytes): The "gwkey.lic" licence input file.
2213
2214        Returns:
2215            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
2216        """
2217        # API function declaration
2218        self.library.GW2RegisterLicenceMemory.argtypes = [
2219            ct.c_size_t,  # Session_Handle session
2220            ct.c_char_p,  # const char *filename
2221            ct.c_size_t,  # size_t licenceLength
2222        ]
2223
2224        # Variable initialisation
2225        gw_return_object = glasswall.GwReturnObj()
2226        gw_return_object.session = ct.c_size_t(session)
2227        gw_return_object.buffer = ct.c_char_p(input_file)
2228        gw_return_object.buffer_length = ct.c_size_t(len(input_file))
2229
2230        # API call
2231        gw_return_object.status = self.library.GW2RegisterLicenceMemory(
2232            gw_return_object.session,
2233            gw_return_object.buffer,
2234            gw_return_object.buffer_length
2235        )
2236
2237        return gw_return_object
2238
2239    def register_licence(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
2240        """ Registers a "gwkey.lic" licence from file path or memory.
2241
2242        Args:
2243            session (int): The session integer.
2244            input_file (Union[str, bytes, bytearray, io.BytesIO]): The "gwkey.lic" licence. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object.
2245
2246        Returns:
2247            - result (glasswall.GwReturnObj): Depending on the input 'input_file':
2248                - If input_file is a str file path:
2249                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
2250
2251                - If input_file is a file in memory:
2252                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
2253        """
2254        if isinstance(input_file, str):
2255            if not os.path.isfile(input_file):
2256                raise FileNotFoundError(input_file)
2257
2258            input_file = os.path.abspath(input_file)
2259
2260            result = self._GW2RegisterLicenceFile(session, input_file)
2261
2262        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
2263            # Convert bytearray and io.BytesIO to bytes
2264            input_file = utils.as_bytes(input_file)
2265
2266            result = self._GW2RegisterLicenceMemory(session, input_file)
2267
2268        else:
2269            raise TypeError(input_file)
2270
2271        if result.status not in successes.success_codes:
2272            log.error(format_object(result))
2273            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
2274        else:
2275            log.debug(format_object(result))
2276
2277        return result

A high level Python wrapper for Glasswall Editor / Core2.

Editor( library_path: str, licence: Union[str, bytes, bytearray, _io.BytesIO] = None)
22    def __init__(self, library_path: str, licence: Union[str, bytes, bytearray, io.BytesIO] = None):
23        """ Initialise the Editor instance.
24
25        Args:
26            library_path (str): The file or directory path to the Editor library.
27            licence (str, bytes, bytearray, or io.BytesIO, optional): The licence file content or path. This can be:
28                - A string representing the file path to the licence.
29                - A `bytes` or `bytearray` object containing the licence data.
30                - An `io.BytesIO` object for in-memory licence data.
31                If not specified, it is assumed that the licence file is located in the same directory as the `library_path`.
32        """
33        super().__init__(library_path)
34        self.library = self.load_library(os.path.abspath(library_path))
35        self.licence = licence
36
37        # Validate killswitch has not activated
38        self.validate_licence()
39
40        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")

Initialise the Editor instance.

Args: library_path (str): The file or directory path to the Editor library. licence (str, bytes, bytearray, or io.BytesIO, optional): The licence file content or path. This can be: - A string representing the file path to the licence. - A bytes or bytearray object containing the licence data. - An io.BytesIO object for in-memory licence data. If not specified, it is assumed that the licence file is located in the same directory as the library_path.

library
licence
def validate_licence(self):
42    def validate_licence(self):
43        """ Validates the licence of the library by checking the licence details.
44
45        Raises:
46            LicenceExpired: If the licence has expired or could not be validated.
47        """
48        licence_details = self.licence_details()
49
50        bad_details = [
51            "Unable to Read Licence Key File",
52            "Licence File Missing Required Contents",
53            "Licence Expired",
54        ]
55
56        if any(bad_detail.lower() in licence_details.lower() for bad_detail in bad_details):
57            # bad_details found in licence_details
58            log.error(f"{self.__class__.__name__} licence validation failed. Licence details:\n{licence_details}")
59            raise errors.LicenceExpired(licence_details)
60        else:
61            log.debug(f"{self.__class__.__name__} licence validated successfully. Licence details:\n{licence_details}")

Validates the licence of the library by checking the licence details.

Raises: LicenceExpired: If the licence has expired or could not be validated.

def version(self):
63    def version(self):
64        """ Returns the Glasswall library version.
65
66        Returns:
67            version (str): The Glasswall library version.
68        """
69        # API function declaration
70        self.library.GW2LibVersion.restype = ct.c_char_p
71
72        # API call
73        version = self.library.GW2LibVersion()
74
75        # Convert to Python string
76        version = ct.string_at(version).decode()
77
78        return version

Returns the Glasswall library version.

Returns: version (str): The Glasswall library version.

def open_session(self):
80    def open_session(self):
81        """ Open a new Glasswall session.
82
83        Returns:
84            session (int): An incrementing integer repsenting the current session.
85        """
86        # API call
87        session = self.library.GW2OpenSession()
88
89        log.debug(f"\n\tsession: {session}")
90
91        if self.licence:
92            # Must register licence on each GW2OpenSession if the licence is not in the same directory as the library
93            self.register_licence(session, self.licence)
94
95        return session

Open a new Glasswall session.

Returns: session (int): An incrementing integer repsenting the current session.

def close_session(self, session: int) -> int:
 97    def close_session(self, session: int) -> int:
 98        """ Close the Glasswall session. All resources allocated by the session will be destroyed.
 99
100        Args:
101            session (int): The session to close.
102
103        Returns:
104            status (int): The status code of the function call.
105        """
106        if not isinstance(session, int):
107            raise TypeError(session)
108
109        # API function declaration
110        self.library.GW2CloseSession.argtypes = [ct.c_size_t]
111
112        # Variable initialisation
113        ct_session = ct.c_size_t(session)
114
115        # API call
116        status = self.library.GW2CloseSession(ct_session)
117
118        if status not in successes.success_codes:
119            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
120        else:
121            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")
122
123        return status

Close the Glasswall session. All resources allocated by the session will be destroyed.

Args: session (int): The session to close.

Returns: status (int): The status code of the function call.

@contextmanager
def new_session(self):
125    @contextmanager
126    def new_session(self):
127        """ Context manager. Opens a new session on entry and closes the session on exit. """
128        try:
129            session = self.open_session()
130            yield session
131        finally:
132            self.close_session(session)

Context manager. Opens a new session on entry and closes the session on exit.

def run_session(self, session):
134    def run_session(self, session):
135        """ Runs the Glasswall session and begins processing of a file.
136
137        Args:
138            session (int): The session to run.
139
140        Returns:
141            status (int): The status code of the function call.
142        """
143        # API function declaration
144        self.library.GW2RunSession.argtypes = [ct.c_size_t]
145
146        # Variable initialisation
147        ct_session = ct.c_size_t(session)
148
149        # API call
150        status = self.library.GW2RunSession(ct_session)
151
152        if status not in successes.success_codes:
153            log.error(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}")
154        else:
155            log.debug(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.file_error_message(session)}")
156
157        return status

Runs the Glasswall session and begins processing of a file.

Args: session (int): The session to run.

Returns: status (int): The status code of the function call.

def determine_file_type( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], as_string: bool = False, raise_unsupported: bool = True) -> Union[int, str]:
159    def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True) -> Union[int, str]:
160        """ Determine the file type of a given input file, either as an integer identifier or a string.
161
162        Args:
163            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file to analyse. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object.
164            as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False.
165            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
166
167        Returns:
168            file_type (Union[int, str]): The file type.
169        """
170        if isinstance(input_file, str):
171            if not os.path.isfile(input_file):
172                raise FileNotFoundError(input_file)
173
174            # convert to ct.c_char_p of bytes
175            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
176
177            # API call
178            file_type = self.library.GW2DetermineFileTypeFromFile(ct_input_file)
179
180        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
181            # convert to bytes
182            bytes_input_file = utils.as_bytes(input_file)
183
184            # ctypes conversion
185            ct_buffer = ct.c_char_p(bytes_input_file)
186            ct_buffer_length = ct.c_size_t(len(bytes_input_file))
187
188            # API call
189            file_type = self.library.GW2DetermineFileTypeFromMemory(
190                ct_buffer,
191                ct_buffer_length
192            )
193
194        else:
195            raise TypeError(input_file)
196
197        file_type_as_string = dft.file_type_int_to_str(file_type)
198        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
199
200        if not dft.is_success(file_type):
201            log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
202            if raise_unsupported:
203                raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
204        else:
205            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
206
207        if as_string:
208            return file_type_as_string
209
210        return file_type

Determine the file type of a given input file, either as an integer identifier or a string.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file to analyse. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object. as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_type (Union[int, str]): The file type.

def get_content_management_policy(self, session: int):
255    def get_content_management_policy(self, session: int):
256        """ Returns the content management configuration for a given session.
257
258        Args:
259            session (int): The session integer.
260
261        Returns:
262            xml_string (str): The XML string of the current content management configuration.
263        """
264        # NOTE GW2GetPolicySettings is current not implemented in editor
265
266        # set xml_string as loaded default config
267        xml_string = glasswall.content_management.policies.Editor(default="sanitise").text,
268
269        # log.debug(f"xml_string:\n{xml_string}")
270
271        return xml_string
272
273        # # API function declaration
274        # self.library.GW2GetPolicySettings.argtypes = [
275        #     ct.c_size_t,
276        #     ct.c_void_p,
277        # ]
278
279        # # Variable initialisation
280        # ct_session = ct.c_size_t(session)
281        # ct_buffer = ct.c_void_p()
282        # ct_buffer_length = ct.c_size_t()
283        # # ct_file_format = ct.c_int(file_format)
284
285        # # API Call
286        # status = self.library.GW2GetPolicySettings(
287        #     ct_session,
288        #     ct.byref(ct_buffer),
289        #     ct.byref(ct_buffer_length)
290        # )
291
292        # print("GW2GetPolicySettings status:", status)
293
294        # file_bytes = utils.buffer_to_bytes(
295        #     ct_buffer,
296        #     ct_buffer_length,
297        # )
298
299        # return file_bytes

Returns the content management configuration for a given session.

Args: session (int): The session integer.

Returns: xml_string (str): The XML string of the current content management configuration.

def set_content_management_policy( self, session: int, input_file: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, policy_format=0):
369    def set_content_management_policy(self, session: int, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, policy_format=0):
370        """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.
371
372        Args:
373            session (int): The session integer.
374            input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
375            policy_format (int): The format of the content management policy. 0=XML.
376
377        Returns:
378            - result (glasswall.GwReturnObj): Depending on the input 'input_file':
379                - If input_file is a str file path:
380                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'.
381
382                - If input_file is a file in memory:
383                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'.
384        """
385        # Validate type
386        if not isinstance(session, int):
387            raise TypeError(session)
388        if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
389            raise TypeError(input_file)
390        if not isinstance(policy_format, int):
391            raise TypeError(policy_format)
392
393        # Set input_file to default if input_file is None
394        if input_file is None:
395            input_file = glasswall.content_management.policies.Editor(default="sanitise")
396
397        # Validate xml content is parsable
398        utils.validate_xml(input_file)
399
400        # From file
401        if isinstance(input_file, str) and os.path.isfile(input_file):
402            input_file = os.path.abspath(input_file)
403
404            result = self._GW2RegisterPoliciesFile(session, input_file)
405
406        # From memory
407        elif isinstance(input_file, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
408            # Convert bytearray, io.BytesIO to bytes
409            if isinstance(input_file, (bytearray, io.BytesIO)):
410                input_file = utils.as_bytes(input_file)
411            # Convert string xml or Policy to bytes
412            if isinstance(input_file, (str, glasswall.content_management.policies.policy.Policy)):
413                input_file = input_file.encode("utf-8")
414
415            result = self._GW2RegisterPoliciesMemory(session, input_file)
416
417        if result.status not in successes.success_codes:
418            log.error(format_object(result))
419            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
420        else:
421            log.debug(format_object(result))
422
423        return result

Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.

Args: session (int): The session integer. input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. policy_format (int): The format of the content management policy. 0=XML.

Returns: - result (glasswall.GwReturnObj): Depending on the input 'input_file': - If input_file is a str file path: - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'policy_format', 'status'.

    - If input_file is a file in memory:
        - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'policy_format', 'status'.
def register_input( self, session: int, input_file: Union[str, bytes, bytearray, _io.BytesIO]):
487    def register_input(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
488        """ Register an input file or bytes for the given session.
489
490        Args:
491            session (int): The session integer.
492            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
493
494        Returns:
495            - result (glasswall.GwReturnObj): Depending on the input 'input_file':
496                - If input_file is a str file path:
497                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
498
499                - If input_file is a file in memory:
500                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
501        """
502        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
503            raise TypeError(input_file)
504
505        if isinstance(input_file, str):
506            if not os.path.isfile(input_file):
507                raise FileNotFoundError(input_file)
508
509            input_file = os.path.abspath(input_file)
510
511            result = self._GW2RegisterInputFile(session, input_file)
512
513        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
514            # Convert bytearray and io.BytesIO to bytes
515            input_file = utils.as_bytes(input_file)
516
517            result = self._GW2RegisterInputMemory(session, input_file)
518
519        if result.status not in successes.success_codes:
520            log.error(format_object(result))
521            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
522        else:
523            log.debug(format_object(result))
524
525        return result

Register an input file or bytes for the given session.

Args: session (int): The session integer. input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.

Returns: - result (glasswall.GwReturnObj): Depending on the input 'input_file': - If input_file is a str file path: - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.

    - If input_file is a file in memory:
        - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
def register_output(self, session, output_file: Optional[str] = None):
587    def register_output(self, session, output_file: Optional[str] = None):
588        """ Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes.
589
590        Args:
591            session (int): The session integer.
592            output_file (Optional[str]): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes.
593
594        Returns:
595            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
596        """
597        if not isinstance(output_file, (type(None), str)):
598            raise TypeError(output_file)
599
600        if isinstance(output_file, str):
601            output_file = os.path.abspath(output_file)
602
603            result = self._GW2RegisterOutFile(session, output_file)
604
605        elif isinstance(output_file, type(None)):
606            result = self._GW2RegisterOutputMemory(session)
607
608        if result.status not in successes.success_codes:
609            log.error(format_object(result))
610            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
611        else:
612            log.debug(format_object(result))
613
614        return result

Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes.

Args: session (int): The session integer. output_file (Optional[str]): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes.

Returns: gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.

def register_analysis(self, session: int, output_file: Optional[str] = None):
684    def register_analysis(self, session: int, output_file: Optional[str] = None):
685        """ Registers an analysis file for the given session. The analysis file will be created during the session's run_session call.
686
687        Args:
688            session (int): The session integer.
689            output_file (Optional[str]): Default None. The file path where the analysis will be written. None returns the analysis as bytes.
690
691        Returns:
692            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'status', 'session', 'analysis_format'. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. If output_file is not None (file mode) 'output_file' is included.
693        """
694        if not isinstance(output_file, (type(None), str)):
695            raise TypeError(output_file)
696
697        if isinstance(output_file, str):
698            output_file = os.path.abspath(output_file)
699
700            result = self._GW2RegisterAnalysisFile(session, output_file)
701
702        elif isinstance(output_file, type(None)):
703            result = self._GW2RegisterAnalysisMemory(session)
704
705        if result.status not in successes.success_codes:
706            log.error(format_object(result))
707            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
708        else:
709            log.debug(format_object(result))
710
711        return result

Registers an analysis file for the given session. The analysis file will be created during the session's run_session call.

Args: session (int): The session integer. output_file (Optional[str]): Default None. The file path where the analysis will be written. None returns the analysis as bytes.

Returns: gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'status', 'session', 'analysis_format'. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size. If output_file is not None (file mode) 'output_file' is included.

def protect_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
713    def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
714        """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.
715
716        Args:
717            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
718            output_file (Optional[str]): The output file path where the protected file will be written.
719            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
720            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
721
722        Returns:
723            file_bytes (bytes): The protected file bytes.
724        """
725        # Validate arg types
726        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
727            raise TypeError(input_file)
728        if not isinstance(output_file, (type(None), str)):
729            raise TypeError(output_file)
730        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
731            raise TypeError(content_management_policy)
732        if not isinstance(raise_unsupported, bool):
733            raise TypeError(raise_unsupported)
734
735        # Convert string path arguments to absolute paths
736        if isinstance(input_file, str):
737            if not os.path.isfile(input_file):
738                raise FileNotFoundError(input_file)
739            input_file = os.path.abspath(input_file)
740        if isinstance(output_file, str):
741            output_file = os.path.abspath(output_file)
742            # make directories that do not exist
743            os.makedirs(os.path.dirname(output_file), exist_ok=True)
744        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
745            content_management_policy = os.path.abspath(content_management_policy)
746
747        # Convert memory inputs to bytes
748        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
749            input_file = utils.as_bytes(input_file)
750
751        with utils.CwdHandler(self.library_path):
752            with self.new_session() as session:
753                content_management_policy = self.set_content_management_policy(session, content_management_policy)
754                register_input = self.register_input(session, input_file)
755                register_output = self.register_output(session, output_file=output_file)
756                status = self.run_session(session)
757
758                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
759                if status not in successes.success_codes:
760                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
761                    if raise_unsupported:
762                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
763                    else:
764                        file_bytes = None
765                else:
766                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
767                    # Get file bytes
768                    if isinstance(output_file, str):
769                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
770                        if not os.path.isfile(output_file):
771                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
772                            file_bytes = None
773                        else:
774                            with open(output_file, "rb") as f:
775                                file_bytes = f.read()
776                    else:
777                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
778                        file_bytes = utils.buffer_to_bytes(
779                            register_output.buffer,
780                            register_output.buffer_length
781                        )
782
783                # Ensure memory allocated is not garbage collected
784                content_management_policy, register_input, register_output
785
786                return file_bytes

Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Optional[str]): The output file path where the protected file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_bytes (bytes): The protected file bytes.

def protect_directory( self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
788    def protect_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
789        """ Recursively processes all files in a directory in protect mode using the given content management policy.
790        The protected files are written to output_directory maintaining the same directory structure as input_directory.
791
792        Args:
793            input_directory (str): The input directory containing files to protect.
794            output_directory (Optional[str]): The output directory where the protected file will be written, or None to not write files.
795            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
796            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
797
798        Returns:
799            protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
800        """
801        protected_files_dict = {}
802        # Call protect_file on each file in input_directory to output_directory
803        for input_file in utils.list_file_paths(input_directory):
804            relative_path = os.path.relpath(input_file, input_directory)
805            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
806
807            protected_bytes = self.protect_file(
808                input_file=input_file,
809                output_file=output_file,
810                raise_unsupported=raise_unsupported,
811                content_management_policy=content_management_policy,
812            )
813
814            protected_files_dict[relative_path] = protected_bytes
815
816        return protected_files_dict

Recursively processes all files in a directory in protect mode using the given content management policy. The protected files are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing files to protect. output_directory (Optional[str]): The output directory where the protected file will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

def analyse_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
818    def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
819        """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.
820
821        Args:
822            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
823            output_file (Optional[str]): The output file path where the analysis file will be written.
824            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
825            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
826
827        Returns:
828            file_bytes (bytes): The analysis file bytes.
829        """
830        # Validate arg types
831        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
832            raise TypeError(input_file)
833        if not isinstance(output_file, (type(None), str)):
834            raise TypeError(output_file)
835        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
836            raise TypeError(content_management_policy)
837        if not isinstance(raise_unsupported, bool):
838            raise TypeError(raise_unsupported)
839
840        # Convert string path arguments to absolute paths
841        if isinstance(input_file, str):
842            if not os.path.isfile(input_file):
843                raise FileNotFoundError(input_file)
844            input_file = os.path.abspath(input_file)
845        if isinstance(output_file, str):
846            output_file = os.path.abspath(output_file)
847            # make directories that do not exist
848            os.makedirs(os.path.dirname(output_file), exist_ok=True)
849        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
850            content_management_policy = os.path.abspath(content_management_policy)
851
852        # Convert memory inputs to bytes
853        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
854            input_file = utils.as_bytes(input_file)
855
856        with utils.CwdHandler(self.library_path):
857            with self.new_session() as session:
858                content_management_policy = self.set_content_management_policy(session, content_management_policy)
859                register_input = self.register_input(session, input_file)
860                register_analysis = self.register_analysis(session, output_file)
861                status = self.run_session(session)
862
863                file_bytes = None
864                if isinstance(output_file, str):
865                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
866                    if os.path.isfile(output_file):
867                        with open(output_file, "rb") as f:
868                            file_bytes = f.read()
869                else:
870                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
871                    if register_analysis.buffer and register_analysis.buffer_length:
872                        file_bytes = utils.buffer_to_bytes(
873                            register_analysis.buffer,
874                            register_analysis.buffer_length
875                        )
876
877                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
878                if status not in successes.success_codes:
879                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
880                    if raise_unsupported:
881                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
882                else:
883                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
884
885                # Ensure memory allocated is not garbage collected
886                content_management_policy, register_input, register_analysis
887
888                return file_bytes

Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Optional[str]): The output file path where the analysis file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_bytes (bytes): The analysis file bytes.

def analyse_directory( self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
890    def analyse_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
891        """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.
892
893        Args:
894            input_directory (str): The input directory containing files to analyse.
895            output_directory (Optional[str]): The output directory where the analysis files will be written, or None to not write files.
896            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
897            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
898
899        Returns:
900            analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
901        """
902        analysis_files_dict = {}
903        # Call analyse_file on each file in input_directory to output_directory
904        for input_file in utils.list_file_paths(input_directory):
905            relative_path = os.path.relpath(input_file, input_directory) + ".xml"
906            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
907
908            analysis_bytes = self.analyse_file(
909                input_file=input_file,
910                output_file=output_file,
911                raise_unsupported=raise_unsupported,
912                content_management_policy=content_management_policy,
913            )
914
915            analysis_files_dict[relative_path] = analysis_bytes
916
917        return analysis_files_dict

Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing files to analyse. output_directory (Optional[str]): The output directory where the analysis files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

def protect_and_analyse_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_analysis_report: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
 919    def protect_and_analyse_file(
 920        self,
 921        input_file: Union[str, bytes, bytearray, io.BytesIO],
 922        output_file: Optional[str] = None,
 923        output_analysis_report: Optional[str] = None,
 924        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
 925        raise_unsupported: bool = True,
 926    ):
 927        """ Protects and analyses a file in a single session, returning both protected file bytes and analysis report bytes.
 928
 929        Args:
 930            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 931            output_file (Optional[str]): The output file path where the protected file will be written.
 932            output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written.
 933            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
 934            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 935
 936        Returns:
 937            Tuple[Optional[bytes], Optional[bytes]]: A tuple of (protected_file_bytes, analysis_report_bytes).
 938        """
 939        # Validate arg types
 940        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 941            raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}")
 942        if not isinstance(output_file, (type(None), str)):
 943            raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}")
 944        if not isinstance(output_analysis_report, (type(None), str)):
 945            raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}")
 946        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 947            raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}")
 948        if not isinstance(raise_unsupported, bool):
 949            raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}")
 950
 951        # Convert string path arguments to absolute paths
 952        if isinstance(input_file, str):
 953            if not os.path.isfile(input_file):
 954                raise FileNotFoundError(input_file)
 955            input_file = os.path.abspath(input_file)
 956        if isinstance(output_file, str):
 957            output_file = os.path.abspath(output_file)
 958            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 959        if isinstance(output_analysis_report, str):
 960            output_analysis_report = os.path.abspath(output_analysis_report)
 961            os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True)
 962        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 963            content_management_policy = os.path.abspath(content_management_policy)
 964
 965        # Convert memory inputs to bytes
 966        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 967            input_file = utils.as_bytes(input_file)
 968
 969        with utils.CwdHandler(self.library_path):
 970            with self.new_session() as session:
 971                content_management_policy = self.set_content_management_policy(session, content_management_policy)
 972                register_input = self.register_input(session, input_file)
 973                register_output = self.register_output(session, output_file)
 974                register_analysis = self.register_analysis(session, output_analysis_report)
 975
 976                status = self.run_session(session)
 977
 978                protected_file_bytes = None
 979                analysis_report_bytes = None
 980
 981                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 982                if status not in successes.success_codes:
 983                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
 984                    if raise_unsupported:
 985                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 986
 987                # Get analysis report file bytes, even on processing failure
 988                if isinstance(output_analysis_report, str):
 989                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
 990                    if not os.path.isfile(output_analysis_report):
 991                        log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}")
 992                    else:
 993                        with open(output_analysis_report, "rb") as f:
 994                            analysis_report_bytes = f.read()
 995                else:
 996                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
 997                    if register_analysis.buffer and register_analysis.buffer_length:
 998                        analysis_report_bytes = utils.buffer_to_bytes(
 999                            register_analysis.buffer,
1000                            register_analysis.buffer_length
1001                        )
1002
1003                # On success, get protected file bytes
1004                if status in successes.success_codes:
1005                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
1006                    # Get file bytes
1007                    if isinstance(output_file, str):
1008                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1009                        if not os.path.isfile(output_file):
1010                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1011                        else:
1012                            with open(output_file, "rb") as f:
1013                                protected_file_bytes = f.read()
1014                    else:
1015                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1016                        protected_file_bytes = utils.buffer_to_bytes(
1017                            register_output.buffer,
1018                            register_output.buffer_length
1019                        )
1020
1021                # Ensure memory allocated is not garbage collected
1022                content_management_policy, register_input, register_output, register_analysis
1023
1024                return protected_file_bytes, analysis_report_bytes

Protects and analyses a file in a single session, returning both protected file bytes and analysis report bytes.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Optional[str]): The output file path where the protected file will be written. output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: Tuple[Optional[bytes], Optional[bytes]]: A tuple of (protected_file_bytes, analysis_report_bytes).

def protect_and_analyse_directory( self, input_directory: str, output_directory: Optional[str], analysis_directory: Optional[str], content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
1026    def protect_and_analyse_directory(
1027        self,
1028        input_directory: str,
1029        output_directory: Optional[str],
1030        analysis_directory: Optional[str],
1031        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
1032        raise_unsupported: bool = True,
1033    ):
1034        """ Recursively processes all files in a directory using protect and analyse mode with the given content management policy.
1035        Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory.
1036
1037        Args:
1038            input_directory (str): The input directory containing files to process.
1039            output_directory (Optional[str]): The output directory for protected files.
1040            analysis_directory (Optional[str]): The output directory for XML analysis reports.
1041            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
1042            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1043
1044        Returns:
1045            result_dict (dict): A dictionary mapping relative file paths to tuples of (protected_file_bytes, analysis_report_bytes).
1046        """
1047        result_dict = {}
1048        for input_file in utils.list_file_paths(input_directory):
1049            relative_path = os.path.relpath(input_file, input_directory)
1050            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
1051            output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml")
1052
1053            protected_file_bytes, analysis_report_bytes = self.protect_and_analyse_file(
1054                input_file=input_file,
1055                output_file=output_file,
1056                output_analysis_report=output_analysis_report,
1057                content_management_policy=content_management_policy,
1058                raise_unsupported=raise_unsupported,
1059            )
1060
1061            result_dict[relative_path] = (protected_file_bytes, analysis_report_bytes)
1062
1063        return result_dict

Recursively processes all files in a directory using protect and analyse mode with the given content management policy. Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory.

Args: input_directory (str): The input directory containing files to process. output_directory (Optional[str]): The output directory for protected files. analysis_directory (Optional[str]): The output directory for XML analysis reports. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: result_dict (dict): A dictionary mapping relative file paths to tuples of (protected_file_bytes, analysis_report_bytes).

def register_export(self, session: int, output_file: Optional[str] = None):
1125    def register_export(self, session: int, output_file: Optional[str] = None):
1126        """ Registers a file to be exported for the given session. The export file will be created during the session's run_session call.
1127
1128        Args:
1129            session (int): The session integer.
1130            output_file (Optional[str]): Default None. The file path where the export will be written. None exports the file in memory.
1131
1132        Returns:
1133            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call and 'session', the session integer. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
1134        """
1135        if not isinstance(output_file, (type(None), str)):
1136            raise TypeError(output_file)
1137
1138        if isinstance(output_file, str):
1139            output_file = os.path.abspath(output_file)
1140
1141            result = self._GW2RegisterExportFile(session, output_file)
1142
1143        elif isinstance(output_file, type(None)):
1144            result = self._GW2RegisterExportMemory(session)
1145
1146        if result.status not in successes.success_codes:
1147            log.error(format_object(result))
1148            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1149        else:
1150            log.debug(format_object(result))
1151
1152        return result

Registers a file to be exported for the given session. The export file will be created during the session's run_session call.

Args: session (int): The session integer. output_file (Optional[str]): Default None. The file path where the export will be written. None exports the file in memory.

Returns: gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call and 'session', the session integer. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.

def export_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
1154    def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1155        """ Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided.
1156
1157        Args:
1158            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
1159            output_file (Optional[str]): The output file path where the .zip file will be written.
1160            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
1161            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1162
1163        Returns:
1164            file_bytes (bytes): The exported .zip file.
1165        """
1166        # Validate arg types
1167        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
1168            raise TypeError(input_file)
1169        if not isinstance(output_file, (type(None), str)):
1170            raise TypeError(output_file)
1171        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
1172            raise TypeError(content_management_policy)
1173        if not isinstance(raise_unsupported, bool):
1174            raise TypeError(raise_unsupported)
1175
1176        # Convert string path arguments to absolute paths
1177        if isinstance(input_file, str):
1178            if not os.path.isfile(input_file):
1179                raise FileNotFoundError(input_file)
1180            input_file = os.path.abspath(input_file)
1181        if isinstance(output_file, str):
1182            output_file = os.path.abspath(output_file)
1183            # make directories that do not exist
1184            os.makedirs(os.path.dirname(output_file), exist_ok=True)
1185        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
1186            content_management_policy = os.path.abspath(content_management_policy)
1187
1188        # Convert memory inputs to bytes
1189        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
1190            input_file = utils.as_bytes(input_file)
1191
1192        with utils.CwdHandler(self.library_path):
1193            with self.new_session() as session:
1194                content_management_policy = self.set_content_management_policy(session, content_management_policy)
1195                register_input = self.register_input(session, input_file)
1196                register_export = self.register_export(session, output_file)
1197                status = self.run_session(session)
1198
1199                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
1200                if status not in successes.success_codes:
1201                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1202                    if raise_unsupported:
1203                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1204                    else:
1205                        file_bytes = None
1206                else:
1207                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1208                    # Get file bytes
1209                    if isinstance(output_file, str):
1210                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1211                        if not os.path.isfile(output_file):
1212                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1213                            file_bytes = None
1214                        else:
1215                            with open(output_file, "rb") as f:
1216                                file_bytes = f.read()
1217                    else:
1218                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1219                        file_bytes = utils.buffer_to_bytes(
1220                            register_export.buffer,
1221                            register_export.buffer_length
1222                        )
1223
1224                # Ensure memory allocated is not garbage collected
1225                content_management_policy, register_input, register_export
1226
1227                return file_bytes

Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Optional[str]): The output file path where the .zip file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_bytes (bytes): The exported .zip file.

def export_directory( self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
1229    def export_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1230        """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.
1231
1232        Args:
1233            input_directory (str): The input directory containing files to export.
1234            output_directory (Optional[str]): The output directory where the export files will be written, or None to not write files.
1235            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
1236            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1237
1238        Returns:
1239            export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
1240        """
1241        export_files_dict = {}
1242        # Call export_file on each file in input_directory to output_directory
1243        for input_file in utils.list_file_paths(input_directory):
1244            relative_path = os.path.relpath(input_file, input_directory) + ".zip"
1245            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
1246
1247            export_bytes = self.export_file(
1248                input_file=input_file,
1249                output_file=output_file,
1250                raise_unsupported=raise_unsupported,
1251                content_management_policy=content_management_policy,
1252            )
1253
1254            export_files_dict[relative_path] = export_bytes
1255
1256        return export_files_dict

Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing files to export. output_directory (Optional[str]): The output directory where the export files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

def export_and_analyse_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_analysis_report: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
1258    def export_and_analyse_file(
1259        self,
1260        input_file: Union[str, bytes, bytearray, io.BytesIO],
1261        output_file: Optional[str] = None,
1262        output_analysis_report: Optional[str] = None,
1263        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
1264        raise_unsupported: bool = True,
1265    ):
1266        """ Exports and analyses a file in a single session, returning both exported .zip bytes and analysis report bytes.
1267
1268        Args:
1269            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
1270            output_file (Optional[str]): The output file path where the .zip export will be written.
1271            output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written.
1272            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
1273            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1274
1275        Returns:
1276            Tuple[Optional[bytes], Optional[bytes]]: A tuple of (exported_file_bytes, analysis_report_bytes).
1277        """
1278        # Validate arg types
1279        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
1280            raise TypeError(f"input_file expected to be of type: str, bytes, bytearray, io.BytesIO. Got: {type(input_file).__name__}")
1281        if not isinstance(output_file, (type(None), str)):
1282            raise TypeError(f"output_file expected to be of type: None, str. Got: {type(output_file).__name__}")
1283        if not isinstance(output_analysis_report, (type(None), str)):
1284            raise TypeError(f"output_analysis_report expected to be of type: None, str. Got: {type(output_analysis_report).__name__}")
1285        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
1286            raise TypeError(f"content_management_policy expected to be of type: None, str, bytes, bytearray, io.BytesIO, Policy. Got: {type(content_management_policy).__name__}")
1287        if not isinstance(raise_unsupported, bool):
1288            raise TypeError(f"raise_unsupported expected to be of type: bool. Got: {type(raise_unsupported).__name__}")
1289
1290        # Convert string path arguments to absolute paths
1291        if isinstance(input_file, str):
1292            if not os.path.isfile(input_file):
1293                raise FileNotFoundError(input_file)
1294            input_file = os.path.abspath(input_file)
1295        if isinstance(output_file, str):
1296            output_file = os.path.abspath(output_file)
1297            os.makedirs(os.path.dirname(output_file), exist_ok=True)
1298        if isinstance(output_analysis_report, str):
1299            output_analysis_report = os.path.abspath(output_analysis_report)
1300            os.makedirs(os.path.dirname(output_analysis_report), exist_ok=True)
1301        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
1302            content_management_policy = os.path.abspath(content_management_policy)
1303
1304        # Convert memory inputs to bytes
1305        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
1306            input_file = utils.as_bytes(input_file)
1307
1308        with utils.CwdHandler(self.library_path):
1309            with self.new_session() as session:
1310                content_management_policy = self.set_content_management_policy(session, content_management_policy)
1311                register_input = self.register_input(session, input_file)
1312                register_export = self.register_export(session, output_file)
1313                register_analysis = self.register_analysis(session, output_analysis_report)
1314
1315                status = self.run_session(session)
1316
1317                export_file_bytes = None
1318                analysis_report_bytes = None
1319
1320                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
1321                if status not in successes.success_codes:
1322                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
1323                    if raise_unsupported:
1324                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1325
1326                # Get analysis report file bytes, even on processing failure
1327                if isinstance(output_analysis_report, str):
1328                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1329                    if not os.path.isfile(output_analysis_report):
1330                        log.error(f"Editor returned success code: {status} but no output file was found: {output_analysis_report}")
1331                    else:
1332                        with open(output_analysis_report, "rb") as f:
1333                            analysis_report_bytes = f.read()
1334                else:
1335                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1336                    if register_analysis.buffer and register_analysis.buffer_length:
1337                        analysis_report_bytes = utils.buffer_to_bytes(
1338                            register_analysis.buffer,
1339                            register_analysis.buffer_length
1340                        )
1341
1342                # On success, get export file bytes
1343                if status in successes.success_codes:
1344                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\toutput_analysis_report: {output_analysis_report}\n\tsession: {session}\n\tstatus: {status}")
1345                    # Get export file bytes
1346                    if isinstance(output_file, str):
1347                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1348                        if not os.path.isfile(output_file):
1349                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1350                        else:
1351                            with open(output_file, "rb") as f:
1352                                export_file_bytes = f.read()
1353                    else:
1354                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1355                        export_file_bytes = utils.buffer_to_bytes(
1356                            register_export.buffer,
1357                            register_export.buffer_length
1358                        )
1359
1360                # Ensure memory allocated is not garbage collected
1361                content_management_policy, register_input, register_export, register_analysis
1362
1363                return export_file_bytes, analysis_report_bytes

Exports and analyses a file in a single session, returning both exported .zip bytes and analysis report bytes.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Optional[str]): The output file path where the .zip export will be written. output_analysis_report (Optional[str]): The output file path where the XML analysis report will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: Tuple[Optional[bytes], Optional[bytes]]: A tuple of (exported_file_bytes, analysis_report_bytes).

def export_and_analyse_directory( self, input_directory: str, output_directory: Optional[str], analysis_directory: Optional[str], content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
1365    def export_and_analyse_directory(
1366        self,
1367        input_directory: str,
1368        output_directory: Optional[str],
1369        analysis_directory: Optional[str],
1370        content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None,
1371        raise_unsupported: bool = True,
1372    ):
1373        """ Recursively processes all files in a directory using export and analyse mode with the given content management policy.
1374        Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory.
1375
1376        Args:
1377            input_directory (str): The input directory containing files to process.
1378            output_directory (Optional[str]): The output directory for exported .zip files.
1379            analysis_directory (Optional[str]): The output directory for XML analysis reports.
1380            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
1381            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1382
1383        Returns:
1384            result_dict (dict): A dictionary mapping relative file paths to tuples of (exported_file_bytes, analysis_report_bytes).
1385        """
1386        result_dict = {}
1387
1388        for input_file in utils.list_file_paths(input_directory):
1389            relative_path = os.path.relpath(input_file, input_directory)
1390            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path + ".zip")
1391            output_analysis_report = None if analysis_directory is None else os.path.join(os.path.abspath(analysis_directory), relative_path + ".xml")
1392
1393            export_file_bytes, analysis_report_bytes = self.export_and_analyse_file(
1394                input_file=input_file,
1395                output_file=output_file,
1396                output_analysis_report=output_analysis_report,
1397                content_management_policy=content_management_policy,
1398                raise_unsupported=raise_unsupported,
1399            )
1400
1401            result_dict[relative_path] = (export_file_bytes, analysis_report_bytes)
1402
1403        return result_dict

Recursively processes all files in a directory using export and analyse mode with the given content management policy. Outputs are written to output_directory and analysis_directory maintaining the same structure as input_directory.

Args: input_directory (str): The input directory containing files to process. output_directory (Optional[str]): The output directory for exported .zip files. analysis_directory (Optional[str]): The output directory for XML analysis reports. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: result_dict (dict): A dictionary mapping relative file paths to tuples of (exported_file_bytes, analysis_report_bytes).

def register_import( self, session: int, input_file: Union[str, bytes, bytearray, _io.BytesIO]):
1466    def register_import(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
1467        """ Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call.
1468
1469        Args:
1470            session (int): The session integer.
1471            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input import file path or bytes.
1472
1473        Returns:
1474            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
1475        """
1476        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
1477            raise TypeError(input_file)
1478
1479        if isinstance(input_file, str):
1480            if not os.path.isfile(input_file):
1481                raise FileNotFoundError(input_file)
1482
1483            input_file = os.path.abspath(input_file)
1484
1485            result = self._GW2RegisterImportFile(session, input_file)
1486
1487        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
1488            # Convert bytearray and io.BytesIO to bytes
1489            input_file = utils.as_bytes(input_file)
1490
1491            result = self._GW2RegisterImportMemory(session, input_file)
1492
1493        if result.status not in successes.success_codes:
1494            log.error(format_object(result))
1495            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1496        else:
1497            log.debug(format_object(result))
1498
1499        return result

Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call.

Args: session (int): The session integer. input_file (Union[str, bytes, bytearray, io.BytesIO]): The input import file path or bytes.

Returns: gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.

def import_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
1501    def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1502        """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.
1503
1504        Args:
1505            input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes.
1506            output_file (Optional[str]): The output file path where the constructed file will be written.
1507            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
1508            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1509
1510        Returns:
1511            file_bytes (bytes): The imported file bytes.
1512        """
1513        # Validate arg types
1514        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
1515            raise TypeError(input_file)
1516        if not isinstance(output_file, (type(None), str)):
1517            raise TypeError(output_file)
1518        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
1519            raise TypeError(content_management_policy)
1520        if not isinstance(raise_unsupported, bool):
1521            raise TypeError(raise_unsupported)
1522
1523        # Convert string path arguments to absolute paths
1524        if isinstance(input_file, str):
1525            if not os.path.isfile(input_file):
1526                raise FileNotFoundError(input_file)
1527            input_file = os.path.abspath(input_file)
1528        if isinstance(output_file, str):
1529            output_file = os.path.abspath(output_file)
1530            # make directories that do not exist
1531            os.makedirs(os.path.dirname(output_file), exist_ok=True)
1532        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
1533            content_management_policy = os.path.abspath(content_management_policy)
1534
1535        # Convert memory inputs to bytes
1536        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
1537            input_file = utils.as_bytes(input_file)
1538
1539        with utils.CwdHandler(self.library_path):
1540            with self.new_session() as session:
1541                content_management_policy = self.set_content_management_policy(session, content_management_policy)
1542                register_import = self.register_import(session, input_file)
1543                register_output = self.register_output(session, output_file)
1544                status = self.run_session(session)
1545
1546                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
1547                if status not in successes.success_codes:
1548                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1549                    if raise_unsupported:
1550                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1551                    else:
1552                        file_bytes = None
1553                else:
1554                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
1555                    # Get file bytes
1556                    if isinstance(output_file, str):
1557                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
1558                        if not os.path.isfile(output_file):
1559                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
1560                            file_bytes = None
1561                        else:
1562                            with open(output_file, "rb") as f:
1563                                file_bytes = f.read()
1564                    else:
1565                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
1566                        file_bytes = utils.buffer_to_bytes(
1567                            register_output.buffer,
1568                            register_output.buffer_length
1569                        )
1570
1571                # Ensure memory allocated is not garbage collected
1572                content_management_policy, register_import, register_output
1573
1574                return file_bytes

Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. output_file (Optional[str]): The output file path where the constructed file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_bytes (bytes): The imported file bytes.

def import_directory( self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
1576    def import_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
1577        """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced.
1578        The constructed files are written to output_directory maintaining the same directory structure as input_directory.
1579
1580        Args:
1581            input_directory (str): The input directory containing files to import.
1582            output_directory (Optional[str]): The output directory where the constructed files will be written, or None to not write files.
1583            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
1584            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1585
1586        Returns:
1587            import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
1588        """
1589        import_files_dict = {}
1590        # Call import_file on each file in input_directory to output_directory
1591        for input_file in utils.list_file_paths(input_directory):
1592            relative_path = os.path.relpath(input_file, input_directory)
1593            # Remove .zip extension from relative_path
1594            relative_path = os.path.splitext(relative_path)[0]
1595            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
1596
1597            import_bytes = self.import_file(
1598                input_file=input_file,
1599                output_file=output_file,
1600                raise_unsupported=raise_unsupported,
1601                content_management_policy=content_management_policy,
1602            )
1603
1604            import_files_dict[relative_path] = import_bytes
1605
1606        return import_files_dict

Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. The constructed files are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing files to import. output_directory (Optional[str]): The output directory where the constructed files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

@functools.lru_cache()
def file_error_message(self, session: int) -> str:
1647    @functools.lru_cache()
1648    def file_error_message(self, session: int) -> str:
1649        """ Retrieve the Glasswall Session Process error message.
1650
1651        Args:
1652            session (int): The session integer.
1653
1654        Returns:
1655            error_message (str): The Glasswall Session Process error message.
1656        """
1657        # Validate arg types
1658        if not isinstance(session, int):
1659            raise TypeError(session)
1660
1661        result = self._GW2FileErrorMsg(session)
1662
1663        if result.status not in successes.success_codes:
1664            log.error(format_object(result))
1665            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1666        else:
1667            log.debug(format_object(result))
1668
1669        return result.error_message

Retrieve the Glasswall Session Process error message.

Args: session (int): The session integer.

Returns: error_message (str): The Glasswall Session Process error message.

def GW2GetFileType(self, session: int, file_type_id):
1671    def GW2GetFileType(self, session: int, file_type_id):
1672        """ Retrieve the file type as a string.
1673
1674        Args:
1675            session (int): The session integer.
1676            file_type_id (int): The file type id.
1677
1678        Returns:
1679            file_type (str): The formal file name for the corresponding file id.
1680        """
1681        # Validate arg types
1682        if not isinstance(session, int):
1683            raise TypeError(session)
1684
1685        # API function declaration
1686        self.library.GW2GetFileType.argtypes = [
1687            ct.c_size_t,
1688            ct.c_size_t,
1689            ct.POINTER(ct.c_size_t),
1690            ct.POINTER(ct.c_void_p)
1691        ]
1692
1693        # Variable initialisation
1694        ct_session = ct.c_size_t(session)
1695        ct_file_type = ct.c_size_t(file_type_id)
1696        ct_buffer_length = ct.c_size_t()
1697        ct_buffer = ct.c_void_p()
1698
1699        # API call
1700        status = self.library.GW2GetFileType(
1701            ct_session,
1702            ct_file_type,
1703            ct.byref(ct_buffer_length),
1704            ct.byref(ct_buffer)
1705        )
1706
1707        if status not in successes.success_codes:
1708            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
1709            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1710        else:
1711            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")
1712
1713        # Editor wrote to a buffer, convert it to bytes
1714        file_type_bytes = utils.buffer_to_bytes(
1715            ct_buffer,
1716            ct_buffer_length
1717        )
1718
1719        file_type = file_type_bytes.decode()
1720
1721        return file_type

Retrieve the file type as a string.

Args: session (int): The session integer. file_type_id (int): The file type id.

Returns: file_type (str): The formal file name for the corresponding file id.

def GW2GetFileTypeID(self, session: int, file_type_str):
1723    def GW2GetFileTypeID(self, session: int, file_type_str):
1724        """ Retrieve the Glasswall file type id given a file type string.
1725
1726        Args:
1727            session (int): The session integer.
1728            file_type_str (str): The file type as a string.
1729
1730        Returns:
1731            file_type_id (str): The Glasswall file type id for the specified file type.
1732        """
1733        # Validate arg types
1734        if not isinstance(session, int):
1735            raise TypeError(session)
1736
1737        # API function declaration
1738        self.library.GW2GetFileTypeID.argtypes = [
1739            ct.c_size_t,
1740            ct.c_char_p,
1741            ct.POINTER(ct.c_size_t),
1742            ct.POINTER(ct.c_void_p)
1743        ]
1744
1745        # Variable initialisation
1746        ct_session = ct.c_size_t(session)
1747        ct_file_type = ct.c_char_p(file_type_str.encode('utf-8'))
1748        ct_buffer_length = ct.c_size_t()
1749        ct_buffer = ct.c_void_p()
1750
1751        # API call
1752        status = self.library.GW2GetFileTypeID(
1753            ct_session,
1754            ct_file_type,
1755            ct.byref(ct_buffer_length),
1756            ct.byref(ct_buffer)
1757        )
1758
1759        if status not in successes.success_codes:
1760            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
1761            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
1762        else:
1763            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")
1764
1765        # Editor wrote to a buffer, convert it to bytes
1766        file_type_bytes = utils.buffer_to_bytes(
1767            ct_buffer,
1768            ct_buffer_length
1769        )
1770
1771        file_type_id = file_type_bytes.decode()
1772
1773        return file_type_id

Retrieve the Glasswall file type id given a file type string.

Args: session (int): The session integer. file_type_str (str): The file type as a string.

Returns: file_type_id (str): The Glasswall file type id for the specified file type.

def get_file_type_info(self, file_type: Union[str, int]):
1775    def get_file_type_info(self, file_type: Union[str, int]):
1776        """ Retrieve information about a file type based on its identifier.
1777
1778        Args:
1779            file_type (Union[str, int]): The file type identifier. This can be either a string representing a file
1780            extension (e.g. 'bmp') or an integer corresponding to a file type (e.g. 29).
1781
1782        Returns:
1783            - file_type_info (Union[int, str]): Depending on the input 'file_type':
1784                - If `file_type` is a string (e.g. 'bmp'):
1785                    - If the file type is recognised, returns an integer corresponding to that file type.
1786                    - If the file type is not recognised, returns 0.
1787                - If `file_type` is an integer (e.g. 29):
1788                    - If the integer corresponds to a recognised file type, returns a more detailed string description
1789                        of the file type (e.g. 'BMP Image').
1790                    - If the integer does not match any recognised file type, returns an empty string.
1791        """
1792        # Validate arg types
1793        if not isinstance(file_type, (str, int)):
1794            raise TypeError(file_type)
1795
1796        with utils.CwdHandler(self.library_path):
1797            with self.new_session() as session:
1798
1799                if isinstance(file_type, int):
1800                    file_type_info = self.GW2GetFileType(session, file_type)
1801                if isinstance(file_type, str):
1802                    file_type_info = self.GW2GetFileTypeID(session, file_type)
1803
1804                return file_type_info

Retrieve information about a file type based on its identifier.

Args: file_type (Union[str, int]): The file type identifier. This can be either a string representing a file extension (e.g. 'bmp') or an integer corresponding to a file type (e.g. 29).

Returns: - file_type_info (Union[int, str]): Depending on the input 'file_type': - If file_type is a string (e.g. 'bmp'): - If the file type is recognised, returns an integer corresponding to that file type. - If the file type is not recognised, returns 0. - If file_type is an integer (e.g. 29): - If the integer corresponds to a recognised file type, returns a more detailed string description of the file type (e.g. 'BMP Image'). - If the integer does not match any recognised file type, returns an empty string.

@utils.deprecated_function(replacement_function=get_file_type_info)
def get_file_info(self, *args, **kwargs):
1806    @utils.deprecated_function(replacement_function=get_file_type_info)
1807    def get_file_info(self, *args, **kwargs):
1808        """ Deprecated in 1.0.6. Use get_file_type_info. """
1809        pass

Deprecated in 1.0.6. Use get_file_type_info.

def register_report_file(self, session: int, output_file: str):
1840    def register_report_file(self, session: int, output_file: str):
1841        """ Register the report file path for the given session.
1842
1843        Args:
1844            session (int): The session integer.
1845            output_file (str): The file path of the report file.
1846
1847        Returns:
1848            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.
1849        """
1850        # Validate arg types
1851        if not isinstance(session, int):
1852            raise TypeError(session)
1853        if not isinstance(output_file, (type(None), str)):
1854            raise TypeError(output_file)
1855
1856        result = self._GW2RegisterReportFile(session, output_file)
1857
1858        if result.status not in successes.success_codes:
1859            log.error(format_object(result))
1860            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1861        else:
1862            log.debug(format_object(result))
1863
1864        return result

Register the report file path for the given session.

Args: session (int): The session integer. output_file (str): The file path of the report file.

Returns: gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'output_file', 'status'.

def get_id_info(self, issue_id: int, raise_unsupported: bool = True):
1910    def get_id_info(self, issue_id: int, raise_unsupported: bool = True):
1911        """ Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances"
1912
1913        Args:
1914            issue_id (int): The issue id.
1915            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1916
1917        Returns:
1918            id_info (str): The group description for the given Issue ID.
1919        """
1920        # Validate arg types
1921        if not isinstance(issue_id, int):
1922            raise TypeError(issue_id)
1923
1924        with utils.CwdHandler(self.library_path):
1925            with self.new_session() as session:
1926                result = self._GW2GetIdInfo(session, issue_id)
1927
1928                if result.status not in successes.success_codes:
1929                    log.error(format_object(result))
1930                    if raise_unsupported:
1931                        raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1932                else:
1933                    log.debug(format_object(result))
1934
1935                return result.id_info

Retrieves the group description for the given Issue ID. e.g. issue_id 96 returns "Document Processing Instances"

Args: issue_id (int): The issue id. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: id_info (str): The group description for the given Issue ID.

def get_all_id_info( self, output_file: Optional[str] = None, raise_unsupported: bool = True) -> str:
1975    def get_all_id_info(self, output_file: Optional[str] = None, raise_unsupported: bool = True) -> str:
1976        """ Retrieves the XML containing all the Issue ID ranges with their group descriptions
1977
1978        Args:
1979            output_file (Optional[str]): The output file path where the analysis file will be written.
1980            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
1981
1982        Returns:
1983            all_id_info (str): A string XML analysis report containing all id info.
1984        """
1985        # Validate arg types
1986        if not isinstance(output_file, (type(None), str)):
1987            raise TypeError(output_file)
1988        if isinstance(output_file, str):
1989            output_file = os.path.abspath(output_file)
1990
1991        with utils.CwdHandler(self.library_path):
1992            with self.new_session() as session:
1993                result = self._GW2GetAllIdInfo(session)
1994
1995                if result.status not in successes.success_codes:
1996                    log.error(format_object(result))
1997                    if raise_unsupported:
1998                        raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
1999                else:
2000                    log.debug(format_object(result))
2001
2002                if isinstance(output_file, str):
2003                    # GW2GetAllIdInfo is memory only, write to file
2004                    # make directories that do not exist
2005                    os.makedirs(os.path.dirname(output_file), exist_ok=True)
2006                    with open(output_file, "w") as f:
2007                        f.write(result.all_id_info)
2008
2009                return result.all_id_info

Retrieves the XML containing all the Issue ID ranges with their group descriptions

Args: output_file (Optional[str]): The output file path where the analysis file will be written. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: all_id_info (str): A string XML analysis report containing all id info.

def file_session_status_message(self, session: int, raise_unsupported: bool = True) -> str:
2056    def file_session_status_message(self, session: int, raise_unsupported: bool = True) -> str:
2057        """ Retrieves the Glasswall session status message. Gives a high level indication of the processing that was carried out.
2058
2059        Args:
2060            session (int): The session integer.
2061            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
2062
2063        Returns:
2064            result.message (str):The file session status message.
2065        """
2066        # Validate arg types
2067        if not isinstance(session, int):
2068            raise TypeError(session)
2069
2070        result = self._GW2FileSessionStatus(session)
2071
2072        if result.status not in successes.success_codes:
2073            log.error(format_object(result))
2074            if raise_unsupported:
2075                raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
2076        else:
2077            log.debug(format_object(result))
2078
2079        return result.message

Retrieves the Glasswall session status message. Gives a high level indication of the processing that was carried out.

Args: session (int): The session integer. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: result.message (str):The file session status message.

def licence_details(self):
2105    def licence_details(self):
2106        """ Returns a string containing details of the licence.
2107
2108        Returns:
2109            result (str): A string containing details of the licence.
2110        """
2111        with self.new_session() as session:
2112            result = self._GW2LicenceDetails(session)
2113
2114            log.debug(f"\n\tsession: {session}\n\tGW2LicenceDetails: {result}")
2115
2116        return result

Returns a string containing details of the licence.

Returns: result (str): A string containing details of the licence.

def register_licence( self, session: int, input_file: Union[str, bytes, bytearray, _io.BytesIO]):
2239    def register_licence(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
2240        """ Registers a "gwkey.lic" licence from file path or memory.
2241
2242        Args:
2243            session (int): The session integer.
2244            input_file (Union[str, bytes, bytearray, io.BytesIO]): The "gwkey.lic" licence. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object.
2245
2246        Returns:
2247            - result (glasswall.GwReturnObj): Depending on the input 'input_file':
2248                - If input_file is a str file path:
2249                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.
2250
2251                - If input_file is a file in memory:
2252                    - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.
2253        """
2254        if isinstance(input_file, str):
2255            if not os.path.isfile(input_file):
2256                raise FileNotFoundError(input_file)
2257
2258            input_file = os.path.abspath(input_file)
2259
2260            result = self._GW2RegisterLicenceFile(session, input_file)
2261
2262        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
2263            # Convert bytearray and io.BytesIO to bytes
2264            input_file = utils.as_bytes(input_file)
2265
2266            result = self._GW2RegisterLicenceMemory(session, input_file)
2267
2268        else:
2269            raise TypeError(input_file)
2270
2271        if result.status not in successes.success_codes:
2272            log.error(format_object(result))
2273            raise errors.error_codes.get(result.status, errors.UnknownErrorCode)(result.status)
2274        else:
2275            log.debug(format_object(result))
2276
2277        return result

Registers a "gwkey.lic" licence from file path or memory.

Args: session (int): The session integer. input_file (Union[str, bytes, bytearray, io.BytesIO]): The "gwkey.lic" licence. It can be provided as a file path (str), bytes, bytearray, or a BytesIO object.

Returns: - result (glasswall.GwReturnObj): Depending on the input 'input_file': - If input_file is a str file path: - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'input_file', 'status'.

    - If input_file is a file in memory:
        - gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes 'session', 'buffer', 'buffer_length', 'status'.