Coverage for skema/gromet/query/query.py: 0%
59 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
1from typing import List
2import os
4from skema.program_analysis.JSON2GroMEt.json2gromet import json_to_gromet
7from skema.gromet.fn import GrometFNModule
9from skema.gromet.metadata import Metadata, SourceCodeReference
12def get_module_metadata(module: GrometFNModule, idx: int) -> List[Metadata]:
13 """
14 Helper to access metadata index of a GrometFNModule.
15 :param module: The GrometFNModule
16 :param idx: index into the module metadata_collection
17 :return: List of Metadata objects
18 """
19 if idx < len(module.metadata_collection):
20 return module.metadata_collection[idx]
21 else:
22 raise Exception(
23 f"GrometFNModule {module.name} as metadata_collection size {len(module.metadata_collection)}, "
24 f"but asked for index {idx}"
25 )
28def find_source_code_reference(
29 module: GrometFNModule, idx: int
30) -> SourceCodeReference:
31 """
32 Helper to find source_code_reference metadatum given an
33 index into the metadata_collection of a GrometFNModule.
34 :param module: The GrometFNModule
35 :param idx: index into the module metadata_collection
36 :return: A SourceCodeReference Metadata object (if it is in the Metadata list)
37 """
38 metadata = get_module_metadata(module, idx)
39 scr = None
40 for metadatum in metadata:
41 if isinstance(metadatum, SourceCodeReference):
42 scr = metadatum
43 return scr
46def collect_named_output_ports(module: GrometFNModule):
47 named_output_ports = list()
49 def collect_for_fn(fn):
50 """
51 Helper fn to collected all of the named output ports of a FN
52 :param fn:
53 :return:
54 """
55 if fn.pof:
56 for output_port in fn.pof:
57 if output_port.name:
58 # output_port has a name
59 name = output_port.name
60 value = None
61 source_code_reference = None
63 box = fn.bf[
64 output_port.box - 1
65 ] # the box (call) the output_port is attached to
67 # see if we can find source_code_reference for the output_port
68 if box.metadata:
69 # box has metadata, see if it has a source_code_reference (otherwise will return None)
70 source_code_reference = find_source_code_reference(
71 module, box.metadata - 1
72 )
74 # Now see if there is a LiteralValue assigned directly to this output_port
75 if box.contents:
76 attribute_contents = module.attributes[
77 box.contents - 1
78 ] # the contents of the box-call
79 if attribute_contents.type == "FN":
80 # the contents are a FN (not an import)
81 fn_contents = (
82 attribute_contents.value
83 ) # The FN itself
84 if fn_contents.b[0].function_type == "EXPRESSION":
85 # The FN is an expression, so it has a single return port (value)
86 if fn_contents.wfopo:
87 expr_box_idx = fn_contents.wfopo[
88 0
89 ].tgt # identify the value source of the output port
90 expr_box = fn_contents.bf[expr_box_idx - 1]
91 if expr_box.function_type == "LITERAL":
92 # we have a literal!
93 value = (
94 expr_box.value
95 ) # The literal value
97 named_output_ports.append(
98 (name, value, source_code_reference)
99 )
101 if module.fn:
102 collect_for_fn(module.fn)
103 for attr in module.attributes:
104 if attr.type == "FN":
105 collect_for_fn(attr.value)
107 return named_output_ports
110# -----------------------------------------------------------------------------
111# Development Script
112# NOTE: this has been replicated in <skema_root>/notebooks/gromet/gromet_query.ipynb
113# -----------------------------------------------------------------------------
115LOCAL_SKEMA_GOOGLE_DRIVE_ROOT = "/Users/claytonm/My Drive/"
116ROOT = os.path.join(LOCAL_SKEMA_GOOGLE_DRIVE_ROOT, "ASKEM-SKEMA/data/")
117EXP0_GROMET_JSON = os.path.join(
118 ROOT, "gromet/examples/exp0/FN_0.1.4/exp0--Gromet-FN-auto.json"
119)
120EXP1_GROMET_JSON = os.path.join(
121 ROOT, "gromet/examples/exp1/FN_0.1.4/exp1--Gromet-FN-auto.json"
122)
123EXP2_GROMET_JSON = os.path.join(
124 ROOT, "gromet/examples/exp2/FN_0.1.4/exp2--Gromet-FN-auto.json"
125)
126CHIME_SIR_GROMET_JSON = os.path.join(
127 ROOT,
128 "epidemiology/CHIME/CHIME_SIR_model/gromet/FN_0.1.4/CHIME_SIR--Gromet-FN-auto.json",
129)
130EXAMPLE_GROMET_JSON_FILE = (
131 "../../../notebooks/gromet/CHIME_SIR_while_loop--Gromet-FN-auto.json"
132)
135def script():
136 # module = json_to_gromet(EXP0_GROMET_JSON)
137 # module = json_to_gromet(EXP1_GROMET_JSON)
138 # module = json_to_gromet(EXP2_GROMET_JSON)
139 module = json_to_gromet(CHIME_SIR_GROMET_JSON)
140 # module = json_to_gromet(EXAMPLE_GROMET_JSON_FILE)
141 # print(module)
142 # print(len(module.metadata_collection))
143 # print(find_source_code_reference(module, 0))
144 # print(find_source_code_reference(module, 2))
146 nops = collect_named_output_ports(module)
147 for nop in nops:
148 print(nop)
151if __name__ == "__main__":
152 script()