glasswall.libraries.archive_manager.archive_manager

  1import ctypes as ct
  2import functools
  3import io
  4import os
  5from typing import Optional, Union
  6
  7import glasswall
  8from glasswall import determine_file_type as dft
  9from glasswall import utils
 10from glasswall.config.logging import log
 11from glasswall.libraries.archive_manager import errors, successes
 12from glasswall.libraries.library import Library
 13
 14
 15class ArchiveManager(Library):
 16    """ A high level Python wrapper for Glasswall Archive Manager. """
 17
 18    def __init__(self, library_path):
 19        super().__init__(library_path)
 20        self.library = self.load_library(os.path.abspath(library_path))
 21
 22        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
 23
 24    def version(self):
 25        """ Returns the Glasswall library version.
 26
 27        Returns:
 28            version (str): The Glasswall library version.
 29        """
 30        # API function declaration
 31        self.library.GwArchiveVersion.restype = ct.c_char_p
 32
 33        # API call
 34        version = self.library.GwArchiveVersion()
 35
 36        # Convert to Python string
 37        version = ct.string_at(version).decode()
 38
 39        return version
 40
 41    def release(self):
 42        """ Releases any resources held by the Glasswall Archive Manager library. """
 43        self.library.GwArchiveDone()
 44
 45    @property
 46    @functools.lru_cache()
 47    def supported_archives(self):
 48        """ Returns a list of supported archive file formats. """
 49
 50        # API function declaration
 51        self.library.GwSupportedFiletypes.restype = ct.c_char_p
 52
 53        # API call
 54        result = self.library.GwSupportedFiletypes()  # b'7z,bz2,gz,rar,tar,xz,zip,'
 55
 56        # Convert to Python string
 57        result = ct.string_at(result).decode()  # 7z,bz2,gz,rar,tar,xz,zip,
 58
 59        # Convert comma separated str to list, remove empty trailing element, sort
 60        result = sorted(filter(None, result.split(",")))
 61
 62        return result
 63
 64    @functools.lru_cache()
 65    def is_supported_archive(self, archive_type: str):
 66        """ Returns True if the archive type (e.g. `7z`) is supported. """
 67
 68        # API function declaration
 69        self.library.GwIsSupportedArchiveType.argtypes = [
 70            ct.c_char_p
 71        ]
 72        self.library.GwIsSupportedArchiveType.restype = ct.c_bool
 73
 74        ct_archive_type = ct.c_char_p(archive_type.encode())  # const char* type
 75
 76        result = self.library.GwIsSupportedArchiveType(ct_archive_type)
 77
 78        return result
 79
 80    def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True):
 81        """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """
 82        return [
 83            file_path
 84            for file_path in glasswall.utils.list_file_paths(
 85                directory=directory,
 86                recursive=recursive,
 87                absolute=absolute,
 88                followlinks=followlinks,
 89            )
 90            if self.is_supported_archive(self.determine_file_type(file_path, as_string=True, raise_unsupported=False))
 91        ]
 92
 93    def determine_file_type(self, input_file: str, as_string: bool = False, raise_unsupported: bool = True):
 94        """ Returns an int representing the file type of an archive.
 95
 96        Args:
 97            input_file (str) The input file path.
 98            as_string (bool, optional): Return file type as string, eg: "xz" instead of: 262. Defaults to False.
 99            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
100
101        Returns:
102            file_type (Union[int, str]): The file format.
103        """
104        if not os.path.isfile(input_file):
105            raise FileNotFoundError(input_file)
106
107        # API function declaration
108        self.library.GwDetermineArchiveTypeFromFile.argtypes = [
109            ct.c_char_p
110        ]
111
112        # Variable initialisation
113        ct_input_file = ct.c_char_p(input_file.encode())  # const char * inputFilePath)
114
115        with utils.CwdHandler(new_cwd=self.library_path):
116            # API call
117            file_type = self.library.GwDetermineArchiveTypeFromFile(
118                ct_input_file
119            )
120
121        file_type_as_string = dft.file_type_int_to_str(file_type)
122        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
123
124        if not dft.is_success(file_type):
125            if raise_unsupported:
126                log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
127                raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
128            else:
129                log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
130        else:
131            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
132
133        if as_string:
134            return file_type_as_string
135
136        return file_type
137
138    def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
139        """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
140
141        Args:
142            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
143            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
144            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
145            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
146            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
147
148        Returns:
149            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
150        """
151        # Validate arg types
152        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
153            raise TypeError(input_file)
154        if not isinstance(output_file, (type(None), str)):
155            raise TypeError(output_file)
156        if not isinstance(output_report, (type(None), str)):
157            raise TypeError(output_report)
158        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
159            raise TypeError(content_management_policy)
160
161        # Convert string path arguments to absolute paths
162        if isinstance(input_file, str):
163            input_file = os.path.abspath(input_file)
164        if isinstance(output_file, str):
165            output_file = os.path.abspath(output_file)
166        if isinstance(output_report, str):
167            output_report = os.path.abspath(output_report)
168
169        # Convert inputs to bytes
170        if isinstance(input_file, str):
171            if not os.path.isfile(input_file):
172                raise FileNotFoundError(input_file)
173            with open(input_file, "rb") as f:
174                input_file_bytes = f.read()
175        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
176            input_file_bytes = utils.as_bytes(input_file)
177
178        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
179            with open(content_management_policy, "rb") as f:
180                content_management_policy = f.read()
181        elif isinstance(content_management_policy, type(None)):
182            # Load default
183            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
184        content_management_policy = utils.validate_xml(content_management_policy)
185
186        # API function declaration
187        self.library.GwFileAnalysisArchive.argtypes = [
188            ct.c_void_p,  # void *inputBuffer
189            ct.c_size_t,  # size_t inputBufferLength
190            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
191            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
192            ct.POINTER(ct.c_void_p),  # void **outputAnalysisReportBuffer
193            ct.POINTER(ct.c_size_t),  # size_t *outputAnalysisReportBufferLength
194            ct.c_char_p  # const char *xmlConfigString
195        ]
196
197        # Variable initialisation
198        gw_return_object = glasswall.GwReturnObj()
199        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
200        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
201        gw_return_object.output_buffer = ct.c_void_p()
202        gw_return_object.output_buffer_length = ct.c_size_t()
203        gw_return_object.output_report_buffer = ct.c_void_p()
204        gw_return_object.output_report_buffer_length = ct.c_size_t()
205        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
206
207        with utils.CwdHandler(new_cwd=self.library_path):
208            # API call
209            gw_return_object.status = self.library.GwFileAnalysisArchive(
210                gw_return_object.input_buffer,
211                gw_return_object.input_buffer_length,
212                ct.byref(gw_return_object.output_buffer),
213                ct.byref(gw_return_object.output_buffer_length),
214                ct.byref(gw_return_object.output_report_buffer),
215                ct.byref(gw_return_object.output_report_buffer_length),
216                gw_return_object.content_management_policy
217            )
218
219        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
220            gw_return_object.output_file = utils.buffer_to_bytes(
221                gw_return_object.output_buffer,
222                gw_return_object.output_buffer_length
223            )
224        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
225            gw_return_object.output_report = utils.buffer_to_bytes(
226                gw_return_object.output_report_buffer,
227                gw_return_object.output_report_buffer_length
228            )
229
230        # Write output file
231        if hasattr(gw_return_object, "output_file"):
232            if isinstance(output_file, str):
233                os.makedirs(os.path.dirname(output_file), exist_ok=True)
234                with open(output_file, "wb") as f:
235                    f.write(gw_return_object.output_file)
236
237        # Write output report
238        if hasattr(gw_return_object, "output_report"):
239            if isinstance(output_report, str):
240                os.makedirs(os.path.dirname(output_report), exist_ok=True)
241                with open(output_report, "wb") as f:
242                    f.write(gw_return_object.output_report)
243
244        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
245        if gw_return_object.status not in successes.success_codes:
246            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
247            if raise_unsupported:
248                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
249        else:
250            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
251
252        self.release()
253
254        return gw_return_object
255
256    def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
257        """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.
258
259        Args:
260            input_directory (str): The input directory containing archives to analyse.
261            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written.
262            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
263            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
264            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
265
266        Returns:
267            analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
268        """
269        analysed_archives_dict = {}
270        # Call analyse_archive on each file in input_directory
271        for input_file in utils.list_file_paths(input_directory):
272            relative_path = os.path.relpath(input_file, input_directory)
273            # Construct paths for output file and output report
274            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
275            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
276
277            result = self.analyse_archive(
278                input_file=input_file,
279                output_file=output_file,
280                output_report=output_report,
281                content_management_policy=content_management_policy,
282                raise_unsupported=raise_unsupported,
283            )
284
285            analysed_archives_dict[relative_path] = result
286
287        return analysed_archives_dict
288
289    def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
290        """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
291
292        Args:
293            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
294            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
295            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
296            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
297            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
298
299        Returns:
300            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
301        """
302        # Validate arg types
303        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
304            raise TypeError(input_file)
305        if not isinstance(output_file, (type(None), str)):
306            raise TypeError(output_file)
307        if not isinstance(output_report, (type(None), str)):
308            raise TypeError(output_report)
309        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
310            raise TypeError(content_management_policy)
311
312        # Convert string path arguments to absolute paths
313        if isinstance(input_file, str):
314            input_file = os.path.abspath(input_file)
315        if isinstance(output_file, str):
316            output_file = os.path.abspath(output_file)
317        if isinstance(output_report, str):
318            output_report = os.path.abspath(output_report)
319
320        # Convert inputs to bytes
321        if isinstance(input_file, str):
322            if not os.path.isfile(input_file):
323                raise FileNotFoundError(input_file)
324            with open(input_file, "rb") as f:
325                input_file_bytes = f.read()
326        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
327            input_file_bytes = utils.as_bytes(input_file)
328
329        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
330            with open(content_management_policy, "rb") as f:
331                content_management_policy = f.read()
332        elif isinstance(content_management_policy, type(None)):
333            # Load default
334            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
335        content_management_policy = utils.validate_xml(content_management_policy)
336
337        # API function declaration
338        self.library.GwFileProtectAndReportArchive.argtypes = [
339            ct.c_void_p,  # void *inputBuffer
340            ct.c_size_t,  # size_t inputBufferLength
341            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
342            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
343            ct.POINTER(ct.c_void_p),  # void **outputReportBuffer
344            ct.POINTER(ct.c_size_t),  # size_t *outputReportBufferLength
345            ct.c_char_p  # const char *xmlConfigString
346        ]
347        # Variable initialisation
348        gw_return_object = glasswall.GwReturnObj()
349        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
350        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
351        gw_return_object.output_buffer = ct.c_void_p()
352        gw_return_object.output_buffer_length = ct.c_size_t()
353        gw_return_object.output_report_buffer = ct.c_void_p()
354        gw_return_object.output_report_buffer_length = ct.c_size_t()
355        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
356
357        with utils.CwdHandler(new_cwd=self.library_path):
358            # API call
359            gw_return_object.status = self.library.GwFileProtectAndReportArchive(
360                ct.byref(gw_return_object.input_buffer),
361                gw_return_object.input_buffer_length,
362                ct.byref(gw_return_object.output_buffer),
363                ct.byref(gw_return_object.output_buffer_length),
364                ct.byref(gw_return_object.output_report_buffer),
365                ct.byref(gw_return_object.output_report_buffer_length),
366                gw_return_object.content_management_policy
367            )
368
369        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
370            gw_return_object.output_file = utils.buffer_to_bytes(
371                gw_return_object.output_buffer,
372                gw_return_object.output_buffer_length
373            )
374        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
375            gw_return_object.output_report = utils.buffer_to_bytes(
376                gw_return_object.output_report_buffer,
377                gw_return_object.output_report_buffer_length
378            )
379
380        # Write output file
381        if hasattr(gw_return_object, "output_file"):
382            if isinstance(output_file, str):
383                os.makedirs(os.path.dirname(output_file), exist_ok=True)
384                with open(output_file, "wb") as f:
385                    f.write(gw_return_object.output_file)
386
387        # Write output report
388        if hasattr(gw_return_object, "output_report"):
389            if isinstance(output_report, str):
390                os.makedirs(os.path.dirname(output_report), exist_ok=True)
391                with open(output_report, "wb") as f:
392                    f.write(gw_return_object.output_report)
393
394        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
395        if gw_return_object.status not in successes.success_codes:
396            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
397            if raise_unsupported:
398                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
399        else:
400            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
401
402        self.release()
403
404        return gw_return_object
405
406    def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
407        """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.
408
409        Args:
410            input_directory (str): The input directory containing archives to protect.
411            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
412            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
413            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
414            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
415
416        Returns:
417            protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
418        """
419        protected_archives_dict = {}
420        # Call protect_archive on each file in input_directory to output_directory
421        for input_file in utils.list_file_paths(input_directory):
422            relative_path = os.path.relpath(input_file, input_directory)
423            # Construct paths for output file and output report
424            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
425            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
426
427            result = self.protect_archive(
428                input_file=input_file,
429                output_file=output_file,
430                output_report=output_report,
431                content_management_policy=content_management_policy,
432                raise_unsupported=raise_unsupported,
433            )
434
435            protected_archives_dict[relative_path] = result
436
437        return protected_archives_dict
438
439    def file_to_file_unpack(self, input_file: str, output_directory: str, raise_unsupported: bool = True):
440        # Validate arg types
441        if not isinstance(input_file, str):
442            raise TypeError(input_file)
443        elif not os.path.isfile(input_file):
444            raise FileNotFoundError(input_file)
445        if not isinstance(output_directory, str):
446            raise TypeError(output_directory)
447
448        # API function declaration
449        self.library.GwFileToFileUnpack.argtypes = [
450            ct.c_char_p,
451            ct.c_char_p,
452        ]
453
454        # Variable initialisation
455        gw_return_object = glasswall.GwReturnObj()
456        gw_return_object.ct_input_file = ct.c_char_p(input_file.encode())  # const char* inputFilePath
457        gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
458
459        with utils.CwdHandler(new_cwd=self.library_path):
460            # API call
461            gw_return_object.status = self.library.GwFileToFileUnpack(
462                gw_return_object.ct_input_file,
463                gw_return_object.ct_output_directory,
464            )
465
466        if gw_return_object.status not in successes.success_codes:
467            log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
468            if raise_unsupported:
469                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
470        else:
471            log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
472
473        self.release()
474
475        return gw_return_object
476
477    def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True):
478        # Validate arg types
479        if not isinstance(input_directory, str):
480            raise TypeError(input_directory)
481        elif not os.path.isdir(input_directory):
482            raise NotADirectoryError(input_directory)
483        if not isinstance(output_directory, str):
484            raise TypeError(output_directory)
485        if not file_type:
486            file_type = utils.get_file_type(input_directory)
487
488        # Ensure output_directory exists
489        os.makedirs(output_directory, exist_ok=True)
490
491        # API function declaration
492        self.library.GwFileToFilePack.argtypes = [
493            ct.c_char_p,
494            ct.c_char_p,
495            ct.c_char_p,
496            ct.c_int,
497        ]
498
499        # Variable initialisation
500        gw_return_object = glasswall.GwReturnObj()
501        gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode())  # const char* inputDirPath
502        gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
503        gw_return_object.ct_file_type = ct.c_char_p(file_type.encode())  # const char *fileType
504        gw_return_object.ct_add_extension = ct.c_int(int(add_extension))  # int addExtension
505
506        with utils.CwdHandler(new_cwd=self.library_path):
507            # API call
508            gw_return_object.status = self.library.GwFileToFilePack(
509                gw_return_object.ct_input_directory,
510                gw_return_object.ct_output_directory,
511                gw_return_object.ct_file_type,
512                gw_return_object.ct_add_extension,
513            )
514
515        if gw_return_object.status not in successes.success_codes:
516            log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
517            if raise_unsupported:
518                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
519        else:
520            log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
521
522        self.release()
523
524        return gw_return_object
525
526    def unpack(self, input_file: str, output_directory: str, recursive: bool = True, include_file_type: bool = False, raise_unsupported: bool = True, delete_origin: bool = False):
527        """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip".
528
529        Args:
530            input_file (str): The archive file path
531            output_directory (str): The output directory where the archive will be unpacked to a new directory.
532            recursive (bool, optional): Default True. Recursively unpack all nested archives.
533            include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
534            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
535            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
536        """
537        # Convert to absolute paths
538        input_file = os.path.abspath(input_file)
539        output_directory = os.path.abspath(output_directory)
540
541        if include_file_type:
542            archive_name = os.path.basename(input_file)
543        else:
544            archive_name = os.path.splitext(os.path.basename(input_file))[0]
545        archive_output_directory = os.path.join(output_directory, archive_name)
546
547        # Unpack
548        log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}")
549        result = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, raise_unsupported=raise_unsupported)
550        if result:
551            status = result.status
552        else:
553            status = None
554
555        if status not in successes.success_codes:
556            log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
557            if raise_unsupported:
558                raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
559        else:
560            log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
561
562        if delete_origin:
563            os.remove(input_file)
564
565        if recursive:
566            # Unpack sub archives
567            for subarchive in self.list_archive_paths(archive_output_directory):
568                self.unpack(
569                    input_file=subarchive,
570                    output_directory=archive_output_directory,
571                    recursive=recursive,
572                    raise_unsupported=raise_unsupported,
573                    delete_origin=True
574                )
575
576        return status
577
578    def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
579        """ Unpack a directory of archives, maintaining directory structure.
580
581        Args:
582            input_directory (str): The input directory containing archives to unpack.
583            output_directory (str): The output directory where archives will be unpacked to a new directory.
584            recursive (bool, optional): Default True. Recursively unpack all nested archives.
585            include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
586            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
587            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
588        """
589        # Convert to absolute paths
590        input_directory = os.path.abspath(input_directory)
591        output_directory = os.path.abspath(output_directory)
592
593        for archive_input_file in self.list_archive_paths(input_directory):
594            relative_path = os.path.relpath(archive_input_file, input_directory)
595            archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path))
596            self.unpack(
597                input_file=archive_input_file,
598                output_directory=archive_output_file,
599                recursive=recursive,
600                include_file_type=include_file_type,
601                raise_unsupported=raise_unsupported,
602                delete_origin=delete_origin
603            )
604
605    def pack_directory(self, input_directory: str, output_directory: str, file_type: str, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True, delete_origin: Optional[bool] = False):
606        """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip".
607
608        Args:
609            input_directory (str): The input directory containing files to archive.
610            output_directory (str): The output directory to store the created archive.
611            file_type (str): The archive file type.
612            add_extension (bool, optional): Default: True. Archive file type extension to result file.
613            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
614            delete_origin (bool, optional): Default False. Delete input_directory after packing to output_directory.
615        """
616        # Convert to absolute paths
617        input_directory = os.path.abspath(input_directory)
618        output_directory = os.path.abspath(output_directory)
619
620        # Pack
621        log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}")
622        status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, add_extension=add_extension, raise_unsupported=raise_unsupported).status
623
624        if status not in successes.success_codes:
625            log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
626            if raise_unsupported:
627                raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
628        else:
629            log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
630
631        if delete_origin:
632            utils.delete_directory(input_directory)
633
634        return status
635
636    def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
637        """ Exports an archive using the Glasswall engine.
638
639        Args:
640            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
641            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
642            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
643            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
644            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
645
646        Returns:
647            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
648        """
649        # Validate arg types
650        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
651            raise TypeError(input_file)
652        if not isinstance(output_file, (type(None), str)):
653            raise TypeError(output_file)
654        if not isinstance(output_report, (type(None), str)):
655            raise TypeError(output_report)
656        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
657            raise TypeError(content_management_policy)
658
659        # Convert string path arguments to absolute paths
660        if isinstance(input_file, str):
661            input_file = os.path.abspath(input_file)
662        if isinstance(output_file, str):
663            output_file = os.path.abspath(output_file)
664        if isinstance(output_report, str):
665            output_report = os.path.abspath(output_report)
666
667        # Convert inputs to bytes
668        if isinstance(input_file, str):
669            if not os.path.isfile(input_file):
670                raise FileNotFoundError(input_file)
671            with open(input_file, "rb") as f:
672                input_file_bytes = f.read()
673        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
674            input_file_bytes = utils.as_bytes(input_file)
675
676        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
677            with open(content_management_policy, "rb") as f:
678                content_management_policy = f.read()
679        elif isinstance(content_management_policy, type(None)):
680            # Load default
681            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
682        content_management_policy = utils.validate_xml(content_management_policy)
683
684        # API function declaration
685        self.library.GwFileExportArchive.argtypes = [
686            ct.c_void_p,  # void *inputBuffer
687            ct.c_size_t,  # size_t inputBufferLength
688            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
689            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
690            ct.POINTER(ct.c_void_p),  # void **outputReportBuffer
691            ct.POINTER(ct.c_size_t),  # size_t *outputReportBufferLength
692            ct.c_char_p  # const char *xmlConfigString
693        ]
694
695        # Variable initialisation
696        gw_return_object = glasswall.GwReturnObj()
697        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
698        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
699        gw_return_object.output_buffer = ct.c_void_p()
700        gw_return_object.output_buffer_length = ct.c_size_t()
701        gw_return_object.output_report_buffer = ct.c_void_p()
702        gw_return_object.output_report_buffer_length = ct.c_size_t()
703        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
704
705        with utils.CwdHandler(new_cwd=self.library_path):
706            # API call
707            gw_return_object.status = self.library.GwFileExportArchive(
708                gw_return_object.input_buffer,
709                gw_return_object.input_buffer_length,
710                ct.byref(gw_return_object.output_buffer),
711                ct.byref(gw_return_object.output_buffer_length),
712                ct.byref(gw_return_object.output_report_buffer),
713                ct.byref(gw_return_object.output_report_buffer_length),
714                gw_return_object.content_management_policy
715            )
716
717        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
718            gw_return_object.output_file = utils.buffer_to_bytes(
719                gw_return_object.output_buffer,
720                gw_return_object.output_buffer_length
721            )
722        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
723            gw_return_object.output_report = utils.buffer_to_bytes(
724                gw_return_object.output_report_buffer,
725                gw_return_object.output_report_buffer_length
726            )
727
728        # Write output file
729        if hasattr(gw_return_object, "output_file"):
730            if isinstance(output_file, str):
731                os.makedirs(os.path.dirname(output_file), exist_ok=True)
732                with open(output_file, "wb") as f:
733                    f.write(gw_return_object.output_file)
734
735        # Write output report
736        if hasattr(gw_return_object, "output_report"):
737            if isinstance(output_report, str):
738                os.makedirs(os.path.dirname(output_report), exist_ok=True)
739                with open(output_report, "wb") as f:
740                    f.write(gw_return_object.output_report)
741
742        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
743        if gw_return_object.status not in successes.success_codes:
744            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
745            if raise_unsupported:
746                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
747        else:
748            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
749
750        self.release()
751
752        return gw_return_object
753
754    def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
755        """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.
756
757        Args:
758            input_directory (str): The input directory containing archives to export.
759            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
760            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
761            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
762            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
763
764        Returns:
765            exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
766        """
767        exported_archives_dict = {}
768        # Call export_archive on each file in input_directory to output_directory
769        for input_file in utils.list_file_paths(input_directory):
770            relative_path = os.path.relpath(input_file, input_directory)
771            # Construct paths for output file and output report
772            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
773            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
774
775            result = self.export_archive(
776                input_file=input_file,
777                output_file=output_file,
778                output_report=output_report,
779                content_management_policy=content_management_policy,
780                raise_unsupported=raise_unsupported,
781            )
782
783            exported_archives_dict[relative_path] = result
784
785        return exported_archives_dict
786
787    def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: Optional[bool] = True):
788        """ Imports an archive using the Glasswall engine.
789
790        Args:
791            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
792            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
793            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
794            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
795            include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive.
796            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
797
798        Returns:
799            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
800        """
801        # Validate arg types
802        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
803            raise TypeError(input_file)
804        if not isinstance(output_file, (type(None), str)):
805            raise TypeError(output_file)
806        if not isinstance(output_report, (type(None), str)):
807            raise TypeError(output_report)
808        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
809            raise TypeError(content_management_policy)
810
811        # Convert string path arguments to absolute paths
812        if isinstance(input_file, str):
813            input_file = os.path.abspath(input_file)
814        # Convert string path arguments to absolute paths
815        if isinstance(output_file, str):
816            output_file = os.path.abspath(output_file)
817        if isinstance(output_report, str):
818            output_report = os.path.abspath(output_report)
819
820        # Convert inputs to bytes
821        if isinstance(input_file, str):
822            if not os.path.isfile(input_file):
823                raise FileNotFoundError(input_file)
824            with open(input_file, "rb") as f:
825                input_file_bytes = f.read()
826        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
827            input_file_bytes = utils.as_bytes(input_file)
828
829        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
830            with open(content_management_policy, "rb") as f:
831                content_management_policy = f.read()
832        elif isinstance(content_management_policy, type(None)):
833            # Load default
834            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
835        content_management_policy = utils.validate_xml(content_management_policy)
836
837        # API function declaration
838        self.library.GwFileImportArchive.argtypes = [
839            ct.c_void_p,  # void *inputBuffer
840            ct.c_size_t,  # size_t inputBufferLength
841            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
842            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
843            ct.POINTER(ct.c_void_p),  # void **outputReportBuffer
844            ct.POINTER(ct.c_size_t),  # size_t *outputReportBufferLength
845            ct.c_char_p,  # const char *xmlConfigString
846            ct.c_int  # int includeAnalysisReports
847        ]
848
849        # Variable initialisation
850        gw_return_object = glasswall.GwReturnObj()
851        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
852        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
853        gw_return_object.output_buffer = ct.c_void_p()
854        gw_return_object.output_buffer_length = ct.c_size_t()
855        gw_return_object.output_report_buffer = ct.c_void_p()
856        gw_return_object.output_report_buffer_length = ct.c_size_t()
857        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
858        gw_return_object.include_analysis_report = ct.c_int(int(include_analysis_report))
859
860        with utils.CwdHandler(new_cwd=self.library_path):
861            # API call
862            gw_return_object.status = self.library.GwFileImportArchive(
863                gw_return_object.input_buffer,
864                gw_return_object.input_buffer_length,
865                ct.byref(gw_return_object.output_buffer),
866                ct.byref(gw_return_object.output_buffer_length),
867                ct.byref(gw_return_object.output_report_buffer),
868                ct.byref(gw_return_object.output_report_buffer_length),
869                gw_return_object.content_management_policy,
870                gw_return_object.include_analysis_report
871            )
872
873        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
874            gw_return_object.output_file = utils.buffer_to_bytes(
875                gw_return_object.output_buffer,
876                gw_return_object.output_buffer_length
877            )
878        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
879            gw_return_object.output_report = utils.buffer_to_bytes(
880                gw_return_object.output_report_buffer,
881                gw_return_object.output_report_buffer_length
882            )
883
884        # Write output file
885        if hasattr(gw_return_object, "output_file"):
886            if isinstance(output_file, str):
887                os.makedirs(os.path.dirname(output_file), exist_ok=True)
888                with open(output_file, "wb") as f:
889                    f.write(gw_return_object.output_file)
890
891        # Write output report
892        if hasattr(gw_return_object, "output_report"):
893            if isinstance(output_report, str):
894                os.makedirs(os.path.dirname(output_report), exist_ok=True)
895                with open(output_report, "wb") as f:
896                    f.write(gw_return_object.output_report)
897
898        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
899        if gw_return_object.status not in successes.success_codes:
900            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
901            if raise_unsupported:
902                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
903        else:
904            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
905
906        self.release()
907
908        return gw_return_object
909
910    def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: bool = True):
911        """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.
912
913        Args:
914            input_directory (str): The input directory containing archives to import.
915            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
916            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
917            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
918            include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive.
919            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
920
921        Returns:
922            imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
923        """
924        imported_archives_dict = {}
925        # Call import_archive on each file in input_directory to output_directory
926        for input_file in utils.list_file_paths(input_directory):
927            relative_path = os.path.relpath(input_file, input_directory)
928            # Construct paths for output file and output report
929            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
930            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
931
932            result = self.import_archive(
933                input_file=input_file,
934                output_file=output_file,
935                output_report=output_report,
936                content_management_policy=content_management_policy,
937                include_analysis_report=include_analysis_report,
938                raise_unsupported=raise_unsupported,
939            )
940
941            imported_archives_dict[relative_path] = result
942
943        return imported_archives_dict
class ArchiveManager(glasswall.libraries.library.Library):
 18class ArchiveManager(Library):
 19    """ A high level Python wrapper for Glasswall Archive Manager. """
 20
 21    def __init__(self, library_path):
 22        super().__init__(library_path)
 23        self.library = self.load_library(os.path.abspath(library_path))
 24
 25        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
 26
 27    def version(self):
 28        """ Returns the Glasswall library version.
 29
 30        Returns:
 31            version (str): The Glasswall library version.
 32        """
 33        # API function declaration
 34        self.library.GwArchiveVersion.restype = ct.c_char_p
 35
 36        # API call
 37        version = self.library.GwArchiveVersion()
 38
 39        # Convert to Python string
 40        version = ct.string_at(version).decode()
 41
 42        return version
 43
 44    def release(self):
 45        """ Releases any resources held by the Glasswall Archive Manager library. """
 46        self.library.GwArchiveDone()
 47
 48    @property
 49    @functools.lru_cache()
 50    def supported_archives(self):
 51        """ Returns a list of supported archive file formats. """
 52
 53        # API function declaration
 54        self.library.GwSupportedFiletypes.restype = ct.c_char_p
 55
 56        # API call
 57        result = self.library.GwSupportedFiletypes()  # b'7z,bz2,gz,rar,tar,xz,zip,'
 58
 59        # Convert to Python string
 60        result = ct.string_at(result).decode()  # 7z,bz2,gz,rar,tar,xz,zip,
 61
 62        # Convert comma separated str to list, remove empty trailing element, sort
 63        result = sorted(filter(None, result.split(",")))
 64
 65        return result
 66
 67    @functools.lru_cache()
 68    def is_supported_archive(self, archive_type: str):
 69        """ Returns True if the archive type (e.g. `7z`) is supported. """
 70
 71        # API function declaration
 72        self.library.GwIsSupportedArchiveType.argtypes = [
 73            ct.c_char_p
 74        ]
 75        self.library.GwIsSupportedArchiveType.restype = ct.c_bool
 76
 77        ct_archive_type = ct.c_char_p(archive_type.encode())  # const char* type
 78
 79        result = self.library.GwIsSupportedArchiveType(ct_archive_type)
 80
 81        return result
 82
 83    def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True):
 84        """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """
 85        return [
 86            file_path
 87            for file_path in glasswall.utils.list_file_paths(
 88                directory=directory,
 89                recursive=recursive,
 90                absolute=absolute,
 91                followlinks=followlinks,
 92            )
 93            if self.is_supported_archive(self.determine_file_type(file_path, as_string=True, raise_unsupported=False))
 94        ]
 95
 96    def determine_file_type(self, input_file: str, as_string: bool = False, raise_unsupported: bool = True):
 97        """ Returns an int representing the file type of an archive.
 98
 99        Args:
100            input_file (str) The input file path.
101            as_string (bool, optional): Return file type as string, eg: "xz" instead of: 262. Defaults to False.
102            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
103
104        Returns:
105            file_type (Union[int, str]): The file format.
106        """
107        if not os.path.isfile(input_file):
108            raise FileNotFoundError(input_file)
109
110        # API function declaration
111        self.library.GwDetermineArchiveTypeFromFile.argtypes = [
112            ct.c_char_p
113        ]
114
115        # Variable initialisation
116        ct_input_file = ct.c_char_p(input_file.encode())  # const char * inputFilePath)
117
118        with utils.CwdHandler(new_cwd=self.library_path):
119            # API call
120            file_type = self.library.GwDetermineArchiveTypeFromFile(
121                ct_input_file
122            )
123
124        file_type_as_string = dft.file_type_int_to_str(file_type)
125        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
126
127        if not dft.is_success(file_type):
128            if raise_unsupported:
129                log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
130                raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
131            else:
132                log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
133        else:
134            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
135
136        if as_string:
137            return file_type_as_string
138
139        return file_type
140
141    def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
142        """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
143
144        Args:
145            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
146            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
147            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
148            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
149            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
150
151        Returns:
152            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
153        """
154        # Validate arg types
155        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
156            raise TypeError(input_file)
157        if not isinstance(output_file, (type(None), str)):
158            raise TypeError(output_file)
159        if not isinstance(output_report, (type(None), str)):
160            raise TypeError(output_report)
161        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
162            raise TypeError(content_management_policy)
163
164        # Convert string path arguments to absolute paths
165        if isinstance(input_file, str):
166            input_file = os.path.abspath(input_file)
167        if isinstance(output_file, str):
168            output_file = os.path.abspath(output_file)
169        if isinstance(output_report, str):
170            output_report = os.path.abspath(output_report)
171
172        # Convert inputs to bytes
173        if isinstance(input_file, str):
174            if not os.path.isfile(input_file):
175                raise FileNotFoundError(input_file)
176            with open(input_file, "rb") as f:
177                input_file_bytes = f.read()
178        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
179            input_file_bytes = utils.as_bytes(input_file)
180
181        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
182            with open(content_management_policy, "rb") as f:
183                content_management_policy = f.read()
184        elif isinstance(content_management_policy, type(None)):
185            # Load default
186            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
187        content_management_policy = utils.validate_xml(content_management_policy)
188
189        # API function declaration
190        self.library.GwFileAnalysisArchive.argtypes = [
191            ct.c_void_p,  # void *inputBuffer
192            ct.c_size_t,  # size_t inputBufferLength
193            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
194            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
195            ct.POINTER(ct.c_void_p),  # void **outputAnalysisReportBuffer
196            ct.POINTER(ct.c_size_t),  # size_t *outputAnalysisReportBufferLength
197            ct.c_char_p  # const char *xmlConfigString
198        ]
199
200        # Variable initialisation
201        gw_return_object = glasswall.GwReturnObj()
202        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
203        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
204        gw_return_object.output_buffer = ct.c_void_p()
205        gw_return_object.output_buffer_length = ct.c_size_t()
206        gw_return_object.output_report_buffer = ct.c_void_p()
207        gw_return_object.output_report_buffer_length = ct.c_size_t()
208        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
209
210        with utils.CwdHandler(new_cwd=self.library_path):
211            # API call
212            gw_return_object.status = self.library.GwFileAnalysisArchive(
213                gw_return_object.input_buffer,
214                gw_return_object.input_buffer_length,
215                ct.byref(gw_return_object.output_buffer),
216                ct.byref(gw_return_object.output_buffer_length),
217                ct.byref(gw_return_object.output_report_buffer),
218                ct.byref(gw_return_object.output_report_buffer_length),
219                gw_return_object.content_management_policy
220            )
221
222        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
223            gw_return_object.output_file = utils.buffer_to_bytes(
224                gw_return_object.output_buffer,
225                gw_return_object.output_buffer_length
226            )
227        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
228            gw_return_object.output_report = utils.buffer_to_bytes(
229                gw_return_object.output_report_buffer,
230                gw_return_object.output_report_buffer_length
231            )
232
233        # Write output file
234        if hasattr(gw_return_object, "output_file"):
235            if isinstance(output_file, str):
236                os.makedirs(os.path.dirname(output_file), exist_ok=True)
237                with open(output_file, "wb") as f:
238                    f.write(gw_return_object.output_file)
239
240        # Write output report
241        if hasattr(gw_return_object, "output_report"):
242            if isinstance(output_report, str):
243                os.makedirs(os.path.dirname(output_report), exist_ok=True)
244                with open(output_report, "wb") as f:
245                    f.write(gw_return_object.output_report)
246
247        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
248        if gw_return_object.status not in successes.success_codes:
249            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
250            if raise_unsupported:
251                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
252        else:
253            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
254
255        self.release()
256
257        return gw_return_object
258
259    def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
260        """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.
261
262        Args:
263            input_directory (str): The input directory containing archives to analyse.
264            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written.
265            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
266            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
267            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
268
269        Returns:
270            analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
271        """
272        analysed_archives_dict = {}
273        # Call analyse_archive on each file in input_directory
274        for input_file in utils.list_file_paths(input_directory):
275            relative_path = os.path.relpath(input_file, input_directory)
276            # Construct paths for output file and output report
277            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
278            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
279
280            result = self.analyse_archive(
281                input_file=input_file,
282                output_file=output_file,
283                output_report=output_report,
284                content_management_policy=content_management_policy,
285                raise_unsupported=raise_unsupported,
286            )
287
288            analysed_archives_dict[relative_path] = result
289
290        return analysed_archives_dict
291
292    def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
293        """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
294
295        Args:
296            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
297            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
298            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
299            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
300            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
301
302        Returns:
303            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
304        """
305        # Validate arg types
306        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
307            raise TypeError(input_file)
308        if not isinstance(output_file, (type(None), str)):
309            raise TypeError(output_file)
310        if not isinstance(output_report, (type(None), str)):
311            raise TypeError(output_report)
312        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
313            raise TypeError(content_management_policy)
314
315        # Convert string path arguments to absolute paths
316        if isinstance(input_file, str):
317            input_file = os.path.abspath(input_file)
318        if isinstance(output_file, str):
319            output_file = os.path.abspath(output_file)
320        if isinstance(output_report, str):
321            output_report = os.path.abspath(output_report)
322
323        # Convert inputs to bytes
324        if isinstance(input_file, str):
325            if not os.path.isfile(input_file):
326                raise FileNotFoundError(input_file)
327            with open(input_file, "rb") as f:
328                input_file_bytes = f.read()
329        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
330            input_file_bytes = utils.as_bytes(input_file)
331
332        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
333            with open(content_management_policy, "rb") as f:
334                content_management_policy = f.read()
335        elif isinstance(content_management_policy, type(None)):
336            # Load default
337            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
338        content_management_policy = utils.validate_xml(content_management_policy)
339
340        # API function declaration
341        self.library.GwFileProtectAndReportArchive.argtypes = [
342            ct.c_void_p,  # void *inputBuffer
343            ct.c_size_t,  # size_t inputBufferLength
344            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
345            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
346            ct.POINTER(ct.c_void_p),  # void **outputReportBuffer
347            ct.POINTER(ct.c_size_t),  # size_t *outputReportBufferLength
348            ct.c_char_p  # const char *xmlConfigString
349        ]
350        # Variable initialisation
351        gw_return_object = glasswall.GwReturnObj()
352        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
353        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
354        gw_return_object.output_buffer = ct.c_void_p()
355        gw_return_object.output_buffer_length = ct.c_size_t()
356        gw_return_object.output_report_buffer = ct.c_void_p()
357        gw_return_object.output_report_buffer_length = ct.c_size_t()
358        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
359
360        with utils.CwdHandler(new_cwd=self.library_path):
361            # API call
362            gw_return_object.status = self.library.GwFileProtectAndReportArchive(
363                ct.byref(gw_return_object.input_buffer),
364                gw_return_object.input_buffer_length,
365                ct.byref(gw_return_object.output_buffer),
366                ct.byref(gw_return_object.output_buffer_length),
367                ct.byref(gw_return_object.output_report_buffer),
368                ct.byref(gw_return_object.output_report_buffer_length),
369                gw_return_object.content_management_policy
370            )
371
372        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
373            gw_return_object.output_file = utils.buffer_to_bytes(
374                gw_return_object.output_buffer,
375                gw_return_object.output_buffer_length
376            )
377        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
378            gw_return_object.output_report = utils.buffer_to_bytes(
379                gw_return_object.output_report_buffer,
380                gw_return_object.output_report_buffer_length
381            )
382
383        # Write output file
384        if hasattr(gw_return_object, "output_file"):
385            if isinstance(output_file, str):
386                os.makedirs(os.path.dirname(output_file), exist_ok=True)
387                with open(output_file, "wb") as f:
388                    f.write(gw_return_object.output_file)
389
390        # Write output report
391        if hasattr(gw_return_object, "output_report"):
392            if isinstance(output_report, str):
393                os.makedirs(os.path.dirname(output_report), exist_ok=True)
394                with open(output_report, "wb") as f:
395                    f.write(gw_return_object.output_report)
396
397        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
398        if gw_return_object.status not in successes.success_codes:
399            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
400            if raise_unsupported:
401                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
402        else:
403            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
404
405        self.release()
406
407        return gw_return_object
408
409    def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
410        """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.
411
412        Args:
413            input_directory (str): The input directory containing archives to protect.
414            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
415            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
416            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
417            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
418
419        Returns:
420            protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
421        """
422        protected_archives_dict = {}
423        # Call protect_archive on each file in input_directory to output_directory
424        for input_file in utils.list_file_paths(input_directory):
425            relative_path = os.path.relpath(input_file, input_directory)
426            # Construct paths for output file and output report
427            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
428            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
429
430            result = self.protect_archive(
431                input_file=input_file,
432                output_file=output_file,
433                output_report=output_report,
434                content_management_policy=content_management_policy,
435                raise_unsupported=raise_unsupported,
436            )
437
438            protected_archives_dict[relative_path] = result
439
440        return protected_archives_dict
441
442    def file_to_file_unpack(self, input_file: str, output_directory: str, raise_unsupported: bool = True):
443        # Validate arg types
444        if not isinstance(input_file, str):
445            raise TypeError(input_file)
446        elif not os.path.isfile(input_file):
447            raise FileNotFoundError(input_file)
448        if not isinstance(output_directory, str):
449            raise TypeError(output_directory)
450
451        # API function declaration
452        self.library.GwFileToFileUnpack.argtypes = [
453            ct.c_char_p,
454            ct.c_char_p,
455        ]
456
457        # Variable initialisation
458        gw_return_object = glasswall.GwReturnObj()
459        gw_return_object.ct_input_file = ct.c_char_p(input_file.encode())  # const char* inputFilePath
460        gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
461
462        with utils.CwdHandler(new_cwd=self.library_path):
463            # API call
464            gw_return_object.status = self.library.GwFileToFileUnpack(
465                gw_return_object.ct_input_file,
466                gw_return_object.ct_output_directory,
467            )
468
469        if gw_return_object.status not in successes.success_codes:
470            log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
471            if raise_unsupported:
472                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
473        else:
474            log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
475
476        self.release()
477
478        return gw_return_object
479
480    def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True):
481        # Validate arg types
482        if not isinstance(input_directory, str):
483            raise TypeError(input_directory)
484        elif not os.path.isdir(input_directory):
485            raise NotADirectoryError(input_directory)
486        if not isinstance(output_directory, str):
487            raise TypeError(output_directory)
488        if not file_type:
489            file_type = utils.get_file_type(input_directory)
490
491        # Ensure output_directory exists
492        os.makedirs(output_directory, exist_ok=True)
493
494        # API function declaration
495        self.library.GwFileToFilePack.argtypes = [
496            ct.c_char_p,
497            ct.c_char_p,
498            ct.c_char_p,
499            ct.c_int,
500        ]
501
502        # Variable initialisation
503        gw_return_object = glasswall.GwReturnObj()
504        gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode())  # const char* inputDirPath
505        gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
506        gw_return_object.ct_file_type = ct.c_char_p(file_type.encode())  # const char *fileType
507        gw_return_object.ct_add_extension = ct.c_int(int(add_extension))  # int addExtension
508
509        with utils.CwdHandler(new_cwd=self.library_path):
510            # API call
511            gw_return_object.status = self.library.GwFileToFilePack(
512                gw_return_object.ct_input_directory,
513                gw_return_object.ct_output_directory,
514                gw_return_object.ct_file_type,
515                gw_return_object.ct_add_extension,
516            )
517
518        if gw_return_object.status not in successes.success_codes:
519            log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
520            if raise_unsupported:
521                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
522        else:
523            log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
524
525        self.release()
526
527        return gw_return_object
528
529    def unpack(self, input_file: str, output_directory: str, recursive: bool = True, include_file_type: bool = False, raise_unsupported: bool = True, delete_origin: bool = False):
530        """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip".
531
532        Args:
533            input_file (str): The archive file path
534            output_directory (str): The output directory where the archive will be unpacked to a new directory.
535            recursive (bool, optional): Default True. Recursively unpack all nested archives.
536            include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
537            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
538            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
539        """
540        # Convert to absolute paths
541        input_file = os.path.abspath(input_file)
542        output_directory = os.path.abspath(output_directory)
543
544        if include_file_type:
545            archive_name = os.path.basename(input_file)
546        else:
547            archive_name = os.path.splitext(os.path.basename(input_file))[0]
548        archive_output_directory = os.path.join(output_directory, archive_name)
549
550        # Unpack
551        log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}")
552        result = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, raise_unsupported=raise_unsupported)
553        if result:
554            status = result.status
555        else:
556            status = None
557
558        if status not in successes.success_codes:
559            log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
560            if raise_unsupported:
561                raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
562        else:
563            log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
564
565        if delete_origin:
566            os.remove(input_file)
567
568        if recursive:
569            # Unpack sub archives
570            for subarchive in self.list_archive_paths(archive_output_directory):
571                self.unpack(
572                    input_file=subarchive,
573                    output_directory=archive_output_directory,
574                    recursive=recursive,
575                    raise_unsupported=raise_unsupported,
576                    delete_origin=True
577                )
578
579        return status
580
581    def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
582        """ Unpack a directory of archives, maintaining directory structure.
583
584        Args:
585            input_directory (str): The input directory containing archives to unpack.
586            output_directory (str): The output directory where archives will be unpacked to a new directory.
587            recursive (bool, optional): Default True. Recursively unpack all nested archives.
588            include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
589            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
590            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
591        """
592        # Convert to absolute paths
593        input_directory = os.path.abspath(input_directory)
594        output_directory = os.path.abspath(output_directory)
595
596        for archive_input_file in self.list_archive_paths(input_directory):
597            relative_path = os.path.relpath(archive_input_file, input_directory)
598            archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path))
599            self.unpack(
600                input_file=archive_input_file,
601                output_directory=archive_output_file,
602                recursive=recursive,
603                include_file_type=include_file_type,
604                raise_unsupported=raise_unsupported,
605                delete_origin=delete_origin
606            )
607
608    def pack_directory(self, input_directory: str, output_directory: str, file_type: str, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True, delete_origin: Optional[bool] = False):
609        """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip".
610
611        Args:
612            input_directory (str): The input directory containing files to archive.
613            output_directory (str): The output directory to store the created archive.
614            file_type (str): The archive file type.
615            add_extension (bool, optional): Default: True. Archive file type extension to result file.
616            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
617            delete_origin (bool, optional): Default False. Delete input_directory after packing to output_directory.
618        """
619        # Convert to absolute paths
620        input_directory = os.path.abspath(input_directory)
621        output_directory = os.path.abspath(output_directory)
622
623        # Pack
624        log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}")
625        status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, add_extension=add_extension, raise_unsupported=raise_unsupported).status
626
627        if status not in successes.success_codes:
628            log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
629            if raise_unsupported:
630                raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
631        else:
632            log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
633
634        if delete_origin:
635            utils.delete_directory(input_directory)
636
637        return status
638
639    def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
640        """ Exports an archive using the Glasswall engine.
641
642        Args:
643            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
644            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
645            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
646            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
647            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
648
649        Returns:
650            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
651        """
652        # Validate arg types
653        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
654            raise TypeError(input_file)
655        if not isinstance(output_file, (type(None), str)):
656            raise TypeError(output_file)
657        if not isinstance(output_report, (type(None), str)):
658            raise TypeError(output_report)
659        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
660            raise TypeError(content_management_policy)
661
662        # Convert string path arguments to absolute paths
663        if isinstance(input_file, str):
664            input_file = os.path.abspath(input_file)
665        if isinstance(output_file, str):
666            output_file = os.path.abspath(output_file)
667        if isinstance(output_report, str):
668            output_report = os.path.abspath(output_report)
669
670        # Convert inputs to bytes
671        if isinstance(input_file, str):
672            if not os.path.isfile(input_file):
673                raise FileNotFoundError(input_file)
674            with open(input_file, "rb") as f:
675                input_file_bytes = f.read()
676        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
677            input_file_bytes = utils.as_bytes(input_file)
678
679        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
680            with open(content_management_policy, "rb") as f:
681                content_management_policy = f.read()
682        elif isinstance(content_management_policy, type(None)):
683            # Load default
684            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
685        content_management_policy = utils.validate_xml(content_management_policy)
686
687        # API function declaration
688        self.library.GwFileExportArchive.argtypes = [
689            ct.c_void_p,  # void *inputBuffer
690            ct.c_size_t,  # size_t inputBufferLength
691            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
692            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
693            ct.POINTER(ct.c_void_p),  # void **outputReportBuffer
694            ct.POINTER(ct.c_size_t),  # size_t *outputReportBufferLength
695            ct.c_char_p  # const char *xmlConfigString
696        ]
697
698        # Variable initialisation
699        gw_return_object = glasswall.GwReturnObj()
700        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
701        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
702        gw_return_object.output_buffer = ct.c_void_p()
703        gw_return_object.output_buffer_length = ct.c_size_t()
704        gw_return_object.output_report_buffer = ct.c_void_p()
705        gw_return_object.output_report_buffer_length = ct.c_size_t()
706        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
707
708        with utils.CwdHandler(new_cwd=self.library_path):
709            # API call
710            gw_return_object.status = self.library.GwFileExportArchive(
711                gw_return_object.input_buffer,
712                gw_return_object.input_buffer_length,
713                ct.byref(gw_return_object.output_buffer),
714                ct.byref(gw_return_object.output_buffer_length),
715                ct.byref(gw_return_object.output_report_buffer),
716                ct.byref(gw_return_object.output_report_buffer_length),
717                gw_return_object.content_management_policy
718            )
719
720        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
721            gw_return_object.output_file = utils.buffer_to_bytes(
722                gw_return_object.output_buffer,
723                gw_return_object.output_buffer_length
724            )
725        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
726            gw_return_object.output_report = utils.buffer_to_bytes(
727                gw_return_object.output_report_buffer,
728                gw_return_object.output_report_buffer_length
729            )
730
731        # Write output file
732        if hasattr(gw_return_object, "output_file"):
733            if isinstance(output_file, str):
734                os.makedirs(os.path.dirname(output_file), exist_ok=True)
735                with open(output_file, "wb") as f:
736                    f.write(gw_return_object.output_file)
737
738        # Write output report
739        if hasattr(gw_return_object, "output_report"):
740            if isinstance(output_report, str):
741                os.makedirs(os.path.dirname(output_report), exist_ok=True)
742                with open(output_report, "wb") as f:
743                    f.write(gw_return_object.output_report)
744
745        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
746        if gw_return_object.status not in successes.success_codes:
747            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
748            if raise_unsupported:
749                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
750        else:
751            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
752
753        self.release()
754
755        return gw_return_object
756
757    def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
758        """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.
759
760        Args:
761            input_directory (str): The input directory containing archives to export.
762            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
763            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
764            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
765            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
766
767        Returns:
768            exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
769        """
770        exported_archives_dict = {}
771        # Call export_archive on each file in input_directory to output_directory
772        for input_file in utils.list_file_paths(input_directory):
773            relative_path = os.path.relpath(input_file, input_directory)
774            # Construct paths for output file and output report
775            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
776            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
777
778            result = self.export_archive(
779                input_file=input_file,
780                output_file=output_file,
781                output_report=output_report,
782                content_management_policy=content_management_policy,
783                raise_unsupported=raise_unsupported,
784            )
785
786            exported_archives_dict[relative_path] = result
787
788        return exported_archives_dict
789
790    def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: Optional[bool] = True):
791        """ Imports an archive using the Glasswall engine.
792
793        Args:
794            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
795            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
796            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
797            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
798            include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive.
799            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
800
801        Returns:
802            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
803        """
804        # Validate arg types
805        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
806            raise TypeError(input_file)
807        if not isinstance(output_file, (type(None), str)):
808            raise TypeError(output_file)
809        if not isinstance(output_report, (type(None), str)):
810            raise TypeError(output_report)
811        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
812            raise TypeError(content_management_policy)
813
814        # Convert string path arguments to absolute paths
815        if isinstance(input_file, str):
816            input_file = os.path.abspath(input_file)
817        # Convert string path arguments to absolute paths
818        if isinstance(output_file, str):
819            output_file = os.path.abspath(output_file)
820        if isinstance(output_report, str):
821            output_report = os.path.abspath(output_report)
822
823        # Convert inputs to bytes
824        if isinstance(input_file, str):
825            if not os.path.isfile(input_file):
826                raise FileNotFoundError(input_file)
827            with open(input_file, "rb") as f:
828                input_file_bytes = f.read()
829        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
830            input_file_bytes = utils.as_bytes(input_file)
831
832        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
833            with open(content_management_policy, "rb") as f:
834                content_management_policy = f.read()
835        elif isinstance(content_management_policy, type(None)):
836            # Load default
837            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
838        content_management_policy = utils.validate_xml(content_management_policy)
839
840        # API function declaration
841        self.library.GwFileImportArchive.argtypes = [
842            ct.c_void_p,  # void *inputBuffer
843            ct.c_size_t,  # size_t inputBufferLength
844            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
845            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
846            ct.POINTER(ct.c_void_p),  # void **outputReportBuffer
847            ct.POINTER(ct.c_size_t),  # size_t *outputReportBufferLength
848            ct.c_char_p,  # const char *xmlConfigString
849            ct.c_int  # int includeAnalysisReports
850        ]
851
852        # Variable initialisation
853        gw_return_object = glasswall.GwReturnObj()
854        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
855        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
856        gw_return_object.output_buffer = ct.c_void_p()
857        gw_return_object.output_buffer_length = ct.c_size_t()
858        gw_return_object.output_report_buffer = ct.c_void_p()
859        gw_return_object.output_report_buffer_length = ct.c_size_t()
860        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
861        gw_return_object.include_analysis_report = ct.c_int(int(include_analysis_report))
862
863        with utils.CwdHandler(new_cwd=self.library_path):
864            # API call
865            gw_return_object.status = self.library.GwFileImportArchive(
866                gw_return_object.input_buffer,
867                gw_return_object.input_buffer_length,
868                ct.byref(gw_return_object.output_buffer),
869                ct.byref(gw_return_object.output_buffer_length),
870                ct.byref(gw_return_object.output_report_buffer),
871                ct.byref(gw_return_object.output_report_buffer_length),
872                gw_return_object.content_management_policy,
873                gw_return_object.include_analysis_report
874            )
875
876        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
877            gw_return_object.output_file = utils.buffer_to_bytes(
878                gw_return_object.output_buffer,
879                gw_return_object.output_buffer_length
880            )
881        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
882            gw_return_object.output_report = utils.buffer_to_bytes(
883                gw_return_object.output_report_buffer,
884                gw_return_object.output_report_buffer_length
885            )
886
887        # Write output file
888        if hasattr(gw_return_object, "output_file"):
889            if isinstance(output_file, str):
890                os.makedirs(os.path.dirname(output_file), exist_ok=True)
891                with open(output_file, "wb") as f:
892                    f.write(gw_return_object.output_file)
893
894        # Write output report
895        if hasattr(gw_return_object, "output_report"):
896            if isinstance(output_report, str):
897                os.makedirs(os.path.dirname(output_report), exist_ok=True)
898                with open(output_report, "wb") as f:
899                    f.write(gw_return_object.output_report)
900
901        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
902        if gw_return_object.status not in successes.success_codes:
903            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
904            if raise_unsupported:
905                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
906        else:
907            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
908
909        self.release()
910
911        return gw_return_object
912
913    def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: bool = True):
914        """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.
915
916        Args:
917            input_directory (str): The input directory containing archives to import.
918            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
919            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
920            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
921            include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive.
922            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
923
924        Returns:
925            imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
926        """
927        imported_archives_dict = {}
928        # Call import_archive on each file in input_directory to output_directory
929        for input_file in utils.list_file_paths(input_directory):
930            relative_path = os.path.relpath(input_file, input_directory)
931            # Construct paths for output file and output report
932            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
933            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
934
935            result = self.import_archive(
936                input_file=input_file,
937                output_file=output_file,
938                output_report=output_report,
939                content_management_policy=content_management_policy,
940                include_analysis_report=include_analysis_report,
941                raise_unsupported=raise_unsupported,
942            )
943
944            imported_archives_dict[relative_path] = result
945
946        return imported_archives_dict

A high level Python wrapper for Glasswall Archive Manager.

ArchiveManager(library_path)
21    def __init__(self, library_path):
22        super().__init__(library_path)
23        self.library = self.load_library(os.path.abspath(library_path))
24
25        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
library
def version(self):
27    def version(self):
28        """ Returns the Glasswall library version.
29
30        Returns:
31            version (str): The Glasswall library version.
32        """
33        # API function declaration
34        self.library.GwArchiveVersion.restype = ct.c_char_p
35
36        # API call
37        version = self.library.GwArchiveVersion()
38
39        # Convert to Python string
40        version = ct.string_at(version).decode()
41
42        return version

Returns the Glasswall library version.

Returns: version (str): The Glasswall library version.

def release(self):
44    def release(self):
45        """ Releases any resources held by the Glasswall Archive Manager library. """
46        self.library.GwArchiveDone()

Releases any resources held by the Glasswall Archive Manager library.

supported_archives
48    @property
49    @functools.lru_cache()
50    def supported_archives(self):
51        """ Returns a list of supported archive file formats. """
52
53        # API function declaration
54        self.library.GwSupportedFiletypes.restype = ct.c_char_p
55
56        # API call
57        result = self.library.GwSupportedFiletypes()  # b'7z,bz2,gz,rar,tar,xz,zip,'
58
59        # Convert to Python string
60        result = ct.string_at(result).decode()  # 7z,bz2,gz,rar,tar,xz,zip,
61
62        # Convert comma separated str to list, remove empty trailing element, sort
63        result = sorted(filter(None, result.split(",")))
64
65        return result

Returns a list of supported archive file formats.

@functools.lru_cache()
def is_supported_archive(self, archive_type: str):
67    @functools.lru_cache()
68    def is_supported_archive(self, archive_type: str):
69        """ Returns True if the archive type (e.g. `7z`) is supported. """
70
71        # API function declaration
72        self.library.GwIsSupportedArchiveType.argtypes = [
73            ct.c_char_p
74        ]
75        self.library.GwIsSupportedArchiveType.restype = ct.c_bool
76
77        ct_archive_type = ct.c_char_p(archive_type.encode())  # const char* type
78
79        result = self.library.GwIsSupportedArchiveType(ct_archive_type)
80
81        return result

Returns True if the archive type (e.g. 7z) is supported.

def list_archive_paths( self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True):
83    def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True):
84        """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """
85        return [
86            file_path
87            for file_path in glasswall.utils.list_file_paths(
88                directory=directory,
89                recursive=recursive,
90                absolute=absolute,
91                followlinks=followlinks,
92            )
93            if self.is_supported_archive(self.determine_file_type(file_path, as_string=True, raise_unsupported=False))
94        ]

Returns a list of file paths of supported archives in a directory and all of its subdirectories.

def determine_file_type( self, input_file: str, as_string: bool = False, raise_unsupported: bool = True):
 96    def determine_file_type(self, input_file: str, as_string: bool = False, raise_unsupported: bool = True):
 97        """ Returns an int representing the file type of an archive.
 98
 99        Args:
100            input_file (str) The input file path.
101            as_string (bool, optional): Return file type as string, eg: "xz" instead of: 262. Defaults to False.
102            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
103
104        Returns:
105            file_type (Union[int, str]): The file format.
106        """
107        if not os.path.isfile(input_file):
108            raise FileNotFoundError(input_file)
109
110        # API function declaration
111        self.library.GwDetermineArchiveTypeFromFile.argtypes = [
112            ct.c_char_p
113        ]
114
115        # Variable initialisation
116        ct_input_file = ct.c_char_p(input_file.encode())  # const char * inputFilePath)
117
118        with utils.CwdHandler(new_cwd=self.library_path):
119            # API call
120            file_type = self.library.GwDetermineArchiveTypeFromFile(
121                ct_input_file
122            )
123
124        file_type_as_string = dft.file_type_int_to_str(file_type)
125        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
126
127        if not dft.is_success(file_type):
128            if raise_unsupported:
129                log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
130                raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
131            else:
132                log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
133        else:
134            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
135
136        if as_string:
137            return file_type_as_string
138
139        return file_type

Returns an int representing the file type of an archive.

Args: input_file (str) The input file path. as_string (bool, optional): Return file type as string, eg: "xz" instead of: 262. Defaults to False. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: file_type (Union[int, str]): The file format.

def analyse_archive( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.archive_manager.ArchiveManager] = None, raise_unsupported: bool = True):
141    def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
142        """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
143
144        Args:
145            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
146            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
147            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
148            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
149            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
150
151        Returns:
152            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
153        """
154        # Validate arg types
155        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
156            raise TypeError(input_file)
157        if not isinstance(output_file, (type(None), str)):
158            raise TypeError(output_file)
159        if not isinstance(output_report, (type(None), str)):
160            raise TypeError(output_report)
161        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
162            raise TypeError(content_management_policy)
163
164        # Convert string path arguments to absolute paths
165        if isinstance(input_file, str):
166            input_file = os.path.abspath(input_file)
167        if isinstance(output_file, str):
168            output_file = os.path.abspath(output_file)
169        if isinstance(output_report, str):
170            output_report = os.path.abspath(output_report)
171
172        # Convert inputs to bytes
173        if isinstance(input_file, str):
174            if not os.path.isfile(input_file):
175                raise FileNotFoundError(input_file)
176            with open(input_file, "rb") as f:
177                input_file_bytes = f.read()
178        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
179            input_file_bytes = utils.as_bytes(input_file)
180
181        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
182            with open(content_management_policy, "rb") as f:
183                content_management_policy = f.read()
184        elif isinstance(content_management_policy, type(None)):
185            # Load default
186            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
187        content_management_policy = utils.validate_xml(content_management_policy)
188
189        # API function declaration
190        self.library.GwFileAnalysisArchive.argtypes = [
191            ct.c_void_p,  # void *inputBuffer
192            ct.c_size_t,  # size_t inputBufferLength
193            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
194            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
195            ct.POINTER(ct.c_void_p),  # void **outputAnalysisReportBuffer
196            ct.POINTER(ct.c_size_t),  # size_t *outputAnalysisReportBufferLength
197            ct.c_char_p  # const char *xmlConfigString
198        ]
199
200        # Variable initialisation
201        gw_return_object = glasswall.GwReturnObj()
202        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
203        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
204        gw_return_object.output_buffer = ct.c_void_p()
205        gw_return_object.output_buffer_length = ct.c_size_t()
206        gw_return_object.output_report_buffer = ct.c_void_p()
207        gw_return_object.output_report_buffer_length = ct.c_size_t()
208        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
209
210        with utils.CwdHandler(new_cwd=self.library_path):
211            # API call
212            gw_return_object.status = self.library.GwFileAnalysisArchive(
213                gw_return_object.input_buffer,
214                gw_return_object.input_buffer_length,
215                ct.byref(gw_return_object.output_buffer),
216                ct.byref(gw_return_object.output_buffer_length),
217                ct.byref(gw_return_object.output_report_buffer),
218                ct.byref(gw_return_object.output_report_buffer_length),
219                gw_return_object.content_management_policy
220            )
221
222        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
223            gw_return_object.output_file = utils.buffer_to_bytes(
224                gw_return_object.output_buffer,
225                gw_return_object.output_buffer_length
226            )
227        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
228            gw_return_object.output_report = utils.buffer_to_bytes(
229                gw_return_object.output_report_buffer,
230                gw_return_object.output_report_buffer_length
231            )
232
233        # Write output file
234        if hasattr(gw_return_object, "output_file"):
235            if isinstance(output_file, str):
236                os.makedirs(os.path.dirname(output_file), exist_ok=True)
237                with open(output_file, "wb") as f:
238                    f.write(gw_return_object.output_file)
239
240        # Write output report
241        if hasattr(gw_return_object, "output_report"):
242            if isinstance(output_report, str):
243                os.makedirs(os.path.dirname(output_report), exist_ok=True)
244                with open(output_report, "wb") as f:
245                    f.write(gw_return_object.output_report)
246
247        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
248        if gw_return_object.status not in successes.success_codes:
249            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
250            if raise_unsupported:
251                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
252        else:
253            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
254
255        self.release()
256
257        return gw_return_object

Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)

def analyse_directory( self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.archive_manager.ArchiveManager] = None, raise_unsupported: bool = True):
259    def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
260        """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.
261
262        Args:
263            input_directory (str): The input directory containing archives to analyse.
264            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written.
265            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
266            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
267            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
268
269        Returns:
270            analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
271        """
272        analysed_archives_dict = {}
273        # Call analyse_archive on each file in input_directory
274        for input_file in utils.list_file_paths(input_directory):
275            relative_path = os.path.relpath(input_file, input_directory)
276            # Construct paths for output file and output report
277            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
278            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
279
280            result = self.analyse_archive(
281                input_file=input_file,
282                output_file=output_file,
283                output_report=output_report,
284                content_management_policy=content_management_policy,
285                raise_unsupported=raise_unsupported,
286            )
287
288            analysed_archives_dict[relative_path] = result
289
290        return analysed_archives_dict

Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing archives to analyse. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

def protect_archive( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.archive_manager.ArchiveManager] = None, raise_unsupported: bool = True):
292    def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
293        """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
294
295        Args:
296            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
297            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
298            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
299            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
300            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
301
302        Returns:
303            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
304        """
305        # Validate arg types
306        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
307            raise TypeError(input_file)
308        if not isinstance(output_file, (type(None), str)):
309            raise TypeError(output_file)
310        if not isinstance(output_report, (type(None), str)):
311            raise TypeError(output_report)
312        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
313            raise TypeError(content_management_policy)
314
315        # Convert string path arguments to absolute paths
316        if isinstance(input_file, str):
317            input_file = os.path.abspath(input_file)
318        if isinstance(output_file, str):
319            output_file = os.path.abspath(output_file)
320        if isinstance(output_report, str):
321            output_report = os.path.abspath(output_report)
322
323        # Convert inputs to bytes
324        if isinstance(input_file, str):
325            if not os.path.isfile(input_file):
326                raise FileNotFoundError(input_file)
327            with open(input_file, "rb") as f:
328                input_file_bytes = f.read()
329        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
330            input_file_bytes = utils.as_bytes(input_file)
331
332        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
333            with open(content_management_policy, "rb") as f:
334                content_management_policy = f.read()
335        elif isinstance(content_management_policy, type(None)):
336            # Load default
337            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
338        content_management_policy = utils.validate_xml(content_management_policy)
339
340        # API function declaration
341        self.library.GwFileProtectAndReportArchive.argtypes = [
342            ct.c_void_p,  # void *inputBuffer
343            ct.c_size_t,  # size_t inputBufferLength
344            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
345            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
346            ct.POINTER(ct.c_void_p),  # void **outputReportBuffer
347            ct.POINTER(ct.c_size_t),  # size_t *outputReportBufferLength
348            ct.c_char_p  # const char *xmlConfigString
349        ]
350        # Variable initialisation
351        gw_return_object = glasswall.GwReturnObj()
352        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
353        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
354        gw_return_object.output_buffer = ct.c_void_p()
355        gw_return_object.output_buffer_length = ct.c_size_t()
356        gw_return_object.output_report_buffer = ct.c_void_p()
357        gw_return_object.output_report_buffer_length = ct.c_size_t()
358        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
359
360        with utils.CwdHandler(new_cwd=self.library_path):
361            # API call
362            gw_return_object.status = self.library.GwFileProtectAndReportArchive(
363                ct.byref(gw_return_object.input_buffer),
364                gw_return_object.input_buffer_length,
365                ct.byref(gw_return_object.output_buffer),
366                ct.byref(gw_return_object.output_buffer_length),
367                ct.byref(gw_return_object.output_report_buffer),
368                ct.byref(gw_return_object.output_report_buffer_length),
369                gw_return_object.content_management_policy
370            )
371
372        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
373            gw_return_object.output_file = utils.buffer_to_bytes(
374                gw_return_object.output_buffer,
375                gw_return_object.output_buffer_length
376            )
377        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
378            gw_return_object.output_report = utils.buffer_to_bytes(
379                gw_return_object.output_report_buffer,
380                gw_return_object.output_report_buffer_length
381            )
382
383        # Write output file
384        if hasattr(gw_return_object, "output_file"):
385            if isinstance(output_file, str):
386                os.makedirs(os.path.dirname(output_file), exist_ok=True)
387                with open(output_file, "wb") as f:
388                    f.write(gw_return_object.output_file)
389
390        # Write output report
391        if hasattr(gw_return_object, "output_report"):
392            if isinstance(output_report, str):
393                os.makedirs(os.path.dirname(output_report), exist_ok=True)
394                with open(output_report, "wb") as f:
395                    f.write(gw_return_object.output_report)
396
397        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
398        if gw_return_object.status not in successes.success_codes:
399            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
400            if raise_unsupported:
401                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
402        else:
403            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
404
405        self.release()
406
407        return gw_return_object

Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)

def protect_directory( self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.archive_manager.ArchiveManager] = None, raise_unsupported: bool = True):
409    def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
410        """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.
411
412        Args:
413            input_directory (str): The input directory containing archives to protect.
414            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
415            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
416            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
417            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
418
419        Returns:
420            protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
421        """
422        protected_archives_dict = {}
423        # Call protect_archive on each file in input_directory to output_directory
424        for input_file in utils.list_file_paths(input_directory):
425            relative_path = os.path.relpath(input_file, input_directory)
426            # Construct paths for output file and output report
427            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
428            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
429
430            result = self.protect_archive(
431                input_file=input_file,
432                output_file=output_file,
433                output_report=output_report,
434                content_management_policy=content_management_policy,
435                raise_unsupported=raise_unsupported,
436            )
437
438            protected_archives_dict[relative_path] = result
439
440        return protected_archives_dict

Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing archives to protect. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

def file_to_file_unpack( self, input_file: str, output_directory: str, raise_unsupported: bool = True):
442    def file_to_file_unpack(self, input_file: str, output_directory: str, raise_unsupported: bool = True):
443        # Validate arg types
444        if not isinstance(input_file, str):
445            raise TypeError(input_file)
446        elif not os.path.isfile(input_file):
447            raise FileNotFoundError(input_file)
448        if not isinstance(output_directory, str):
449            raise TypeError(output_directory)
450
451        # API function declaration
452        self.library.GwFileToFileUnpack.argtypes = [
453            ct.c_char_p,
454            ct.c_char_p,
455        ]
456
457        # Variable initialisation
458        gw_return_object = glasswall.GwReturnObj()
459        gw_return_object.ct_input_file = ct.c_char_p(input_file.encode())  # const char* inputFilePath
460        gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
461
462        with utils.CwdHandler(new_cwd=self.library_path):
463            # API call
464            gw_return_object.status = self.library.GwFileToFileUnpack(
465                gw_return_object.ct_input_file,
466                gw_return_object.ct_output_directory,
467            )
468
469        if gw_return_object.status not in successes.success_codes:
470            log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
471            if raise_unsupported:
472                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
473        else:
474            log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
475
476        self.release()
477
478        return gw_return_object
def file_to_file_pack( self, input_directory: str, output_directory: str, file_type: Optional[str] = None, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True):
480    def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True):
481        # Validate arg types
482        if not isinstance(input_directory, str):
483            raise TypeError(input_directory)
484        elif not os.path.isdir(input_directory):
485            raise NotADirectoryError(input_directory)
486        if not isinstance(output_directory, str):
487            raise TypeError(output_directory)
488        if not file_type:
489            file_type = utils.get_file_type(input_directory)
490
491        # Ensure output_directory exists
492        os.makedirs(output_directory, exist_ok=True)
493
494        # API function declaration
495        self.library.GwFileToFilePack.argtypes = [
496            ct.c_char_p,
497            ct.c_char_p,
498            ct.c_char_p,
499            ct.c_int,
500        ]
501
502        # Variable initialisation
503        gw_return_object = glasswall.GwReturnObj()
504        gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode())  # const char* inputDirPath
505        gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
506        gw_return_object.ct_file_type = ct.c_char_p(file_type.encode())  # const char *fileType
507        gw_return_object.ct_add_extension = ct.c_int(int(add_extension))  # int addExtension
508
509        with utils.CwdHandler(new_cwd=self.library_path):
510            # API call
511            gw_return_object.status = self.library.GwFileToFilePack(
512                gw_return_object.ct_input_directory,
513                gw_return_object.ct_output_directory,
514                gw_return_object.ct_file_type,
515                gw_return_object.ct_add_extension,
516            )
517
518        if gw_return_object.status not in successes.success_codes:
519            log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
520            if raise_unsupported:
521                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
522        else:
523            log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
524
525        self.release()
526
527        return gw_return_object
def unpack( self, input_file: str, output_directory: str, recursive: bool = True, include_file_type: bool = False, raise_unsupported: bool = True, delete_origin: bool = False):
529    def unpack(self, input_file: str, output_directory: str, recursive: bool = True, include_file_type: bool = False, raise_unsupported: bool = True, delete_origin: bool = False):
530        """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip".
531
532        Args:
533            input_file (str): The archive file path
534            output_directory (str): The output directory where the archive will be unpacked to a new directory.
535            recursive (bool, optional): Default True. Recursively unpack all nested archives.
536            include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
537            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
538            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
539        """
540        # Convert to absolute paths
541        input_file = os.path.abspath(input_file)
542        output_directory = os.path.abspath(output_directory)
543
544        if include_file_type:
545            archive_name = os.path.basename(input_file)
546        else:
547            archive_name = os.path.splitext(os.path.basename(input_file))[0]
548        archive_output_directory = os.path.join(output_directory, archive_name)
549
550        # Unpack
551        log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}")
552        result = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, raise_unsupported=raise_unsupported)
553        if result:
554            status = result.status
555        else:
556            status = None
557
558        if status not in successes.success_codes:
559            log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
560            if raise_unsupported:
561                raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
562        else:
563            log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
564
565        if delete_origin:
566            os.remove(input_file)
567
568        if recursive:
569            # Unpack sub archives
570            for subarchive in self.list_archive_paths(archive_output_directory):
571                self.unpack(
572                    input_file=subarchive,
573                    output_directory=archive_output_directory,
574                    recursive=recursive,
575                    raise_unsupported=raise_unsupported,
576                    delete_origin=True
577                )
578
579        return status

Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip".

Args: input_file (str): The archive file path output_directory (str): The output directory where the archive will be unpacked to a new directory. recursive (bool, optional): Default True. Recursively unpack all nested archives. include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.

def unpack_directory( self, input_directory: str, output_directory: str, recursive: bool = True, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
581    def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
582        """ Unpack a directory of archives, maintaining directory structure.
583
584        Args:
585            input_directory (str): The input directory containing archives to unpack.
586            output_directory (str): The output directory where archives will be unpacked to a new directory.
587            recursive (bool, optional): Default True. Recursively unpack all nested archives.
588            include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
589            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
590            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
591        """
592        # Convert to absolute paths
593        input_directory = os.path.abspath(input_directory)
594        output_directory = os.path.abspath(output_directory)
595
596        for archive_input_file in self.list_archive_paths(input_directory):
597            relative_path = os.path.relpath(archive_input_file, input_directory)
598            archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path))
599            self.unpack(
600                input_file=archive_input_file,
601                output_directory=archive_output_file,
602                recursive=recursive,
603                include_file_type=include_file_type,
604                raise_unsupported=raise_unsupported,
605                delete_origin=delete_origin
606            )

Unpack a directory of archives, maintaining directory structure.

Args: input_directory (str): The input directory containing archives to unpack. output_directory (str): The output directory where archives will be unpacked to a new directory. recursive (bool, optional): Default True. Recursively unpack all nested archives. include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.

def pack_directory( self, input_directory: str, output_directory: str, file_type: str, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True, delete_origin: Optional[bool] = False):
608    def pack_directory(self, input_directory: str, output_directory: str, file_type: str, add_extension: Optional[bool] = True, raise_unsupported: Optional[bool] = True, delete_origin: Optional[bool] = False):
609        """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip".
610
611        Args:
612            input_directory (str): The input directory containing files to archive.
613            output_directory (str): The output directory to store the created archive.
614            file_type (str): The archive file type.
615            add_extension (bool, optional): Default: True. Archive file type extension to result file.
616            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
617            delete_origin (bool, optional): Default False. Delete input_directory after packing to output_directory.
618        """
619        # Convert to absolute paths
620        input_directory = os.path.abspath(input_directory)
621        output_directory = os.path.abspath(output_directory)
622
623        # Pack
624        log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}")
625        status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, add_extension=add_extension, raise_unsupported=raise_unsupported).status
626
627        if status not in successes.success_codes:
628            log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
629            if raise_unsupported:
630                raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
631        else:
632            log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
633
634        if delete_origin:
635            utils.delete_directory(input_directory)
636
637        return status

Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "xz", "zip".

Args: input_directory (str): The input directory containing files to archive. output_directory (str): The output directory to store the created archive. file_type (str): The archive file type. add_extension (bool, optional): Default: True. Archive file type extension to result file. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_directory after packing to output_directory.

def export_archive( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.archive_manager.ArchiveManager] = None, raise_unsupported: bool = True):
639    def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
640        """ Exports an archive using the Glasswall engine.
641
642        Args:
643            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
644            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
645            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
646            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
647            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
648
649        Returns:
650            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
651        """
652        # Validate arg types
653        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
654            raise TypeError(input_file)
655        if not isinstance(output_file, (type(None), str)):
656            raise TypeError(output_file)
657        if not isinstance(output_report, (type(None), str)):
658            raise TypeError(output_report)
659        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
660            raise TypeError(content_management_policy)
661
662        # Convert string path arguments to absolute paths
663        if isinstance(input_file, str):
664            input_file = os.path.abspath(input_file)
665        if isinstance(output_file, str):
666            output_file = os.path.abspath(output_file)
667        if isinstance(output_report, str):
668            output_report = os.path.abspath(output_report)
669
670        # Convert inputs to bytes
671        if isinstance(input_file, str):
672            if not os.path.isfile(input_file):
673                raise FileNotFoundError(input_file)
674            with open(input_file, "rb") as f:
675                input_file_bytes = f.read()
676        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
677            input_file_bytes = utils.as_bytes(input_file)
678
679        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
680            with open(content_management_policy, "rb") as f:
681                content_management_policy = f.read()
682        elif isinstance(content_management_policy, type(None)):
683            # Load default
684            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
685        content_management_policy = utils.validate_xml(content_management_policy)
686
687        # API function declaration
688        self.library.GwFileExportArchive.argtypes = [
689            ct.c_void_p,  # void *inputBuffer
690            ct.c_size_t,  # size_t inputBufferLength
691            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
692            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
693            ct.POINTER(ct.c_void_p),  # void **outputReportBuffer
694            ct.POINTER(ct.c_size_t),  # size_t *outputReportBufferLength
695            ct.c_char_p  # const char *xmlConfigString
696        ]
697
698        # Variable initialisation
699        gw_return_object = glasswall.GwReturnObj()
700        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
701        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
702        gw_return_object.output_buffer = ct.c_void_p()
703        gw_return_object.output_buffer_length = ct.c_size_t()
704        gw_return_object.output_report_buffer = ct.c_void_p()
705        gw_return_object.output_report_buffer_length = ct.c_size_t()
706        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
707
708        with utils.CwdHandler(new_cwd=self.library_path):
709            # API call
710            gw_return_object.status = self.library.GwFileExportArchive(
711                gw_return_object.input_buffer,
712                gw_return_object.input_buffer_length,
713                ct.byref(gw_return_object.output_buffer),
714                ct.byref(gw_return_object.output_buffer_length),
715                ct.byref(gw_return_object.output_report_buffer),
716                ct.byref(gw_return_object.output_report_buffer_length),
717                gw_return_object.content_management_policy
718            )
719
720        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
721            gw_return_object.output_file = utils.buffer_to_bytes(
722                gw_return_object.output_buffer,
723                gw_return_object.output_buffer_length
724            )
725        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
726            gw_return_object.output_report = utils.buffer_to_bytes(
727                gw_return_object.output_report_buffer,
728                gw_return_object.output_report_buffer_length
729            )
730
731        # Write output file
732        if hasattr(gw_return_object, "output_file"):
733            if isinstance(output_file, str):
734                os.makedirs(os.path.dirname(output_file), exist_ok=True)
735                with open(output_file, "wb") as f:
736                    f.write(gw_return_object.output_file)
737
738        # Write output report
739        if hasattr(gw_return_object, "output_report"):
740            if isinstance(output_report, str):
741                os.makedirs(os.path.dirname(output_report), exist_ok=True)
742                with open(output_report, "wb") as f:
743                    f.write(gw_return_object.output_report)
744
745        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
746        if gw_return_object.status not in successes.success_codes:
747            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
748            if raise_unsupported:
749                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
750        else:
751            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
752
753        self.release()
754
755        return gw_return_object

Exports an archive using the Glasswall engine.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)

def export_directory( self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.archive_manager.ArchiveManager] = None, raise_unsupported: bool = True):
757    def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
758        """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.
759
760        Args:
761            input_directory (str): The input directory containing archives to export.
762            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
763            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
764            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
765            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
766
767        Returns:
768            exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
769        """
770        exported_archives_dict = {}
771        # Call export_archive on each file in input_directory to output_directory
772        for input_file in utils.list_file_paths(input_directory):
773            relative_path = os.path.relpath(input_file, input_directory)
774            # Construct paths for output file and output report
775            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
776            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
777
778            result = self.export_archive(
779                input_file=input_file,
780                output_file=output_file,
781                output_report=output_report,
782                content_management_policy=content_management_policy,
783                raise_unsupported=raise_unsupported,
784            )
785
786            exported_archives_dict[relative_path] = result
787
788        return exported_archives_dict

Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing archives to export. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

def import_archive( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.archive_manager.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: Optional[bool] = True):
790    def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: Optional[bool] = True):
791        """ Imports an archive using the Glasswall engine.
792
793        Args:
794            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
795            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
796            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
797            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
798            include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive.
799            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
800
801        Returns:
802            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)
803        """
804        # Validate arg types
805        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
806            raise TypeError(input_file)
807        if not isinstance(output_file, (type(None), str)):
808            raise TypeError(output_file)
809        if not isinstance(output_report, (type(None), str)):
810            raise TypeError(output_report)
811        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
812            raise TypeError(content_management_policy)
813
814        # Convert string path arguments to absolute paths
815        if isinstance(input_file, str):
816            input_file = os.path.abspath(input_file)
817        # Convert string path arguments to absolute paths
818        if isinstance(output_file, str):
819            output_file = os.path.abspath(output_file)
820        if isinstance(output_report, str):
821            output_report = os.path.abspath(output_report)
822
823        # Convert inputs to bytes
824        if isinstance(input_file, str):
825            if not os.path.isfile(input_file):
826                raise FileNotFoundError(input_file)
827            with open(input_file, "rb") as f:
828                input_file_bytes = f.read()
829        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
830            input_file_bytes = utils.as_bytes(input_file)
831
832        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
833            with open(content_management_policy, "rb") as f:
834                content_management_policy = f.read()
835        elif isinstance(content_management_policy, type(None)):
836            # Load default
837            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
838        content_management_policy = utils.validate_xml(content_management_policy)
839
840        # API function declaration
841        self.library.GwFileImportArchive.argtypes = [
842            ct.c_void_p,  # void *inputBuffer
843            ct.c_size_t,  # size_t inputBufferLength
844            ct.POINTER(ct.c_void_p),  # void **outputFileBuffer
845            ct.POINTER(ct.c_size_t),  # size_t *outputFileBufferLength
846            ct.POINTER(ct.c_void_p),  # void **outputReportBuffer
847            ct.POINTER(ct.c_size_t),  # size_t *outputReportBufferLength
848            ct.c_char_p,  # const char *xmlConfigString
849            ct.c_int  # int includeAnalysisReports
850        ]
851
852        # Variable initialisation
853        gw_return_object = glasswall.GwReturnObj()
854        gw_return_object.input_buffer = ct.create_string_buffer(input_file_bytes)
855        gw_return_object.input_buffer_length = ct.c_size_t(len(input_file_bytes))
856        gw_return_object.output_buffer = ct.c_void_p()
857        gw_return_object.output_buffer_length = ct.c_size_t()
858        gw_return_object.output_report_buffer = ct.c_void_p()
859        gw_return_object.output_report_buffer_length = ct.c_size_t()
860        gw_return_object.content_management_policy = ct.c_char_p(content_management_policy.encode())
861        gw_return_object.include_analysis_report = ct.c_int(int(include_analysis_report))
862
863        with utils.CwdHandler(new_cwd=self.library_path):
864            # API call
865            gw_return_object.status = self.library.GwFileImportArchive(
866                gw_return_object.input_buffer,
867                gw_return_object.input_buffer_length,
868                ct.byref(gw_return_object.output_buffer),
869                ct.byref(gw_return_object.output_buffer_length),
870                ct.byref(gw_return_object.output_report_buffer),
871                ct.byref(gw_return_object.output_report_buffer_length),
872                gw_return_object.content_management_policy,
873                gw_return_object.include_analysis_report
874            )
875
876        if gw_return_object.output_buffer and gw_return_object.output_buffer_length:
877            gw_return_object.output_file = utils.buffer_to_bytes(
878                gw_return_object.output_buffer,
879                gw_return_object.output_buffer_length
880            )
881        if gw_return_object.output_report_buffer and gw_return_object.output_report_buffer_length:
882            gw_return_object.output_report = utils.buffer_to_bytes(
883                gw_return_object.output_report_buffer,
884                gw_return_object.output_report_buffer_length
885            )
886
887        # Write output file
888        if hasattr(gw_return_object, "output_file"):
889            if isinstance(output_file, str):
890                os.makedirs(os.path.dirname(output_file), exist_ok=True)
891                with open(output_file, "wb") as f:
892                    f.write(gw_return_object.output_file)
893
894        # Write output report
895        if hasattr(gw_return_object, "output_report"):
896            if isinstance(output_report, str):
897                os.makedirs(os.path.dirname(output_report), exist_ok=True)
898                with open(output_report, "wb") as f:
899                    f.write(gw_return_object.output_report)
900
901        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
902        if gw_return_object.status not in successes.success_codes:
903            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
904            if raise_unsupported:
905                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
906        else:
907            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
908
909        self.release()
910
911        return gw_return_object

Imports an archive using the Glasswall engine.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes including: "status" (int), "output_file" (bytes), "output_report" (bytes)

def import_directory( self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[NoneType, str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.archive_manager.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: bool = True):
913    def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[bool] = False, raise_unsupported: bool = True):
914        """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.
915
916        Args:
917            input_directory (str): The input directory containing archives to import.
918            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
919            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
920            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
921            include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive.
922            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
923
924        Returns:
925            imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
926        """
927        imported_archives_dict = {}
928        # Call import_archive on each file in input_directory to output_directory
929        for input_file in utils.list_file_paths(input_directory):
930            relative_path = os.path.relpath(input_file, input_directory)
931            # Construct paths for output file and output report
932            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
933            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
934
935            result = self.import_archive(
936                input_file=input_file,
937                output_file=output_file,
938                output_report=output_report,
939                content_management_policy=content_management_policy,
940                include_analysis_report=include_analysis_report,
941                raise_unsupported=raise_unsupported,
942            )
943
944            imported_archives_dict[relative_path] = result
945
946        return imported_archives_dict

Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing archives to import. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. include_analysis_report (Optional[bool], optional): Default False. If True, write the analysis report into the imported archive. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)