Coverage for skema/program_analysis/script_functions.py: 0%

138 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 17:15 +0000

1import sys 

2import ast 

3import dill 

4import os.path 

5import json 

6 

7from skema.gromet import GROMET_VERSION 

8from skema.gromet.fn import ( 

9 GrometFNModuleCollection, 

10) 

11 

12from skema.utils.fold import dictionary_to_gromet_json, del_nulls 

13from skema.program_analysis.CAST.pythonAST import py_ast_to_cast 

14from skema.program_analysis.CAST2FN import cast 

15from skema.program_analysis.CAST2FN.model.cast import SourceRef 

16from skema.program_analysis.CAST2FN.cast import CAST 

17from skema.program_analysis.CAST2FN.visitors.cast_to_agraph_visitor import ( 

18 CASTToAGraphVisitor, 

19) 

20from skema.program_analysis.CAST2FN.ann_cast.cast_to_annotated_cast import ( 

21 CastToAnnotatedCastVisitor, 

22) 

23from skema.program_analysis.CAST2FN.ann_cast.id_collapse_pass import ( 

24 IdCollapsePass, 

25) 

26from skema.program_analysis.CAST2FN.ann_cast.container_scope_pass import ( 

27 ContainerScopePass, 

28) 

29from skema.program_analysis.CAST2FN.ann_cast.variable_version_pass import ( 

30 VariableVersionPass, 

31) 

32from skema.program_analysis.CAST2FN.ann_cast.grfn_var_creation_pass import ( 

33 GrfnVarCreationPass, 

34) 

35from skema.program_analysis.CAST2FN.ann_cast.grfn_assignment_pass import ( 

36 GrfnAssignmentPass, 

37) 

38from skema.program_analysis.CAST2FN.ann_cast.lambda_expression_pass import ( 

39 LambdaExpressionPass, 

40) 

41from skema.program_analysis.CAST2FN.ann_cast.to_grfn_pass import ToGrfnPass 

42from skema.program_analysis.CAST2FN.ann_cast.to_gromet_pass import ( 

43 ToGrometPass, 

44) 

45 

46 

47def process_file_system(system_name, path, files, write_to_file=False): 

48 root_dir = path.strip() 

49 file_list = open(files, "r").readlines() 

50 

51 module_collection = GrometFNModuleCollection( 

52 schema_version=GROMET_VERSION, 

53 name=system_name, 

54 modules=[], 

55 module_index=[], 

56 executables=[], 

57 ) 

58 

59 for f in file_list: 

60 full_file = os.path.join(os.path.normpath(root_dir), f.rstrip("\n")) 

61 

62 # Open the file 

63 # TODO: Do we want to open the CAST or the Python source? 

64 # If we open the Python source then we need to generate its CAST and then generate its GroMEt after 

65 # I'm thinking for now we open the CAST, and generate GroMEt 

66 # As a next-step we can incorporate the Python -> CAST step 

67 print(full_file.rstrip()) 

68 

69 try: 

70 cast = python_to_cast(full_file, cast_obj=True) 

71 generated_gromet = ann_cast_pipeline( 

72 cast, gromet=True, to_file=False, from_obj=True 

73 ) 

74 

75 # Then, after we generate the GroMEt we store it in the 'modules' field 

76 # and store its path in the 'module_index' field 

77 module_collection.modules.append(generated_gromet) 

78 

79 # DONE: Change this so that it's the dotted path from the root 

80 # i.e. like model.view.sir" like it shows up in Python 

81 source_directory = os.path.basename( 

82 os.path.normpath(root_dir) 

83 ) # We just need the last directory of the path, not the complete path 

84 os_module_path = os.path.join(source_directory, f) 

85 python_module_path = os_module_path.replace("/", ".").replace( 

86 ".py", "" 

87 ) 

88 module_collection.module_index.append(python_module_path) 

89 

90 # Done: Determine how we know a gromet goes in the 'executable' field 

91 # We do this by finding all user_defined top level functions in the Gromet 

92 # and check if the name 'main' is among them 

93 function_networks = [ 

94 fn.value 

95 for fn in generated_gromet.attributes 

96 if fn.type == "FN" 

97 ] 

98 defined_functions = [ 

99 fn.b[0].name 

100 for fn in function_networks 

101 if fn.b[0].function_type == "FUNCTION" 

102 ] 

103 if "main" in defined_functions: 

104 module_collection.executables.append(python_module_path) 

105 

106 except ImportError: 

107 print("FAILURE") 

108 

109 # After we go through the whole system, we can then write out the module_collection 

110 if write_to_file: 

111 with open(f"{system_name}--Gromet-FN-auto.json", "w") as f: 

112 gromet_collection_dict = module_collection.to_dict() 

113 f.write( 

114 dictionary_to_gromet_json(del_nulls(gromet_collection_dict)) 

115 ) 

116 

117 return module_collection 

118 

119 

120def python_to_cast( 

121 pyfile_path, 

122 agraph=False, 

123 astprint=False, 

124 std_out=False, 

125 rawjson=False, 

126 legacy=False, 

127 cast_obj=False, 

128): 

129 # Open Python file as a giant string 

130 file_handle = open(pyfile_path) 

131 file_contents = file_handle.read() 

132 file_handle.close() 

133 file_name = pyfile_path.split("/")[-1] 

134 

135 # Count the number of lines in the file 

136 file_handle = open(pyfile_path) 

137 file_list = file_handle.readlines() 

138 line_count = 0 

139 for l in file_list: 

140 line_count += 1 

141 file_handle.close() 

142 

143 # Create a PyASTToCAST Object 

144 if legacy: 

145 convert = py_ast_to_cast.PyASTToCAST(file_name, legacy=True) 

146 else: 

147 convert = py_ast_to_cast.PyASTToCAST(file_name) 

148 

149 # 'Root' the current working directory so that it's where the 

150 # Source file we're generating CAST for is (for Import statements) 

151 old_path = os.getcwd() 

152 try: 

153 idx = pyfile_path.rfind("/") 

154 

155 if idx > -1: 

156 curr_path = pyfile_path[0:idx] 

157 os.chdir(curr_path) 

158 else: 

159 curr_path = "./" + pyfile_path 

160 

161 # os.chdir(curr_path) 

162 

163 # Parse the python program's AST and create the CAST 

164 contents = ast.parse(file_contents) 

165 C = convert.visit(contents, {}, {}) 

166 C.source_refs = [SourceRef(file_name, None, None, 1, line_count)] 

167 finally: 

168 os.chdir(old_path) 

169 

170 out_cast = cast.CAST([C], "python") 

171 

172 if agraph: 

173 import skema.utils.misc.test_pygraphviz 

174 

175 test_pygraphviz( 

176 "For the agraph generation in the python_to_cast " 

177 "function to work, pygraphviz must be installed." 

178 ) 

179 V = CASTToAGraphVisitor(out_cast) 

180 last_slash_idx = file_name.rfind("/") 

181 file_ending_idx = file_name.rfind(".") 

182 pdf_file_name = ( 

183 f"{file_name[last_slash_idx + 1 : file_ending_idx]}.pdf" 

184 ) 

185 V.to_pdf(pdf_file_name) 

186 

187 # Then, print CAST as JSON 

188 if cast_obj: 

189 return out_cast 

190 else: 

191 if rawjson: 

192 print( 

193 json.dumps( 

194 out_cast.to_json_object(), sort_keys=True, indent=None 

195 ) 

196 ) 

197 else: 

198 if std_out: 

199 print(out_cast.to_json_str()) 

200 else: 

201 out_name = file_name.split(".")[0] 

202 print("Writing CAST to " + out_name + "--CAST.json") 

203 out_handle = open(out_name + "--CAST.json", "w") 

204 out_handle.write(out_cast.to_json_str()) 

205 

206 

207def ann_cast_pipeline( 

208 cast_instance, 

209 to_file=True, 

210 gromet=False, 

211 grfn_2_2=False, 

212 a_graph=False, 

213 from_obj=False, 

214 indent_level=0, 

215): 

216 """cast_to_annotated.py 

217 

218 This function reads a JSON file that contains the CAST representation 

219 of a program, and transforms it to annotated CAST. It then calls a 

220 series of passes that each augment the information in the annotatd CAST nodes 

221 in preparation for the GrFN generation. 

222 

223 One command-line argument is expected, namely the name of the JSON file that 

224 contains the CAST data. 

225 TODO: Update this docstring as the program has been tweaked so that this is a function instead of 

226 the program 

227 """ 

228 

229 if from_obj: 

230 f_name = "" 

231 cast = cast_instance 

232 else: 

233 f_name = cast_instance 

234 f_name = f_name.split("/")[-1] 

235 file_contents = open(f_name, "r").read() 

236 

237 cast_json = CAST([], "python") 

238 cast = cast_json.from_json_str(file_contents) 

239 

240 visitor = CastToAnnotatedCastVisitor(cast) 

241 # The Annotated Cast is an attribute of the PipelineState object 

242 pipeline_state = visitor.generate_annotated_cast(grfn_2_2) 

243 

244 # TODO: make filename creation more resilient 

245 

246 print("Calling IdCollapsePass------------------------") 

247 IdCollapsePass(pipeline_state) 

248 

249 print("\nCalling ContainerScopePass-------------------") 

250 ContainerScopePass(pipeline_state) 

251 

252 print("\nCalling VariableVersionPass-------------------") 

253 VariableVersionPass(pipeline_state) 

254 

255 # NOTE: CASTToAGraphVisitor uses misc.uuid, so placing it here means 

256 # that the generated GrFN uuids will not be consistent with GrFN uuids 

257 # created during test runtime. So, do not use these GrFN jsons as expected 

258 # json for testing 

259 f_name = f_name.replace("--CAST.json", "") 

260 if a_graph: 

261 agraph = CASTToAGraphVisitor(pipeline_state) 

262 pdf_file_name = f"{f_name}-AnnCast.pdf" 

263 agraph.to_pdf(pdf_file_name) 

264 

265 print("\nCalling GrfnVarCreationPass-------------------") 

266 GrfnVarCreationPass(pipeline_state) 

267 

268 print("\nCalling GrfnAssignmentPass-------------------") 

269 GrfnAssignmentPass(pipeline_state) 

270 

271 print("\nCalling LambdaExpressionPass-------------------") 

272 LambdaExpressionPass(pipeline_state) 

273 

274 if gromet: 

275 print("\nCalling ToGrometPass-----------------------") 

276 ToGrometPass(pipeline_state) 

277 

278 if to_file: 

279 with open(f"{f_name}--Gromet-FN-auto.json", "w") as f: 

280 gromet_collection_dict = ( 

281 pipeline_state.gromet_collection.to_dict() 

282 ) 

283 f.write( 

284 dictionary_to_gromet_json( 

285 del_nulls(gromet_collection_dict), level=indent_level 

286 ) 

287 ) 

288 else: 

289 return pipeline_state.gromet_collection 

290 else: 

291 print("\nCalling ToGrfnPass-------------------") 

292 ToGrfnPass(pipeline_state) 

293 grfn = pipeline_state.get_grfn() 

294 grfn.to_json_file(f"{f_name}--AC-GrFN.json") 

295 

296 grfn_agraph = grfn.to_AGraph() 

297 grfn_agraph.draw(f"{f_name}--AC-GrFN.pdf", prog="dot") 

298 

299 print("\nGenerating pickled AnnCast nodes-----------------") 

300 pickled_file_name = f"{f_name}--AnnCast.pickled" 

301 with open(pickled_file_name, "wb") as pkfile: 

302 dill.dump(pipeline_state, pkfile)