Coverage for skema/program_analysis/multi_file_ingester.py: 69%
106 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
1import argparse
2import glob
3import sys
4import os.path
5import yaml
6from pathlib import Path
7from typing import List
9from skema.gromet import GROMET_VERSION
10from skema.gromet.fn import (
11 GrometFNModuleCollection,
12)
14from skema.program_analysis.run_ann_cast_pipeline import ann_cast_pipeline
15from skema.program_analysis.python2cast import python_to_cast
16from skema.program_analysis.fortran2cast import fortran_to_cast
17from skema.program_analysis.matlab2cast import matlab_to_cast
18from skema.utils.fold import dictionary_to_gromet_json, del_nulls
19from skema.program_analysis.tree_sitter_parsers.build_parsers import LANGUAGES_YAML_FILEPATH
20from skema.program_analysis.module_locate import extract_imports
22def get_args():
23 parser = argparse.ArgumentParser()
24 parser.add_argument(
25 "--sysname", type=str, help="The name of the system we're ingesting"
26 )
27 parser.add_argument(
28 "--path", type=str, help="The path of source directory"
29 )
30 parser.add_argument(
31 "--files",
32 type=str,
33 help="The path to a file containing a list of files to ingest",
34 )
35 parser.add_argument(
36 "--write",
37 action="store_true",
38 help="If true, the script write the output to a JSON file",
39 )
41 options = parser.parse_args()
42 return options
45def process_file_system(
46 system_name, path, files, write_to_file=False, original_source=False, dependency_depth=0
47) -> GrometFNModuleCollection:
48 root_dir = path.strip()
49 file_list = open(files, "r").readlines()
51 module_collection = GrometFNModuleCollection(
52 schema_version=GROMET_VERSION,
53 name=system_name,
54 modules=[],
55 module_index=[],
56 module_dependencies=[],
57 executables=[],
58 )
60 language_yaml_obj = yaml.safe_load(LANGUAGES_YAML_FILEPATH.read_text())
61 cur_dir = os.getcwd()
62 for f in file_list:
63 full_file = os.path.join(os.path.normpath(root_dir), f.strip("\n"))
64 full_file_obj = Path(full_file)
66 try:
67 # To maintain backwards compatibility for the process_file_system function, for now we will determine the language by file extension
68 if full_file_obj.suffix in language_yaml_obj["python"]["extensions"]:
69 cast = python_to_cast(full_file, cast_obj=True)
70 elif full_file_obj.suffix in language_yaml_obj["matlab"]["extensions"]:
71 cast = matlab_to_cast(full_file, cast_obj=True)
72 elif full_file_obj.suffix in language_yaml_obj["fortran"]["extensions"]:
73 cast = fortran_to_cast(full_file, cast_obj=True)
74 else:
75 print(f"File extension not supported for {full_file}")
77 # The Fortran CAST inteface (CAST/fortran) can produce multiple CAST modules.
78 # However, the Python interface (python2cast) will only return a single module.
79 # This workaround will normalize a single CAST module into a list for consistent processing.
80 if isinstance(cast, List):
81 cast_list = cast
82 else:
83 cast_list = [cast]
85 for cast_module in cast_list:
86 os.chdir(os.path.join(os.getcwd(), path))
87 generated_gromet = ann_cast_pipeline(
88 cast_module, gromet=True, to_file=False, from_obj=True
89 )
90 os.chdir(cur_dir)
92 # NOTE: July '23 Hackathon addition
93 # If this flag is set to true, then we read the entire source file into a string, and store it in the
94 if original_source:
95 source_metadata = generated_gromet.metadata_collection[1]
96 # Open the original source code file, read the lines into a list
97 # and then convert back into a string representing the full file
98 file_text = "".join(open(full_file).readlines())
99 source_metadata[0].files[0].source_string = file_text
102 # Then, after we generate the GroMEt we store it in the 'modules' field
103 # and store its path in the 'module_index' field
104 module_collection.modules.append(generated_gromet)
106 # DONE: Change this so that it's the dotted path from the root
107 # i.e. like model.view.sir" like it shows up in Python
108 source_directory = os.path.basename(
109 os.path.normpath(root_dir)
110 ) # We just need the last directory of the path, not the complete path
111 os_module_path = os.path.join(source_directory, f)
113 # Normalize the path across os and then convert to module dot notation
114 python_module_path = ".".join(os.path.normpath(os_module_path).split(os.path.sep))
115 python_module_path = ".".join(python_module_path.split(".")[0:-1])
117 module_collection.module_index.append(python_module_path)
119 # TODO: Check for duplicate modules across files
120 # TODO: Remove submodule if higher level module is included
121 module_collection.module_dependencies.extend(extract_imports(full_file_obj.read_text()))
123 # Done: Determine how we know a gromet goes in the 'executable' field
124 # We do this by finding all user_defined top level functions in the Gromet
125 # and check if the name 'main' is among them
126 function_networks = [
127 fn
128 for fn in generated_gromet.fn_array
129 ]
130 defined_functions = [
131 fn.b[0].name
132 for fn in function_networks
133 if fn.b[0].function_type == "FUNCTION"
134 ]
135 if "main" in defined_functions:
136 module_collection.executables.append(
137 len(module_collection.module_index)
138 )
141 except (Exception,SystemExit) as e:
142 os.chdir(cur_dir)
144 def clean_dependencies(dependencies, system_name):
145 # Step 1: Remove duplicates and perform initial filtering in one step.
146 # This uses a dictionary to preserve insertion order (Python 3.7+ guaranteed order).
147 cleaned = {
148 dep.name: dep for dep in dependencies
149 if not dep.name.startswith(".") and dep.name != system_name
150 }.values()
152 # Step 2: Sort by the number of dots in the name.
153 sorted_deps = sorted(cleaned, key=lambda dep: dep.name.count('.'))
155 # Step 3: Remove submodules of other modules.
156 # This step keeps an entry if no other entry is its "parent" module.
157 final_deps = [
158 dep for i, dep in enumerate(sorted_deps)
159 if not any(dep.name.startswith(other.name + ".") for other in sorted_deps[:i])
160 ]
162 return final_deps
164 module_collection.module_dependencies = clean_dependencies(module_collection.module_dependencies, system_name)
166 # NOTE: These cannot be imported at the top-level due to circular dependancies
167 from skema.program_analysis.single_file_ingester import process_file
168 from skema.program_analysis.easy_multi_file_ingester import easy_process_file_system
169 from skema.program_analysis.url_ingester import process_git_repo, process_archive
171 if dependency_depth > 0:
172 to_add = []
173 for index, dependency in enumerate(module_collection.module_dependencies):
175 if dependency.source_reference.type == "Local":
176 if Path(dependency.source_reference.value).is_dir():
177 dependency_gromet = easy_process_file_system(dependency.name, dependency.source_reference.value, False, False, dependency_depth=dependency_depth-1)
178 else:
179 dependency_gromet = process_file(dependency.source_reference.value, False, False, dependency_depth=dependency_depth-1)
180 elif dependency.source_reference.type == "Url":
181 dependency_gromet = process_archive(dependency.source_reference.value, False, False, dependency_depth=dependency_depth-1)
182 elif dependency.source_reference.type == "Repository":
183 dependency_gromet = process_git_repo(dependency.source_reference.value, None, False, False, dependency_depth=dependency_depth-1)
184 else:
185 continue
187 # Flatten dependency gromet onto parent Gromet
188 for index in range(len(dependency_gromet.modules)):
189 dependency_gromet.modules[index].is_depenency = True
190 module_collection.modules.extend(dependency_gromet.modules)
191 module_collection.module_index.extend([f"{element} (dependency)" for element in dependency_gromet.module_index])
192 to_add.extend(dependency_gromet.module_dependencies)
194 module_collection.module_dependencies.extend(to_add)
196 if write_to_file:
197 with open(f"{system_name}--Gromet-FN-auto.json", "w") as f:
198 gromet_collection_dict = module_collection.to_dict()
199 f.write(
200 dictionary_to_gromet_json(del_nulls(gromet_collection_dict))
201 )
204 return module_collection
207if __name__ == "__main__":
208 args = get_args()
210 system_name = args.sysname
211 path = args.path
212 files = args.files
214 print(f"Ingesting system: {system_name}")
215 print(f"With root directory as specified in: {path}")
216 print(f"Ingesting the files as specified in: {files}")
218 process_file_system(system_name, path, files, args.write)