glasswall.libraries.word_search.word_search

  1import ctypes as ct
  2import io
  3import os
  4from typing import Optional, Union
  5
  6import glasswall
  7from glasswall import utils
  8from glasswall.config.logging import log
  9from glasswall.libraries.library import Library
 10from glasswall.libraries.word_search import errors, successes
 11
 12
 13class WordSearch(Library):
 14    """ A high level Python wrapper for Glasswall WordSearch. """
 15
 16    def __init__(self, library_path: str):
 17        super().__init__(library_path=library_path)
 18        self.library = self.load_library(os.path.abspath(library_path))
 19
 20        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
 21
 22    def version(self):
 23        """ Returns the Glasswall library version.
 24
 25        Returns:
 26            version (str): The Glasswall library version.
 27        """
 28        # API function declaration
 29        self.library.GwWordSearchVersion.restype = ct.c_char_p
 30
 31        # API call
 32        version = self.library.GwWordSearchVersion()
 33
 34        # Convert to Python string
 35        version = ct.string_at(version).decode()
 36
 37        return version
 38
 39    @glasswall.utils.deprecated_alias(xml_config="content_management_policy")
 40    def redact_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], content_management_policy: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, output_report: Union[None, str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True):
 41        """ Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report.
 42
 43        Args:
 44            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 45            content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply.
 46            output_file (Union[None, str], optional): Default None. If str, write output_file to that path.
 47            output_report (Union[None, str], optional): Default None. If str, write output_file to that path.
 48            homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes.
 49            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 50
 51        Returns:
 52            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
 53        """
 54        # Validate arg types
 55        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 56            raise TypeError(input_file)
 57        if not isinstance(content_management_policy, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 58            raise TypeError(content_management_policy)
 59        if not isinstance(output_file, (type(None), str)):
 60            raise TypeError(output_file)
 61        if not isinstance(output_report, (type(None), str)):
 62            raise TypeError(output_report)
 63        if not isinstance(homoglyphs, (type(None), str, bytes, bytearray, io.BytesIO)):
 64            raise TypeError(homoglyphs)
 65
 66        # Convert string path arguments to absolute paths
 67        if isinstance(output_file, str):
 68            output_file = os.path.abspath(output_file)
 69
 70        if isinstance(output_report, str):
 71            output_report = os.path.abspath(output_report)
 72
 73        # Convert inputs to bytes
 74        if isinstance(input_file, str):
 75            with open(input_file, "rb") as f:
 76                input_file_bytes = f.read()
 77        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 78            input_file_bytes = utils.as_bytes(input_file)
 79        # warn if input_file is 0 bytes
 80        if not input_file_bytes:
 81            log.warning(f"input_file is 0 bytes\n\tinput_file: {input_file}")
 82
 83        if isinstance(homoglyphs, str):
 84            with open(homoglyphs, "rb") as f:
 85                homoglyphs_bytes = f.read()
 86        elif isinstance(homoglyphs, (bytes, bytearray, io.BytesIO)):
 87            homoglyphs_bytes = utils.as_bytes(homoglyphs)
 88        elif isinstance(homoglyphs, type(None)):
 89            # Load default
 90            with open(os.path.join(glasswall._ROOT, "config", "word_search", "homoglyphs.json"), "rb") as f:
 91                homoglyphs_bytes = f.read()
 92
 93        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 94            with open(content_management_policy, "rb") as f:
 95                content_management_policy = f.read()
 96        content_management_policy = utils.validate_xml(content_management_policy)
 97
 98        # Variable initialisation
 99        ct_input_buffer = ct.c_char_p(input_file_bytes)
100        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))
101        ct_output_buffer = ct.c_void_p()
102        ct_output_buffer_length = ct.c_size_t()
103        ct_output_report_buffer = ct.c_void_p()
104        ct_output_report_buffer_length = ct.c_size_t()
105        ct_homoglyphs = ct.c_char_p(homoglyphs_bytes)
106        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())
107        gw_return_object = glasswall.GwReturnObj()
108
109        with utils.CwdHandler(new_cwd=self.library_path):
110            gw_return_object.status = self.library.GwWordSearch(
111                ct_input_buffer,
112                ct_input_buffer_length,
113                ct.byref(ct_output_buffer),
114                ct.byref(ct_output_buffer_length),
115                ct.byref(ct_output_report_buffer),
116                ct.byref(ct_output_report_buffer_length),
117                ct_homoglyphs,
118                ct_content_management_policy
119            )
120
121        gw_return_object.output_file = utils.buffer_to_bytes(
122            ct_output_buffer,
123            ct_output_buffer_length
124        )
125        gw_return_object.output_report = utils.buffer_to_bytes(
126            ct_output_report_buffer,
127            ct_output_report_buffer_length
128        )
129
130        # Write output report
131        if gw_return_object.output_report:
132            if isinstance(output_report, str):
133                os.makedirs(os.path.dirname(output_report), exist_ok=True)
134                with open(output_report, "wb") as f:
135                    f.write(gw_return_object.output_report)
136
137        input_file_repr = f"{type(input_file_bytes)} length {len(input_file_bytes)}" if not isinstance(input_file, str) else input_file
138        output_file_repr = f"{type(gw_return_object.output_file)} length {len(gw_return_object.output_file)}"
139        output_report_repr = f"{type(gw_return_object.output_report)} length {len(gw_return_object.output_report)}"
140        homoglyphs_repr = f"{type(homoglyphs_bytes)} length {len(homoglyphs_bytes)}" if not isinstance(homoglyphs, str) else homoglyphs
141
142        if gw_return_object.status not in successes.success_codes:
143            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}")
144            if raise_unsupported:
145                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
146        else:
147            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}")
148
149        # Write output file
150        if gw_return_object.output_file:
151            if isinstance(output_file, str):
152                os.makedirs(os.path.dirname(output_file), exist_ok=True)
153                with open(output_file, "wb") as f:
154                    f.write(gw_return_object.output_file)
155
156        if input_file_bytes and not gw_return_object.output_file:
157            # input_file_bytes was not empty but output_file is unexpectedly empty
158            log.error(f"output_file empty\n\tinput_file: {input_file_repr}\n\tct_output_buffer: {ct_output_buffer}\n\tct_output_buffer_length: {ct_output_buffer_length}\n\toutput_file: {gw_return_object.output_file}")
159            if raise_unsupported:
160                raise errors.WordSearchError(f"Unexpected empty output_file after calling GwWordSearch\n\toutput_file: {output_file}")
161
162        if input_file_bytes and not gw_return_object.output_report:
163            # input_file_bytes was not empty but output_report is unexpectedly empty
164            log.error(f"output_report empty\n\tinput_file: {input_file_repr}\n\tct_output_report_buffer: {ct_output_report_buffer}\n\tct_output_report_buffer_length: {ct_output_report_buffer_length}")
165            if raise_unsupported:
166                raise errors.WordSearchError(f"Unexpected empty output_report after calling GwWordSearch\n\toutput_report: {output_report}")
167
168        return gw_return_object
169
170    @glasswall.utils.deprecated_alias(xml_config="content_management_policy")
171    def redact_directory(self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True):
172        """ Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory.
173
174        Args:
175            input_directory (str): The input directory containing files to redact.
176            output_directory (str): The output directory where the redacted files will be written.
177            output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written.
178            content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply.
179            homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes.
180            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
181
182        Returns:
183            redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
184        """
185        redacted_files_dict = {}
186        # Call redact_file on each file in input_directory
187        for input_file in utils.list_file_paths(input_directory):
188            relative_path = os.path.relpath(input_file, input_directory)
189            # Construct paths for output file and output report
190            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
191            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
192
193            result = self.redact_file(
194                input_file=input_file,
195                output_file=output_file,
196                output_report=output_report,
197                homoglyphs=homoglyphs,
198                content_management_policy=content_management_policy,
199                raise_unsupported=raise_unsupported,
200            )
201
202            redacted_files_dict[relative_path] = result
203
204        return redacted_files_dict
class WordSearch(glasswall.libraries.library.Library):
 16class WordSearch(Library):
 17    """ A high level Python wrapper for Glasswall WordSearch. """
 18
 19    def __init__(self, library_path: str):
 20        super().__init__(library_path=library_path)
 21        self.library = self.load_library(os.path.abspath(library_path))
 22
 23        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
 24
 25    def version(self):
 26        """ Returns the Glasswall library version.
 27
 28        Returns:
 29            version (str): The Glasswall library version.
 30        """
 31        # API function declaration
 32        self.library.GwWordSearchVersion.restype = ct.c_char_p
 33
 34        # API call
 35        version = self.library.GwWordSearchVersion()
 36
 37        # Convert to Python string
 38        version = ct.string_at(version).decode()
 39
 40        return version
 41
 42    @glasswall.utils.deprecated_alias(xml_config="content_management_policy")
 43    def redact_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], content_management_policy: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, output_report: Union[None, str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True):
 44        """ Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report.
 45
 46        Args:
 47            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 48            content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply.
 49            output_file (Union[None, str], optional): Default None. If str, write output_file to that path.
 50            output_report (Union[None, str], optional): Default None. If str, write output_file to that path.
 51            homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes.
 52            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 53
 54        Returns:
 55            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
 56        """
 57        # Validate arg types
 58        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 59            raise TypeError(input_file)
 60        if not isinstance(content_management_policy, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 61            raise TypeError(content_management_policy)
 62        if not isinstance(output_file, (type(None), str)):
 63            raise TypeError(output_file)
 64        if not isinstance(output_report, (type(None), str)):
 65            raise TypeError(output_report)
 66        if not isinstance(homoglyphs, (type(None), str, bytes, bytearray, io.BytesIO)):
 67            raise TypeError(homoglyphs)
 68
 69        # Convert string path arguments to absolute paths
 70        if isinstance(output_file, str):
 71            output_file = os.path.abspath(output_file)
 72
 73        if isinstance(output_report, str):
 74            output_report = os.path.abspath(output_report)
 75
 76        # Convert inputs to bytes
 77        if isinstance(input_file, str):
 78            with open(input_file, "rb") as f:
 79                input_file_bytes = f.read()
 80        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 81            input_file_bytes = utils.as_bytes(input_file)
 82        # warn if input_file is 0 bytes
 83        if not input_file_bytes:
 84            log.warning(f"input_file is 0 bytes\n\tinput_file: {input_file}")
 85
 86        if isinstance(homoglyphs, str):
 87            with open(homoglyphs, "rb") as f:
 88                homoglyphs_bytes = f.read()
 89        elif isinstance(homoglyphs, (bytes, bytearray, io.BytesIO)):
 90            homoglyphs_bytes = utils.as_bytes(homoglyphs)
 91        elif isinstance(homoglyphs, type(None)):
 92            # Load default
 93            with open(os.path.join(glasswall._ROOT, "config", "word_search", "homoglyphs.json"), "rb") as f:
 94                homoglyphs_bytes = f.read()
 95
 96        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 97            with open(content_management_policy, "rb") as f:
 98                content_management_policy = f.read()
 99        content_management_policy = utils.validate_xml(content_management_policy)
100
101        # Variable initialisation
102        ct_input_buffer = ct.c_char_p(input_file_bytes)
103        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))
104        ct_output_buffer = ct.c_void_p()
105        ct_output_buffer_length = ct.c_size_t()
106        ct_output_report_buffer = ct.c_void_p()
107        ct_output_report_buffer_length = ct.c_size_t()
108        ct_homoglyphs = ct.c_char_p(homoglyphs_bytes)
109        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())
110        gw_return_object = glasswall.GwReturnObj()
111
112        with utils.CwdHandler(new_cwd=self.library_path):
113            gw_return_object.status = self.library.GwWordSearch(
114                ct_input_buffer,
115                ct_input_buffer_length,
116                ct.byref(ct_output_buffer),
117                ct.byref(ct_output_buffer_length),
118                ct.byref(ct_output_report_buffer),
119                ct.byref(ct_output_report_buffer_length),
120                ct_homoglyphs,
121                ct_content_management_policy
122            )
123
124        gw_return_object.output_file = utils.buffer_to_bytes(
125            ct_output_buffer,
126            ct_output_buffer_length
127        )
128        gw_return_object.output_report = utils.buffer_to_bytes(
129            ct_output_report_buffer,
130            ct_output_report_buffer_length
131        )
132
133        # Write output report
134        if gw_return_object.output_report:
135            if isinstance(output_report, str):
136                os.makedirs(os.path.dirname(output_report), exist_ok=True)
137                with open(output_report, "wb") as f:
138                    f.write(gw_return_object.output_report)
139
140        input_file_repr = f"{type(input_file_bytes)} length {len(input_file_bytes)}" if not isinstance(input_file, str) else input_file
141        output_file_repr = f"{type(gw_return_object.output_file)} length {len(gw_return_object.output_file)}"
142        output_report_repr = f"{type(gw_return_object.output_report)} length {len(gw_return_object.output_report)}"
143        homoglyphs_repr = f"{type(homoglyphs_bytes)} length {len(homoglyphs_bytes)}" if not isinstance(homoglyphs, str) else homoglyphs
144
145        if gw_return_object.status not in successes.success_codes:
146            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}")
147            if raise_unsupported:
148                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
149        else:
150            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}")
151
152        # Write output file
153        if gw_return_object.output_file:
154            if isinstance(output_file, str):
155                os.makedirs(os.path.dirname(output_file), exist_ok=True)
156                with open(output_file, "wb") as f:
157                    f.write(gw_return_object.output_file)
158
159        if input_file_bytes and not gw_return_object.output_file:
160            # input_file_bytes was not empty but output_file is unexpectedly empty
161            log.error(f"output_file empty\n\tinput_file: {input_file_repr}\n\tct_output_buffer: {ct_output_buffer}\n\tct_output_buffer_length: {ct_output_buffer_length}\n\toutput_file: {gw_return_object.output_file}")
162            if raise_unsupported:
163                raise errors.WordSearchError(f"Unexpected empty output_file after calling GwWordSearch\n\toutput_file: {output_file}")
164
165        if input_file_bytes and not gw_return_object.output_report:
166            # input_file_bytes was not empty but output_report is unexpectedly empty
167            log.error(f"output_report empty\n\tinput_file: {input_file_repr}\n\tct_output_report_buffer: {ct_output_report_buffer}\n\tct_output_report_buffer_length: {ct_output_report_buffer_length}")
168            if raise_unsupported:
169                raise errors.WordSearchError(f"Unexpected empty output_report after calling GwWordSearch\n\toutput_report: {output_report}")
170
171        return gw_return_object
172
173    @glasswall.utils.deprecated_alias(xml_config="content_management_policy")
174    def redact_directory(self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True):
175        """ Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory.
176
177        Args:
178            input_directory (str): The input directory containing files to redact.
179            output_directory (str): The output directory where the redacted files will be written.
180            output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written.
181            content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply.
182            homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes.
183            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
184
185        Returns:
186            redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
187        """
188        redacted_files_dict = {}
189        # Call redact_file on each file in input_directory
190        for input_file in utils.list_file_paths(input_directory):
191            relative_path = os.path.relpath(input_file, input_directory)
192            # Construct paths for output file and output report
193            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
194            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
195
196            result = self.redact_file(
197                input_file=input_file,
198                output_file=output_file,
199                output_report=output_report,
200                homoglyphs=homoglyphs,
201                content_management_policy=content_management_policy,
202                raise_unsupported=raise_unsupported,
203            )
204
205            redacted_files_dict[relative_path] = result
206
207        return redacted_files_dict

A high level Python wrapper for Glasswall WordSearch.

WordSearch(library_path: str)
19    def __init__(self, library_path: str):
20        super().__init__(library_path=library_path)
21        self.library = self.load_library(os.path.abspath(library_path))
22
23        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
library
def version(self):
25    def version(self):
26        """ Returns the Glasswall library version.
27
28        Returns:
29            version (str): The Glasswall library version.
30        """
31        # API function declaration
32        self.library.GwWordSearchVersion.restype = ct.c_char_p
33
34        # API call
35        version = self.library.GwWordSearchVersion()
36
37        # Convert to Python string
38        version = ct.string_at(version).decode()
39
40        return version

Returns the Glasswall library version.

Returns: version (str): The Glasswall library version.

@glasswall.utils.deprecated_alias(xml_config='content_management_policy')
def redact_file( self, input_file: Union[str, bytes, bytearray, _io.BytesIO], content_management_policy: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, homoglyphs: Union[NoneType, str, bytes, bytearray, _io.BytesIO] = None, raise_unsupported: bool = True):
 42    @glasswall.utils.deprecated_alias(xml_config="content_management_policy")
 43    def redact_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], content_management_policy: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, output_report: Union[None, str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True):
 44        """ Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report.
 45
 46        Args:
 47            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
 48            content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply.
 49            output_file (Union[None, str], optional): Default None. If str, write output_file to that path.
 50            output_report (Union[None, str], optional): Default None. If str, write output_file to that path.
 51            homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes.
 52            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 53
 54        Returns:
 55            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
 56        """
 57        # Validate arg types
 58        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
 59            raise TypeError(input_file)
 60        if not isinstance(content_management_policy, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
 61            raise TypeError(content_management_policy)
 62        if not isinstance(output_file, (type(None), str)):
 63            raise TypeError(output_file)
 64        if not isinstance(output_report, (type(None), str)):
 65            raise TypeError(output_report)
 66        if not isinstance(homoglyphs, (type(None), str, bytes, bytearray, io.BytesIO)):
 67            raise TypeError(homoglyphs)
 68
 69        # Convert string path arguments to absolute paths
 70        if isinstance(output_file, str):
 71            output_file = os.path.abspath(output_file)
 72
 73        if isinstance(output_report, str):
 74            output_report = os.path.abspath(output_report)
 75
 76        # Convert inputs to bytes
 77        if isinstance(input_file, str):
 78            with open(input_file, "rb") as f:
 79                input_file_bytes = f.read()
 80        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
 81            input_file_bytes = utils.as_bytes(input_file)
 82        # warn if input_file is 0 bytes
 83        if not input_file_bytes:
 84            log.warning(f"input_file is 0 bytes\n\tinput_file: {input_file}")
 85
 86        if isinstance(homoglyphs, str):
 87            with open(homoglyphs, "rb") as f:
 88                homoglyphs_bytes = f.read()
 89        elif isinstance(homoglyphs, (bytes, bytearray, io.BytesIO)):
 90            homoglyphs_bytes = utils.as_bytes(homoglyphs)
 91        elif isinstance(homoglyphs, type(None)):
 92            # Load default
 93            with open(os.path.join(glasswall._ROOT, "config", "word_search", "homoglyphs.json"), "rb") as f:
 94                homoglyphs_bytes = f.read()
 95
 96        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
 97            with open(content_management_policy, "rb") as f:
 98                content_management_policy = f.read()
 99        content_management_policy = utils.validate_xml(content_management_policy)
100
101        # Variable initialisation
102        ct_input_buffer = ct.c_char_p(input_file_bytes)
103        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))
104        ct_output_buffer = ct.c_void_p()
105        ct_output_buffer_length = ct.c_size_t()
106        ct_output_report_buffer = ct.c_void_p()
107        ct_output_report_buffer_length = ct.c_size_t()
108        ct_homoglyphs = ct.c_char_p(homoglyphs_bytes)
109        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())
110        gw_return_object = glasswall.GwReturnObj()
111
112        with utils.CwdHandler(new_cwd=self.library_path):
113            gw_return_object.status = self.library.GwWordSearch(
114                ct_input_buffer,
115                ct_input_buffer_length,
116                ct.byref(ct_output_buffer),
117                ct.byref(ct_output_buffer_length),
118                ct.byref(ct_output_report_buffer),
119                ct.byref(ct_output_report_buffer_length),
120                ct_homoglyphs,
121                ct_content_management_policy
122            )
123
124        gw_return_object.output_file = utils.buffer_to_bytes(
125            ct_output_buffer,
126            ct_output_buffer_length
127        )
128        gw_return_object.output_report = utils.buffer_to_bytes(
129            ct_output_report_buffer,
130            ct_output_report_buffer_length
131        )
132
133        # Write output report
134        if gw_return_object.output_report:
135            if isinstance(output_report, str):
136                os.makedirs(os.path.dirname(output_report), exist_ok=True)
137                with open(output_report, "wb") as f:
138                    f.write(gw_return_object.output_report)
139
140        input_file_repr = f"{type(input_file_bytes)} length {len(input_file_bytes)}" if not isinstance(input_file, str) else input_file
141        output_file_repr = f"{type(gw_return_object.output_file)} length {len(gw_return_object.output_file)}"
142        output_report_repr = f"{type(gw_return_object.output_report)} length {len(gw_return_object.output_report)}"
143        homoglyphs_repr = f"{type(homoglyphs_bytes)} length {len(homoglyphs_bytes)}" if not isinstance(homoglyphs, str) else homoglyphs
144
145        if gw_return_object.status not in successes.success_codes:
146            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}")
147            if raise_unsupported:
148                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
149        else:
150            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}")
151
152        # Write output file
153        if gw_return_object.output_file:
154            if isinstance(output_file, str):
155                os.makedirs(os.path.dirname(output_file), exist_ok=True)
156                with open(output_file, "wb") as f:
157                    f.write(gw_return_object.output_file)
158
159        if input_file_bytes and not gw_return_object.output_file:
160            # input_file_bytes was not empty but output_file is unexpectedly empty
161            log.error(f"output_file empty\n\tinput_file: {input_file_repr}\n\tct_output_buffer: {ct_output_buffer}\n\tct_output_buffer_length: {ct_output_buffer_length}\n\toutput_file: {gw_return_object.output_file}")
162            if raise_unsupported:
163                raise errors.WordSearchError(f"Unexpected empty output_file after calling GwWordSearch\n\toutput_file: {output_file}")
164
165        if input_file_bytes and not gw_return_object.output_report:
166            # input_file_bytes was not empty but output_report is unexpectedly empty
167            log.error(f"output_report empty\n\tinput_file: {input_file_repr}\n\tct_output_report_buffer: {ct_output_report_buffer}\n\tct_output_report_buffer_length: {ct_output_report_buffer_length}")
168            if raise_unsupported:
169                raise errors.WordSearchError(f"Unexpected empty output_report after calling GwWordSearch\n\toutput_report: {output_report}")
170
171        return gw_return_object

Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report.

Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. output_file (Union[None, str], optional): Default None. If str, write output_file to that path. output_report (Union[None, str], optional): Default None. If str, write output_file to that path. homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

@glasswall.utils.deprecated_alias(xml_config='content_management_policy')
def redact_directory( self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, _io.BytesIO, glasswall.content_management.policies.policy.Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[NoneType, str, bytes, bytearray, _io.BytesIO] = None, raise_unsupported: bool = True):
173    @glasswall.utils.deprecated_alias(xml_config="content_management_policy")
174    def redact_directory(self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True):
175        """ Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory.
176
177        Args:
178            input_directory (str): The input directory containing files to redact.
179            output_directory (str): The output directory where the redacted files will be written.
180            output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written.
181            content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply.
182            homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes.
183            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
184
185        Returns:
186            redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
187        """
188        redacted_files_dict = {}
189        # Call redact_file on each file in input_directory
190        for input_file in utils.list_file_paths(input_directory):
191            relative_path = os.path.relpath(input_file, input_directory)
192            # Construct paths for output file and output report
193            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
194            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
195
196            result = self.redact_file(
197                input_file=input_file,
198                output_file=output_file,
199                output_report=output_report,
200                homoglyphs=homoglyphs,
201                content_management_policy=content_management_policy,
202                raise_unsupported=raise_unsupported,
203            )
204
205            redacted_files_dict[relative_path] = result
206
207        return redacted_files_dict

Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory.

Args: input_directory (str): The input directory containing files to redact. output_directory (str): The output directory where the redacted files will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written. content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)