glasswall.libraries.security_tagging.security_tagging

  1import ctypes as ct
  2import os
  3import sys
  4
  5from glasswall import utils
  6from glasswall.config.logging import log
  7from glasswall.libraries.library import Library
  8from glasswall.libraries.security_tagging import errors, successes
  9
 10
 11class SecurityTagging(Library):
 12    """ A high level Python wrapper for Glasswall Security Tagging. """
 13
 14    def __init__(self, library_path: str):
 15        super().__init__(library_path=library_path)
 16        self.library = self.load_library(os.path.abspath(library_path))
 17
 18        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
 19
 20    def version(self):
 21        """ Returns the Glasswall library version.
 22
 23        Returns:
 24            version (str): The Glasswall library version.
 25        """
 26        # TODO security tagging currently has no version function
 27        return "NOT_IMPLEMENTED"
 28
 29    def tag_file(self, tags_path: str, input_file: str, output_file: str, raise_unsupported: bool = True):
 30        """ Tags the input_file with xml loaded from tags_path, writing to output_file.
 31
 32        Args:
 33            tags_path (str): The path to the .xml file containing tags to add.
 34            input_file (str): The path to the input file.
 35            output_file (str): The path to the output file where the tagged input_file will be written to.
 36            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 37
 38        Returns:
 39            status (int): An integer indicating the file process status.
 40
 41        Raises:
 42            TypeError: If any arguments are of incorrect type.
 43            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)
 44        """
 45        # Validate arg types
 46        if not isinstance(tags_path, str):
 47            raise TypeError(tags_path)
 48        elif not os.path.isfile(tags_path):
 49            raise FileNotFoundError(tags_path)
 50
 51        if not isinstance(input_file, str):
 52            raise TypeError(input_file)
 53        elif not os.path.isfile(input_file):
 54            raise FileNotFoundError(input_file)
 55
 56        if not isinstance(output_file, str):
 57            raise TypeError(output_file)
 58
 59        if not isinstance(raise_unsupported, bool):
 60            raise TypeError(raise_unsupported)
 61
 62        # Convert paths to absolute paths
 63        tags_path = os.path.abspath(tags_path)
 64        input_file = os.path.abspath(input_file)
 65        output_file = os.path.abspath(output_file)
 66
 67        # Make output directory if it does not exist
 68        os.makedirs(os.path.dirname(output_file), exist_ok=True)
 69
 70        with utils.CwdHandler(self.library_path):
 71            # API function declaration
 72            self.library.GWSecuTag_TagFile.argtypes = [ct.c_char_p]
 73
 74            # Variable initialisation
 75            ct_tags_path = ct.c_char_p(tags_path.encode("utf-8"))
 76            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
 77            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
 78
 79            # API call
 80            status = self.library.GWSecuTag_TagFile(
 81                ct_input_file,
 82                ct_tags_path,
 83                ct_output_file,
 84            )
 85
 86            if status not in successes.success_codes:
 87                log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}")
 88                if raise_unsupported:
 89                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 90            else:
 91                log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}")
 92
 93            # TODO remove, temp fix - check if tags are retrievable, if not delete the file just created by glasswall
 94            # if status not in successes.success_codes: # status is currently incorrect in the library
 95            if not os.path.isfile(output_file):
 96                log.error(f"\n\toutput file does not exist: {output_file}")
 97                status = "OUTPUT_NOT_CREATED"
 98            elif os.path.isfile(output_file):
 99                with utils.TempFilePath() as temp_file:
100                    self.retrieve_tags(
101                        input_file=output_file,
102                        output_file=temp_file,
103                        raise_unsupported=False
104                    )
105                    try:
106                        dict_ = utils.xml_as_dict(temp_file)
107                    except ValueError:
108                        dict_ = {}
109                    if not dict_:
110                        os.remove(output_file)
111                        log.debug(f"\n\tunable to retrieve tags, deleted output file\n\toutput_file: {output_file}\n\t")
112                        status = "OUTPUT_NOT_RETRIEVABLE"
113
114            return status
115
116    def tag_directory(self, tags_path: str, input_directory: str, output_directory: str, raise_unsupported: bool = True):
117        """ Tags all files in input_directory with the xml loaded from tags_path, writing to output_directory and maintaining the same directory structure.
118
119        Args:
120            tags_path (str): The path to the .xml file containing tags to add.
121            input_directory (str): The path to the input directory.
122            output_directory (str): The path to the output directory where the tagged files will be written to.
123            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
124
125        Returns:
126            status (int): An integer indicating the file process status.
127
128        Raises:
129            TypeError: If any arguments are of incorrect type.
130            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)
131        """
132        # Validate arg types
133        if not isinstance(tags_path, str):
134            raise TypeError(tags_path)
135        elif not os.path.isfile(tags_path):
136            raise FileNotFoundError(tags_path)
137
138        if not isinstance(input_directory, str):
139            raise TypeError(input_directory)
140        elif not os.path.isdir(input_directory):
141            raise NotADirectoryError(input_directory)
142
143        if not isinstance(output_directory, str):
144            raise TypeError(output_directory)
145
146        if not isinstance(raise_unsupported, bool):
147            raise TypeError(raise_unsupported)
148
149        for relative_path in utils.list_file_paths(input_directory, absolute=False):
150            # construct absolute paths
151            input_file = os.path.abspath(os.path.join(input_directory, relative_path))
152            output_file = os.path.abspath(os.path.join(output_directory, relative_path))
153
154            # call tag_file on each file in input to output
155            self.tag_file(
156                tags_path=tags_path,
157                input_file=input_file,
158                output_file=output_file,
159                raise_unsupported=raise_unsupported,
160            )
161
162        utils.delete_empty_subdirectories(output_directory)
163
164    def retrieve_tags(self, input_file: str, output_file: str, raise_unsupported=True):
165        """ Retrieves the xml tags of the input_file and writes it to output_file.
166
167        Args:
168            input_file (str): The path to the input file.
169            output_file (str): The path to the output file where the xml tags will be written to.
170            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
171
172        Returns:
173            status (int): An integer indicating the file process status.
174
175        Raises:
176            TypeError: If any arguments are of incorrect type.
177            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)
178        """
179        if not isinstance(input_file, str):
180            raise TypeError(input_file)
181        elif not os.path.isfile(input_file):
182            raise FileNotFoundError(input_file)
183
184        if not isinstance(output_file, str):
185            raise TypeError(output_file)
186
187        # Convert paths to absolute paths
188        input_file = os.path.abspath(input_file)
189        output_file = os.path.abspath(output_file)
190
191        # Make output directory if it does not exist
192        os.makedirs(os.path.dirname(output_file), exist_ok=True)
193
194        log.debug(f"Attempting {sys._getframe().f_code.co_name}:\n\tinput_file: {input_file}\n\toutput_file: {output_file}")
195
196        with utils.CwdHandler(self.library_path):
197            # API function declaration
198            self.library.GWSecuTag_RetrieveTagFile.argtypes = [ct.c_char_p]
199
200            # Variable initialisation
201            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
202            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
203
204            # API call
205            status = self.library.GWSecuTag_RetrieveTagFile(
206                ct_input_file,
207                ct_output_file,
208            )
209
210            if status not in successes.success_codes:
211                log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}")
212                if raise_unsupported:
213                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
214            else:
215                log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}")
216
217            return status
218
219    def retrieve_tags_directory(self, input_directory: str, output_directory: str, raise_unsupported=True):
220        """ Retrieves all tags from files in input_directory, writing XML to output_directory and maintaining the same directory structure.
221
222        Args:
223            input_directory (str): The path to the input directory.
224            output_directory (str): The path to the output directory where the tagged files will be written to.
225            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
226
227        Returns:
228            status (int): The status of the function call.
229
230        Raises:
231            TypeError: If any arguments are of incorrect type.
232            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)
233        """
234        # Validate arg types
235        if not isinstance(input_directory, str):
236            raise TypeError(input_directory)
237        elif not os.path.isdir(input_directory):
238            raise NotADirectoryError(input_directory)
239
240        if not isinstance(output_directory, str):
241            raise TypeError(output_directory)
242
243        if not isinstance(raise_unsupported, bool):
244            raise TypeError(raise_unsupported)
245
246        for relative_path in utils.list_file_paths(input_directory, absolute=False):
247            # construct absolute paths
248            input_file = os.path.abspath(os.path.join(input_directory, relative_path))
249            output_file = os.path.abspath(os.path.join(output_directory, relative_path + ".xml"))
250
251            # call retrieve_tags on each file in input to output
252            self.retrieve_tags(
253                input_file=input_file,
254                output_file=output_file,
255                raise_unsupported=raise_unsupported,
256            )
257
258        utils.delete_empty_subdirectories(output_directory)
class SecurityTagging(glasswall.libraries.library.Library):
 14class SecurityTagging(Library):
 15    """ A high level Python wrapper for Glasswall Security Tagging. """
 16
 17    def __init__(self, library_path: str):
 18        super().__init__(library_path=library_path)
 19        self.library = self.load_library(os.path.abspath(library_path))
 20
 21        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
 22
 23    def version(self):
 24        """ Returns the Glasswall library version.
 25
 26        Returns:
 27            version (str): The Glasswall library version.
 28        """
 29        # TODO security tagging currently has no version function
 30        return "NOT_IMPLEMENTED"
 31
 32    def tag_file(self, tags_path: str, input_file: str, output_file: str, raise_unsupported: bool = True):
 33        """ Tags the input_file with xml loaded from tags_path, writing to output_file.
 34
 35        Args:
 36            tags_path (str): The path to the .xml file containing tags to add.
 37            input_file (str): The path to the input file.
 38            output_file (str): The path to the output file where the tagged input_file will be written to.
 39            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 40
 41        Returns:
 42            status (int): An integer indicating the file process status.
 43
 44        Raises:
 45            TypeError: If any arguments are of incorrect type.
 46            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)
 47        """
 48        # Validate arg types
 49        if not isinstance(tags_path, str):
 50            raise TypeError(tags_path)
 51        elif not os.path.isfile(tags_path):
 52            raise FileNotFoundError(tags_path)
 53
 54        if not isinstance(input_file, str):
 55            raise TypeError(input_file)
 56        elif not os.path.isfile(input_file):
 57            raise FileNotFoundError(input_file)
 58
 59        if not isinstance(output_file, str):
 60            raise TypeError(output_file)
 61
 62        if not isinstance(raise_unsupported, bool):
 63            raise TypeError(raise_unsupported)
 64
 65        # Convert paths to absolute paths
 66        tags_path = os.path.abspath(tags_path)
 67        input_file = os.path.abspath(input_file)
 68        output_file = os.path.abspath(output_file)
 69
 70        # Make output directory if it does not exist
 71        os.makedirs(os.path.dirname(output_file), exist_ok=True)
 72
 73        with utils.CwdHandler(self.library_path):
 74            # API function declaration
 75            self.library.GWSecuTag_TagFile.argtypes = [ct.c_char_p]
 76
 77            # Variable initialisation
 78            ct_tags_path = ct.c_char_p(tags_path.encode("utf-8"))
 79            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
 80            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
 81
 82            # API call
 83            status = self.library.GWSecuTag_TagFile(
 84                ct_input_file,
 85                ct_tags_path,
 86                ct_output_file,
 87            )
 88
 89            if status not in successes.success_codes:
 90                log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}")
 91                if raise_unsupported:
 92                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 93            else:
 94                log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}")
 95
 96            # TODO remove, temp fix - check if tags are retrievable, if not delete the file just created by glasswall
 97            # if status not in successes.success_codes: # status is currently incorrect in the library
 98            if not os.path.isfile(output_file):
 99                log.error(f"\n\toutput file does not exist: {output_file}")
100                status = "OUTPUT_NOT_CREATED"
101            elif os.path.isfile(output_file):
102                with utils.TempFilePath() as temp_file:
103                    self.retrieve_tags(
104                        input_file=output_file,
105                        output_file=temp_file,
106                        raise_unsupported=False
107                    )
108                    try:
109                        dict_ = utils.xml_as_dict(temp_file)
110                    except ValueError:
111                        dict_ = {}
112                    if not dict_:
113                        os.remove(output_file)
114                        log.debug(f"\n\tunable to retrieve tags, deleted output file\n\toutput_file: {output_file}\n\t")
115                        status = "OUTPUT_NOT_RETRIEVABLE"
116
117            return status
118
119    def tag_directory(self, tags_path: str, input_directory: str, output_directory: str, raise_unsupported: bool = True):
120        """ Tags all files in input_directory with the xml loaded from tags_path, writing to output_directory and maintaining the same directory structure.
121
122        Args:
123            tags_path (str): The path to the .xml file containing tags to add.
124            input_directory (str): The path to the input directory.
125            output_directory (str): The path to the output directory where the tagged files will be written to.
126            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
127
128        Returns:
129            status (int): An integer indicating the file process status.
130
131        Raises:
132            TypeError: If any arguments are of incorrect type.
133            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)
134        """
135        # Validate arg types
136        if not isinstance(tags_path, str):
137            raise TypeError(tags_path)
138        elif not os.path.isfile(tags_path):
139            raise FileNotFoundError(tags_path)
140
141        if not isinstance(input_directory, str):
142            raise TypeError(input_directory)
143        elif not os.path.isdir(input_directory):
144            raise NotADirectoryError(input_directory)
145
146        if not isinstance(output_directory, str):
147            raise TypeError(output_directory)
148
149        if not isinstance(raise_unsupported, bool):
150            raise TypeError(raise_unsupported)
151
152        for relative_path in utils.list_file_paths(input_directory, absolute=False):
153            # construct absolute paths
154            input_file = os.path.abspath(os.path.join(input_directory, relative_path))
155            output_file = os.path.abspath(os.path.join(output_directory, relative_path))
156
157            # call tag_file on each file in input to output
158            self.tag_file(
159                tags_path=tags_path,
160                input_file=input_file,
161                output_file=output_file,
162                raise_unsupported=raise_unsupported,
163            )
164
165        utils.delete_empty_subdirectories(output_directory)
166
167    def retrieve_tags(self, input_file: str, output_file: str, raise_unsupported=True):
168        """ Retrieves the xml tags of the input_file and writes it to output_file.
169
170        Args:
171            input_file (str): The path to the input file.
172            output_file (str): The path to the output file where the xml tags will be written to.
173            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
174
175        Returns:
176            status (int): An integer indicating the file process status.
177
178        Raises:
179            TypeError: If any arguments are of incorrect type.
180            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)
181        """
182        if not isinstance(input_file, str):
183            raise TypeError(input_file)
184        elif not os.path.isfile(input_file):
185            raise FileNotFoundError(input_file)
186
187        if not isinstance(output_file, str):
188            raise TypeError(output_file)
189
190        # Convert paths to absolute paths
191        input_file = os.path.abspath(input_file)
192        output_file = os.path.abspath(output_file)
193
194        # Make output directory if it does not exist
195        os.makedirs(os.path.dirname(output_file), exist_ok=True)
196
197        log.debug(f"Attempting {sys._getframe().f_code.co_name}:\n\tinput_file: {input_file}\n\toutput_file: {output_file}")
198
199        with utils.CwdHandler(self.library_path):
200            # API function declaration
201            self.library.GWSecuTag_RetrieveTagFile.argtypes = [ct.c_char_p]
202
203            # Variable initialisation
204            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
205            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
206
207            # API call
208            status = self.library.GWSecuTag_RetrieveTagFile(
209                ct_input_file,
210                ct_output_file,
211            )
212
213            if status not in successes.success_codes:
214                log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}")
215                if raise_unsupported:
216                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
217            else:
218                log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}")
219
220            return status
221
222    def retrieve_tags_directory(self, input_directory: str, output_directory: str, raise_unsupported=True):
223        """ Retrieves all tags from files in input_directory, writing XML to output_directory and maintaining the same directory structure.
224
225        Args:
226            input_directory (str): The path to the input directory.
227            output_directory (str): The path to the output directory where the tagged files will be written to.
228            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
229
230        Returns:
231            status (int): The status of the function call.
232
233        Raises:
234            TypeError: If any arguments are of incorrect type.
235            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)
236        """
237        # Validate arg types
238        if not isinstance(input_directory, str):
239            raise TypeError(input_directory)
240        elif not os.path.isdir(input_directory):
241            raise NotADirectoryError(input_directory)
242
243        if not isinstance(output_directory, str):
244            raise TypeError(output_directory)
245
246        if not isinstance(raise_unsupported, bool):
247            raise TypeError(raise_unsupported)
248
249        for relative_path in utils.list_file_paths(input_directory, absolute=False):
250            # construct absolute paths
251            input_file = os.path.abspath(os.path.join(input_directory, relative_path))
252            output_file = os.path.abspath(os.path.join(output_directory, relative_path + ".xml"))
253
254            # call retrieve_tags on each file in input to output
255            self.retrieve_tags(
256                input_file=input_file,
257                output_file=output_file,
258                raise_unsupported=raise_unsupported,
259            )
260
261        utils.delete_empty_subdirectories(output_directory)

A high level Python wrapper for Glasswall Security Tagging.

SecurityTagging(library_path: str)
17    def __init__(self, library_path: str):
18        super().__init__(library_path=library_path)
19        self.library = self.load_library(os.path.abspath(library_path))
20
21        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
library
def version(self):
23    def version(self):
24        """ Returns the Glasswall library version.
25
26        Returns:
27            version (str): The Glasswall library version.
28        """
29        # TODO security tagging currently has no version function
30        return "NOT_IMPLEMENTED"

Returns the Glasswall library version.

Returns: version (str): The Glasswall library version.

def tag_file( self, tags_path: str, input_file: str, output_file: str, raise_unsupported: bool = True):
 32    def tag_file(self, tags_path: str, input_file: str, output_file: str, raise_unsupported: bool = True):
 33        """ Tags the input_file with xml loaded from tags_path, writing to output_file.
 34
 35        Args:
 36            tags_path (str): The path to the .xml file containing tags to add.
 37            input_file (str): The path to the input file.
 38            output_file (str): The path to the output file where the tagged input_file will be written to.
 39            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
 40
 41        Returns:
 42            status (int): An integer indicating the file process status.
 43
 44        Raises:
 45            TypeError: If any arguments are of incorrect type.
 46            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)
 47        """
 48        # Validate arg types
 49        if not isinstance(tags_path, str):
 50            raise TypeError(tags_path)
 51        elif not os.path.isfile(tags_path):
 52            raise FileNotFoundError(tags_path)
 53
 54        if not isinstance(input_file, str):
 55            raise TypeError(input_file)
 56        elif not os.path.isfile(input_file):
 57            raise FileNotFoundError(input_file)
 58
 59        if not isinstance(output_file, str):
 60            raise TypeError(output_file)
 61
 62        if not isinstance(raise_unsupported, bool):
 63            raise TypeError(raise_unsupported)
 64
 65        # Convert paths to absolute paths
 66        tags_path = os.path.abspath(tags_path)
 67        input_file = os.path.abspath(input_file)
 68        output_file = os.path.abspath(output_file)
 69
 70        # Make output directory if it does not exist
 71        os.makedirs(os.path.dirname(output_file), exist_ok=True)
 72
 73        with utils.CwdHandler(self.library_path):
 74            # API function declaration
 75            self.library.GWSecuTag_TagFile.argtypes = [ct.c_char_p]
 76
 77            # Variable initialisation
 78            ct_tags_path = ct.c_char_p(tags_path.encode("utf-8"))
 79            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
 80            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
 81
 82            # API call
 83            status = self.library.GWSecuTag_TagFile(
 84                ct_input_file,
 85                ct_tags_path,
 86                ct_output_file,
 87            )
 88
 89            if status not in successes.success_codes:
 90                log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}")
 91                if raise_unsupported:
 92                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
 93            else:
 94                log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}")
 95
 96            # TODO remove, temp fix - check if tags are retrievable, if not delete the file just created by glasswall
 97            # if status not in successes.success_codes: # status is currently incorrect in the library
 98            if not os.path.isfile(output_file):
 99                log.error(f"\n\toutput file does not exist: {output_file}")
100                status = "OUTPUT_NOT_CREATED"
101            elif os.path.isfile(output_file):
102                with utils.TempFilePath() as temp_file:
103                    self.retrieve_tags(
104                        input_file=output_file,
105                        output_file=temp_file,
106                        raise_unsupported=False
107                    )
108                    try:
109                        dict_ = utils.xml_as_dict(temp_file)
110                    except ValueError:
111                        dict_ = {}
112                    if not dict_:
113                        os.remove(output_file)
114                        log.debug(f"\n\tunable to retrieve tags, deleted output file\n\toutput_file: {output_file}\n\t")
115                        status = "OUTPUT_NOT_RETRIEVABLE"
116
117            return status

Tags the input_file with xml loaded from tags_path, writing to output_file.

Args: tags_path (str): The path to the .xml file containing tags to add. input_file (str): The path to the input file. output_file (str): The path to the output file where the tagged input_file will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: status (int): An integer indicating the file process status.

Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)

def tag_directory( self, tags_path: str, input_directory: str, output_directory: str, raise_unsupported: bool = True):
119    def tag_directory(self, tags_path: str, input_directory: str, output_directory: str, raise_unsupported: bool = True):
120        """ Tags all files in input_directory with the xml loaded from tags_path, writing to output_directory and maintaining the same directory structure.
121
122        Args:
123            tags_path (str): The path to the .xml file containing tags to add.
124            input_directory (str): The path to the input directory.
125            output_directory (str): The path to the output directory where the tagged files will be written to.
126            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
127
128        Returns:
129            status (int): An integer indicating the file process status.
130
131        Raises:
132            TypeError: If any arguments are of incorrect type.
133            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)
134        """
135        # Validate arg types
136        if not isinstance(tags_path, str):
137            raise TypeError(tags_path)
138        elif not os.path.isfile(tags_path):
139            raise FileNotFoundError(tags_path)
140
141        if not isinstance(input_directory, str):
142            raise TypeError(input_directory)
143        elif not os.path.isdir(input_directory):
144            raise NotADirectoryError(input_directory)
145
146        if not isinstance(output_directory, str):
147            raise TypeError(output_directory)
148
149        if not isinstance(raise_unsupported, bool):
150            raise TypeError(raise_unsupported)
151
152        for relative_path in utils.list_file_paths(input_directory, absolute=False):
153            # construct absolute paths
154            input_file = os.path.abspath(os.path.join(input_directory, relative_path))
155            output_file = os.path.abspath(os.path.join(output_directory, relative_path))
156
157            # call tag_file on each file in input to output
158            self.tag_file(
159                tags_path=tags_path,
160                input_file=input_file,
161                output_file=output_file,
162                raise_unsupported=raise_unsupported,
163            )
164
165        utils.delete_empty_subdirectories(output_directory)

Tags all files in input_directory with the xml loaded from tags_path, writing to output_directory and maintaining the same directory structure.

Args: tags_path (str): The path to the .xml file containing tags to add. input_directory (str): The path to the input directory. output_directory (str): The path to the output directory where the tagged files will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: status (int): An integer indicating the file process status.

Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)

def retrieve_tags(self, input_file: str, output_file: str, raise_unsupported=True):
167    def retrieve_tags(self, input_file: str, output_file: str, raise_unsupported=True):
168        """ Retrieves the xml tags of the input_file and writes it to output_file.
169
170        Args:
171            input_file (str): The path to the input file.
172            output_file (str): The path to the output file where the xml tags will be written to.
173            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
174
175        Returns:
176            status (int): An integer indicating the file process status.
177
178        Raises:
179            TypeError: If any arguments are of incorrect type.
180            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)
181        """
182        if not isinstance(input_file, str):
183            raise TypeError(input_file)
184        elif not os.path.isfile(input_file):
185            raise FileNotFoundError(input_file)
186
187        if not isinstance(output_file, str):
188            raise TypeError(output_file)
189
190        # Convert paths to absolute paths
191        input_file = os.path.abspath(input_file)
192        output_file = os.path.abspath(output_file)
193
194        # Make output directory if it does not exist
195        os.makedirs(os.path.dirname(output_file), exist_ok=True)
196
197        log.debug(f"Attempting {sys._getframe().f_code.co_name}:\n\tinput_file: {input_file}\n\toutput_file: {output_file}")
198
199        with utils.CwdHandler(self.library_path):
200            # API function declaration
201            self.library.GWSecuTag_RetrieveTagFile.argtypes = [ct.c_char_p]
202
203            # Variable initialisation
204            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
205            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
206
207            # API call
208            status = self.library.GWSecuTag_RetrieveTagFile(
209                ct_input_file,
210                ct_output_file,
211            )
212
213            if status not in successes.success_codes:
214                log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}")
215                if raise_unsupported:
216                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
217            else:
218                log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}")
219
220            return status

Retrieves the xml tags of the input_file and writes it to output_file.

Args: input_file (str): The path to the input file. output_file (str): The path to the output file where the xml tags will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: status (int): An integer indicating the file process status.

Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)

def retrieve_tags_directory( self, input_directory: str, output_directory: str, raise_unsupported=True):
222    def retrieve_tags_directory(self, input_directory: str, output_directory: str, raise_unsupported=True):
223        """ Retrieves all tags from files in input_directory, writing XML to output_directory and maintaining the same directory structure.
224
225        Args:
226            input_directory (str): The path to the input directory.
227            output_directory (str): The path to the output directory where the tagged files will be written to.
228            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
229
230        Returns:
231            status (int): The status of the function call.
232
233        Raises:
234            TypeError: If any arguments are of incorrect type.
235            NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)
236        """
237        # Validate arg types
238        if not isinstance(input_directory, str):
239            raise TypeError(input_directory)
240        elif not os.path.isdir(input_directory):
241            raise NotADirectoryError(input_directory)
242
243        if not isinstance(output_directory, str):
244            raise TypeError(output_directory)
245
246        if not isinstance(raise_unsupported, bool):
247            raise TypeError(raise_unsupported)
248
249        for relative_path in utils.list_file_paths(input_directory, absolute=False):
250            # construct absolute paths
251            input_file = os.path.abspath(os.path.join(input_directory, relative_path))
252            output_file = os.path.abspath(os.path.join(output_directory, relative_path + ".xml"))
253
254            # call retrieve_tags on each file in input to output
255            self.retrieve_tags(
256                input_file=input_file,
257                output_file=output_file,
258                raise_unsupported=raise_unsupported,
259            )
260
261        utils.delete_empty_subdirectories(output_directory)

Retrieves all tags from files in input_directory, writing XML to output_directory and maintaining the same directory structure.

Args: input_directory (str): The path to the input directory. output_directory (str): The path to the output directory where the tagged files will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns: status (int): The status of the function call.

Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)