glasswall.libraries.word_search.word_search
1import ctypes as ct 2import io 3import os 4from typing import Optional, Union 5 6import glasswall 7from glasswall import utils 8from glasswall.config.logging import log 9from glasswall.libraries.library import Library 10from glasswall.libraries.word_search import errors, successes 11 12 13class WordSearch(Library): 14 """ A high level Python wrapper for Glasswall WordSearch. """ 15 16 def __init__(self, library_path: str): 17 super().__init__(library_path=library_path) 18 self.library = self.load_library(os.path.abspath(library_path)) 19 20 log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") 21 22 def version(self): 23 """ Returns the Glasswall library version. 24 25 Returns: 26 version (str): The Glasswall library version. 27 """ 28 # API function declaration 29 self.library.GwWordSearchVersion.restype = ct.c_char_p 30 31 # API call 32 version = self.library.GwWordSearchVersion() 33 34 # Convert to Python string 35 version = ct.string_at(version).decode() 36 37 return version 38 39 @glasswall.utils.deprecated_alias(xml_config="content_management_policy") 40 def redact_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], content_management_policy: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, output_report: Union[None, str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True): 41 """ Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report. 42 43 Args: 44 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 45 content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. 46 output_file (Union[None, str], optional): Default None. If str, write output_file to that path. 47 output_report (Union[None, str], optional): Default None. If str, write output_file to that path. 48 homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes. 49 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 50 51 Returns: 52 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 53 """ 54 # Validate arg types 55 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 56 raise TypeError(input_file) 57 if not isinstance(content_management_policy, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 58 raise TypeError(content_management_policy) 59 if not isinstance(output_file, (type(None), str)): 60 raise TypeError(output_file) 61 if not isinstance(output_report, (type(None), str)): 62 raise TypeError(output_report) 63 if not isinstance(homoglyphs, (type(None), str, bytes, bytearray, io.BytesIO)): 64 raise TypeError(homoglyphs) 65 66 # Convert string path arguments to absolute paths 67 if isinstance(output_file, str): 68 output_file = os.path.abspath(output_file) 69 70 if isinstance(output_report, str): 71 output_report = os.path.abspath(output_report) 72 73 # Convert inputs to bytes 74 if isinstance(input_file, str): 75 with open(input_file, "rb") as f: 76 input_file_bytes = f.read() 77 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 78 input_file_bytes = utils.as_bytes(input_file) 79 # warn if input_file is 0 bytes 80 if not input_file_bytes: 81 log.warning(f"input_file is 0 bytes\n\tinput_file: {input_file}") 82 83 if isinstance(homoglyphs, str): 84 with open(homoglyphs, "rb") as f: 85 homoglyphs_bytes = f.read() 86 elif isinstance(homoglyphs, (bytes, bytearray, io.BytesIO)): 87 homoglyphs_bytes = utils.as_bytes(homoglyphs) 88 elif isinstance(homoglyphs, type(None)): 89 # Load default 90 with open(os.path.join(glasswall._ROOT, "config", "word_search", "homoglyphs.json"), "rb") as f: 91 homoglyphs_bytes = f.read() 92 93 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 94 with open(content_management_policy, "rb") as f: 95 content_management_policy = f.read() 96 content_management_policy = utils.validate_xml(content_management_policy) 97 98 # Variable initialisation 99 ct_input_buffer = ct.c_char_p(input_file_bytes) 100 ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) 101 ct_output_buffer = ct.c_void_p() 102 ct_output_buffer_length = ct.c_size_t() 103 ct_output_report_buffer = ct.c_void_p() 104 ct_output_report_buffer_length = ct.c_size_t() 105 ct_homoglyphs = ct.c_char_p(homoglyphs_bytes) 106 ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) 107 gw_return_object = glasswall.GwReturnObj() 108 109 with utils.CwdHandler(new_cwd=self.library_path): 110 gw_return_object.status = self.library.GwWordSearch( 111 ct_input_buffer, 112 ct_input_buffer_length, 113 ct.byref(ct_output_buffer), 114 ct.byref(ct_output_buffer_length), 115 ct.byref(ct_output_report_buffer), 116 ct.byref(ct_output_report_buffer_length), 117 ct_homoglyphs, 118 ct_content_management_policy 119 ) 120 121 gw_return_object.output_file = utils.buffer_to_bytes( 122 ct_output_buffer, 123 ct_output_buffer_length 124 ) 125 gw_return_object.output_report = utils.buffer_to_bytes( 126 ct_output_report_buffer, 127 ct_output_report_buffer_length 128 ) 129 130 # Write output report 131 if gw_return_object.output_report: 132 if isinstance(output_report, str): 133 os.makedirs(os.path.dirname(output_report), exist_ok=True) 134 with open(output_report, "wb") as f: 135 f.write(gw_return_object.output_report) 136 137 input_file_repr = f"{type(input_file_bytes)} length {len(input_file_bytes)}" if not isinstance(input_file, str) else input_file 138 output_file_repr = f"{type(gw_return_object.output_file)} length {len(gw_return_object.output_file)}" 139 output_report_repr = f"{type(gw_return_object.output_report)} length {len(gw_return_object.output_report)}" 140 homoglyphs_repr = f"{type(homoglyphs_bytes)} length {len(homoglyphs_bytes)}" if not isinstance(homoglyphs, str) else homoglyphs 141 142 if gw_return_object.status not in successes.success_codes: 143 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}") 144 if raise_unsupported: 145 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 146 else: 147 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}") 148 149 # Write output file 150 if gw_return_object.output_file: 151 if isinstance(output_file, str): 152 os.makedirs(os.path.dirname(output_file), exist_ok=True) 153 with open(output_file, "wb") as f: 154 f.write(gw_return_object.output_file) 155 156 if input_file_bytes and not gw_return_object.output_file: 157 # input_file_bytes was not empty but output_file is unexpectedly empty 158 log.error(f"output_file empty\n\tinput_file: {input_file_repr}\n\tct_output_buffer: {ct_output_buffer}\n\tct_output_buffer_length: {ct_output_buffer_length}\n\toutput_file: {gw_return_object.output_file}") 159 if raise_unsupported: 160 raise errors.WordSearchError(f"Unexpected empty output_file after calling GwWordSearch\n\toutput_file: {output_file}") 161 162 if input_file_bytes and not gw_return_object.output_report: 163 # input_file_bytes was not empty but output_report is unexpectedly empty 164 log.error(f"output_report empty\n\tinput_file: {input_file_repr}\n\tct_output_report_buffer: {ct_output_report_buffer}\n\tct_output_report_buffer_length: {ct_output_report_buffer_length}") 165 if raise_unsupported: 166 raise errors.WordSearchError(f"Unexpected empty output_report after calling GwWordSearch\n\toutput_report: {output_report}") 167 168 return gw_return_object 169 170 @glasswall.utils.deprecated_alias(xml_config="content_management_policy") 171 def redact_directory(self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True): 172 """ Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory. 173 174 Args: 175 input_directory (str): The input directory containing files to redact. 176 output_directory (str): The output directory where the redacted files will be written. 177 output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written. 178 content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. 179 homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes. 180 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 181 182 Returns: 183 redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 184 """ 185 redacted_files_dict = {} 186 # Call redact_file on each file in input_directory 187 for input_file in utils.list_file_paths(input_directory): 188 relative_path = os.path.relpath(input_file, input_directory) 189 # Construct paths for output file and output report 190 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 191 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 192 193 result = self.redact_file( 194 input_file=input_file, 195 output_file=output_file, 196 output_report=output_report, 197 homoglyphs=homoglyphs, 198 content_management_policy=content_management_policy, 199 raise_unsupported=raise_unsupported, 200 ) 201 202 redacted_files_dict[relative_path] = result 203 204 return redacted_files_dict
16class WordSearch(Library): 17 """ A high level Python wrapper for Glasswall WordSearch. """ 18 19 def __init__(self, library_path: str): 20 super().__init__(library_path=library_path) 21 self.library = self.load_library(os.path.abspath(library_path)) 22 23 log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") 24 25 def version(self): 26 """ Returns the Glasswall library version. 27 28 Returns: 29 version (str): The Glasswall library version. 30 """ 31 # API function declaration 32 self.library.GwWordSearchVersion.restype = ct.c_char_p 33 34 # API call 35 version = self.library.GwWordSearchVersion() 36 37 # Convert to Python string 38 version = ct.string_at(version).decode() 39 40 return version 41 42 @glasswall.utils.deprecated_alias(xml_config="content_management_policy") 43 def redact_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], content_management_policy: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, output_report: Union[None, str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True): 44 """ Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report. 45 46 Args: 47 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 48 content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. 49 output_file (Union[None, str], optional): Default None. If str, write output_file to that path. 50 output_report (Union[None, str], optional): Default None. If str, write output_file to that path. 51 homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes. 52 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 53 54 Returns: 55 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 56 """ 57 # Validate arg types 58 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 59 raise TypeError(input_file) 60 if not isinstance(content_management_policy, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 61 raise TypeError(content_management_policy) 62 if not isinstance(output_file, (type(None), str)): 63 raise TypeError(output_file) 64 if not isinstance(output_report, (type(None), str)): 65 raise TypeError(output_report) 66 if not isinstance(homoglyphs, (type(None), str, bytes, bytearray, io.BytesIO)): 67 raise TypeError(homoglyphs) 68 69 # Convert string path arguments to absolute paths 70 if isinstance(output_file, str): 71 output_file = os.path.abspath(output_file) 72 73 if isinstance(output_report, str): 74 output_report = os.path.abspath(output_report) 75 76 # Convert inputs to bytes 77 if isinstance(input_file, str): 78 with open(input_file, "rb") as f: 79 input_file_bytes = f.read() 80 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 81 input_file_bytes = utils.as_bytes(input_file) 82 # warn if input_file is 0 bytes 83 if not input_file_bytes: 84 log.warning(f"input_file is 0 bytes\n\tinput_file: {input_file}") 85 86 if isinstance(homoglyphs, str): 87 with open(homoglyphs, "rb") as f: 88 homoglyphs_bytes = f.read() 89 elif isinstance(homoglyphs, (bytes, bytearray, io.BytesIO)): 90 homoglyphs_bytes = utils.as_bytes(homoglyphs) 91 elif isinstance(homoglyphs, type(None)): 92 # Load default 93 with open(os.path.join(glasswall._ROOT, "config", "word_search", "homoglyphs.json"), "rb") as f: 94 homoglyphs_bytes = f.read() 95 96 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 97 with open(content_management_policy, "rb") as f: 98 content_management_policy = f.read() 99 content_management_policy = utils.validate_xml(content_management_policy) 100 101 # Variable initialisation 102 ct_input_buffer = ct.c_char_p(input_file_bytes) 103 ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) 104 ct_output_buffer = ct.c_void_p() 105 ct_output_buffer_length = ct.c_size_t() 106 ct_output_report_buffer = ct.c_void_p() 107 ct_output_report_buffer_length = ct.c_size_t() 108 ct_homoglyphs = ct.c_char_p(homoglyphs_bytes) 109 ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) 110 gw_return_object = glasswall.GwReturnObj() 111 112 with utils.CwdHandler(new_cwd=self.library_path): 113 gw_return_object.status = self.library.GwWordSearch( 114 ct_input_buffer, 115 ct_input_buffer_length, 116 ct.byref(ct_output_buffer), 117 ct.byref(ct_output_buffer_length), 118 ct.byref(ct_output_report_buffer), 119 ct.byref(ct_output_report_buffer_length), 120 ct_homoglyphs, 121 ct_content_management_policy 122 ) 123 124 gw_return_object.output_file = utils.buffer_to_bytes( 125 ct_output_buffer, 126 ct_output_buffer_length 127 ) 128 gw_return_object.output_report = utils.buffer_to_bytes( 129 ct_output_report_buffer, 130 ct_output_report_buffer_length 131 ) 132 133 # Write output report 134 if gw_return_object.output_report: 135 if isinstance(output_report, str): 136 os.makedirs(os.path.dirname(output_report), exist_ok=True) 137 with open(output_report, "wb") as f: 138 f.write(gw_return_object.output_report) 139 140 input_file_repr = f"{type(input_file_bytes)} length {len(input_file_bytes)}" if not isinstance(input_file, str) else input_file 141 output_file_repr = f"{type(gw_return_object.output_file)} length {len(gw_return_object.output_file)}" 142 output_report_repr = f"{type(gw_return_object.output_report)} length {len(gw_return_object.output_report)}" 143 homoglyphs_repr = f"{type(homoglyphs_bytes)} length {len(homoglyphs_bytes)}" if not isinstance(homoglyphs, str) else homoglyphs 144 145 if gw_return_object.status not in successes.success_codes: 146 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}") 147 if raise_unsupported: 148 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 149 else: 150 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}") 151 152 # Write output file 153 if gw_return_object.output_file: 154 if isinstance(output_file, str): 155 os.makedirs(os.path.dirname(output_file), exist_ok=True) 156 with open(output_file, "wb") as f: 157 f.write(gw_return_object.output_file) 158 159 if input_file_bytes and not gw_return_object.output_file: 160 # input_file_bytes was not empty but output_file is unexpectedly empty 161 log.error(f"output_file empty\n\tinput_file: {input_file_repr}\n\tct_output_buffer: {ct_output_buffer}\n\tct_output_buffer_length: {ct_output_buffer_length}\n\toutput_file: {gw_return_object.output_file}") 162 if raise_unsupported: 163 raise errors.WordSearchError(f"Unexpected empty output_file after calling GwWordSearch\n\toutput_file: {output_file}") 164 165 if input_file_bytes and not gw_return_object.output_report: 166 # input_file_bytes was not empty but output_report is unexpectedly empty 167 log.error(f"output_report empty\n\tinput_file: {input_file_repr}\n\tct_output_report_buffer: {ct_output_report_buffer}\n\tct_output_report_buffer_length: {ct_output_report_buffer_length}") 168 if raise_unsupported: 169 raise errors.WordSearchError(f"Unexpected empty output_report after calling GwWordSearch\n\toutput_report: {output_report}") 170 171 return gw_return_object 172 173 @glasswall.utils.deprecated_alias(xml_config="content_management_policy") 174 def redact_directory(self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True): 175 """ Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory. 176 177 Args: 178 input_directory (str): The input directory containing files to redact. 179 output_directory (str): The output directory where the redacted files will be written. 180 output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written. 181 content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. 182 homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes. 183 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 184 185 Returns: 186 redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 187 """ 188 redacted_files_dict = {} 189 # Call redact_file on each file in input_directory 190 for input_file in utils.list_file_paths(input_directory): 191 relative_path = os.path.relpath(input_file, input_directory) 192 # Construct paths for output file and output report 193 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 194 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 195 196 result = self.redact_file( 197 input_file=input_file, 198 output_file=output_file, 199 output_report=output_report, 200 homoglyphs=homoglyphs, 201 content_management_policy=content_management_policy, 202 raise_unsupported=raise_unsupported, 203 ) 204 205 redacted_files_dict[relative_path] = result 206 207 return redacted_files_dict
A high level Python wrapper for Glasswall WordSearch.
25 def version(self): 26 """ Returns the Glasswall library version. 27 28 Returns: 29 version (str): The Glasswall library version. 30 """ 31 # API function declaration 32 self.library.GwWordSearchVersion.restype = ct.c_char_p 33 34 # API call 35 version = self.library.GwWordSearchVersion() 36 37 # Convert to Python string 38 version = ct.string_at(version).decode() 39 40 return version
Returns the Glasswall library version.
Returns: version (str): The Glasswall library version.
42 @glasswall.utils.deprecated_alias(xml_config="content_management_policy") 43 def redact_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], content_management_policy: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, output_report: Union[None, str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True): 44 """ Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report. 45 46 Args: 47 input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. 48 content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. 49 output_file (Union[None, str], optional): Default None. If str, write output_file to that path. 50 output_report (Union[None, str], optional): Default None. If str, write output_file to that path. 51 homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes. 52 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 53 54 Returns: 55 gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 56 """ 57 # Validate arg types 58 if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): 59 raise TypeError(input_file) 60 if not isinstance(content_management_policy, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): 61 raise TypeError(content_management_policy) 62 if not isinstance(output_file, (type(None), str)): 63 raise TypeError(output_file) 64 if not isinstance(output_report, (type(None), str)): 65 raise TypeError(output_report) 66 if not isinstance(homoglyphs, (type(None), str, bytes, bytearray, io.BytesIO)): 67 raise TypeError(homoglyphs) 68 69 # Convert string path arguments to absolute paths 70 if isinstance(output_file, str): 71 output_file = os.path.abspath(output_file) 72 73 if isinstance(output_report, str): 74 output_report = os.path.abspath(output_report) 75 76 # Convert inputs to bytes 77 if isinstance(input_file, str): 78 with open(input_file, "rb") as f: 79 input_file_bytes = f.read() 80 elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): 81 input_file_bytes = utils.as_bytes(input_file) 82 # warn if input_file is 0 bytes 83 if not input_file_bytes: 84 log.warning(f"input_file is 0 bytes\n\tinput_file: {input_file}") 85 86 if isinstance(homoglyphs, str): 87 with open(homoglyphs, "rb") as f: 88 homoglyphs_bytes = f.read() 89 elif isinstance(homoglyphs, (bytes, bytearray, io.BytesIO)): 90 homoglyphs_bytes = utils.as_bytes(homoglyphs) 91 elif isinstance(homoglyphs, type(None)): 92 # Load default 93 with open(os.path.join(glasswall._ROOT, "config", "word_search", "homoglyphs.json"), "rb") as f: 94 homoglyphs_bytes = f.read() 95 96 if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): 97 with open(content_management_policy, "rb") as f: 98 content_management_policy = f.read() 99 content_management_policy = utils.validate_xml(content_management_policy) 100 101 # Variable initialisation 102 ct_input_buffer = ct.c_char_p(input_file_bytes) 103 ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) 104 ct_output_buffer = ct.c_void_p() 105 ct_output_buffer_length = ct.c_size_t() 106 ct_output_report_buffer = ct.c_void_p() 107 ct_output_report_buffer_length = ct.c_size_t() 108 ct_homoglyphs = ct.c_char_p(homoglyphs_bytes) 109 ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) 110 gw_return_object = glasswall.GwReturnObj() 111 112 with utils.CwdHandler(new_cwd=self.library_path): 113 gw_return_object.status = self.library.GwWordSearch( 114 ct_input_buffer, 115 ct_input_buffer_length, 116 ct.byref(ct_output_buffer), 117 ct.byref(ct_output_buffer_length), 118 ct.byref(ct_output_report_buffer), 119 ct.byref(ct_output_report_buffer_length), 120 ct_homoglyphs, 121 ct_content_management_policy 122 ) 123 124 gw_return_object.output_file = utils.buffer_to_bytes( 125 ct_output_buffer, 126 ct_output_buffer_length 127 ) 128 gw_return_object.output_report = utils.buffer_to_bytes( 129 ct_output_report_buffer, 130 ct_output_report_buffer_length 131 ) 132 133 # Write output report 134 if gw_return_object.output_report: 135 if isinstance(output_report, str): 136 os.makedirs(os.path.dirname(output_report), exist_ok=True) 137 with open(output_report, "wb") as f: 138 f.write(gw_return_object.output_report) 139 140 input_file_repr = f"{type(input_file_bytes)} length {len(input_file_bytes)}" if not isinstance(input_file, str) else input_file 141 output_file_repr = f"{type(gw_return_object.output_file)} length {len(gw_return_object.output_file)}" 142 output_report_repr = f"{type(gw_return_object.output_report)} length {len(gw_return_object.output_report)}" 143 homoglyphs_repr = f"{type(homoglyphs_bytes)} length {len(homoglyphs_bytes)}" if not isinstance(homoglyphs, str) else homoglyphs 144 145 if gw_return_object.status not in successes.success_codes: 146 log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}") 147 if raise_unsupported: 148 raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) 149 else: 150 log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}") 151 152 # Write output file 153 if gw_return_object.output_file: 154 if isinstance(output_file, str): 155 os.makedirs(os.path.dirname(output_file), exist_ok=True) 156 with open(output_file, "wb") as f: 157 f.write(gw_return_object.output_file) 158 159 if input_file_bytes and not gw_return_object.output_file: 160 # input_file_bytes was not empty but output_file is unexpectedly empty 161 log.error(f"output_file empty\n\tinput_file: {input_file_repr}\n\tct_output_buffer: {ct_output_buffer}\n\tct_output_buffer_length: {ct_output_buffer_length}\n\toutput_file: {gw_return_object.output_file}") 162 if raise_unsupported: 163 raise errors.WordSearchError(f"Unexpected empty output_file after calling GwWordSearch\n\toutput_file: {output_file}") 164 165 if input_file_bytes and not gw_return_object.output_report: 166 # input_file_bytes was not empty but output_report is unexpectedly empty 167 log.error(f"output_report empty\n\tinput_file: {input_file_repr}\n\tct_output_report_buffer: {ct_output_report_buffer}\n\tct_output_report_buffer_length: {ct_output_report_buffer_length}") 168 if raise_unsupported: 169 raise errors.WordSearchError(f"Unexpected empty output_report after calling GwWordSearch\n\toutput_report: {output_report}") 170 171 return gw_return_object
Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report.
Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. output_file (Union[None, str], optional): Default None. If str, write output_file to that path. output_report (Union[None, str], optional): Default None. If str, write output_file to that path. homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
173 @glasswall.utils.deprecated_alias(xml_config="content_management_policy") 174 def redact_directory(self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True): 175 """ Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory. 176 177 Args: 178 input_directory (str): The input directory containing files to redact. 179 output_directory (str): The output directory where the redacted files will be written. 180 output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written. 181 content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. 182 homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes. 183 raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. 184 185 Returns: 186 redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) 187 """ 188 redacted_files_dict = {} 189 # Call redact_file on each file in input_directory 190 for input_file in utils.list_file_paths(input_directory): 191 relative_path = os.path.relpath(input_file, input_directory) 192 # Construct paths for output file and output report 193 output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) 194 output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") 195 196 result = self.redact_file( 197 input_file=input_file, 198 output_file=output_file, 199 output_report=output_report, 200 homoglyphs=homoglyphs, 201 content_management_policy=content_management_policy, 202 raise_unsupported=raise_unsupported, 203 ) 204 205 redacted_files_dict[relative_path] = result 206 207 return redacted_files_dict
Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory.
Args: input_directory (str): The input directory containing files to redact. output_directory (str): The output directory where the redacted files will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written. content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns: redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)