Coverage for skema/utils/script_functions.py: 35%

134 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 17:15 +0000

1import ast 

2import dill 

3import os.path 

4import json 

5 

6from skema.gromet.fn import ( 

7 GrometFNModuleCollection, 

8) 

9 

10from skema.utils.fold import dictionary_to_gromet_json, del_nulls 

11from skema.program_analysis.CAST.pythonAST import py_ast_to_cast 

12from skema.program_analysis.CAST2FN import cast 

13from skema.program_analysis.CAST2FN.model.cast import SourceRef 

14from skema.program_analysis.CAST2FN.cast import CAST 

15from skema.program_analysis.CAST2FN.visitors.cast_to_agraph_visitor import ( 

16 CASTToAGraphVisitor, 

17) 

18from skema.program_analysis.CAST2FN.ann_cast.cast_to_annotated_cast import ( 

19 CastToAnnotatedCastVisitor, 

20) 

21from skema.program_analysis.CAST2FN.ann_cast.id_collapse_pass import ( 

22 IdCollapsePass, 

23) 

24from skema.program_analysis.CAST2FN.ann_cast.container_scope_pass import ( 

25 ContainerScopePass, 

26) 

27from skema.program_analysis.CAST2FN.ann_cast.variable_version_pass import ( 

28 VariableVersionPass, 

29) 

30from skema.program_analysis.CAST2FN.ann_cast.grfn_var_creation_pass import ( 

31 GrfnVarCreationPass, 

32) 

33from skema.program_analysis.CAST2FN.ann_cast.grfn_assignment_pass import ( 

34 GrfnAssignmentPass, 

35) 

36from skema.program_analysis.CAST2FN.ann_cast.lambda_expression_pass import ( 

37 LambdaExpressionPass, 

38) 

39from skema.program_analysis.CAST2FN.ann_cast.to_grfn_pass import ToGrfnPass 

40from skema.program_analysis.CAST2FN.ann_cast.to_gromet_pass import ( 

41 ToGrometPass, 

42) 

43 

44 

45def process_file_system(system_name, path, files, write_to_file=False): 

46 root_dir = path.strip() 

47 file_list = open(files, "r").readlines() 

48 

49 module_collection = GrometFNModuleCollection( 

50 schema_version="0.1.6", 

51 name=system_name, 

52 modules=[], 

53 module_index=[], 

54 executables=[], 

55 ) 

56 

57 for f in file_list: 

58 full_file = os.path.join(os.path.normpath(root_dir), f.rstrip("\n")) 

59 

60 # Open the file 

61 # TODO: Do we want to open the CAST or the Python source? 

62 # If we open the Python source then we need to generate its CAST and then generate its GroMEt after 

63 # I'm thinking for now we open the CAST, and generate GroMEt 

64 # As a next-step we can incorporate the Python -> CAST step 

65 print(full_file.rstrip()) 

66 

67 try: 

68 cast = python_to_cast(full_file, cast_obj=True) 

69 generated_gromet = ann_cast_pipeline( 

70 cast, gromet=True, to_file=False, from_obj=True 

71 ) 

72 

73 # Then, after we generate the GroMEt we store it in the 'modules' field 

74 # and store its path in the 'module_index' field 

75 module_collection.modules.append(generated_gromet) 

76 

77 # DONE: Change this so that it's the dotted path from the root 

78 # i.e. like model.view.sir" like it shows up in Python 

79 source_directory = os.path.basename( 

80 os.path.normpath(root_dir) 

81 ) # We just need the last directory of the path, not the complete path 

82 os_module_path = os.path.join(source_directory, f) 

83 python_module_path = os_module_path.replace("/", ".").replace( 

84 ".py", "" 

85 ) 

86 module_collection.module_index.append(python_module_path) 

87 

88 # Done: Determine how we know a gromet goes in the 'executable' field 

89 # We do this by finding all user_defined top level functions in the Gromet 

90 # and check if the name 'main' is among them 

91 function_networks = [ 

92 fn.value 

93 for fn in generated_gromet.attributes 

94 if fn.type == "FN" 

95 ] 

96 defined_functions = [ 

97 fn.b[0].name 

98 for fn in function_networks 

99 if fn.b[0].function_type == "FUNCTION" 

100 ] 

101 if "main" in defined_functions: 

102 module_collection.executables.append(python_module_path) 

103 

104 except ImportError: 

105 print("FAILURE") 

106 

107 # After we go through the whole system, we can then write out the module_collection 

108 if write_to_file: 

109 with open(f"{system_name}--Gromet-FN-auto.json", "w") as f: 

110 gromet_collection_dict = module_collection.to_dict() 

111 f.write( 

112 dictionary_to_gromet_json(del_nulls(gromet_collection_dict)) 

113 ) 

114 

115 return module_collection 

116 

117 

118def python_to_cast( 

119 pyfile_path, 

120 agraph=False, 

121 astprint=False, 

122 std_out=False, 

123 rawjson=False, 

124 legacy=False, 

125 cast_obj=False, 

126): 

127 # Open Python file as a giant string 

128 file_handle = open(pyfile_path) 

129 file_contents = file_handle.read() 

130 file_handle.close() 

131 file_name = pyfile_path.split("/")[-1] 

132 

133 # Count the number of lines in the file 

134 file_handle = open(pyfile_path) 

135 file_list = file_handle.readlines() 

136 line_count = 0 

137 for l in file_list: 

138 line_count += 1 

139 file_handle.close() 

140 

141 # Create a PyASTToCAST Object 

142 if legacy: 

143 convert = py_ast_to_cast.PyASTToCAST(file_name, legacy=True) 

144 else: 

145 convert = py_ast_to_cast.PyASTToCAST(file_name) 

146 

147 # 'Root' the current working directory so that it's where the 

148 # Source file we're generating CAST for is (for Import statements) 

149 old_path = os.getcwd() 

150 try: 

151 idx = pyfile_path.rfind("/") 

152 

153 if idx > -1: 

154 curr_path = pyfile_path[0:idx] 

155 os.chdir(curr_path) 

156 else: 

157 curr_path = "./" + pyfile_path 

158 

159 # os.chdir(curr_path) 

160 

161 # Parse the python program's AST and create the CAST 

162 contents = ast.parse(file_contents) 

163 C = convert.visit(contents, {}, {}) 

164 C.source_refs = [SourceRef(file_name, None, None, 1, line_count)] 

165 finally: 

166 os.chdir(old_path) 

167 

168 out_cast = cast.CAST([C], "python") 

169 

170 if agraph: 

171 V = CASTToAGraphVisitor(out_cast) 

172 last_slash_idx = file_name.rfind("/") 

173 file_ending_idx = file_name.rfind(".") 

174 pdf_file_name = ( 

175 f"{file_name[last_slash_idx + 1 : file_ending_idx]}.pdf" 

176 ) 

177 V.to_pdf(pdf_file_name) 

178 

179 # Then, print CAST as JSON 

180 if cast_obj: 

181 return out_cast 

182 else: 

183 if rawjson: 

184 print( 

185 json.dumps( 

186 out_cast.to_json_object(), sort_keys=True, indent=None 

187 ) 

188 ) 

189 else: 

190 if std_out: 

191 print(out_cast.to_json_str()) 

192 else: 

193 out_name = file_name.split(".")[0] 

194 print("Writing CAST to " + out_name + "--CAST.json") 

195 out_handle = open(out_name + "--CAST.json", "w") 

196 out_handle.write(out_cast.to_json_str()) 

197 

198 

199def ann_cast_pipeline( 

200 cast_instance, 

201 to_file=True, 

202 gromet=False, 

203 grfn_2_2=False, 

204 a_graph=False, 

205 from_obj=False, 

206 indent_level=0, 

207): 

208 """cast_to_annotated.py 

209 

210 This function reads a JSON file that contains the CAST representation 

211 of a program, and transforms it to annotated CAST. It then calls a 

212 series of passes that each augment the information in the annotatd CAST nodes 

213 in preparation for the GrFN generation. 

214 

215 One command-line argument is expected, namely the name of the JSON file that 

216 contains the CAST data. 

217 TODO: Update this docstring as the program has been tweaked so that this is a function instead of 

218 the program 

219 """ 

220 

221 if from_obj: 

222 f_name = "" 

223 cast = cast_instance 

224 else: 

225 f_name = cast_instance 

226 f_name = f_name.split("/")[-1] 

227 file_contents = open(f_name, "r").read() 

228 

229 cast_json = CAST([], "python") 

230 cast = cast_json.from_json_str(file_contents) 

231 

232 visitor = CastToAnnotatedCastVisitor(cast) 

233 # The Annotated Cast is an attribute of the PipelineState object 

234 pipeline_state = visitor.generate_annotated_cast(grfn_2_2) 

235 

236 # TODO: make filename creation more resilient 

237 

238 print("Calling IdCollapsePass------------------------") 

239 IdCollapsePass(pipeline_state) 

240 

241 print("\nCalling ContainerScopePass-------------------") 

242 ContainerScopePass(pipeline_state) 

243 

244 print("\nCalling VariableVersionPass-------------------") 

245 VariableVersionPass(pipeline_state) 

246 

247 # NOTE: CASTToAGraphVisitor uses misc.uuid, so placing it here means 

248 # that the generated GrFN uuids will not be consistent with GrFN uuids 

249 # created during test runtime. So, do not use these GrFN jsons as expected 

250 # json for testing 

251 f_name = f_name.replace("--CAST.json", "") 

252 if a_graph: 

253 agraph = CASTToAGraphVisitor(pipeline_state) 

254 pdf_file_name = f"{f_name}-AnnCast.pdf" 

255 agraph.to_pdf(pdf_file_name) 

256 

257 

258 print("\nCalling GrfnVarCreationPass-------------------") 

259 GrfnVarCreationPass(pipeline_state) 

260 

261 print("\nCalling GrfnAssignmentPass-------------------") 

262 GrfnAssignmentPass(pipeline_state) 

263 

264 print("\nCalling LambdaExpressionPass-------------------") 

265 LambdaExpressionPass(pipeline_state) 

266 

267 if gromet: 

268 print("\nCalling ToGrometPass-----------------------") 

269 ToGrometPass(pipeline_state) 

270 

271 if to_file: 

272 with open(f"{f_name}--Gromet-FN-auto.json", "w") as f: 

273 gromet_collection_dict = ( 

274 pipeline_state.gromet_collection.to_dict() 

275 ) 

276 f.write( 

277 dictionary_to_gromet_json( 

278 del_nulls(gromet_collection_dict), level=indent_level 

279 ) 

280 ) 

281 else: 

282 return pipeline_state.gromet_collection 

283 else: 

284 print("\nCalling ToGrfnPass-------------------") 

285 ToGrfnPass(pipeline_state) 

286 grfn = pipeline_state.get_grfn() 

287 grfn.to_json_file(f"{f_name}--AC-GrFN.json") 

288 

289 grfn_agraph = grfn.to_AGraph() 

290 grfn_agraph.draw(f"{f_name}--AC-GrFN.pdf", prog="dot") 

291 

292 print("\nGenerating pickled AnnCast nodes-----------------") 

293 pickled_file_name = f"{f_name}--AnnCast.pickled" 

294 with open(pickled_file_name, "wb") as pkfile: 

295 dill.dump(pipeline_state, pkfile)