Coverage for skema/gromet/query/query.py: 0%

59 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 17:15 +0000

1from typing import List 

2import os 

3 

4from skema.program_analysis.JSON2GroMEt.json2gromet import json_to_gromet 

5 

6 

7from skema.gromet.fn import GrometFNModule 

8 

9from skema.gromet.metadata import Metadata, SourceCodeReference 

10 

11 

12def get_module_metadata(module: GrometFNModule, idx: int) -> List[Metadata]: 

13 """ 

14 Helper to access metadata index of a GrometFNModule. 

15 :param module: The GrometFNModule 

16 :param idx: index into the module metadata_collection 

17 :return: List of Metadata objects 

18 """ 

19 if idx < len(module.metadata_collection): 

20 return module.metadata_collection[idx] 

21 else: 

22 raise Exception( 

23 f"GrometFNModule {module.name} as metadata_collection size {len(module.metadata_collection)}, " 

24 f"but asked for index {idx}" 

25 ) 

26 

27 

28def find_source_code_reference( 

29 module: GrometFNModule, idx: int 

30) -> SourceCodeReference: 

31 """ 

32 Helper to find source_code_reference metadatum given an 

33 index into the metadata_collection of a GrometFNModule. 

34 :param module: The GrometFNModule 

35 :param idx: index into the module metadata_collection 

36 :return: A SourceCodeReference Metadata object (if it is in the Metadata list) 

37 """ 

38 metadata = get_module_metadata(module, idx) 

39 scr = None 

40 for metadatum in metadata: 

41 if isinstance(metadatum, SourceCodeReference): 

42 scr = metadatum 

43 return scr 

44 

45 

46def collect_named_output_ports(module: GrometFNModule): 

47 named_output_ports = list() 

48 

49 def collect_for_fn(fn): 

50 """ 

51 Helper fn to collected all of the named output ports of a FN 

52 :param fn: 

53 :return: 

54 """ 

55 if fn.pof: 

56 for output_port in fn.pof: 

57 if output_port.name: 

58 # output_port has a name 

59 name = output_port.name 

60 value = None 

61 source_code_reference = None 

62 

63 box = fn.bf[ 

64 output_port.box - 1 

65 ] # the box (call) the output_port is attached to 

66 

67 # see if we can find source_code_reference for the output_port 

68 if box.metadata: 

69 # box has metadata, see if it has a source_code_reference (otherwise will return None) 

70 source_code_reference = find_source_code_reference( 

71 module, box.metadata - 1 

72 ) 

73 

74 # Now see if there is a LiteralValue assigned directly to this output_port 

75 if box.contents: 

76 attribute_contents = module.attributes[ 

77 box.contents - 1 

78 ] # the contents of the box-call 

79 if attribute_contents.type == "FN": 

80 # the contents are a FN (not an import) 

81 fn_contents = ( 

82 attribute_contents.value 

83 ) # The FN itself 

84 if fn_contents.b[0].function_type == "EXPRESSION": 

85 # The FN is an expression, so it has a single return port (value) 

86 if fn_contents.wfopo: 

87 expr_box_idx = fn_contents.wfopo[ 

88 0 

89 ].tgt # identify the value source of the output port 

90 expr_box = fn_contents.bf[expr_box_idx - 1] 

91 if expr_box.function_type == "LITERAL": 

92 # we have a literal! 

93 value = ( 

94 expr_box.value 

95 ) # The literal value 

96 

97 named_output_ports.append( 

98 (name, value, source_code_reference) 

99 ) 

100 

101 if module.fn: 

102 collect_for_fn(module.fn) 

103 for attr in module.attributes: 

104 if attr.type == "FN": 

105 collect_for_fn(attr.value) 

106 

107 return named_output_ports 

108 

109 

110# ----------------------------------------------------------------------------- 

111# Development Script 

112# NOTE: this has been replicated in <skema_root>/notebooks/gromet/gromet_query.ipynb 

113# ----------------------------------------------------------------------------- 

114 

115LOCAL_SKEMA_GOOGLE_DRIVE_ROOT = "/Users/claytonm/My Drive/" 

116ROOT = os.path.join(LOCAL_SKEMA_GOOGLE_DRIVE_ROOT, "ASKEM-SKEMA/data/") 

117EXP0_GROMET_JSON = os.path.join( 

118 ROOT, "gromet/examples/exp0/FN_0.1.4/exp0--Gromet-FN-auto.json" 

119) 

120EXP1_GROMET_JSON = os.path.join( 

121 ROOT, "gromet/examples/exp1/FN_0.1.4/exp1--Gromet-FN-auto.json" 

122) 

123EXP2_GROMET_JSON = os.path.join( 

124 ROOT, "gromet/examples/exp2/FN_0.1.4/exp2--Gromet-FN-auto.json" 

125) 

126CHIME_SIR_GROMET_JSON = os.path.join( 

127 ROOT, 

128 "epidemiology/CHIME/CHIME_SIR_model/gromet/FN_0.1.4/CHIME_SIR--Gromet-FN-auto.json", 

129) 

130EXAMPLE_GROMET_JSON_FILE = ( 

131 "../../../notebooks/gromet/CHIME_SIR_while_loop--Gromet-FN-auto.json" 

132) 

133 

134 

135def script(): 

136 # module = json_to_gromet(EXP0_GROMET_JSON) 

137 # module = json_to_gromet(EXP1_GROMET_JSON) 

138 # module = json_to_gromet(EXP2_GROMET_JSON) 

139 module = json_to_gromet(CHIME_SIR_GROMET_JSON) 

140 # module = json_to_gromet(EXAMPLE_GROMET_JSON_FILE) 

141 # print(module) 

142 # print(len(module.metadata_collection)) 

143 # print(find_source_code_reference(module, 0)) 

144 # print(find_source_code_reference(module, 2)) 

145 

146 nops = collect_named_output_ports(module) 

147 for nop in nops: 

148 print(nop) 

149 

150 

151if __name__ == "__main__": 

152 script()