Coverage for skema/model_assembly/interpreter.py: 0%
169 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
1import os
2import re
3import sys
4import json
5import importlib
6from pathlib import Path
7from typing import Set, Dict
8from abc import ABC, abstractmethod
10from skema.program_analysis.for2py import f2grfn
11from .networks import GroundedFunctionNetwork
12from .structures import (
13 GenericContainer,
14 GenericStmt,
15 CallStmt,
16 GenericIdentifier,
17 GenericDefinition,
18 VariableDefinition,
19)
20from .code_types import CodeType, build_code_type_decision_tree
23class SourceInterpreter(ABC):
24 def __init__(self, C: Dict, V: Dict, T: Dict, D: Dict):
25 self.containers = C
26 self.variables = V
27 self.types = T
28 self.documentation = D
29 self.decision_tree = build_code_type_decision_tree()
31 @classmethod
32 @abstractmethod
33 def from_src_file(cls, filepath):
34 pass
36 @classmethod
37 @abstractmethod
38 def from_src_dir(cls, dirpath):
39 pass
41 @staticmethod
42 @abstractmethod
43 def interp_file_IR(filepath):
44 pass
47class ImperativeInterpreter(SourceInterpreter):
48 def __init__(self, C, V, T, D):
49 super().__init__(C, V, T, D)
51 @classmethod
52 def from_src_file(cls, file):
53 if not (file.endswith(".for") or file.endswith(".f")):
54 raise ValueError(f"Unsupported file type ending for: {file}")
56 (C, V, T, D) = cls.interp_file_IR(file)
57 return cls(C, V, T, D)
59 @classmethod
60 def from_src_dir(cls, dirpath):
61 src_paths = [
62 os.path.join(root, file)
63 for root, dirs, files in os.walk(dirpath)
64 for file in files
65 if file.endswith(".for") or file.endswith(".f")
66 ]
68 C, V, T, D = {}, {}, {}, {}
69 for src_path in src_paths:
70 (C_new, V_new, T_new, D_new) = cls.interp_file_IR(src_path)
71 C.update(C_new)
72 V.update(V_new)
73 T.update(T_new)
74 D.update(D_new)
76 return cls(C, V, T, D)
78 @staticmethod
79 def interp_file_IR(fortran_file):
80 (
81 python_sources,
82 translated_python_files,
83 mod_mapper_dict,
84 fortran_filename,
85 module_log_file_path,
86 processing_modules,
87 ) = f2grfn.fortran_to_grfn(fortran_file, save_intermediate_files=True)
89 C, V, T, D = dict(), dict(), dict(), dict()
90 for file_num, python_file in enumerate(translated_python_files):
91 lambdas_file_path = python_file.replace(".py", "_lambdas.py")
92 ir_dict = f2grfn.generate_grfn(
93 python_sources[file_num][0],
94 python_file,
95 lambdas_file_path,
96 mod_mapper_dict[0],
97 fortran_file,
98 module_log_file_path,
99 processing_modules,
100 )
102 with open(python_file.replace(".py", "_AIR.json"), "w") as f:
103 json.dump(ir_dict, f, indent=2)
105 for var_data in ir_dict["variables"]:
106 new_var = GenericDefinition.from_dict(var_data)
107 V[new_var.identifier] = new_var
109 for type_data in ir_dict["types"]:
110 new_type = GenericDefinition.from_dict(type_data)
111 T[new_type.identifier] = new_type
113 for con_data in ir_dict["containers"]:
114 new_container = GenericContainer.from_dict(con_data)
115 for in_var in new_container.arguments:
116 if in_var not in V:
117 V[in_var] = VariableDefinition.from_identifier(in_var)
118 C[new_container.identifier] = new_container
120 filename = ir_dict["source"][0]
122 # TODO Paul - is it fine to switch from keying by filename to keying by
123 # container name? Also, lowercasing? - Adarsh
124 container_name = Path(filename).stem.lower()
125 D.update(
126 {
127 n if not n.startswith("$") else container_name + n: data
128 for n, data in ir_dict["source_comments"].items()
129 }
130 )
132 return C, V, T, D
134 def __find_max_call_depth(self, depth, container, visited: Set[str]):
135 # TODO Adarsh: implement this
136 # NOTE: use the visited list to avoid an infinite loop
138 for stmt in container["body"]:
139 function = stmt["function"]
140 if (
141 function["type"] in ("container", "function")
142 and function["name"] not in visited
143 ):
144 visited.add(function["name"])
145 depth = self.__find_max_call_depth(
146 depth + 1, self.containers[function["name"]], visited
147 )
149 return depth
151 def __find_max_cond_depth(self, depth, curr_con):
152 # NOTE: @Adarsh you can hold off on implementing this
153 return NotImplemented
155 def __find_max_loop_depth(self, depth, curr_con):
156 # NOTE: @Adarsh you can hold off on implementing this
157 return NotImplemented
159 def __process_container_stmt_stats(self, stmt, con_name):
160 """
161 Processes through a container call statement gathering stats for the
162 container referenced by con_name.
163 """
164 # TODO Adarsh: this may need some debugging
165 child_con_name = stmt["function"]["name"]
167 child_con = self.containers[child_con_name]
168 child_con_type = child_con["type"]
169 if child_con_type in ("container", "function"):
170 self.container_stats[con_name]["num_calls"] += 1
171 visited = {child_con_name}
172 temp = self.__find_max_call_depth(1, child_con, visited)
173 if temp >= self.container_stats[con_name]["max_call_depth"]:
174 self.container_stats[con_name]["max_call_depth"] = temp
175 elif child_con_type == "if-block":
176 self.container_stats[con_name]["num_conditionals"] += 1
177 temp = self.__find_max_cond_depth(1, child_con)
178 # if temp >= self.container_stats[con_name]["max_conditional_depth"]:
179 # self.container_stats[con_name]["max_conditional_depth"] = temp
180 elif child_con_type == "select-block":
181 self.container_stats[con_name]["num_switches"] += 1
182 elif child_con_type == "loop":
183 self.container_stats[con_name]["num_loops"] += 1
184 temp = self.__find_max_loop_depth(1, child_con)
185 if temp >= self.container_stats[con_name]["max_loop_depth"]:
186 self.container_stats[con_name]["max_loop_depth"] = temp
187 else:
188 raise ValueError(f"Unidentified container type: {child_con_type}")
190 def __is_data_access(lambda_str):
191 """
192 Returns true if this lambda represents a data access, false otherwise.
193 Common Fortran pattern of data access to search for:
194 some_var = some_struct % some_attr
195 NOTE: regex for the "%" on the RHS of the "="
196 """
197 # TODO Adarsh: implement this
198 return NotImplemented
200 def __is_math_assg(lambda_str):
201 """
202 Returns true if any math operator func is found, false otherwise
204 NOTE: need to consider refining to deal with unary minus and divison
205 operators as sometimes being constant creation instead of a math op
206 """
207 # TODO Adarsh: debug this
208 rhs_lambda = lambda_str[lambda_str.find("=") + 1 :]
209 math_ops = r"\+|-|/|\*\*|\*|%"
210 math_funcs = (
211 r"np\.maximum|np\.minimum|np\.exp|np\.log|np\.sqrt|np\.log10"
212 )
213 trig_funcs = (
214 r"np\.sin|np\.cos|np\.tan|np\.arccos|np\.arcsin|np\.arctan"
215 )
216 math_search = re.search(math_ops, rhs_lambda)
217 if math_search is not None:
218 return True
220 func_search = re.search(math_funcs, rhs_lambda)
221 if func_search is not None:
222 return True
224 trig_search = re.search(trig_funcs, rhs_lambda)
225 if trig_search is not None:
226 return True
228 return False
230 def __process_lambda_stmt_stats(self, stmt, con_name):
231 # TODO finish this implementation
232 self.container_stats[con_name]["num_assgs"] += 1
233 lambda_name = stmt["function"]["name"]
234 lambdas_dir = str(lambda_path.parent.resolve())
235 if lambdas_dir not in sys.path:
236 sys.path.insert(0, lambdas_dir)
237 lambdas = importlib.import_module(lambda_path.stem)
238 # NOTE: use inspect.getsource(<lambda-ref>) in order to get the string source
239 # NOTE: We need to search for:
240 # (1) assignment vs condition
241 # (2) accessor assignment vs math assignment
242 # (3) data change assignment vs regular math assignment
243 return NotImplemented
245 def find_root_container(self):
246 all_containers = list(self.containers.keys())
247 called_containers = list()
248 for con in self.containers.values():
249 for stmt in con.statements:
250 if isinstance(stmt, CallStmt):
251 called_containers.append(stmt.call_id)
252 possible_root_containers = list(
253 set(all_containers) - set(called_containers)
254 )
255 if len(possible_root_containers) > 1:
256 raise RuntimeWarning(
257 f"Multiple possible root containers found:\n{possible_root_containers}"
258 )
259 elif len(possible_root_containers) == 0:
260 raise RuntimeError("No possible root containers found:.")
261 return possible_root_containers[0]
263 def gather_container_stats(self):
264 """
265 Analysis code that gathers container statistics used to determine the
266 code-type of this container.
267 """
268 for con_name, con_data in self.containers.items():
269 for stmt in con_data["body"]:
270 # TODO Paul/Adarsh - extend the below to deal with statements that don't
271 # have the 'function' key - e.g. ones that have 'condition' as
272 # a key.
273 if stmt.get("function") is not None:
274 stmt_type = stmt["function"]["type"]
275 if stmt_type == "container":
276 self.__process_container_stmt_stats(stmt, con_name)
277 elif stmt_type == "lambda":
278 self.__process_lambda_stmt_stats(stmt, con_name)
279 else:
280 raise ValueError(
281 f"Unidentified statement type: {stmt_type}"
282 )
284 def label_container_code_type(self, current_node, stats):
285 G = self.decision_tree
286 satisfied = G.nodes[current_node]["func"](stats)
287 for successor in G.successors(current_node):
288 if G.get_edge_data(current_node, successor)["type"] == satisfied:
289 label = (
290 G.nodes[successor]["type"]
291 if G.nodes[successor]["type"] != "condition"
292 else self.label_container_code_type(successor, stats)
293 )
295 return label
297 def label_container_code_types(self):
298 # TODO Adarsh: Implement the code-type decision tree here
299 root = "C0"
300 for container, stats in self.container_stats.items():
301 self.container_code_types[
302 container
303 ] = self.label_container_code_type(root, stats)
305 def build_GrFNs(self):
306 """
307 Creates the GrFNs for each container that has been determined to be
308 represent a scientific model.
309 """
310 return {
311 name: GroundedFunctionNetwork.from_AIR(
312 name,
313 self.containers,
314 self.variables,
315 self.types,
316 )
317 for name in self.containers.keys()
318 if self.container_code_types[name] is CodeType.MODEL
319 }