Coverage for skema/program_analysis/multi_file_ingester.py: 69%

106 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 17:15 +0000

1import argparse 

2import glob 

3import sys 

4import os.path 

5import yaml 

6from pathlib import Path 

7from typing import List 

8 

9from skema.gromet import GROMET_VERSION 

10from skema.gromet.fn import ( 

11 GrometFNModuleCollection, 

12) 

13 

14from skema.program_analysis.run_ann_cast_pipeline import ann_cast_pipeline 

15from skema.program_analysis.python2cast import python_to_cast 

16from skema.program_analysis.fortran2cast import fortran_to_cast 

17from skema.program_analysis.matlab2cast import matlab_to_cast 

18from skema.utils.fold import dictionary_to_gromet_json, del_nulls 

19from skema.program_analysis.tree_sitter_parsers.build_parsers import LANGUAGES_YAML_FILEPATH 

20from skema.program_analysis.module_locate import extract_imports 

21 

22def get_args(): 

23 parser = argparse.ArgumentParser() 

24 parser.add_argument( 

25 "--sysname", type=str, help="The name of the system we're ingesting" 

26 ) 

27 parser.add_argument( 

28 "--path", type=str, help="The path of source directory" 

29 ) 

30 parser.add_argument( 

31 "--files", 

32 type=str, 

33 help="The path to a file containing a list of files to ingest", 

34 ) 

35 parser.add_argument( 

36 "--write", 

37 action="store_true", 

38 help="If true, the script write the output to a JSON file", 

39 ) 

40 

41 options = parser.parse_args() 

42 return options 

43 

44 

45def process_file_system( 

46 system_name, path, files, write_to_file=False, original_source=False, dependency_depth=0 

47) -> GrometFNModuleCollection: 

48 root_dir = path.strip() 

49 file_list = open(files, "r").readlines() 

50 

51 module_collection = GrometFNModuleCollection( 

52 schema_version=GROMET_VERSION, 

53 name=system_name, 

54 modules=[], 

55 module_index=[], 

56 module_dependencies=[], 

57 executables=[], 

58 ) 

59 

60 language_yaml_obj = yaml.safe_load(LANGUAGES_YAML_FILEPATH.read_text()) 

61 cur_dir = os.getcwd() 

62 for f in file_list: 

63 full_file = os.path.join(os.path.normpath(root_dir), f.strip("\n")) 

64 full_file_obj = Path(full_file) 

65 

66 try: 

67 # To maintain backwards compatibility for the process_file_system function, for now we will determine the language by file extension 

68 if full_file_obj.suffix in language_yaml_obj["python"]["extensions"]: 

69 cast = python_to_cast(full_file, cast_obj=True) 

70 elif full_file_obj.suffix in language_yaml_obj["matlab"]["extensions"]: 

71 cast = matlab_to_cast(full_file, cast_obj=True) 

72 elif full_file_obj.suffix in language_yaml_obj["fortran"]["extensions"]: 

73 cast = fortran_to_cast(full_file, cast_obj=True) 

74 else: 

75 print(f"File extension not supported for {full_file}") 

76 

77 # The Fortran CAST inteface (CAST/fortran) can produce multiple CAST modules. 

78 # However, the Python interface (python2cast) will only return a single module. 

79 # This workaround will normalize a single CAST module into a list for consistent processing. 

80 if isinstance(cast, List): 

81 cast_list = cast 

82 else: 

83 cast_list = [cast] 

84 

85 for cast_module in cast_list: 

86 os.chdir(os.path.join(os.getcwd(), path)) 

87 generated_gromet = ann_cast_pipeline( 

88 cast_module, gromet=True, to_file=False, from_obj=True 

89 ) 

90 os.chdir(cur_dir) 

91 

92 # NOTE: July '23 Hackathon addition 

93 # If this flag is set to true, then we read the entire source file into a string, and store it in the  

94 if original_source: 

95 source_metadata = generated_gromet.metadata_collection[1] 

96 # Open the original source code file, read the lines into a list 

97 # and then convert back into a string representing the full file  

98 file_text = "".join(open(full_file).readlines()) 

99 source_metadata[0].files[0].source_string = file_text 

100 

101 

102 # Then, after we generate the GroMEt we store it in the 'modules' field 

103 # and store its path in the 'module_index' field 

104 module_collection.modules.append(generated_gromet) 

105 

106 # DONE: Change this so that it's the dotted path from the root 

107 # i.e. like model.view.sir" like it shows up in Python 

108 source_directory = os.path.basename( 

109 os.path.normpath(root_dir) 

110 ) # We just need the last directory of the path, not the complete path 

111 os_module_path = os.path.join(source_directory, f) 

112 

113 # Normalize the path across os and then convert to module dot notation 

114 python_module_path = ".".join(os.path.normpath(os_module_path).split(os.path.sep)) 

115 python_module_path = ".".join(python_module_path.split(".")[0:-1]) 

116 

117 module_collection.module_index.append(python_module_path) 

118 

119 # TODO: Check for duplicate modules across files 

120 # TODO: Remove submodule if higher level module is included 

121 module_collection.module_dependencies.extend(extract_imports(full_file_obj.read_text())) 

122 

123 # Done: Determine how we know a gromet goes in the 'executable' field 

124 # We do this by finding all user_defined top level functions in the Gromet 

125 # and check if the name 'main' is among them 

126 function_networks = [ 

127 fn 

128 for fn in generated_gromet.fn_array 

129 ] 

130 defined_functions = [ 

131 fn.b[0].name 

132 for fn in function_networks 

133 if fn.b[0].function_type == "FUNCTION" 

134 ] 

135 if "main" in defined_functions: 

136 module_collection.executables.append( 

137 len(module_collection.module_index) 

138 ) 

139 

140 

141 except (Exception,SystemExit) as e: 

142 os.chdir(cur_dir) 

143 

144 def clean_dependencies(dependencies, system_name): 

145 # Step 1: Remove duplicates and perform initial filtering in one step. 

146 # This uses a dictionary to preserve insertion order (Python 3.7+ guaranteed order). 

147 cleaned = { 

148 dep.name: dep for dep in dependencies 

149 if not dep.name.startswith(".") and dep.name != system_name 

150 }.values() 

151 

152 # Step 2: Sort by the number of dots in the name. 

153 sorted_deps = sorted(cleaned, key=lambda dep: dep.name.count('.')) 

154 

155 # Step 3: Remove submodules of other modules. 

156 # This step keeps an entry if no other entry is its "parent" module. 

157 final_deps = [ 

158 dep for i, dep in enumerate(sorted_deps) 

159 if not any(dep.name.startswith(other.name + ".") for other in sorted_deps[:i]) 

160 ] 

161 

162 return final_deps 

163 

164 module_collection.module_dependencies = clean_dependencies(module_collection.module_dependencies, system_name) 

165 

166 # NOTE: These cannot be imported at the top-level due to circular dependancies 

167 from skema.program_analysis.single_file_ingester import process_file 

168 from skema.program_analysis.easy_multi_file_ingester import easy_process_file_system 

169 from skema.program_analysis.url_ingester import process_git_repo, process_archive 

170 

171 if dependency_depth > 0: 

172 to_add = [] 

173 for index, dependency in enumerate(module_collection.module_dependencies): 

174 

175 if dependency.source_reference.type == "Local": 

176 if Path(dependency.source_reference.value).is_dir(): 

177 dependency_gromet = easy_process_file_system(dependency.name, dependency.source_reference.value, False, False, dependency_depth=dependency_depth-1) 

178 else: 

179 dependency_gromet = process_file(dependency.source_reference.value, False, False, dependency_depth=dependency_depth-1) 

180 elif dependency.source_reference.type == "Url": 

181 dependency_gromet = process_archive(dependency.source_reference.value, False, False, dependency_depth=dependency_depth-1) 

182 elif dependency.source_reference.type == "Repository": 

183 dependency_gromet = process_git_repo(dependency.source_reference.value, None, False, False, dependency_depth=dependency_depth-1) 

184 else: 

185 continue 

186 

187 # Flatten dependency gromet onto parent Gromet 

188 for index in range(len(dependency_gromet.modules)): 

189 dependency_gromet.modules[index].is_depenency = True 

190 module_collection.modules.extend(dependency_gromet.modules) 

191 module_collection.module_index.extend([f"{element} (dependency)" for element in dependency_gromet.module_index]) 

192 to_add.extend(dependency_gromet.module_dependencies) 

193 

194 module_collection.module_dependencies.extend(to_add) 

195 

196 if write_to_file: 

197 with open(f"{system_name}--Gromet-FN-auto.json", "w") as f: 

198 gromet_collection_dict = module_collection.to_dict() 

199 f.write( 

200 dictionary_to_gromet_json(del_nulls(gromet_collection_dict)) 

201 ) 

202 

203 

204 return module_collection 

205 

206 

207if __name__ == "__main__": 

208 args = get_args() 

209 

210 system_name = args.sysname 

211 path = args.path 

212 files = args.files 

213 

214 print(f"Ingesting system: {system_name}") 

215 print(f"With root directory as specified in: {path}") 

216 print(f"Ingesting the files as specified in: {files}") 

217 

218 process_file_system(system_name, path, files, args.write)