Coverage for skema/model_assembly/ 0%

169 statements  

« prev     ^ index     » next v7.5.0, created at 2024-04-30 17:15 +0000

1import os 

2import re 

3import sys 

4import json 

5import importlib 

6from pathlib import Path 

7from typing import Set, Dict 

8from abc import ABC, abstractmethod 


10from skema.program_analysis.for2py import f2grfn 

11from .networks import GroundedFunctionNetwork 

12from .structures import ( 

13 GenericContainer, 

14 GenericStmt, 

15 CallStmt, 

16 GenericIdentifier, 

17 GenericDefinition, 

18 VariableDefinition, 


20from .code_types import CodeType, build_code_type_decision_tree 



23class SourceInterpreter(ABC): 

24 def __init__(self, C: Dict, V: Dict, T: Dict, D: Dict): 

25 self.containers = C 

26 self.variables = V 

27 self.types = T 

28 self.documentation = D 

29 self.decision_tree = build_code_type_decision_tree() 


31 @classmethod 

32 @abstractmethod 

33 def from_src_file(cls, filepath): 

34 pass 


36 @classmethod 

37 @abstractmethod 

38 def from_src_dir(cls, dirpath): 

39 pass 


41 @staticmethod 

42 @abstractmethod 

43 def interp_file_IR(filepath): 

44 pass 



47class ImperativeInterpreter(SourceInterpreter): 

48 def __init__(self, C, V, T, D): 

49 super().__init__(C, V, T, D) 


51 @classmethod 

52 def from_src_file(cls, file): 

53 if not (file.endswith(".for") or file.endswith(".f")): 

54 raise ValueError(f"Unsupported file type ending for: {file}") 


56 (C, V, T, D) = cls.interp_file_IR(file) 

57 return cls(C, V, T, D) 


59 @classmethod 

60 def from_src_dir(cls, dirpath): 

61 src_paths = [ 

62 os.path.join(root, file) 

63 for root, dirs, files in os.walk(dirpath) 

64 for file in files 

65 if file.endswith(".for") or file.endswith(".f") 

66 ] 


68 C, V, T, D = {}, {}, {}, {} 

69 for src_path in src_paths: 

70 (C_new, V_new, T_new, D_new) = cls.interp_file_IR(src_path) 

71 C.update(C_new) 

72 V.update(V_new) 

73 T.update(T_new) 

74 D.update(D_new) 


76 return cls(C, V, T, D) 


78 @staticmethod 

79 def interp_file_IR(fortran_file): 

80 ( 

81 python_sources, 

82 translated_python_files, 

83 mod_mapper_dict, 

84 fortran_filename, 

85 module_log_file_path, 

86 processing_modules, 

87 ) = f2grfn.fortran_to_grfn(fortran_file, save_intermediate_files=True) 


89 C, V, T, D = dict(), dict(), dict(), dict() 

90 for file_num, python_file in enumerate(translated_python_files): 

91 lambdas_file_path = python_file.replace(".py", "") 

92 ir_dict = f2grfn.generate_grfn( 

93 python_sources[file_num][0], 

94 python_file, 

95 lambdas_file_path, 

96 mod_mapper_dict[0], 

97 fortran_file, 

98 module_log_file_path, 

99 processing_modules, 

100 ) 


102 with open(python_file.replace(".py", "_AIR.json"), "w") as f: 

103 json.dump(ir_dict, f, indent=2) 


105 for var_data in ir_dict["variables"]: 

106 new_var = GenericDefinition.from_dict(var_data) 

107 V[new_var.identifier] = new_var 


109 for type_data in ir_dict["types"]: 

110 new_type = GenericDefinition.from_dict(type_data) 

111 T[new_type.identifier] = new_type 


113 for con_data in ir_dict["containers"]: 

114 new_container = GenericContainer.from_dict(con_data) 

115 for in_var in new_container.arguments: 

116 if in_var not in V: 

117 V[in_var] = VariableDefinition.from_identifier(in_var) 

118 C[new_container.identifier] = new_container 


120 filename = ir_dict["source"][0] 


122 # TODO Paul - is it fine to switch from keying by filename to keying by 

123 # container name? Also, lowercasing? - Adarsh 

124 container_name = Path(filename).stem.lower() 

125 D.update( 

126 { 

127 n if not n.startswith("$") else container_name + n: data 

128 for n, data in ir_dict["source_comments"].items() 

129 } 

130 ) 


132 return C, V, T, D 


134 def __find_max_call_depth(self, depth, container, visited: Set[str]): 

135 # TODO Adarsh: implement this 

136 # NOTE: use the visited list to avoid an infinite loop 


138 for stmt in container["body"]: 

139 function = stmt["function"] 

140 if ( 

141 function["type"] in ("container", "function") 

142 and function["name"] not in visited 

143 ): 

144 visited.add(function["name"]) 

145 depth = self.__find_max_call_depth( 

146 depth + 1, self.containers[function["name"]], visited 

147 ) 


149 return depth 


151 def __find_max_cond_depth(self, depth, curr_con): 

152 # NOTE: @Adarsh you can hold off on implementing this 

153 return NotImplemented 


155 def __find_max_loop_depth(self, depth, curr_con): 

156 # NOTE: @Adarsh you can hold off on implementing this 

157 return NotImplemented 


159 def __process_container_stmt_stats(self, stmt, con_name): 

160 """ 

161 Processes through a container call statement gathering stats for the 

162 container referenced by con_name. 

163 """ 

164 # TODO Adarsh: this may need some debugging 

165 child_con_name = stmt["function"]["name"] 


167 child_con = self.containers[child_con_name] 

168 child_con_type = child_con["type"] 

169 if child_con_type in ("container", "function"): 

170 self.container_stats[con_name]["num_calls"] += 1 

171 visited = {child_con_name} 

172 temp = self.__find_max_call_depth(1, child_con, visited) 

173 if temp >= self.container_stats[con_name]["max_call_depth"]: 

174 self.container_stats[con_name]["max_call_depth"] = temp 

175 elif child_con_type == "if-block": 

176 self.container_stats[con_name]["num_conditionals"] += 1 

177 temp = self.__find_max_cond_depth(1, child_con) 

178 # if temp >= self.container_stats[con_name]["max_conditional_depth"]: 

179 # self.container_stats[con_name]["max_conditional_depth"] = temp 

180 elif child_con_type == "select-block": 

181 self.container_stats[con_name]["num_switches"] += 1 

182 elif child_con_type == "loop": 

183 self.container_stats[con_name]["num_loops"] += 1 

184 temp = self.__find_max_loop_depth(1, child_con) 

185 if temp >= self.container_stats[con_name]["max_loop_depth"]: 

186 self.container_stats[con_name]["max_loop_depth"] = temp 

187 else: 

188 raise ValueError(f"Unidentified container type: {child_con_type}") 


190 def __is_data_access(lambda_str): 

191 """ 

192 Returns true if this lambda represents a data access, false otherwise. 

193 Common Fortran pattern of data access to search for: 

194 some_var = some_struct % some_attr 

195 NOTE: regex for the "%" on the RHS of the "=" 

196 """ 

197 # TODO Adarsh: implement this 

198 return NotImplemented 


200 def __is_math_assg(lambda_str): 

201 """ 

202 Returns true if any math operator func is found, false otherwise 


204 NOTE: need to consider refining to deal with unary minus and divison 

205 operators as sometimes being constant creation instead of a math op 

206 """ 

207 # TODO Adarsh: debug this 

208 rhs_lambda = lambda_str[lambda_str.find("=") + 1 :] 

209 math_ops = r"\+|-|/|\*\*|\*|%" 

210 math_funcs = ( 

211 r"np\.maximum|np\.minimum|np\.exp|np\.log|np\.sqrt|np\.log10" 

212 ) 

213 trig_funcs = ( 

214 r"np\.sin|np\.cos|np\.tan|np\.arccos|np\.arcsin|np\.arctan" 

215 ) 

216 math_search =, rhs_lambda) 

217 if math_search is not None: 

218 return True 


220 func_search =, rhs_lambda) 

221 if func_search is not None: 

222 return True 


224 trig_search =, rhs_lambda) 

225 if trig_search is not None: 

226 return True 


228 return False 


230 def __process_lambda_stmt_stats(self, stmt, con_name): 

231 # TODO finish this implementation 

232 self.container_stats[con_name]["num_assgs"] += 1 

233 lambda_name = stmt["function"]["name"] 

234 lambdas_dir = str(lambda_path.parent.resolve()) 

235 if lambdas_dir not in sys.path: 

236 sys.path.insert(0, lambdas_dir) 

237 lambdas = importlib.import_module(lambda_path.stem) 

238 # NOTE: use inspect.getsource(<lambda-ref>) in order to get the string source 

239 # NOTE: We need to search for: 

240 # (1) assignment vs condition 

241 # (2) accessor assignment vs math assignment 

242 # (3) data change assignment vs regular math assignment 

243 return NotImplemented 


245 def find_root_container(self): 

246 all_containers = list(self.containers.keys()) 

247 called_containers = list() 

248 for con in self.containers.values(): 

249 for stmt in con.statements: 

250 if isinstance(stmt, CallStmt): 

251 called_containers.append(stmt.call_id) 

252 possible_root_containers = list( 

253 set(all_containers) - set(called_containers) 

254 ) 

255 if len(possible_root_containers) > 1: 

256 raise RuntimeWarning( 

257 f"Multiple possible root containers found:\n{possible_root_containers}" 

258 ) 

259 elif len(possible_root_containers) == 0: 

260 raise RuntimeError("No possible root containers found:.") 

261 return possible_root_containers[0] 


263 def gather_container_stats(self): 

264 """ 

265 Analysis code that gathers container statistics used to determine the 

266 code-type of this container. 

267 """ 

268 for con_name, con_data in self.containers.items(): 

269 for stmt in con_data["body"]: 

270 # TODO Paul/Adarsh - extend the below to deal with statements that don't 

271 # have the 'function' key - e.g. ones that have 'condition' as 

272 # a key. 

273 if stmt.get("function") is not None: 

274 stmt_type = stmt["function"]["type"] 

275 if stmt_type == "container": 

276 self.__process_container_stmt_stats(stmt, con_name) 

277 elif stmt_type == "lambda": 

278 self.__process_lambda_stmt_stats(stmt, con_name) 

279 else: 

280 raise ValueError( 

281 f"Unidentified statement type: {stmt_type}" 

282 ) 


284 def label_container_code_type(self, current_node, stats): 

285 G = self.decision_tree 

286 satisfied = G.nodes[current_node]["func"](stats) 

287 for successor in G.successors(current_node): 

288 if G.get_edge_data(current_node, successor)["type"] == satisfied: 

289 label = ( 

290 G.nodes[successor]["type"] 

291 if G.nodes[successor]["type"] != "condition" 

292 else self.label_container_code_type(successor, stats) 

293 ) 


295 return label 


297 def label_container_code_types(self): 

298 # TODO Adarsh: Implement the code-type decision tree here 

299 root = "C0" 

300 for container, stats in self.container_stats.items(): 

301 self.container_code_types[ 

302 container 

303 ] = self.label_container_code_type(root, stats) 


305 def build_GrFNs(self): 

306 """ 

307 Creates the GrFNs for each container that has been determined to be 

308 represent a scientific model. 

309 """ 

310 return { 

311 name: GroundedFunctionNetwork.from_AIR( 

312 name, 

313 self.containers, 

314 self.variables, 

315 self.types, 

316 ) 

317 for name in self.containers.keys() 

318 if self.container_code_types[name] is CodeType.MODEL 

319 }