glasswall.libraries.rebuild.rebuild

  1import ctypes as ct
  2import io
  3import os
  4from typing import Union
  5
  6import glasswall
  7from glasswall import determine_file_type as dft
  8from glasswall import utils
  9from glasswall.config.logging import log
 10from glasswall.libraries.library import Library
 11from glasswall.libraries.rebuild import errors, successes
 12
 13
 14class Rebuild(Library):
 15    """ A high level Python wrapper for Glasswall Rebuild / Classic. """
 16
 17    def __init__(self, library_path: str):
 18        super().__init__(library_path=library_path)
 19        self.library = self.load_library(os.path.abspath(library_path))
 20
 21        # Set content management configuration to default
 22        self.set_content_management_policy(input_file=None)
 23
 24        # Validate killswitch has not activated
 25        self.validate_licence()
 26
 27        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
 28
 29    def validate_licence(self):
 30        """ Validates the licence of the library by attempting to call protect_file on a known supported file.
 31
 32        Raises:
 33            RebuildError: If the licence could not be validated.
 34        """
 35        # Call protect file on a known good bitmap to see if licence has expired
 36        try:
 37            self.protect_file(
 38                input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00",
 39                raise_unsupported=True
 40            )
 41            log.debug(f"{self.__class__.__name__} licence validated successfully.")
 42        except errors.RebuildError:
 43            log.error(f"{self.__class__.__name__} licence validation failed.")
 44            raise
 45
 46    def version(self):
 47        """ Returns the Glasswall library version.
 48
 49        Returns:
 50            version (str): The Glasswall library version.
 51        """
 52
 53        # Declare the return type
 54        self.library.GWFileVersion.restype = ct.c_wchar_p
 55
 56        # API call
 57        version = self.library.GWFileVersion()
 58
 59        return version
 60
 61    def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True):
 62        """ Returns an int representing the file type / file format of a file.
 63
 64        Args:
 65            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path.
 66            as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False.
 67            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 68
 69        Returns:
 70            file_type (Union[int, str]): The file format.
 71        """
 72
 73        if isinstance(input_file, str):
 74            if not os.path.isfile(input_file):
 75                raise FileNotFoundError(input_file)
 76
 77            self.library.GWDetermineFileTypeFromFile.argtypes = [ct.c_wchar_p]
 78            self.library.GWDetermineFileTypeFromFile.restype = ct.c_int
 79
 80            # convert to ct.c_wchar_p
 81            ct_input_file = ct.c_wchar_p(input_file)
 82
 83            # API call
 84            file_type = self.library.GWDetermineFileTypeFromFile(ct_input_file)
 85
 86        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 87            self.library.GWDetermineFileTypeFromFileInMem.argtypes = [ct.c_char_p, ct.c_size_t]
 88            self.library.GWDetermineFileTypeFromFileInMem.restype = ct.c_int
 89
 90            # convert to bytes
 91            bytes_input_file = utils.as_bytes(input_file)
 92
 93            # ctypes conversion
 94            ct_input_buffer = ct.c_char_p(bytes_input_file)
 95            ct_butter_length = ct.c_size_t(len(bytes_input_file))
 96
 97            # API call
 98            file_type = self.library.GWDetermineFileTypeFromFileInMem(
 99                ct_input_buffer,
100                ct_butter_length
101            )
102
103        file_type_as_string = dft.file_type_int_to_str(file_type)
104        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
105
106        if not dft.is_success(file_type):
107            if raise_unsupported:
108                log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
109                raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
110            else:
111                log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
112        else:
113            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
114
115        if as_string:
116            return file_type_as_string
117
118        return file_type
119
120    def get_content_management_policy(self):
121        """ Gets the current content management configuration.
122
123        Returns:
124            xml_string (str): The XML string of the current content management configuration.
125        """
126
127        # Declare argument types
128        self.library.GWFileConfigGet.argtypes = [
129            ct.POINTER(ct.POINTER(ct.c_wchar)),
130            ct.POINTER(ct.c_size_t)
131        ]
132
133        # Variable initialisation
134        ct_input_buffer = ct.POINTER(ct.c_wchar)()
135        ct_input_size = ct.c_size_t(0)
136
137        # API call
138        status = self.library.GWFileConfigGet(
139            ct.byref(ct_input_buffer),
140            ct.byref(ct_input_size)
141        )
142
143        if status not in successes.success_codes:
144            log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
145            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
146        else:
147            log.debug(f"\n\tstatus: {status}")
148
149        # As string
150        xml_string = utils.validate_xml(ct.wstring_at(ct_input_buffer))
151
152        return xml_string
153
154    def set_content_management_policy(self, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None):
155        """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.
156
157        Args:
158            input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
159
160        Returns:
161            status (int): The result of the Glasswall API call.
162        """
163        # Validate type
164        if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
165            raise TypeError(input_file)
166
167        # self.library.GWFileConfigRevertToDefaults doesn't work, load default instead
168        # Set input_file to default if input_file is None
169        if input_file is None:
170            input_file = glasswall.content_management.policies.Rebuild(default="sanitise")
171
172        # Validate xml content is parsable
173        xml_string = utils.validate_xml(input_file)
174
175        # Declare argument types
176        self.library.GWFileConfigXML.argtypes = [ct.c_wchar_p]
177
178        # API call
179        status = self.library.GWFileConfigXML(
180            ct.c_wchar_p(xml_string)
181        )
182
183        if status not in successes.success_codes:
184            log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
185            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
186        else:
187            log.debug(f"\n\tstatus: {status}")
188
189        return status
190
191    def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
192        """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.
193
194        Args:
195            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
196            output_file (Union[None, str], optional): The output file path where the protected file will be written.
197            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
198            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
199
200        Returns:
201            file_bytes (bytes): The protected file bytes.
202        """
203        # Validate arg types
204        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
205            raise TypeError(input_file)
206        if not isinstance(output_file, (type(None), str)):
207            raise TypeError(output_file)
208        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
209            raise TypeError(content_management_policy)
210        if not isinstance(raise_unsupported, bool):
211            raise TypeError(raise_unsupported)
212
213        # Convert string path arguments to absolute paths
214        if isinstance(input_file, str):
215            if not os.path.isfile(input_file):
216                raise FileNotFoundError(input_file)
217            input_file = os.path.abspath(input_file)
218        if isinstance(output_file, str):
219            output_file = os.path.abspath(output_file)
220            # make directories that do not exist
221            os.makedirs(os.path.dirname(output_file), exist_ok=True)
222        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
223            content_management_policy = os.path.abspath(content_management_policy)
224
225        # Convert memory inputs to bytes
226        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
227            input_file = utils.as_bytes(input_file)
228
229        # Check that file type is supported
230        try:
231            file_type = self.determine_file_type(input_file=input_file)
232        except dft.errors.FileTypeEnumError:
233            if raise_unsupported:
234                raise
235            else:
236                return None
237
238        with utils.CwdHandler(self.library_path):
239            # Set content management policy
240            self.set_content_management_policy(content_management_policy)
241
242            # file to file
243            if isinstance(input_file, str) and isinstance(output_file, str):
244                # API function declaration
245                self.library.GWFileToFileProtect.argtypes = [
246                    ct.c_wchar_p,
247                    ct.c_wchar_p,
248                    ct.c_wchar_p
249                ]
250
251                # Variable initialisation
252                ct_input_file = ct.c_wchar_p(input_file)
253                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
254                ct_output_file = ct.c_wchar_p(output_file)
255
256                # API call
257                status = self.library.GWFileToFileProtect(
258                    ct_input_file,
259                    ct_file_type,
260                    ct_output_file
261                )
262
263            # file to memory
264            elif isinstance(input_file, str) and output_file is None:
265                # API function declaration
266                self.library.GWFileProtect.argtypes = [
267                    ct.c_wchar_p,
268                    ct.c_wchar_p,
269                    ct.POINTER(ct.c_void_p),
270                    ct.POINTER(ct.c_size_t)
271                ]
272
273                # Variable initialisation
274                ct_input_file = ct.c_wchar_p(input_file)
275                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
276                ct_output_buffer = ct.c_void_p(0)
277                ct_output_size = ct.c_size_t(0)
278
279                # API call
280                status = self.library.GWFileProtect(
281                    ct_input_file,
282                    ct_file_type,
283                    ct.byref(ct_output_buffer),
284                    ct.byref(ct_output_size)
285                )
286
287            # memory to memory and memory to file
288            elif isinstance(input_file, bytes):
289                # API function declaration
290                self.library.GWMemoryToMemoryProtect.argtypes = [
291                    ct.c_void_p,
292                    ct.c_size_t,
293                    ct.c_wchar_p,
294                    ct.POINTER(ct.c_void_p),
295                    ct.POINTER(ct.c_size_t)
296                ]
297
298                # Variable initialization
299                bytearray_buffer = bytearray(input_file)
300                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
301                ct_input_size = ct.c_size_t(len(input_file))
302                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
303                ct_output_buffer = ct.c_void_p(0)
304                ct_output_size = ct.c_size_t(0)
305
306                status = self.library.GWMemoryToMemoryProtect(
307                    ct_input_buffer,
308                    ct_input_size,
309                    ct_file_type,
310                    ct.byref(ct_output_buffer),
311                    ct.byref(ct_output_size)
312                )
313
314            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
315            if status not in successes.success_codes:
316                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
317                if raise_unsupported:
318                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
319                else:
320                    file_bytes = None
321            else:
322                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
323                if isinstance(input_file, str) and isinstance(output_file, str):
324                    # file to file, read the bytes of the file that Rebuild has already written
325                    if not os.path.isfile(output_file):
326                        log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
327                        file_bytes = None
328                    else:
329                        with open(output_file, "rb") as f:
330                            file_bytes = f.read()
331                else:
332                    # file to memory, memory to memory
333                    file_bytes = utils.buffer_to_bytes(
334                        ct_output_buffer,
335                        ct_output_size
336                    )
337                    if isinstance(output_file, str):
338                        # memory to file
339                        # no Rebuild function exists for memory to file, write the memory to file ourselves
340                        with open(output_file, "wb") as f:
341                            f.write(file_bytes)
342
343            return file_bytes
344
345    def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
346        """ Recursively processes all files in a directory in protect mode using the given content management policy.
347        The protected files are written to output_directory maintaining the same directory structure as input_directory.
348
349        Args:
350            input_directory (str): The input directory containing files to protect.
351            output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files.
352            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
353            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
354
355        Returns:
356            protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
357        """
358        protected_files_dict = {}
359        # Call protect_file on each file in input_directory to output_directory
360        for input_file in utils.list_file_paths(input_directory):
361            relative_path = os.path.relpath(input_file, input_directory)
362            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
363
364            protected_bytes = self.protect_file(
365                input_file=input_file,
366                output_file=output_file,
367                raise_unsupported=raise_unsupported,
368                content_management_policy=content_management_policy,
369            )
370
371            protected_files_dict[relative_path] = protected_bytes
372
373        return protected_files_dict
374
375    def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
376        """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.
377
378        Args:
379            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
380            output_file (Union[None, str], optional): The output file path where the analysis file will be written.
381            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
382            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
383
384        Returns:
385            file_bytes (bytes): The analysis file bytes.
386        """
387        # Validate arg types
388        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
389            raise TypeError(input_file)
390        if not isinstance(output_file, (type(None), str)):
391            raise TypeError(output_file)
392        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
393            raise TypeError(content_management_policy)
394        if not isinstance(raise_unsupported, bool):
395            raise TypeError(raise_unsupported)
396
397        # Convert string path arguments to absolute paths
398        if isinstance(input_file, str):
399            if not os.path.isfile(input_file):
400                raise FileNotFoundError(input_file)
401            input_file = os.path.abspath(input_file)
402        if isinstance(output_file, str):
403            output_file = os.path.abspath(output_file)
404            # make directories that do not exist
405            os.makedirs(os.path.dirname(output_file), exist_ok=True)
406        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
407            content_management_policy = os.path.abspath(content_management_policy)
408
409        # Convert memory inputs to bytes
410        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
411            input_file = utils.as_bytes(input_file)
412
413        # Check that file type is supported
414        try:
415            file_type = self.determine_file_type(input_file=input_file)
416        except dft.errors.FileTypeEnumError:
417            if raise_unsupported:
418                raise
419            else:
420                return None
421
422        with utils.CwdHandler(self.library_path):
423            # Set content management policy
424            self.set_content_management_policy(content_management_policy)
425
426            # file to file
427            if isinstance(input_file, str) and isinstance(output_file, str):
428                # API function declaration
429                self.library.GWFileToFileAnalysisAudit.argtypes = [
430                    ct.c_wchar_p,
431                    ct.c_wchar_p,
432                    ct.c_wchar_p
433                ]
434
435                # Variable initialisation
436                ct_input_file = ct.c_wchar_p(input_file)
437                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
438                ct_output_file = ct.c_wchar_p(output_file)
439
440                # API call
441                status = self.library.GWFileToFileAnalysisAudit(
442                    ct_input_file,
443                    ct_file_type,
444                    ct_output_file
445                )
446
447            # file to memory
448            elif isinstance(input_file, str) and output_file is None:
449                # API function declaration
450                self.library.GWFileAnalysisAudit.argtypes = [
451                    ct.c_wchar_p,
452                    ct.c_wchar_p,
453                    ct.POINTER(ct.c_void_p),
454                    ct.POINTER(ct.c_size_t)
455                ]
456
457                # Variable initialisation
458                ct_input_file = ct.c_wchar_p(input_file)
459                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
460                ct_output_buffer = ct.c_void_p(0)
461                ct_output_size = ct.c_size_t(0)
462
463                # API call
464                status = self.library.GWFileAnalysisAudit(
465                    ct_input_file,
466                    ct_file_type,
467                    ct.byref(ct_output_buffer),
468                    ct.byref(ct_output_size)
469                )
470
471            # memory to memory and memory to file
472            elif isinstance(input_file, bytes):
473                # API function declaration
474                self.library.GWMemoryToMemoryAnalysisAudit.argtypes = [
475                    ct.c_void_p,
476                    ct.c_size_t,
477                    ct.c_wchar_p,
478                    ct.POINTER(ct.c_void_p),
479                    ct.POINTER(ct.c_size_t)
480                ]
481
482                # Variable initialization
483                bytearray_buffer = bytearray(input_file)
484                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
485                ct_input_size = ct.c_size_t(len(input_file))
486                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
487                ct_output_buffer = ct.c_void_p(0)
488                ct_output_size = ct.c_size_t(0)
489
490                status = self.library.GWMemoryToMemoryAnalysisAudit(
491                    ct.byref(ct_input_buffer),
492                    ct_input_size,
493                    ct_file_type,
494                    ct.byref(ct_output_buffer),
495                    ct.byref(ct_output_size)
496                )
497
498            file_bytes = None
499            if isinstance(input_file, str) and isinstance(output_file, str):
500                # file to file, read the bytes of the file that Rebuild has already written
501                if os.path.isfile(output_file):
502                    with open(output_file, "rb") as f:
503                        file_bytes = f.read()
504            else:
505                # file to memory, memory to memory
506                if ct_output_buffer and ct_output_size:
507                    file_bytes = utils.buffer_to_bytes(
508                        ct_output_buffer,
509                        ct_output_size
510                    )
511                    if isinstance(output_file, str):
512                        # memory to file
513                        # no Rebuild function exists for memory to file, write the memory to file ourselves
514                        with open(output_file, "wb") as f:
515                            f.write(file_bytes)
516
517            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
518            if status not in successes.success_codes:
519                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
520                if raise_unsupported:
521                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
522            else:
523                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
524
525            return file_bytes
526
527    def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
528        """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.
529
530        Args:
531            input_directory (str): The input directory containing files to analyse.
532            output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files.
533            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
534            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
535
536        Returns:
537            analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
538        """
539        analysis_files_dict = {}
540        # Call analyse_file on each file in input_directory to output_directory
541        for input_file in utils.list_file_paths(input_directory):
542            relative_path = os.path.relpath(input_file, input_directory) + ".xml"
543            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
544
545            analysis_bytes = self.analyse_file(
546                input_file=input_file,
547                output_file=output_file,
548                raise_unsupported=raise_unsupported,
549                content_management_policy=content_management_policy,
550            )
551
552            analysis_files_dict[relative_path] = analysis_bytes
553
554        return analysis_files_dict
555
556    def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
557        """ Export a file, returning the .zip file bytes. The .zip file is written to output_file.
558
559        Args:
560            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
561            output_file (Union[None, str], optional): The output file path where the .zip file will be written.
562            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
563            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
564
565        Returns:
566            file_bytes (bytes): The exported .zip file.
567        """
568        # Validate arg types
569        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
570            raise TypeError(input_file)
571        if not isinstance(output_file, (type(None), str)):
572            raise TypeError(output_file)
573        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
574            raise TypeError(content_management_policy)
575        if not isinstance(raise_unsupported, bool):
576            raise TypeError(raise_unsupported)
577
578        # Convert string path arguments to absolute paths
579        if isinstance(input_file, str):
580            if not os.path.isfile(input_file):
581                raise FileNotFoundError(input_file)
582            input_file = os.path.abspath(input_file)
583        if isinstance(output_file, str):
584            output_file = os.path.abspath(output_file)
585            # make directories that do not exist
586            os.makedirs(os.path.dirname(output_file), exist_ok=True)
587        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
588            content_management_policy = os.path.abspath(content_management_policy)
589
590        # Convert memory inputs to bytes
591        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
592            input_file = utils.as_bytes(input_file)
593
594        # Check that file type is supported
595        try:
596            self.determine_file_type(input_file=input_file)
597        except dft.errors.FileTypeEnumError:
598            if raise_unsupported:
599                raise
600            else:
601                return None
602
603        with utils.CwdHandler(self.library_path):
604            # Set content management policy
605            self.set_content_management_policy(content_management_policy)
606
607            # file to file
608            if isinstance(input_file, str) and isinstance(output_file, str):
609                # API function declaration
610                self.library.GWFileToFileAnalysisProtectAndExport.argtypes = [
611                    ct.c_wchar_p,
612                    ct.c_wchar_p
613                ]
614
615                # Variable initialisation
616                ct_input_file = ct.c_wchar_p(input_file)
617                ct_output_file = ct.c_wchar_p(output_file)
618
619                # API call
620                status = self.library.GWFileToFileAnalysisProtectAndExport(
621                    ct_input_file,
622                    ct_output_file
623                )
624
625            # file to memory
626            elif isinstance(input_file, str) and output_file is None:
627                # API function declaration
628                self.library.GWFileToMemoryAnalysisProtectAndExport.argtypes = [
629                    ct.c_wchar_p,
630                    ct.POINTER(ct.c_void_p),
631                    ct.POINTER(ct.c_size_t)
632                ]
633
634                # Variable initialisation
635                ct_input_file = ct.c_wchar_p(input_file)
636                ct_output_buffer = ct.c_void_p(0)
637                ct_output_size = ct.c_size_t(0)
638
639                # API call
640                status = self.library.GWFileToMemoryAnalysisProtectAndExport(
641                    ct_input_file,
642                    ct.byref(ct_output_buffer),
643                    ct.byref(ct_output_size)
644                )
645
646            # memory to memory and memory to file
647            elif isinstance(input_file, bytes):
648                # API function declaration
649                self.library.GWMemoryToMemoryAnalysisProtectAndExport.argtypes = [
650                    ct.c_void_p,
651                    ct.c_size_t,
652                    ct.POINTER(ct.c_void_p),
653                    ct.POINTER(ct.c_size_t)
654                ]
655
656                # Variable initialization
657                bytearray_buffer = bytearray(input_file)
658                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
659                ct_input_size = ct.c_size_t(len(input_file))
660                ct_output_buffer = ct.c_void_p(0)
661                ct_output_size = ct.c_size_t(0)
662
663                status = self.library.GWMemoryToMemoryAnalysisProtectAndExport(
664                    ct_input_buffer,
665                    ct_input_size,
666                    ct.byref(ct_output_buffer),
667                    ct.byref(ct_output_size)
668                )
669
670            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
671            if status not in successes.success_codes:
672                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
673                if raise_unsupported:
674                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
675                else:
676                    file_bytes = None
677            else:
678                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
679                if isinstance(input_file, str) and isinstance(output_file, str):
680                    # file to file, read the bytes of the file that Rebuild has already written
681                    if not os.path.isfile(output_file):
682                        log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
683                        file_bytes = None
684                    else:
685                        with open(output_file, "rb") as f:
686                            file_bytes = f.read()
687                else:
688                    # file to memory, memory to memory
689                    file_bytes = utils.buffer_to_bytes(
690                        ct_output_buffer,
691                        ct_output_size
692                    )
693                    if isinstance(output_file, str):
694                        # memory to file
695                        # no Rebuild function exists for memory to file, write the memory to file ourselves
696                        with open(output_file, "wb") as f:
697                            f.write(file_bytes)
698
699            return file_bytes
700
701    def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
702        """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.
703
704        Args:
705            input_directory (str): The input directory containing files to export.
706            output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files.
707            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
708            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
709
710        Returns:
711            export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
712        """
713        export_files_dict = {}
714        # Call export_file on each file in input_directory to output_directory
715        for input_file in utils.list_file_paths(input_directory):
716            relative_path = os.path.relpath(input_file, input_directory) + ".zip"
717            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
718
719            export_bytes = self.export_file(
720                input_file=input_file,
721                output_file=output_file,
722                raise_unsupported=raise_unsupported,
723                content_management_policy=content_management_policy,
724            )
725
726            export_files_dict[relative_path] = export_bytes
727
728        return export_files_dict
729
730    def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
731        """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.
732
733        Args:
734            input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes.
735            output_file (Union[None, str], optional): The output file path where the constructed file will be written.
736            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
737            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
738
739        Returns:
740            file_bytes (bytes): The imported file bytes.
741        """
742        # Validate arg types
743        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
744            raise TypeError(input_file)
745        if not isinstance(output_file, (type(None), str)):
746            raise TypeError(output_file)
747        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
748            raise TypeError(content_management_policy)
749        if not isinstance(raise_unsupported, bool):
750            raise TypeError(raise_unsupported)
751
752        # Convert string path arguments to absolute paths
753        if isinstance(input_file, str):
754            if not os.path.isfile(input_file):
755                raise FileNotFoundError(input_file)
756            input_file = os.path.abspath(input_file)
757        if isinstance(output_file, str):
758            output_file = os.path.abspath(output_file)
759            # make directories that do not exist
760            os.makedirs(os.path.dirname(output_file), exist_ok=True)
761        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
762            content_management_policy = os.path.abspath(content_management_policy)
763
764        # Convert memory inputs to bytes
765        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
766            input_file = utils.as_bytes(input_file)
767
768        # Check that file type is supported
769        try:
770            self.determine_file_type(input_file=input_file)
771        except dft.errors.FileTypeEnumError:
772            if raise_unsupported:
773                raise
774            else:
775                return None
776
777        with utils.CwdHandler(self.library_path):
778            # Set content management policy
779            self.set_content_management_policy(content_management_policy)
780
781            # file to file
782            if isinstance(input_file, str) and isinstance(output_file, str):
783                # API function declaration
784                self.library.GWFileToFileProtectAndImport.argtypes = [
785                    ct.c_wchar_p,
786                    ct.c_wchar_p
787                ]
788
789                # Variable initialisation
790                ct_input_file = ct.c_wchar_p(input_file)
791                ct_output_file = ct.c_wchar_p(output_file)
792
793                # API call
794                status = self.library.GWFileToFileProtectAndImport(
795                    ct_input_file,
796                    ct_output_file
797                )
798
799            # file to memory
800            elif isinstance(input_file, str) and output_file is None:
801                # API function declaration
802                self.library.GWFileToMemoryProtectAndImport.argtypes = [
803                    ct.c_wchar_p,
804                    ct.POINTER(ct.c_void_p),
805                    ct.POINTER(ct.c_size_t)
806                ]
807
808                # Variable initialisation
809                ct_input_file = ct.c_wchar_p(input_file)
810                ct_output_buffer = ct.c_void_p(0)
811                ct_output_size = ct.c_size_t(0)
812
813                # API call
814                status = self.library.GWFileToMemoryProtectAndImport(
815                    ct_input_file,
816                    ct.byref(ct_output_buffer),
817                    ct.byref(ct_output_size)
818                )
819
820            # memory to memory and memory to file
821            elif isinstance(input_file, bytes):
822                # API function declaration
823                self.library.GWMemoryToMemoryProtectAndImport.argtypes = [
824                    ct.c_void_p,
825                    ct.c_size_t,
826                    ct.POINTER(ct.c_void_p),
827                    ct.POINTER(ct.c_size_t)
828                ]
829
830                # Variable initialization
831                bytearray_buffer = bytearray(input_file)
832                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
833                ct_input_size = ct.c_size_t(len(input_file))
834                ct_output_buffer = ct.c_void_p(0)
835                ct_output_size = ct.c_size_t(0)
836
837                status = self.library.GWMemoryToMemoryProtectAndImport(
838                    ct_input_buffer,
839                    ct_input_size,
840                    ct.byref(ct_output_buffer),
841                    ct.byref(ct_output_size)
842                )
843
844            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
845            if status not in successes.success_codes:
846                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
847                if raise_unsupported:
848                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
849                else:
850                    file_bytes = None
851            else:
852                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
853                if isinstance(input_file, str) and isinstance(output_file, str):
854                    # file to file, read the bytes of the file that Rebuild has already written
855                    if not os.path.isfile(output_file):
856                        log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
857                        file_bytes = None
858                    else:
859                        with open(output_file, "rb") as f:
860                            file_bytes = f.read()
861                else:
862                    # file to memory, memory to memory
863                    file_bytes = utils.buffer_to_bytes(
864                        ct_output_buffer,
865                        ct_output_size
866                    )
867                    if isinstance(output_file, str):
868                        # memory to file
869                        # no Rebuild function exists for memory to file, write the memory to file ourselves
870                        with open(output_file, "wb") as f:
871                            f.write(file_bytes)
872
873            return file_bytes
874
875    def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
876        """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced.
877        The constructed files are written to output_directory maintaining the same directory structure as input_directory.
878
879        Args:
880            input_directory (str): The input directory containing files to import.
881            output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files.
882            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
883            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
884
885        Returns:
886            import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
887        """
888        import_files_dict = {}
889        # Call import_file on each file in input_directory to output_directory
890        for input_file in utils.list_file_paths(input_directory):
891            relative_path = os.path.relpath(input_file, input_directory)
892            # Remove .zip extension from relative_path
893            relative_path = os.path.splitext(relative_path)[0]
894            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
895
896            import_bytes = self.import_file(
897                input_file=input_file,
898                output_file=output_file,
899                raise_unsupported=raise_unsupported,
900                content_management_policy=content_management_policy,
901            )
902
903            import_files_dict[relative_path] = import_bytes
904
905        return import_files_dict
906
907    def GWFileErrorMsg(self):
908        """ Retrieve the Glasswall Process error message.
909
910        Returns:
911            error_message (str): The Glasswall Process error message.
912        """
913        # Declare the return type
914        self.library.GWFileErrorMsg.restype = ct.c_wchar_p
915
916        # API call
917        error_message = self.library.GWFileErrorMsg()
918
919        return error_message
920
921    def _GWFileToFileAnalysisAndProtect(self, input_file: str, file_type: str, output_file: str, output_analysis_report: str):
922        """ This function Manages the specified file and carries out an Analysis Audit, saving the results to the specified file locations.
923
924        Args:
925            input_file (str): The input file path or bytes.
926            output_file (str): The output file path where the protected file will be written.
927            output_analysis_report (str): The output file path where the analysis report will be written.
928
929        Returns:
930            status (int): The result of the Glasswall API call.
931        """
932        # API function declaration
933        self.library.GWFileToFileAnalysisAndProtect.argtypes = [
934            ct.c_wchar_p,  # wchar_t * inputFilePathName
935            ct.c_wchar_p,  # wchar_t* wcType
936            ct.c_wchar_p,  # wchar_t * outputFilePathName
937            ct.c_wchar_p  # wchar_t * analysisFilePathName
938        ]
939
940        # Variable initialisation
941        ct_input_file = ct.c_wchar_p(input_file)
942        ct_file_type = ct.c_wchar_p(file_type)
943        ct_output_file = ct.c_wchar_p(output_file)
944        ct_output_analysis_report = ct.c_wchar_p(output_analysis_report)
945
946        # API call
947        status = self.library.GWFileToFileAnalysisAndProtect(
948            ct_input_file,
949            ct_file_type,
950            ct_output_file,
951            ct_output_analysis_report
952        )
953
954        return status
955
956    def _GWFileAnalysisAndProtect(self, input_file: str, file_type: str):
957        """ This function Manages the specified file and carries out an Analysis Audit, returning both outputs to the specified memory locations.
958
959        Args:
960            input_file (str): The input file path or bytes.
961
962        Returns:
963            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes  'input_file', 'file_type', 'output_file_buffer', 'output_file_buffer_length', 'output_report_buffer', 'output_report_buffer_length', 'output_file', 'analysis_file'.
964        """
965        # API function declaration
966        self.library.GWFileAnalysisAndProtect.argtypes = [
967            ct.c_wchar_p,  # wchar_t * inputFilePathName
968            ct.c_wchar_p,  # wchar_t* wcType
969            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
970            ct.POINTER(ct.c_size_t),  # size_t *outputLength
971            ct.POINTER(ct.c_void_p),  # void **analysisFileBuffer
972            ct.POINTER(ct.c_size_t)  # size_t *analysisFileBufferLength
973        ]
974
975        # Variable initialisation
976        gw_return_object = glasswall.GwReturnObj()
977        gw_return_object.input_file = ct.c_wchar_p(input_file)
978        gw_return_object.file_type = ct.c_wchar_p(file_type)
979        gw_return_object.output_file_buffer = ct.c_void_p(0)
980        gw_return_object.output_file_buffer_length = ct.c_size_t(0)
981        gw_return_object.output_report_buffer = ct.c_void_p(0)
982        gw_return_object.output_report_buffer_length = ct.c_size_t(0)
983
984        # API call
985        gw_return_object.status = self.library.GWFileAnalysisAndProtect(
986            gw_return_object.input_file,
987            gw_return_object.file_type,
988            ct.byref(gw_return_object.output_file_buffer),
989            ct.byref(gw_return_object.output_file_buffer_length),
990            ct.byref(gw_return_object.output_report_buffer),
991            ct.byref(gw_return_object.output_report_buffer_length)
992        )
993
994        gw_return_object.output_file = ct.string_at(gw_return_object.output_file_buffer, gw_return_object.output_file_buffer_length.value)
995        gw_return_object.analysis_file = ct.string_at(gw_return_object.output_report_buffer, gw_return_object.output_report_buffer_length.value)
996
997        return gw_return_object
class Rebuild(glasswall.libraries.library.Library):
  17class Rebuild(Library):
  18    """ A high level Python wrapper for Glasswall Rebuild / Classic. """
  19
  20    def __init__(self, library_path: str):
  21        super().__init__(library_path=library_path)
  22        self.library = self.load_library(os.path.abspath(library_path))
  23
  24        # Set content management configuration to default
  25        self.set_content_management_policy(input_file=None)
  26
  27        # Validate killswitch has not activated
  28        self.validate_licence()
  29
  30        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
  31
  32    def validate_licence(self):
  33        """ Validates the licence of the library by attempting to call protect_file on a known supported file.
  34
  35        Raises:
  36            RebuildError: If the licence could not be validated.
  37        """
  38        # Call protect file on a known good bitmap to see if licence has expired
  39        try:
  40            self.protect_file(
  41                input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00",
  42                raise_unsupported=True
  43            )
  44            log.debug(f"{self.__class__.__name__} licence validated successfully.")
  45        except errors.RebuildError:
  46            log.error(f"{self.__class__.__name__} licence validation failed.")
  47            raise
  48
  49    def version(self):
  50        """ Returns the Glasswall library version.
  51
  52        Returns:
  53            version (str): The Glasswall library version.
  54        """
  55
  56        # Declare the return type
  57        self.library.GWFileVersion.restype = ct.c_wchar_p
  58
  59        # API call
  60        version = self.library.GWFileVersion()
  61
  62        return version
  63
  64    def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True):
  65        """ Returns an int representing the file type / file format of a file.
  66
  67        Args:
  68            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path.
  69            as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False.
  70            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
  71
  72        Returns:
  73            file_type (Union[int, str]): The file format.
  74        """
  75
  76        if isinstance(input_file, str):
  77            if not os.path.isfile(input_file):
  78                raise FileNotFoundError(input_file)
  79
  80            self.library.GWDetermineFileTypeFromFile.argtypes = [ct.c_wchar_p]
  81            self.library.GWDetermineFileTypeFromFile.restype = ct.c_int
  82
  83            # convert to ct.c_wchar_p
  84            ct_input_file = ct.c_wchar_p(input_file)
  85
  86            # API call
  87            file_type = self.library.GWDetermineFileTypeFromFile(ct_input_file)
  88
  89        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
  90            self.library.GWDetermineFileTypeFromFileInMem.argtypes = [ct.c_char_p, ct.c_size_t]
  91            self.library.GWDetermineFileTypeFromFileInMem.restype = ct.c_int
  92
  93            # convert to bytes
  94            bytes_input_file = utils.as_bytes(input_file)
  95
  96            # ctypes conversion
  97            ct_input_buffer = ct.c_char_p(bytes_input_file)
  98            ct_butter_length = ct.c_size_t(len(bytes_input_file))
  99
 100            # API call
 101            file_type = self.library.GWDetermineFileTypeFromFileInMem(
 102                ct_input_buffer,
 103                ct_butter_length
 104            )
 105
 106        file_type_as_string = dft.file_type_int_to_str(file_type)
 107        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 108
 109        if not dft.is_success(file_type):
 110            if raise_unsupported:
 111                log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
 112                raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
 113            else:
 114                log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
 115        else:
 116            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
 117
 118        if as_string:
 119            return file_type_as_string
 120
 121        return file_type
 122
 123    def get_content_management_policy(self):
 124        """ Gets the current content management configuration.
 125
 126        Returns:
 127            xml_string (str): The XML string of the current content management configuration.
 128        """
 129
 130        # Declare argument types
 131        self.library.GWFileConfigGet.argtypes = [
 132            ct.POINTER(ct.POINTER(ct.c_wchar)),
 133            ct.POINTER(ct.c_size_t)
 134        ]
 135
 136        # Variable initialisation
 137        ct_input_buffer = ct.POINTER(ct.c_wchar)()
 138        ct_input_size = ct.c_size_t(0)
 139
 140        # API call
 141        status = self.library.GWFileConfigGet(
 142            ct.byref(ct_input_buffer),
 143            ct.byref(ct_input_size)
 144        )
 145
 146        if status not in successes.success_codes:
 147            log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
 148            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 149        else:
 150            log.debug(f"\n\tstatus: {status}")
 151
 152        # As string
 153        xml_string = utils.validate_xml(ct.wstring_at(ct_input_buffer))
 154
 155        return xml_string
 156
 157    def set_content_management_policy(self, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None):
 158        """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.
 159
 160        Args:
 161            input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 162
 163        Returns:
 164            status (int): The result of the Glasswall API call.
 165        """
 166        # Validate type
 167        if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 168            raise TypeError(input_file)
 169
 170        # self.library.GWFileConfigRevertToDefaults doesn't work, load default instead
 171        # Set input_file to default if input_file is None
 172        if input_file is None:
 173            input_file = glasswall.content_management.policies.Rebuild(default="sanitise")
 174
 175        # Validate xml content is parsable
 176        xml_string = utils.validate_xml(input_file)
 177
 178        # Declare argument types
 179        self.library.GWFileConfigXML.argtypes = [ct.c_wchar_p]
 180
 181        # API call
 182        status = self.library.GWFileConfigXML(
 183            ct.c_wchar_p(xml_string)
 184        )
 185
 186        if status not in successes.success_codes:
 187            log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
 188            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 189        else:
 190            log.debug(f"\n\tstatus: {status}")
 191
 192        return status
 193
 194    def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 195        """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.
 196
 197        Args:
 198            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 199            output_file (Union[None, str], optional): The output file path where the protected file will be written.
 200            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
 201            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 202
 203        Returns:
 204            file_bytes (bytes): The protected file bytes.
 205        """
 206        # Validate arg types
 207        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 208            raise TypeError(input_file)
 209        if not isinstance(output_file, (type(None), str)):
 210            raise TypeError(output_file)
 211        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 212            raise TypeError(content_management_policy)
 213        if not isinstance(raise_unsupported, bool):
 214            raise TypeError(raise_unsupported)
 215
 216        # Convert string path arguments to absolute paths
 217        if isinstance(input_file, str):
 218            if not os.path.isfile(input_file):
 219                raise FileNotFoundError(input_file)
 220            input_file = os.path.abspath(input_file)
 221        if isinstance(output_file, str):
 222            output_file = os.path.abspath(output_file)
 223            # make directories that do not exist
 224            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 225        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 226            content_management_policy = os.path.abspath(content_management_policy)
 227
 228        # Convert memory inputs to bytes
 229        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 230            input_file = utils.as_bytes(input_file)
 231
 232        # Check that file type is supported
 233        try:
 234            file_type = self.determine_file_type(input_file=input_file)
 235        except dft.errors.FileTypeEnumError:
 236            if raise_unsupported:
 237                raise
 238            else:
 239                return None
 240
 241        with utils.CwdHandler(self.library_path):
 242            # Set content management policy
 243            self.set_content_management_policy(content_management_policy)
 244
 245            # file to file
 246            if isinstance(input_file, str) and isinstance(output_file, str):
 247                # API function declaration
 248                self.library.GWFileToFileProtect.argtypes = [
 249                    ct.c_wchar_p,
 250                    ct.c_wchar_p,
 251                    ct.c_wchar_p
 252                ]
 253
 254                # Variable initialisation
 255                ct_input_file = ct.c_wchar_p(input_file)
 256                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
 257                ct_output_file = ct.c_wchar_p(output_file)
 258
 259                # API call
 260                status = self.library.GWFileToFileProtect(
 261                    ct_input_file,
 262                    ct_file_type,
 263                    ct_output_file
 264                )
 265
 266            # file to memory
 267            elif isinstance(input_file, str) and output_file is None:
 268                # API function declaration
 269                self.library.GWFileProtect.argtypes = [
 270                    ct.c_wchar_p,
 271                    ct.c_wchar_p,
 272                    ct.POINTER(ct.c_void_p),
 273                    ct.POINTER(ct.c_size_t)
 274                ]
 275
 276                # Variable initialisation
 277                ct_input_file = ct.c_wchar_p(input_file)
 278                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
 279                ct_output_buffer = ct.c_void_p(0)
 280                ct_output_size = ct.c_size_t(0)
 281
 282                # API call
 283                status = self.library.GWFileProtect(
 284                    ct_input_file,
 285                    ct_file_type,
 286                    ct.byref(ct_output_buffer),
 287                    ct.byref(ct_output_size)
 288                )
 289
 290            # memory to memory and memory to file
 291            elif isinstance(input_file, bytes):
 292                # API function declaration
 293                self.library.GWMemoryToMemoryProtect.argtypes = [
 294                    ct.c_void_p,
 295                    ct.c_size_t,
 296                    ct.c_wchar_p,
 297                    ct.POINTER(ct.c_void_p),
 298                    ct.POINTER(ct.c_size_t)
 299                ]
 300
 301                # Variable initialization
 302                bytearray_buffer = bytearray(input_file)
 303                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
 304                ct_input_size = ct.c_size_t(len(input_file))
 305                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
 306                ct_output_buffer = ct.c_void_p(0)
 307                ct_output_size = ct.c_size_t(0)
 308
 309                status = self.library.GWMemoryToMemoryProtect(
 310                    ct_input_buffer,
 311                    ct_input_size,
 312                    ct_file_type,
 313                    ct.byref(ct_output_buffer),
 314                    ct.byref(ct_output_size)
 315                )
 316
 317            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 318            if status not in successes.success_codes:
 319                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
 320                if raise_unsupported:
 321                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 322                else:
 323                    file_bytes = None
 324            else:
 325                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
 326                if isinstance(input_file, str) and isinstance(output_file, str):
 327                    # file to file, read the bytes of the file that Rebuild has already written
 328                    if not os.path.isfile(output_file):
 329                        log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
 330                        file_bytes = None
 331                    else:
 332                        with open(output_file, "rb") as f:
 333                            file_bytes = f.read()
 334                else:
 335                    # file to memory, memory to memory
 336                    file_bytes = utils.buffer_to_bytes(
 337                        ct_output_buffer,
 338                        ct_output_size
 339                    )
 340                    if isinstance(output_file, str):
 341                        # memory to file
 342                        # no Rebuild function exists for memory to file, write the memory to file ourselves
 343                        with open(output_file, "wb") as f:
 344                            f.write(file_bytes)
 345
 346            return file_bytes
 347
 348    def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 349        """ Recursively processes all files in a directory in protect mode using the given content management policy.
 350        The protected files are written to output_directory maintaining the same directory structure as input_directory.
 351
 352        Args:
 353            input_directory (str): The input directory containing files to protect.
 354            output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files.
 355            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 356            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 357
 358        Returns:
 359            protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
 360        """
 361        protected_files_dict = {}
 362        # Call protect_file on each file in input_directory to output_directory
 363        for input_file in utils.list_file_paths(input_directory):
 364            relative_path = os.path.relpath(input_file, input_directory)
 365            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
 366
 367            protected_bytes = self.protect_file(
 368                input_file=input_file,
 369                output_file=output_file,
 370                raise_unsupported=raise_unsupported,
 371                content_management_policy=content_management_policy,
 372            )
 373
 374            protected_files_dict[relative_path] = protected_bytes
 375
 376        return protected_files_dict
 377
 378    def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 379        """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.
 380
 381        Args:
 382            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 383            output_file (Union[None, str], optional): The output file path where the analysis file will be written.
 384            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
 385            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 386
 387        Returns:
 388            file_bytes (bytes): The analysis file bytes.
 389        """
 390        # Validate arg types
 391        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 392            raise TypeError(input_file)
 393        if not isinstance(output_file, (type(None), str)):
 394            raise TypeError(output_file)
 395        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 396            raise TypeError(content_management_policy)
 397        if not isinstance(raise_unsupported, bool):
 398            raise TypeError(raise_unsupported)
 399
 400        # Convert string path arguments to absolute paths
 401        if isinstance(input_file, str):
 402            if not os.path.isfile(input_file):
 403                raise FileNotFoundError(input_file)
 404            input_file = os.path.abspath(input_file)
 405        if isinstance(output_file, str):
 406            output_file = os.path.abspath(output_file)
 407            # make directories that do not exist
 408            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 409        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 410            content_management_policy = os.path.abspath(content_management_policy)
 411
 412        # Convert memory inputs to bytes
 413        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 414            input_file = utils.as_bytes(input_file)
 415
 416        # Check that file type is supported
 417        try:
 418            file_type = self.determine_file_type(input_file=input_file)
 419        except dft.errors.FileTypeEnumError:
 420            if raise_unsupported:
 421                raise
 422            else:
 423                return None
 424
 425        with utils.CwdHandler(self.library_path):
 426            # Set content management policy
 427            self.set_content_management_policy(content_management_policy)
 428
 429            # file to file
 430            if isinstance(input_file, str) and isinstance(output_file, str):
 431                # API function declaration
 432                self.library.GWFileToFileAnalysisAudit.argtypes = [
 433                    ct.c_wchar_p,
 434                    ct.c_wchar_p,
 435                    ct.c_wchar_p
 436                ]
 437
 438                # Variable initialisation
 439                ct_input_file = ct.c_wchar_p(input_file)
 440                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
 441                ct_output_file = ct.c_wchar_p(output_file)
 442
 443                # API call
 444                status = self.library.GWFileToFileAnalysisAudit(
 445                    ct_input_file,
 446                    ct_file_type,
 447                    ct_output_file
 448                )
 449
 450            # file to memory
 451            elif isinstance(input_file, str) and output_file is None:
 452                # API function declaration
 453                self.library.GWFileAnalysisAudit.argtypes = [
 454                    ct.c_wchar_p,
 455                    ct.c_wchar_p,
 456                    ct.POINTER(ct.c_void_p),
 457                    ct.POINTER(ct.c_size_t)
 458                ]
 459
 460                # Variable initialisation
 461                ct_input_file = ct.c_wchar_p(input_file)
 462                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
 463                ct_output_buffer = ct.c_void_p(0)
 464                ct_output_size = ct.c_size_t(0)
 465
 466                # API call
 467                status = self.library.GWFileAnalysisAudit(
 468                    ct_input_file,
 469                    ct_file_type,
 470                    ct.byref(ct_output_buffer),
 471                    ct.byref(ct_output_size)
 472                )
 473
 474            # memory to memory and memory to file
 475            elif isinstance(input_file, bytes):
 476                # API function declaration
 477                self.library.GWMemoryToMemoryAnalysisAudit.argtypes = [
 478                    ct.c_void_p,
 479                    ct.c_size_t,
 480                    ct.c_wchar_p,
 481                    ct.POINTER(ct.c_void_p),
 482                    ct.POINTER(ct.c_size_t)
 483                ]
 484
 485                # Variable initialization
 486                bytearray_buffer = bytearray(input_file)
 487                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
 488                ct_input_size = ct.c_size_t(len(input_file))
 489                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
 490                ct_output_buffer = ct.c_void_p(0)
 491                ct_output_size = ct.c_size_t(0)
 492
 493                status = self.library.GWMemoryToMemoryAnalysisAudit(
 494                    ct.byref(ct_input_buffer),
 495                    ct_input_size,
 496                    ct_file_type,
 497                    ct.byref(ct_output_buffer),
 498                    ct.byref(ct_output_size)
 499                )
 500
 501            file_bytes = None
 502            if isinstance(input_file, str) and isinstance(output_file, str):
 503                # file to file, read the bytes of the file that Rebuild has already written
 504                if os.path.isfile(output_file):
 505                    with open(output_file, "rb") as f:
 506                        file_bytes = f.read()
 507            else:
 508                # file to memory, memory to memory
 509                if ct_output_buffer and ct_output_size:
 510                    file_bytes = utils.buffer_to_bytes(
 511                        ct_output_buffer,
 512                        ct_output_size
 513                    )
 514                    if isinstance(output_file, str):
 515                        # memory to file
 516                        # no Rebuild function exists for memory to file, write the memory to file ourselves
 517                        with open(output_file, "wb") as f:
 518                            f.write(file_bytes)
 519
 520            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 521            if status not in successes.success_codes:
 522                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
 523                if raise_unsupported:
 524                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 525            else:
 526                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
 527
 528            return file_bytes
 529
 530    def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 531        """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.
 532
 533        Args:
 534            input_directory (str): The input directory containing files to analyse.
 535            output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files.
 536            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 537            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 538
 539        Returns:
 540            analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
 541        """
 542        analysis_files_dict = {}
 543        # Call analyse_file on each file in input_directory to output_directory
 544        for input_file in utils.list_file_paths(input_directory):
 545            relative_path = os.path.relpath(input_file, input_directory) + ".xml"
 546            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
 547
 548            analysis_bytes = self.analyse_file(
 549                input_file=input_file,
 550                output_file=output_file,
 551                raise_unsupported=raise_unsupported,
 552                content_management_policy=content_management_policy,
 553            )
 554
 555            analysis_files_dict[relative_path] = analysis_bytes
 556
 557        return analysis_files_dict
 558
 559    def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 560        """ Export a file, returning the .zip file bytes. The .zip file is written to output_file.
 561
 562        Args:
 563            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 564            output_file (Union[None, str], optional): The output file path where the .zip file will be written.
 565            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
 566            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 567
 568        Returns:
 569            file_bytes (bytes): The exported .zip file.
 570        """
 571        # Validate arg types
 572        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 573            raise TypeError(input_file)
 574        if not isinstance(output_file, (type(None), str)):
 575            raise TypeError(output_file)
 576        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 577            raise TypeError(content_management_policy)
 578        if not isinstance(raise_unsupported, bool):
 579            raise TypeError(raise_unsupported)
 580
 581        # Convert string path arguments to absolute paths
 582        if isinstance(input_file, str):
 583            if not os.path.isfile(input_file):
 584                raise FileNotFoundError(input_file)
 585            input_file = os.path.abspath(input_file)
 586        if isinstance(output_file, str):
 587            output_file = os.path.abspath(output_file)
 588            # make directories that do not exist
 589            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 590        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 591            content_management_policy = os.path.abspath(content_management_policy)
 592
 593        # Convert memory inputs to bytes
 594        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 595            input_file = utils.as_bytes(input_file)
 596
 597        # Check that file type is supported
 598        try:
 599            self.determine_file_type(input_file=input_file)
 600        except dft.errors.FileTypeEnumError:
 601            if raise_unsupported:
 602                raise
 603            else:
 604                return None
 605
 606        with utils.CwdHandler(self.library_path):
 607            # Set content management policy
 608            self.set_content_management_policy(content_management_policy)
 609
 610            # file to file
 611            if isinstance(input_file, str) and isinstance(output_file, str):
 612                # API function declaration
 613                self.library.GWFileToFileAnalysisProtectAndExport.argtypes = [
 614                    ct.c_wchar_p,
 615                    ct.c_wchar_p
 616                ]
 617
 618                # Variable initialisation
 619                ct_input_file = ct.c_wchar_p(input_file)
 620                ct_output_file = ct.c_wchar_p(output_file)
 621
 622                # API call
 623                status = self.library.GWFileToFileAnalysisProtectAndExport(
 624                    ct_input_file,
 625                    ct_output_file
 626                )
 627
 628            # file to memory
 629            elif isinstance(input_file, str) and output_file is None:
 630                # API function declaration
 631                self.library.GWFileToMemoryAnalysisProtectAndExport.argtypes = [
 632                    ct.c_wchar_p,
 633                    ct.POINTER(ct.c_void_p),
 634                    ct.POINTER(ct.c_size_t)
 635                ]
 636
 637                # Variable initialisation
 638                ct_input_file = ct.c_wchar_p(input_file)
 639                ct_output_buffer = ct.c_void_p(0)
 640                ct_output_size = ct.c_size_t(0)
 641
 642                # API call
 643                status = self.library.GWFileToMemoryAnalysisProtectAndExport(
 644                    ct_input_file,
 645                    ct.byref(ct_output_buffer),
 646                    ct.byref(ct_output_size)
 647                )
 648
 649            # memory to memory and memory to file
 650            elif isinstance(input_file, bytes):
 651                # API function declaration
 652                self.library.GWMemoryToMemoryAnalysisProtectAndExport.argtypes = [
 653                    ct.c_void_p,
 654                    ct.c_size_t,
 655                    ct.POINTER(ct.c_void_p),
 656                    ct.POINTER(ct.c_size_t)
 657                ]
 658
 659                # Variable initialization
 660                bytearray_buffer = bytearray(input_file)
 661                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
 662                ct_input_size = ct.c_size_t(len(input_file))
 663                ct_output_buffer = ct.c_void_p(0)
 664                ct_output_size = ct.c_size_t(0)
 665
 666                status = self.library.GWMemoryToMemoryAnalysisProtectAndExport(
 667                    ct_input_buffer,
 668                    ct_input_size,
 669                    ct.byref(ct_output_buffer),
 670                    ct.byref(ct_output_size)
 671                )
 672
 673            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 674            if status not in successes.success_codes:
 675                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
 676                if raise_unsupported:
 677                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 678                else:
 679                    file_bytes = None
 680            else:
 681                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
 682                if isinstance(input_file, str) and isinstance(output_file, str):
 683                    # file to file, read the bytes of the file that Rebuild has already written
 684                    if not os.path.isfile(output_file):
 685                        log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
 686                        file_bytes = None
 687                    else:
 688                        with open(output_file, "rb") as f:
 689                            file_bytes = f.read()
 690                else:
 691                    # file to memory, memory to memory
 692                    file_bytes = utils.buffer_to_bytes(
 693                        ct_output_buffer,
 694                        ct_output_size
 695                    )
 696                    if isinstance(output_file, str):
 697                        # memory to file
 698                        # no Rebuild function exists for memory to file, write the memory to file ourselves
 699                        with open(output_file, "wb") as f:
 700                            f.write(file_bytes)
 701
 702            return file_bytes
 703
 704    def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 705        """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.
 706
 707        Args:
 708            input_directory (str): The input directory containing files to export.
 709            output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files.
 710            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 711            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 712
 713        Returns:
 714            export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
 715        """
 716        export_files_dict = {}
 717        # Call export_file on each file in input_directory to output_directory
 718        for input_file in utils.list_file_paths(input_directory):
 719            relative_path = os.path.relpath(input_file, input_directory) + ".zip"
 720            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
 721
 722            export_bytes = self.export_file(
 723                input_file=input_file,
 724                output_file=output_file,
 725                raise_unsupported=raise_unsupported,
 726                content_management_policy=content_management_policy,
 727            )
 728
 729            export_files_dict[relative_path] = export_bytes
 730
 731        return export_files_dict
 732
 733    def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 734        """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.
 735
 736        Args:
 737            input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes.
 738            output_file (Union[None, str], optional): The output file path where the constructed file will be written.
 739            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
 740            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 741
 742        Returns:
 743            file_bytes (bytes): The imported file bytes.
 744        """
 745        # Validate arg types
 746        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 747            raise TypeError(input_file)
 748        if not isinstance(output_file, (type(None), str)):
 749            raise TypeError(output_file)
 750        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 751            raise TypeError(content_management_policy)
 752        if not isinstance(raise_unsupported, bool):
 753            raise TypeError(raise_unsupported)
 754
 755        # Convert string path arguments to absolute paths
 756        if isinstance(input_file, str):
 757            if not os.path.isfile(input_file):
 758                raise FileNotFoundError(input_file)
 759            input_file = os.path.abspath(input_file)
 760        if isinstance(output_file, str):
 761            output_file = os.path.abspath(output_file)
 762            # make directories that do not exist
 763            os.makedirs(os.path.dirname(output_file), exist_ok=True)
 764        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 765            content_management_policy = os.path.abspath(content_management_policy)
 766
 767        # Convert memory inputs to bytes
 768        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 769            input_file = utils.as_bytes(input_file)
 770
 771        # Check that file type is supported
 772        try:
 773            self.determine_file_type(input_file=input_file)
 774        except dft.errors.FileTypeEnumError:
 775            if raise_unsupported:
 776                raise
 777            else:
 778                return None
 779
 780        with utils.CwdHandler(self.library_path):
 781            # Set content management policy
 782            self.set_content_management_policy(content_management_policy)
 783
 784            # file to file
 785            if isinstance(input_file, str) and isinstance(output_file, str):
 786                # API function declaration
 787                self.library.GWFileToFileProtectAndImport.argtypes = [
 788                    ct.c_wchar_p,
 789                    ct.c_wchar_p
 790                ]
 791
 792                # Variable initialisation
 793                ct_input_file = ct.c_wchar_p(input_file)
 794                ct_output_file = ct.c_wchar_p(output_file)
 795
 796                # API call
 797                status = self.library.GWFileToFileProtectAndImport(
 798                    ct_input_file,
 799                    ct_output_file
 800                )
 801
 802            # file to memory
 803            elif isinstance(input_file, str) and output_file is None:
 804                # API function declaration
 805                self.library.GWFileToMemoryProtectAndImport.argtypes = [
 806                    ct.c_wchar_p,
 807                    ct.POINTER(ct.c_void_p),
 808                    ct.POINTER(ct.c_size_t)
 809                ]
 810
 811                # Variable initialisation
 812                ct_input_file = ct.c_wchar_p(input_file)
 813                ct_output_buffer = ct.c_void_p(0)
 814                ct_output_size = ct.c_size_t(0)
 815
 816                # API call
 817                status = self.library.GWFileToMemoryProtectAndImport(
 818                    ct_input_file,
 819                    ct.byref(ct_output_buffer),
 820                    ct.byref(ct_output_size)
 821                )
 822
 823            # memory to memory and memory to file
 824            elif isinstance(input_file, bytes):
 825                # API function declaration
 826                self.library.GWMemoryToMemoryProtectAndImport.argtypes = [
 827                    ct.c_void_p,
 828                    ct.c_size_t,
 829                    ct.POINTER(ct.c_void_p),
 830                    ct.POINTER(ct.c_size_t)
 831                ]
 832
 833                # Variable initialization
 834                bytearray_buffer = bytearray(input_file)
 835                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
 836                ct_input_size = ct.c_size_t(len(input_file))
 837                ct_output_buffer = ct.c_void_p(0)
 838                ct_output_size = ct.c_size_t(0)
 839
 840                status = self.library.GWMemoryToMemoryProtectAndImport(
 841                    ct_input_buffer,
 842                    ct_input_size,
 843                    ct.byref(ct_output_buffer),
 844                    ct.byref(ct_output_size)
 845                )
 846
 847            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
 848            if status not in successes.success_codes:
 849                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
 850                if raise_unsupported:
 851                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 852                else:
 853                    file_bytes = None
 854            else:
 855                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
 856                if isinstance(input_file, str) and isinstance(output_file, str):
 857                    # file to file, read the bytes of the file that Rebuild has already written
 858                    if not os.path.isfile(output_file):
 859                        log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
 860                        file_bytes = None
 861                    else:
 862                        with open(output_file, "rb") as f:
 863                            file_bytes = f.read()
 864                else:
 865                    # file to memory, memory to memory
 866                    file_bytes = utils.buffer_to_bytes(
 867                        ct_output_buffer,
 868                        ct_output_size
 869                    )
 870                    if isinstance(output_file, str):
 871                        # memory to file
 872                        # no Rebuild function exists for memory to file, write the memory to file ourselves
 873                        with open(output_file, "wb") as f:
 874                            f.write(file_bytes)
 875
 876            return file_bytes
 877
 878    def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
 879        """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced.
 880        The constructed files are written to output_directory maintaining the same directory structure as input_directory.
 881
 882        Args:
 883            input_directory (str): The input directory containing files to import.
 884            output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files.
 885            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
 886            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 887
 888        Returns:
 889            import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
 890        """
 891        import_files_dict = {}
 892        # Call import_file on each file in input_directory to output_directory
 893        for input_file in utils.list_file_paths(input_directory):
 894            relative_path = os.path.relpath(input_file, input_directory)
 895            # Remove .zip extension from relative_path
 896            relative_path = os.path.splitext(relative_path)[0]
 897            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
 898
 899            import_bytes = self.import_file(
 900                input_file=input_file,
 901                output_file=output_file,
 902                raise_unsupported=raise_unsupported,
 903                content_management_policy=content_management_policy,
 904            )
 905
 906            import_files_dict[relative_path] = import_bytes
 907
 908        return import_files_dict
 909
 910    def GWFileErrorMsg(self):
 911        """ Retrieve the Glasswall Process error message.
 912
 913        Returns:
 914            error_message (str): The Glasswall Process error message.
 915        """
 916        # Declare the return type
 917        self.library.GWFileErrorMsg.restype = ct.c_wchar_p
 918
 919        # API call
 920        error_message = self.library.GWFileErrorMsg()
 921
 922        return error_message
 923
 924    def _GWFileToFileAnalysisAndProtect(self, input_file: str, file_type: str, output_file: str, output_analysis_report: str):
 925        """ This function Manages the specified file and carries out an Analysis Audit, saving the results to the specified file locations.
 926
 927        Args:
 928            input_file (str): The input file path or bytes.
 929            output_file (str): The output file path where the protected file will be written.
 930            output_analysis_report (str): The output file path where the analysis report will be written.
 931
 932        Returns:
 933            status (int): The result of the Glasswall API call.
 934        """
 935        # API function declaration
 936        self.library.GWFileToFileAnalysisAndProtect.argtypes = [
 937            ct.c_wchar_p,  # wchar_t * inputFilePathName
 938            ct.c_wchar_p,  # wchar_t* wcType
 939            ct.c_wchar_p,  # wchar_t * outputFilePathName
 940            ct.c_wchar_p  # wchar_t * analysisFilePathName
 941        ]
 942
 943        # Variable initialisation
 944        ct_input_file = ct.c_wchar_p(input_file)
 945        ct_file_type = ct.c_wchar_p(file_type)
 946        ct_output_file = ct.c_wchar_p(output_file)
 947        ct_output_analysis_report = ct.c_wchar_p(output_analysis_report)
 948
 949        # API call
 950        status = self.library.GWFileToFileAnalysisAndProtect(
 951            ct_input_file,
 952            ct_file_type,
 953            ct_output_file,
 954            ct_output_analysis_report
 955        )
 956
 957        return status
 958
 959    def _GWFileAnalysisAndProtect(self, input_file: str, file_type: str):
 960        """ This function Manages the specified file and carries out an Analysis Audit, returning both outputs to the specified memory locations.
 961
 962        Args:
 963            input_file (str): The input file path or bytes.
 964
 965        Returns:
 966            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attributes  'input_file', 'file_type', 'output_file_buffer', 'output_file_buffer_length', 'output_report_buffer', 'output_report_buffer_length', 'output_file', 'analysis_file'.
 967        """
 968        # API function declaration
 969        self.library.GWFileAnalysisAndProtect.argtypes = [
 970            ct.c_wchar_p,  # wchar_t * inputFilePathName
 971            ct.c_wchar_p,  # wchar_t* wcType
 972            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
 973            ct.POINTER(ct.c_size_t),  # size_t *outputLength
 974            ct.POINTER(ct.c_void_p),  # void **analysisFileBuffer
 975            ct.POINTER(ct.c_size_t)  # size_t *analysisFileBufferLength
 976        ]
 977
 978        # Variable initialisation
 979        gw_return_object = glasswall.GwReturnObj()
 980        gw_return_object.input_file = ct.c_wchar_p(input_file)
 981        gw_return_object.file_type = ct.c_wchar_p(file_type)
 982        gw_return_object.output_file_buffer = ct.c_void_p(0)
 983        gw_return_object.output_file_buffer_length = ct.c_size_t(0)
 984        gw_return_object.output_report_buffer = ct.c_void_p(0)
 985        gw_return_object.output_report_buffer_length = ct.c_size_t(0)
 986
 987        # API call
 988        gw_return_object.status = self.library.GWFileAnalysisAndProtect(
 989            gw_return_object.input_file,
 990            gw_return_object.file_type,
 991            ct.byref(gw_return_object.output_file_buffer),
 992            ct.byref(gw_return_object.output_file_buffer_length),
 993            ct.byref(gw_return_object.output_report_buffer),
 994            ct.byref(gw_return_object.output_report_buffer_length)
 995        )
 996
 997        gw_return_object.output_file = ct.string_at(gw_return_object.output_file_buffer, gw_return_object.output_file_buffer_length.value)
 998        gw_return_object.analysis_file = ct.string_at(gw_return_object.output_report_buffer, gw_return_object.output_report_buffer_length.value)
 999
1000        return gw_return_object

A high level Python wrapper for Glasswall Rebuild / Classic.

Rebuild(library_path: str)
20    def __init__(self, library_path: str):
21        super().__init__(library_path=library_path)
22        self.library = self.load_library(os.path.abspath(library_path))
23
24        # Set content management configuration to default
25        self.set_content_management_policy(input_file=None)
26
27        # Validate killswitch has not activated
28        self.validate_licence()
29
30        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
library
def validate_licence(self):
32    def validate_licence(self):
33        """ Validates the licence of the library by attempting to call protect_file on a known supported file.
34
35        Raises:
36            RebuildError: If the licence could not be validated.
37        """
38        # Call protect file on a known good bitmap to see if licence has expired
39        try:
40            self.protect_file(
41                input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00",
42                raise_unsupported=True
43            )
44            log.debug(f"{self.__class__.__name__} licence validated successfully.")
45        except errors.RebuildError:
46            log.error(f"{self.__class__.__name__} licence validation failed.")
47            raise

Validates the licence of the library by attempting to call protect_file on a known supported file.

Raises: RebuildError: If the licence could not be validated.

def version(self):
49    def version(self):
50        """ Returns the Glasswall library version.
51
52        Returns:
53            version (str): The Glasswall library version.
54        """
55
56        # Declare the return type
57        self.library.GWFileVersion.restype = ct.c_wchar_p
58
59        # API call
60        version = self.library.GWFileVersion()
61
62        return version

Returns the Glasswall library version.

Returns: version (str): The Glasswall library version.

def determine_file_type( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], as_string: bool = False, raise_unsupported: bool = True):
 64    def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False, raise_unsupported: bool = True):
 65        """ Returns an int representing the file type / file format of a file.
 66
 67        Args:
 68            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path.
 69            as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False.
 70            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 71
 72        Returns:
 73            file_type (Union[int, str]): The file format.
 74        """
 75
 76        if isinstance(input_file, str):
 77            if not os.path.isfile(input_file):
 78                raise FileNotFoundError(input_file)
 79
 80            self.library.GWDetermineFileTypeFromFile.argtypes = [ct.c_wchar_p]
 81            self.library.GWDetermineFileTypeFromFile.restype = ct.c_int
 82
 83            # convert to ct.c_wchar_p
 84            ct_input_file = ct.c_wchar_p(input_file)
 85
 86            # API call
 87            file_type = self.library.GWDetermineFileTypeFromFile(ct_input_file)
 88
 89        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 90            self.library.GWDetermineFileTypeFromFileInMem.argtypes = [ct.c_char_p, ct.c_size_t]
 91            self.library.GWDetermineFileTypeFromFileInMem.restype = ct.c_int
 92
 93            # convert to bytes
 94            bytes_input_file = utils.as_bytes(input_file)
 95
 96            # ctypes conversion
 97            ct_input_buffer = ct.c_char_p(bytes_input_file)
 98            ct_butter_length = ct.c_size_t(len(bytes_input_file))
 99
100            # API call
101            file_type = self.library.GWDetermineFileTypeFromFileInMem(
102                ct_input_buffer,
103                ct_butter_length
104            )
105
106        file_type_as_string = dft.file_type_int_to_str(file_type)
107        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
108
109        if not dft.is_success(file_type):
110            if raise_unsupported:
111                log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
112                raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
113            else:
114                log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
115        else:
116            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
117
118        if as_string:
119            return file_type_as_string
120
121        return file_type

Returns an int representing the file type / file format of a file.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path. as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_type (Union[int, str]): The file format.

def get_content_management_policy(self):
123    def get_content_management_policy(self):
124        """ Gets the current content management configuration.
125
126        Returns:
127            xml_string (str): The XML string of the current content management configuration.
128        """
129
130        # Declare argument types
131        self.library.GWFileConfigGet.argtypes = [
132            ct.POINTER(ct.POINTER(ct.c_wchar)),
133            ct.POINTER(ct.c_size_t)
134        ]
135
136        # Variable initialisation
137        ct_input_buffer = ct.POINTER(ct.c_wchar)()
138        ct_input_size = ct.c_size_t(0)
139
140        # API call
141        status = self.library.GWFileConfigGet(
142            ct.byref(ct_input_buffer),
143            ct.byref(ct_input_size)
144        )
145
146        if status not in successes.success_codes:
147            log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
148            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
149        else:
150            log.debug(f"\n\tstatus: {status}")
151
152        # As string
153        xml_string = utils.validate_xml(ct.wstring_at(ct_input_buffer))
154
155        return xml_string

Gets the current content management configuration.

Returns: xml_string (str): The XML string of the current content management configuration.

def set_content_management_policy( self, input_file: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None):
157    def set_content_management_policy(self, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None):
158        """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.
159
160        Args:
161            input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
162
163        Returns:
164            status (int): The result of the Glasswall API call.
165        """
166        # Validate type
167        if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
168            raise TypeError(input_file)
169
170        # self.library.GWFileConfigRevertToDefaults doesn't work, load default instead
171        # Set input_file to default if input_file is None
172        if input_file is None:
173            input_file = glasswall.content_management.policies.Rebuild(default="sanitise")
174
175        # Validate xml content is parsable
176        xml_string = utils.validate_xml(input_file)
177
178        # Declare argument types
179        self.library.GWFileConfigXML.argtypes = [ct.c_wchar_p]
180
181        # API call
182        status = self.library.GWFileConfigXML(
183            ct.c_wchar_p(xml_string)
184        )
185
186        if status not in successes.success_codes:
187            log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
188            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
189        else:
190            log.debug(f"\n\tstatus: {status}")
191
192        return status

Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.

Args: input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.

Returns: status (int): The result of the Glasswall API call.

def protect_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
194    def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
195        """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.
196
197        Args:
198            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
199            output_file (Union[None, str], optional): The output file path where the protected file will be written.
200            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
201            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
202
203        Returns:
204            file_bytes (bytes): The protected file bytes.
205        """
206        # Validate arg types
207        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
208            raise TypeError(input_file)
209        if not isinstance(output_file, (type(None), str)):
210            raise TypeError(output_file)
211        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
212            raise TypeError(content_management_policy)
213        if not isinstance(raise_unsupported, bool):
214            raise TypeError(raise_unsupported)
215
216        # Convert string path arguments to absolute paths
217        if isinstance(input_file, str):
218            if not os.path.isfile(input_file):
219                raise FileNotFoundError(input_file)
220            input_file = os.path.abspath(input_file)
221        if isinstance(output_file, str):
222            output_file = os.path.abspath(output_file)
223            # make directories that do not exist
224            os.makedirs(os.path.dirname(output_file), exist_ok=True)
225        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
226            content_management_policy = os.path.abspath(content_management_policy)
227
228        # Convert memory inputs to bytes
229        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
230            input_file = utils.as_bytes(input_file)
231
232        # Check that file type is supported
233        try:
234            file_type = self.determine_file_type(input_file=input_file)
235        except dft.errors.FileTypeEnumError:
236            if raise_unsupported:
237                raise
238            else:
239                return None
240
241        with utils.CwdHandler(self.library_path):
242            # Set content management policy
243            self.set_content_management_policy(content_management_policy)
244
245            # file to file
246            if isinstance(input_file, str) and isinstance(output_file, str):
247                # API function declaration
248                self.library.GWFileToFileProtect.argtypes = [
249                    ct.c_wchar_p,
250                    ct.c_wchar_p,
251                    ct.c_wchar_p
252                ]
253
254                # Variable initialisation
255                ct_input_file = ct.c_wchar_p(input_file)
256                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
257                ct_output_file = ct.c_wchar_p(output_file)
258
259                # API call
260                status = self.library.GWFileToFileProtect(
261                    ct_input_file,
262                    ct_file_type,
263                    ct_output_file
264                )
265
266            # file to memory
267            elif isinstance(input_file, str) and output_file is None:
268                # API function declaration
269                self.library.GWFileProtect.argtypes = [
270                    ct.c_wchar_p,
271                    ct.c_wchar_p,
272                    ct.POINTER(ct.c_void_p),
273                    ct.POINTER(ct.c_size_t)
274                ]
275
276                # Variable initialisation
277                ct_input_file = ct.c_wchar_p(input_file)
278                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
279                ct_output_buffer = ct.c_void_p(0)
280                ct_output_size = ct.c_size_t(0)
281
282                # API call
283                status = self.library.GWFileProtect(
284                    ct_input_file,
285                    ct_file_type,
286                    ct.byref(ct_output_buffer),
287                    ct.byref(ct_output_size)
288                )
289
290            # memory to memory and memory to file
291            elif isinstance(input_file, bytes):
292                # API function declaration
293                self.library.GWMemoryToMemoryProtect.argtypes = [
294                    ct.c_void_p,
295                    ct.c_size_t,
296                    ct.c_wchar_p,
297                    ct.POINTER(ct.c_void_p),
298                    ct.POINTER(ct.c_size_t)
299                ]
300
301                # Variable initialization
302                bytearray_buffer = bytearray(input_file)
303                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
304                ct_input_size = ct.c_size_t(len(input_file))
305                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
306                ct_output_buffer = ct.c_void_p(0)
307                ct_output_size = ct.c_size_t(0)
308
309                status = self.library.GWMemoryToMemoryProtect(
310                    ct_input_buffer,
311                    ct_input_size,
312                    ct_file_type,
313                    ct.byref(ct_output_buffer),
314                    ct.byref(ct_output_size)
315                )
316
317            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
318            if status not in successes.success_codes:
319                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
320                if raise_unsupported:
321                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
322                else:
323                    file_bytes = None
324            else:
325                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
326                if isinstance(input_file, str) and isinstance(output_file, str):
327                    # file to file, read the bytes of the file that Rebuild has already written
328                    if not os.path.isfile(output_file):
329                        log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
330                        file_bytes = None
331                    else:
332                        with open(output_file, "rb") as f:
333                            file_bytes = f.read()
334                else:
335                    # file to memory, memory to memory
336                    file_bytes = utils.buffer_to_bytes(
337                        ct_output_buffer,
338                        ct_output_size
339                    )
340                    if isinstance(output_file, str):
341                        # memory to file
342                        # no Rebuild function exists for memory to file, write the memory to file ourselves
343                        with open(output_file, "wb") as f:
344                            f.write(file_bytes)
345
346            return file_bytes

Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the protected file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_bytes (bytes): The protected file bytes.

def protect_directory( self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
348    def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
349        """ Recursively processes all files in a directory in protect mode using the given content management policy.
350        The protected files are written to output_directory maintaining the same directory structure as input_directory.
351
352        Args:
353            input_directory (str): The input directory containing files to protect.
354            output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files.
355            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
356            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
357
358        Returns:
359            protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
360        """
361        protected_files_dict = {}
362        # Call protect_file on each file in input_directory to output_directory
363        for input_file in utils.list_file_paths(input_directory):
364            relative_path = os.path.relpath(input_file, input_directory)
365            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
366
367            protected_bytes = self.protect_file(
368                input_file=input_file,
369                output_file=output_file,
370                raise_unsupported=raise_unsupported,
371                content_management_policy=content_management_policy,
372            )
373
374            protected_files_dict[relative_path] = protected_bytes
375
376        return protected_files_dict

Recursively processes all files in a directory in protect mode using the given content management policy. The protected files are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing files to protect. output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

def analyse_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
378    def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
379        """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.
380
381        Args:
382            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
383            output_file (Union[None, str], optional): The output file path where the analysis file will be written.
384            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
385            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
386
387        Returns:
388            file_bytes (bytes): The analysis file bytes.
389        """
390        # Validate arg types
391        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
392            raise TypeError(input_file)
393        if not isinstance(output_file, (type(None), str)):
394            raise TypeError(output_file)
395        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
396            raise TypeError(content_management_policy)
397        if not isinstance(raise_unsupported, bool):
398            raise TypeError(raise_unsupported)
399
400        # Convert string path arguments to absolute paths
401        if isinstance(input_file, str):
402            if not os.path.isfile(input_file):
403                raise FileNotFoundError(input_file)
404            input_file = os.path.abspath(input_file)
405        if isinstance(output_file, str):
406            output_file = os.path.abspath(output_file)
407            # make directories that do not exist
408            os.makedirs(os.path.dirname(output_file), exist_ok=True)
409        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
410            content_management_policy = os.path.abspath(content_management_policy)
411
412        # Convert memory inputs to bytes
413        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
414            input_file = utils.as_bytes(input_file)
415
416        # Check that file type is supported
417        try:
418            file_type = self.determine_file_type(input_file=input_file)
419        except dft.errors.FileTypeEnumError:
420            if raise_unsupported:
421                raise
422            else:
423                return None
424
425        with utils.CwdHandler(self.library_path):
426            # Set content management policy
427            self.set_content_management_policy(content_management_policy)
428
429            # file to file
430            if isinstance(input_file, str) and isinstance(output_file, str):
431                # API function declaration
432                self.library.GWFileToFileAnalysisAudit.argtypes = [
433                    ct.c_wchar_p,
434                    ct.c_wchar_p,
435                    ct.c_wchar_p
436                ]
437
438                # Variable initialisation
439                ct_input_file = ct.c_wchar_p(input_file)
440                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
441                ct_output_file = ct.c_wchar_p(output_file)
442
443                # API call
444                status = self.library.GWFileToFileAnalysisAudit(
445                    ct_input_file,
446                    ct_file_type,
447                    ct_output_file
448                )
449
450            # file to memory
451            elif isinstance(input_file, str) and output_file is None:
452                # API function declaration
453                self.library.GWFileAnalysisAudit.argtypes = [
454                    ct.c_wchar_p,
455                    ct.c_wchar_p,
456                    ct.POINTER(ct.c_void_p),
457                    ct.POINTER(ct.c_size_t)
458                ]
459
460                # Variable initialisation
461                ct_input_file = ct.c_wchar_p(input_file)
462                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
463                ct_output_buffer = ct.c_void_p(0)
464                ct_output_size = ct.c_size_t(0)
465
466                # API call
467                status = self.library.GWFileAnalysisAudit(
468                    ct_input_file,
469                    ct_file_type,
470                    ct.byref(ct_output_buffer),
471                    ct.byref(ct_output_size)
472                )
473
474            # memory to memory and memory to file
475            elif isinstance(input_file, bytes):
476                # API function declaration
477                self.library.GWMemoryToMemoryAnalysisAudit.argtypes = [
478                    ct.c_void_p,
479                    ct.c_size_t,
480                    ct.c_wchar_p,
481                    ct.POINTER(ct.c_void_p),
482                    ct.POINTER(ct.c_size_t)
483                ]
484
485                # Variable initialization
486                bytearray_buffer = bytearray(input_file)
487                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
488                ct_input_size = ct.c_size_t(len(input_file))
489                ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
490                ct_output_buffer = ct.c_void_p(0)
491                ct_output_size = ct.c_size_t(0)
492
493                status = self.library.GWMemoryToMemoryAnalysisAudit(
494                    ct.byref(ct_input_buffer),
495                    ct_input_size,
496                    ct_file_type,
497                    ct.byref(ct_output_buffer),
498                    ct.byref(ct_output_size)
499                )
500
501            file_bytes = None
502            if isinstance(input_file, str) and isinstance(output_file, str):
503                # file to file, read the bytes of the file that Rebuild has already written
504                if os.path.isfile(output_file):
505                    with open(output_file, "rb") as f:
506                        file_bytes = f.read()
507            else:
508                # file to memory, memory to memory
509                if ct_output_buffer and ct_output_size:
510                    file_bytes = utils.buffer_to_bytes(
511                        ct_output_buffer,
512                        ct_output_size
513                    )
514                    if isinstance(output_file, str):
515                        # memory to file
516                        # no Rebuild function exists for memory to file, write the memory to file ourselves
517                        with open(output_file, "wb") as f:
518                            f.write(file_bytes)
519
520            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
521            if status not in successes.success_codes:
522                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
523                if raise_unsupported:
524                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
525            else:
526                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
527
528            return file_bytes

Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the analysis file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_bytes (bytes): The analysis file bytes.

def analyse_directory( self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
530    def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
531        """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.
532
533        Args:
534            input_directory (str): The input directory containing files to analyse.
535            output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files.
536            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
537            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
538
539        Returns:
540            analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
541        """
542        analysis_files_dict = {}
543        # Call analyse_file on each file in input_directory to output_directory
544        for input_file in utils.list_file_paths(input_directory):
545            relative_path = os.path.relpath(input_file, input_directory) + ".xml"
546            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
547
548            analysis_bytes = self.analyse_file(
549                input_file=input_file,
550                output_file=output_file,
551                raise_unsupported=raise_unsupported,
552                content_management_policy=content_management_policy,
553            )
554
555            analysis_files_dict[relative_path] = analysis_bytes
556
557        return analysis_files_dict

Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing files to analyse. output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

def export_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
559    def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
560        """ Export a file, returning the .zip file bytes. The .zip file is written to output_file.
561
562        Args:
563            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
564            output_file (Union[None, str], optional): The output file path where the .zip file will be written.
565            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
566            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
567
568        Returns:
569            file_bytes (bytes): The exported .zip file.
570        """
571        # Validate arg types
572        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
573            raise TypeError(input_file)
574        if not isinstance(output_file, (type(None), str)):
575            raise TypeError(output_file)
576        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
577            raise TypeError(content_management_policy)
578        if not isinstance(raise_unsupported, bool):
579            raise TypeError(raise_unsupported)
580
581        # Convert string path arguments to absolute paths
582        if isinstance(input_file, str):
583            if not os.path.isfile(input_file):
584                raise FileNotFoundError(input_file)
585            input_file = os.path.abspath(input_file)
586        if isinstance(output_file, str):
587            output_file = os.path.abspath(output_file)
588            # make directories that do not exist
589            os.makedirs(os.path.dirname(output_file), exist_ok=True)
590        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
591            content_management_policy = os.path.abspath(content_management_policy)
592
593        # Convert memory inputs to bytes
594        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
595            input_file = utils.as_bytes(input_file)
596
597        # Check that file type is supported
598        try:
599            self.determine_file_type(input_file=input_file)
600        except dft.errors.FileTypeEnumError:
601            if raise_unsupported:
602                raise
603            else:
604                return None
605
606        with utils.CwdHandler(self.library_path):
607            # Set content management policy
608            self.set_content_management_policy(content_management_policy)
609
610            # file to file
611            if isinstance(input_file, str) and isinstance(output_file, str):
612                # API function declaration
613                self.library.GWFileToFileAnalysisProtectAndExport.argtypes = [
614                    ct.c_wchar_p,
615                    ct.c_wchar_p
616                ]
617
618                # Variable initialisation
619                ct_input_file = ct.c_wchar_p(input_file)
620                ct_output_file = ct.c_wchar_p(output_file)
621
622                # API call
623                status = self.library.GWFileToFileAnalysisProtectAndExport(
624                    ct_input_file,
625                    ct_output_file
626                )
627
628            # file to memory
629            elif isinstance(input_file, str) and output_file is None:
630                # API function declaration
631                self.library.GWFileToMemoryAnalysisProtectAndExport.argtypes = [
632                    ct.c_wchar_p,
633                    ct.POINTER(ct.c_void_p),
634                    ct.POINTER(ct.c_size_t)
635                ]
636
637                # Variable initialisation
638                ct_input_file = ct.c_wchar_p(input_file)
639                ct_output_buffer = ct.c_void_p(0)
640                ct_output_size = ct.c_size_t(0)
641
642                # API call
643                status = self.library.GWFileToMemoryAnalysisProtectAndExport(
644                    ct_input_file,
645                    ct.byref(ct_output_buffer),
646                    ct.byref(ct_output_size)
647                )
648
649            # memory to memory and memory to file
650            elif isinstance(input_file, bytes):
651                # API function declaration
652                self.library.GWMemoryToMemoryAnalysisProtectAndExport.argtypes = [
653                    ct.c_void_p,
654                    ct.c_size_t,
655                    ct.POINTER(ct.c_void_p),
656                    ct.POINTER(ct.c_size_t)
657                ]
658
659                # Variable initialization
660                bytearray_buffer = bytearray(input_file)
661                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
662                ct_input_size = ct.c_size_t(len(input_file))
663                ct_output_buffer = ct.c_void_p(0)
664                ct_output_size = ct.c_size_t(0)
665
666                status = self.library.GWMemoryToMemoryAnalysisProtectAndExport(
667                    ct_input_buffer,
668                    ct_input_size,
669                    ct.byref(ct_output_buffer),
670                    ct.byref(ct_output_size)
671                )
672
673            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
674            if status not in successes.success_codes:
675                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
676                if raise_unsupported:
677                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
678                else:
679                    file_bytes = None
680            else:
681                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
682                if isinstance(input_file, str) and isinstance(output_file, str):
683                    # file to file, read the bytes of the file that Rebuild has already written
684                    if not os.path.isfile(output_file):
685                        log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
686                        file_bytes = None
687                    else:
688                        with open(output_file, "rb") as f:
689                            file_bytes = f.read()
690                else:
691                    # file to memory, memory to memory
692                    file_bytes = utils.buffer_to_bytes(
693                        ct_output_buffer,
694                        ct_output_size
695                    )
696                    if isinstance(output_file, str):
697                        # memory to file
698                        # no Rebuild function exists for memory to file, write the memory to file ourselves
699                        with open(output_file, "wb") as f:
700                            f.write(file_bytes)
701
702            return file_bytes

Export a file, returning the .zip file bytes. The .zip file is written to output_file.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the .zip file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_bytes (bytes): The exported .zip file.

def export_directory( self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
704    def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
705        """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.
706
707        Args:
708            input_directory (str): The input directory containing files to export.
709            output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files.
710            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
711            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
712
713        Returns:
714            export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
715        """
716        export_files_dict = {}
717        # Call export_file on each file in input_directory to output_directory
718        for input_file in utils.list_file_paths(input_directory):
719            relative_path = os.path.relpath(input_file, input_directory) + ".zip"
720            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
721
722            export_bytes = self.export_file(
723                input_file=input_file,
724                output_file=output_file,
725                raise_unsupported=raise_unsupported,
726                content_management_policy=content_management_policy,
727            )
728
729            export_files_dict[relative_path] = export_bytes
730
731        return export_files_dict

Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing files to export. output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

def import_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
733    def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
734        """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.
735
736        Args:
737            input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes.
738            output_file (Union[None, str], optional): The output file path where the constructed file will be written.
739            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
740            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
741
742        Returns:
743            file_bytes (bytes): The imported file bytes.
744        """
745        # Validate arg types
746        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
747            raise TypeError(input_file)
748        if not isinstance(output_file, (type(None), str)):
749            raise TypeError(output_file)
750        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
751            raise TypeError(content_management_policy)
752        if not isinstance(raise_unsupported, bool):
753            raise TypeError(raise_unsupported)
754
755        # Convert string path arguments to absolute paths
756        if isinstance(input_file, str):
757            if not os.path.isfile(input_file):
758                raise FileNotFoundError(input_file)
759            input_file = os.path.abspath(input_file)
760        if isinstance(output_file, str):
761            output_file = os.path.abspath(output_file)
762            # make directories that do not exist
763            os.makedirs(os.path.dirname(output_file), exist_ok=True)
764        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
765            content_management_policy = os.path.abspath(content_management_policy)
766
767        # Convert memory inputs to bytes
768        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
769            input_file = utils.as_bytes(input_file)
770
771        # Check that file type is supported
772        try:
773            self.determine_file_type(input_file=input_file)
774        except dft.errors.FileTypeEnumError:
775            if raise_unsupported:
776                raise
777            else:
778                return None
779
780        with utils.CwdHandler(self.library_path):
781            # Set content management policy
782            self.set_content_management_policy(content_management_policy)
783
784            # file to file
785            if isinstance(input_file, str) and isinstance(output_file, str):
786                # API function declaration
787                self.library.GWFileToFileProtectAndImport.argtypes = [
788                    ct.c_wchar_p,
789                    ct.c_wchar_p
790                ]
791
792                # Variable initialisation
793                ct_input_file = ct.c_wchar_p(input_file)
794                ct_output_file = ct.c_wchar_p(output_file)
795
796                # API call
797                status = self.library.GWFileToFileProtectAndImport(
798                    ct_input_file,
799                    ct_output_file
800                )
801
802            # file to memory
803            elif isinstance(input_file, str) and output_file is None:
804                # API function declaration
805                self.library.GWFileToMemoryProtectAndImport.argtypes = [
806                    ct.c_wchar_p,
807                    ct.POINTER(ct.c_void_p),
808                    ct.POINTER(ct.c_size_t)
809                ]
810
811                # Variable initialisation
812                ct_input_file = ct.c_wchar_p(input_file)
813                ct_output_buffer = ct.c_void_p(0)
814                ct_output_size = ct.c_size_t(0)
815
816                # API call
817                status = self.library.GWFileToMemoryProtectAndImport(
818                    ct_input_file,
819                    ct.byref(ct_output_buffer),
820                    ct.byref(ct_output_size)
821                )
822
823            # memory to memory and memory to file
824            elif isinstance(input_file, bytes):
825                # API function declaration
826                self.library.GWMemoryToMemoryProtectAndImport.argtypes = [
827                    ct.c_void_p,
828                    ct.c_size_t,
829                    ct.POINTER(ct.c_void_p),
830                    ct.POINTER(ct.c_size_t)
831                ]
832
833                # Variable initialization
834                bytearray_buffer = bytearray(input_file)
835                ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
836                ct_input_size = ct.c_size_t(len(input_file))
837                ct_output_buffer = ct.c_void_p(0)
838                ct_output_size = ct.c_size_t(0)
839
840                status = self.library.GWMemoryToMemoryProtectAndImport(
841                    ct_input_buffer,
842                    ct_input_size,
843                    ct.byref(ct_output_buffer),
844                    ct.byref(ct_output_size)
845                )
846
847            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
848            if status not in successes.success_codes:
849                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
850                if raise_unsupported:
851                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
852                else:
853                    file_bytes = None
854            else:
855                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
856                if isinstance(input_file, str) and isinstance(output_file, str):
857                    # file to file, read the bytes of the file that Rebuild has already written
858                    if not os.path.isfile(output_file):
859                        log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
860                        file_bytes = None
861                    else:
862                        with open(output_file, "rb") as f:
863                            file_bytes = f.read()
864                else:
865                    # file to memory, memory to memory
866                    file_bytes = utils.buffer_to_bytes(
867                        ct_output_buffer,
868                        ct_output_size
869                    )
870                    if isinstance(output_file, str):
871                        # memory to file
872                        # no Rebuild function exists for memory to file, write the memory to file ourselves
873                        with open(output_file, "wb") as f:
874                            f.write(file_bytes)
875
876            return file_bytes

Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. output_file (Union[None, str], optional): The output file path where the constructed file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_bytes (bytes): The imported file bytes.

def import_directory( self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy] = None, raise_unsupported: bool = True):
878    def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
879        """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced.
880        The constructed files are written to output_directory maintaining the same directory structure as input_directory.
881
882        Args:
883            input_directory (str): The input directory containing files to import.
884            output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files.
885            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
886            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
887
888        Returns:
889            import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
890        """
891        import_files_dict = {}
892        # Call import_file on each file in input_directory to output_directory
893        for input_file in utils.list_file_paths(input_directory):
894            relative_path = os.path.relpath(input_file, input_directory)
895            # Remove .zip extension from relative_path
896            relative_path = os.path.splitext(relative_path)[0]
897            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
898
899            import_bytes = self.import_file(
900                input_file=input_file,
901                output_file=output_file,
902                raise_unsupported=raise_unsupported,
903                content_management_policy=content_management_policy,
904            )
905
906            import_files_dict[relative_path] = import_bytes
907
908        return import_files_dict

Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. The constructed files are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing files to import. output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

def GWFileErrorMsg(self):
910    def GWFileErrorMsg(self):
911        """ Retrieve the Glasswall Process error message.
912
913        Returns:
914            error_message (str): The Glasswall Process error message.
915        """
916        # Declare the return type
917        self.library.GWFileErrorMsg.restype = ct.c_wchar_p
918
919        # API call
920        error_message = self.library.GWFileErrorMsg()
921
922        return error_message

Retrieve the Glasswall Process error message.

Returns: error_message (str): The Glasswall Process error message.