Coverage for skema/program_analysis/CAST/fortran/preprocessor/preprocess.py: 48%
127 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
1import argparse
2import re
3import os
4import shutil
5import logging
6from typing import List, Optional
7from pathlib import Path
8from subprocess import run, PIPE
10from tree_sitter import Parser, Node, Language, Tree
12from skema.program_analysis.tree_sitter_parsers.build_parsers import (
13 INSTALLED_LANGUAGES_FILEPATH,
14)
15from skema.program_analysis.CAST.fortran.preprocessor.fixed2free import convertToFree
18def preprocess(
19 source_path: Path,
20 out_dir=None,
21 overwrite=False,
22 out_missing_includes=False,
23 out_gcc=False,
24 out_unsupported=False,
25 out_free=False,
26) -> str:
27 """Run the full preprocessing pipeline for Fortran->Tree-Sitter->CAST
28 Takes the original source as input and will return the tree-sitter parse tree as output
29 An intermediary directory will also be created containing:
30 1. A log of missing files that are included by preprocessor directives in the source code
31 2. The intermediary product from running the c-preprocessor
32 3. A log of unsupported idioms
33 4. The source code converted to free-form
34 """
35 # NOTE: The order of preprocessing steps does matter. We have to run the GCC preprocessor before correcting the continuation lines or there could be issues
37 # TODO: Create single location for generating include base path
38 source = source_path.read_text()
40 # Get paths for intermediate products
41 if out_dir:
42 if not (out_missing_includes or out_gcc or out_unsupported or out_free):
43 logging.warning("out_dir is specified, but no out flags are set")
45 out_dir.mkdir(parents=True, exist_ok=True)
47 missing_includes_path = Path(out_dir, "missing_includes.txt")
48 gcc_path = Path(out_dir, "gcc.F")
49 unsupported_path = Path(out_dir, "unsupported_idioms.txt")
50 free_path = Path(out_dir, "corrected.F")
51 parse_path = Path(out_dir, "parse_tree.txt")
53 # Step 1: Check for missing included files
54 # Many source files won't have includes. We only need to check missing includes if a source contains an include statement.
55 if len(produce_include_summary(source)) > 0:
56 missing_includes = check_for_missing_includes(source_path)
57 if out_missing_includes:
58 missing_includes_path.write_text("\n".join(missing_includes))
60 if len(missing_includes) > 0:
61 logging.error("Missing required included files, missing files were:")
62 for include in missing_includes:
63 logging.error(include)
64 exit()
65 elif out_missing_includes:
66 missing_includes_path.write_text("Source file contains no include statements")
68 # Step 2: Correct include directives to remove system references
69 source = fix_include_directives(source)
71 # Step 3: Process with gcc c-preprocessor
72 include_base_directory = Path(source_path.parent, f"include_{source_path.stem}")
73 if not include_base_directory.exists():
74 include_base_directory = include_base_directory.parent
75 source = run_c_preprocessor(source, include_base_directory)
76 if out_gcc:
77 gcc_path.write_text(source)
79 # Step 4: Prepare for tree-sitter
80 # This step removes any additional preprocessor directives added or not removed by GCC
81 source = "\n".join(
82 ["!" + line if line.startswith("#") else line for line in source.splitlines()]
83 )
85 # Step 5: Check for unsupported idioms
86 if out_unsupported:
87 unsupported_path.write_text(
88 "\n".join(search_for_unsupported_idioms(source, "idioms_regex.txt"))
89 )
91 # Step 6 : Convert to free-form for tree-sitter parsing
92 source = convert_to_free_form(source)
93 if out_free:
94 free_path.write_text(source)
96 return source
99def produce_include_summary(source: str) -> List:
100 """Uses regex to produce a list of all included files in a source"""
101 includes = []
103 system_re = "#include\s+<(.*)>"
104 local_re = '#include\s+"(.*)"'
106 for match in re.finditer(system_re, source):
107 includes.append(match.group(1))
108 for match in re.finditer(local_re, source):
109 includes.append(match.group(1))
111 return includes
114def check_for_missing_includes(source_path: Path):
115 """Gathers all required includes and check if they have been added to the include_SOURCE directory"""
117 missing_files = []
119 # First we will check for the include directory
120 include_base_directory = Path(source_path.parent, f"include_{source_path.stem}")
121 if not include_base_directory.exists():
122 missing_files.append(include_base_directory)
123 return missing_files
125 # Add original source to includes directory
126 shutil.copy2(source_path, include_base_directory)
128 # Next gather all includes in each source file
129 includes = []
130 for dirpath, dirnames, filenames in os.walk(include_base_directory):
131 for file in filenames:
132 file_source = Path(dirpath, file).read_text()
133 includes.extend(produce_include_summary(file_source))
135 # Check for missing files
136 already_checked = set()
137 for include in includes:
138 if include in already_checked:
139 continue
140 if not Path(include_base_directory, include).exists():
141 missing_files.append(include)
142 already_checked.add(include)
143 return missing_files
146def search_for_unsupported_idioms(source: str, idioms_regex_path: str):
147 """Check source string for unsupported idioms using regex. Returns a log of found matches as well as line information"""
148 log = []
149 lines = open(idioms_regex_path, "r").read().splitlines()
150 for line in lines:
151 for match in re.finditer(line, source, flags=re.MULTILINE):
152 line_number = source[: match.span()[0]].count("\n")
153 log.append(f"Found unsupported idiom matching regex: {line}")
154 log.append(f"Match was: {match.group(0)}")
155 log.append(f"Line was: {line_number}")
156 return log
159def fix_include_directives(source: str) -> str:
160 """There are a few corrections we need to make to the include statements
161 1. Convert system level includes to local includes
162 """
163 processed_lines = []
164 for i, line in enumerate(source.splitlines()):
165 if "#include" in line:
166 line = line.replace("<", '"').replace(">", '"')
167 processed_lines.append(line)
168 source = "\n".join(processed_lines)
170 return source
173def run_c_preprocessor(source: str, include_base_path: Path) -> str:
174 """Run the gcc c-preprocessor. Its run from the context of the include_base_path, so that it can find all included files"""
175 result = run(
176 ["gfortran", "-cpp", "-E", "-x", "f95", "-"],
177 input=source,
178 text=True,
179 capture_output=True,
180 universal_newlines=True,
181 cwd=include_base_path,
182 )
183 return result.stdout
186def convert_assigned(source: str) -> str:
187 """Convered ASSIGNED GO TO to a traditional GO TO"""
188 assign_regex = "assign (.*?) to (.*?)"
189 for match in re.finditer(assign_regex, source, re.IGNORECASE):
190 pass
193def convert_to_free_form(source: str) -> str:
194 """If fixed-form Fortran source, convert to free-form"""
196 def validate_parse_tree(source: str) -> bool:
197 """Parse source with tree-sitter and check if an error is returned."""
198 language = Language(INSTALLED_LANGUAGES_FILEPATH, "fortran")
199 parser = Parser()
200 parser.set_language(language)
201 tree = parser.parse(bytes(source, encoding="utf-8"))
202 return "ERROR" not in tree.root_node.sexp()
204 # We don't know for sure if a source is meant to be fixed-form or free-form
205 # So, we will run the parser first to check
206 if validate_parse_tree(source):
207 return source
208 else:
209 # convertToFree takes a stream as input and returns a generator
210 free_source = "".join(
211 [line for line in convertToFree(source.splitlines(keepends=True))]
212 )
213 if validate_parse_tree(free_source):
214 return free_source
216 return source
219def main():
220 """Run the preprocessor as a script"""
221 parser = argparse.ArgumentParser(description="Fortran preprocessing script")
222 parser.add_argument("source_path", type=str, help="Path to the source file")
223 parser.add_argument("out_dir", type=str, help="Output directory path")
224 parser.add_argument(
225 "-o",
226 "--overwrite",
227 action="store_true",
228 help="Overwrite existing output directory",
229 )
230 parser.add_argument(
231 "--out_missing_includes",
232 action="store_true",
233 help="Output missing includes log",
234 )
235 parser.add_argument(
236 "--out_gcc",
237 action="store_true",
238 help="Output source after running the GCC preprocessor",
239 )
240 parser.add_argument(
241 "--out_unsupported",
242 action="store_true",
243 help="Output unsupported idioms log",
244 )
245 parser.add_argument(
246 "--out_free",
247 action="store_true",
248 help="Output source after fixing unsupported idioms",
249 )
250 args = parser.parse_args()
252 preprocess(
253 Path(args.source_path),
254 Path(args.out_dir),
255 args.overwrite,
256 args.out_missing_includes,
257 args.out_gcc,
258 args.out_unsupported,
259 args.out_free,
260 )
263if __name__ == "__main__":
264 main()