Coverage for skema/program_analysis/CAST2FN/ann_cast/ann_cast_helpers.py: 86%
242 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
1import re
2import sys
3import typing
4from dataclasses import dataclass, field
5from datetime import datetime
7from skema.model_assembly.metadata import (
8 CodeSpanReference,
9 Domain,
10 LambdaType,
11 MetadataMethod,
12 ProvenanceData,
13 VariableCreationReason,
14 VariableFromSource,
15)
16from skema.model_assembly.networks import (
17 GenericNode,
18 LambdaNode,
19 VariableNode,
20 PackNode,
21 UnpackNode,
22)
23from skema.model_assembly.structures import VariableIdentifier
24from skema.program_analysis.CAST2FN.ann_cast.annotated_cast import *
25from skema.program_analysis.CAST2FN.model.cast import SourceRef
27# NOTE: the GrFN json loading seems to rely on "." as the separator for container scopes
28# For the Annotated Cast pipeline, it is fine to change these separators as long as they
29# don't collide
30# used in ContainerScopePass functions `con_scope_to_str()` and `visit_name()`
31CON_STR_SEP = "."
32# delimiter for fullids
33FULLID_SEP = ":"
35# NOTE: we use hyphens between names to create illegal identifiers to prevent name collisions
36LOOPBODY = "loop-body"
37ELSEBODY = "else-body"
38IFBODY = "if-body"
39LOOPPRE = "loop-pre"
40LOOPEXPR = "loop-expr"
41LOOPPOST = "loop-post"
42IFEXPR = "if-expr"
44MODULE_SCOPE = "module"
46VAR_INIT_VERSION = 0
47VAR_EXIT_VERSION = 1
49# the variable versions for loop interface are extended
50# to include `LOOP_VAR_UPDATED_VERSION`
51# because the top loop interface has special semantics
52# it chooses between the initial version, or the version
53# updated after loop body execution
54# However, the top_interface_out produces `VAR_INIT_VERSION`
55# and bot_interface_in accepts `VAR_EXIT_VERSION` which is consistent
56# with other containers
57LOOP_VAR_UPDATED_VERSION = 2
60@dataclass
61class GrfnContainerSrcRef:
62 """
63 Used to track the line begin, line end, and source file for ModelIf and Loop
64 Containers. This data will eventually be added to the containers metadata
65 """
67 line_begin: typing.Optional[int]
68 line_end: typing.Optional[int]
69 source_file_name: typing.Optional[str]
72@dataclass
73class GrfnAssignment:
74 assignment_node: LambdaNode
75 assignment_type: LambdaType
76 # inputs and outputs map fullid to GrFN Var uid
77 inputs: typing.Dict[str, str] = field(default_factory=dict)
78 outputs: typing.Dict[str, str] = field(default_factory=dict)
79 lambda_expr: str = ""
82def cast_op_to_str(op):
83 op_map = {
84 "Pow": "^",
85 "Mult": "*",
86 "Add": "+",
87 "Sub": "-",
88 "Div": "/",
89 "Gt": ">",
90 "Gte": ">=",
91 "Lt": "<",
92 "Lte": "<=",
93 "Eq": "==",
94 "NotEq": "!=",
95 "BitXor": "^",
96 "BitAnd": "&",
97 "BitOr": "|",
98 "LShift": "<<",
99 "RShift": ">>",
100 "Not": "not ",
101 "Invert": "~",
102 "USub": "- ",
103 "And": "&&",
104 "Or": "||",
105 "Mod": "%",
106 }
107 return op_map[op] if op in op_map else None
110# Metadata functions
111def source_ref_dict(source_ref: SourceRef):
112 to_return = dict()
113 to_return["line_begin"] = source_ref.row_start
114 to_return["line_end"] = source_ref.row_end
115 to_return["col_start"] = source_ref.col_start
116 to_return["col_end"] = source_ref.col_end
117 return to_return
120def combine_source_refs(source_refs: typing.List[SourceRef]):
121 """
122 From a list of SourceRefs return a single SourceRef with
123 row and column range covering all row/column ranges from the list
124 """
125 row_start = sys.maxsize
126 row_end = -1
127 col_start = sys.maxsize
128 col_end = -1
129 source_file_name = None
131 for src_ref in source_refs:
132 if src_ref.row_start is not None and src_ref.row_start < row_start:
133 row_start = src_ref.row_start
134 if src_ref.row_end is not None and src_ref.row_end > row_end:
135 row_end = src_ref.row_end
136 if src_ref.col_start is not None and src_ref.col_start < col_start:
137 col_start = src_ref.col_start
138 if src_ref.col_end is not None and src_ref.col_end > col_end:
139 col_end = src_ref.col_end
140 if src_ref.source_file_name is not None:
141 assert (
142 source_file_name is None
143 or source_file_name == src_ref.source_file_name
144 )
145 source_file_name = src_ref.source_file_name
147 # use None instead of providing incorrect data
148 row_start = None if row_start in [-1, sys.maxsize] else row_start
149 row_end = None if row_end in [-1, sys.maxsize] else row_end
150 col_start = None if col_start in [-1, sys.maxsize] else col_start
151 col_end = None if col_end in [-1, sys.maxsize] else col_end
153 # due to incomplete source ref data, it is possible
154 # to combine source refs and end up in a situation where we no longer have a valid
155 # range i.e. row_end < row_start.
156 # if we run into this, we swap them
157 if row_end is not None and row_start is not None and row_end < row_start:
158 row_end, row_start = row_start, row_end
159 if col_end is not None and col_start is not None and col_end < col_start:
160 col_end, col_start = col_start, col_end
162 return SourceRef(source_file_name, col_start, col_end, row_start, row_end)
165def generate_domain_metadata():
166 # FUTURE: this is metadata needs to be updated
167 # This is just default data that is often incorrect.
168 # We borrowed this default from the legacy AIR -> GrFN pipeline
169 data = dict()
170 data["type"] = "domain"
171 data["provenance"] = ProvenanceData.from_data(
172 {
173 "method": "PROGRAM_ANALYSIS_PIPELINE",
174 "timestamp": datetime.now(),
175 }
176 )
177 data["data_type"] = "integer"
178 data["measurement_scale"] = "discrete"
179 data["elements"] = []
181 return Domain.from_data(data=data)
184def generate_from_source_metadata(
185 from_source: bool, reason: VariableCreationReason
186):
187 provenance = ProvenanceData(
188 MetadataMethod.PROGRAM_ANALYSIS_PIPELINE,
189 ProvenanceData.get_dt_timestamp(),
190 )
191 data = {
192 "type": "FROM_SOURCE",
193 "provenance": provenance,
194 "from_source": str(from_source),
195 "creation_reason": reason,
196 }
197 return VariableFromSource.from_ann_cast_data(data=data)
200def generate_variable_node_span_metadata(source_refs):
201 src_ref_dict = {}
202 file_ref = ""
203 if source_refs:
204 # FUTURE: the source_refs attribute of CAST nodes is a list. Because
205 # of this, it may be good to combine/consolidate source_refs that have multiple
206 # elements. For now, we just take the first source_ref
207 src_ref = source_refs[0]
208 src_ref_dict = source_ref_dict(src_ref)
209 file_ref = src_ref.source_file_name
211 code_span_data = {
212 "source_ref": src_ref_dict,
213 "file_uid": file_ref,
214 "code_type": "identifier",
215 }
216 return CodeSpanReference.from_air_data(code_span_data)
219def add_metadata_from_name_node(grfn_var, name_node):
220 """
221 Adds metadata to the GrFN VariableNode inferred from the (Ann)CAST Name node
223 Currently, all Name nodes are obtained from source, so we generate
224 the from source metadata accordingly.
225 """
226 from_source = True
227 from_source_mdata = generate_from_source_metadata(
228 from_source, VariableCreationReason.UNKNOWN
229 )
230 span_mdata = generate_variable_node_span_metadata(name_node.source_refs)
231 add_metadata_to_grfn_var(grfn_var, from_source_mdata, span_mdata)
234def add_metadata_to_grfn_var(
235 grfn_var, from_source_mdata=None, span_mdata=None, domain_mdata=None
236):
237 if from_source_mdata is None:
238 from_source = True
239 from_source_mdata = generate_from_source_metadata(
240 from_source, VariableCreationReason.UNKNOWN
241 )
243 # if this GrFN variable is from source, and we don't have span metadata, create
244 # an blank SourceRef for its span metadata
245 if from_source_mdata.from_source and span_mdata is None:
246 source_refs = [SourceRef(None, None, None, None, None)]
247 span_mdata = generate_variable_node_span_metadata(source_refs)
249 if domain_mdata is None:
250 domain_mdata = generate_domain_metadata()
252 new_metadata = [from_source_mdata, domain_mdata, span_mdata]
253 grfn_var.metadata = new_metadata
256def create_lambda_node_metadata(source_refs):
257 """
258 source_refs is either None or a List of SourceRefs
259 This is what the spec for CAST implements
260 """
261 src_ref_dict = {}
262 file_ref = ""
263 if source_refs:
264 # FUTURE: the source_refs attribute of CAST nodes is a list. Because
265 # of this, it may be good to combine/consolidate source_refs that have multiple
266 # elements. For now, we just take the first source_ref
267 src_ref = source_refs[0]
268 src_ref_dict = source_ref_dict(src_ref)
269 file_ref = src_ref.source_file_name
271 code_span_data = {
272 "source_ref": src_ref_dict,
273 "file_uid": file_ref,
274 "code_type": "block",
275 }
276 metadata = [CodeSpanReference.from_air_data(code_span_data)]
278 return metadata
281def create_container_metadata(grfn_src_ref: GrfnContainerSrcRef):
282 src_ref_dict = {}
283 src_ref_dict["line_begin"] = grfn_src_ref.line_begin
284 src_ref_dict["line_end"] = grfn_src_ref.line_end
286 code_span_data = {
287 "source_ref": src_ref_dict,
288 "file_uid": grfn_src_ref.source_file_name,
289 "code_type": "block",
290 }
291 metadata = [CodeSpanReference.from_air_data(code_span_data)]
293 return metadata
296def combine_grfn_con_src_refs(source_refs: typing.List[GrfnContainerSrcRef]):
297 """
298 From a list of GrfnContainerSrcRef return a single GrfnContainerSrcRef with
299 line range covering all line ranges from the list
300 """
301 line_begin = sys.maxsize
302 line_end = -1
303 source_file_name = None
305 for src_ref in source_refs:
306 if src_ref.line_begin is not None and src_ref.line_begin < line_begin:
307 line_begin = src_ref.line_begin
308 if src_ref.line_end is not None and src_ref.line_end > line_end:
309 line_end = src_ref.line_end
310 if src_ref.source_file_name is not None:
311 source_file_name = src_ref.source_file_name
313 # use None instead of providing incorrect data
314 line_begin = None if line_begin in [-1, sys.maxsize] else line_begin
315 line_end = None if line_end in [-1, sys.maxsize] else line_end
317 # due to incomplete source ref data, it is possible
318 # to combine source refs and end up in a situation where we no longer have a valid
319 # range i.e. line_end < line_begin.
320 # if we run into this, we swap them
321 if (
322 line_end is not None
323 and line_begin is not None
324 and line_end < line_begin
325 ):
326 line_end, line_begin = line_begin, line_end
328 return GrfnContainerSrcRef(line_begin, line_end, source_file_name)
331# End Metadata functions
334def union_dicts(dict1, dict2):
335 """
336 Combines the key value pairs of dict1 and dict2.
337 For collisions, don't assume which key-value pair will be chosen.
338 """
339 return {**dict1, **dict2}
342def con_scope_to_str(scope: typing.List):
343 return CON_STR_SEP.join(scope)
346def var_dict_to_str(str_start, vars):
347 vars_id_and_names = [f" {name}: {id}" for id, name in vars.items()]
348 return str_start + ", ".join(vars_id_and_names)
351def interface_to_str(str_start, interface):
352 return str_start + ", ".join(interface.values())
355def decision_in_to_str(str_start, decision):
356 if_else_fullids = []
357 for d in decision.values():
358 ifid = d[IFBODY]
359 elseid = d[ELSEBODY]
360 if_else_fullids.append(f" If: {ifid}; Else: {elseid}")
362 return str_start + ", ".join(if_else_fullids)
365def make_cond_var_name(con_scopestr):
366 """
367 Make a condition variable name from the scope string `con_scopestr`
368 """
369 # FUTURE: potentially add scoping info
370 return "COND"
373def make_loop_exit_name(con_scopestr):
374 """
375 Makes a Loop exit variable to be used for GrFN condition node
376 """
377 # FUTURE: potentially add scoping info
378 return "EXIT"
381def is_literal_assignment(node):
382 """
383 Check if the node is a Number, Boolean, or String
384 This may need to updated later
385 """
386 # FUTURE: may need to augment this list e.g. AnnCastList/AnnCastDict etc
387 if isinstance(
388 node,
389 (AnnCastLiteralValue),
390 # (AnnCastNumber, AnnCastBoolean, AnnCastString, AnnCastLiteralValue),
391 ):
392 return True
394 return False
397def is_func_def_main(node) -> bool:
398 """
399 Parameter: AnnCastFuncitonDef
400 Checks if node is the FunctionDef for "main"
401 """
402 # FUTURE: this may need to be extended or adjusted for Python
403 MAIN_FUNC_DEF_NAME = "main"
404 return node.name.name == MAIN_FUNC_DEF_NAME
407def func_def_container_name(node) -> str:
408 """
409 Parameter: AnnCastFunctionDef
410 Returns function container name in the form "name_id"
411 """
412 return func_container_name_from_name_node(node.name)
415def func_container_name_from_name_node(node) -> str:
416 """
417 Parameter: AnnCastNameNode
418 Returns function container name in the form "name_id"
419 """
420 if isinstance(node, AnnCastAttribute):
421 return f"{node.attr.name}_id{node.attr.id}"
422 else:
423 return f"{node.name}_id{node.id}"
426def func_def_argument_name(node, arg_index: int) -> str:
427 """
428 Returns the FunctionDef argument name for argument with index `arg_index`
429 Used for the AnnCastCall's top interface in
430 """
431 return f"{func_def_container_name(node)}_arg{arg_index}"
434def func_def_ret_val_name(node) -> str:
435 """
436 Returns the FunctionDef return value name
437 Used for the AnnCastCall's bot interface out
438 """
439 return f"{func_def_container_name(node)}_ret_val"
442def specialized_global_name(node, var_name) -> str:
443 """
444 Parameters:
445 - node: a AnnCastFunctionDef
446 - var_name: the variable name for the global
447 Returns the specialized global name for FunctionDef `func_def_node`
448 """
449 return f"{func_def_container_name(node)}_{var_name}"
452def call_argument_name(node, arg_index: int) -> str:
453 """
454 Returns the call site argument name for argument with index `arg_index`
455 Used for the AnnCastCall's top interface in
456 """
457 func_con_name = func_container_name_from_name_node(node.func)
458 return f"{func_con_name}_call{node.invocation_index}_arg{arg_index}"
461def call_param_name(node, arg_index: int) -> str:
462 """
463 Returns the call site parameter name for argument with index `arg_index`
464 Used for the AnnCastCall's top interface in
465 """
466 func_con_name = func_container_name_from_name_node(node.func)
467 return f"{func_con_name}_call{node.invocation_index}_param{arg_index}"
470def call_container_name(node) -> str:
471 """
472 Returns the call site container name
473 Used for the AnnCastCall's top interface out and bot interface in
474 """
475 func_con_name = func_container_name_from_name_node(node.func)
476 return f"{func_con_name}_call{node.invocation_index}"
479def call_ret_val_name(node) -> str:
480 """
481 Returns the call site return value name
482 Used for the AnnCastCall's bot interface out
483 """
484 func_con_name = func_container_name_from_name_node(node.func)
485 return f"{func_con_name}_call{node.invocation_index}_ret_val"
488def ann_cast_name_to_fullid(node):
489 """
490 Returns a string representing the fullid of the name node.
491 The fullid has format
492 'name.id.version.con_scopestr'
493 This should only be called after both VariableVersionPass and
494 ContainerScopePass have completed
495 """
496 return build_fullid(
497 node.name, node.id, node.version, con_scope_to_str(node.con_scope)
498 )
501def build_fullid(var_name: str, id: int, version: int, con_scopestr: str):
502 """
503 Returns a string representing the fullid.
504 The fullid has format
505 'var_name.id.version.con_scopestr'
506 """
507 pieces = [var_name, str(id), str(version), con_scopestr]
508 if pieces[0] == None:
509 pieces[0] = ""
510 return FULLID_SEP.join(pieces)
513def parse_fullid(fullid: str) -> typing.Dict:
514 """
515 Parses the fullid, returning a dict with mapping the strings
516 - "var_name"
517 - "id"
518 - "version"
519 - "con_scopestr"
520 to their respective values determined by the fullid
521 """
522 keys = ["var_name", "id", "version", "con_scopestr"]
523 values = fullid.split(FULLID_SEP)
525 assert len(keys) == len(values)
527 return dict(zip(keys, values))
530def lambda_var_from_fullid(fullid: str) -> str:
531 """
532 Return a suitable lambda variable name for variable with fullid `fullid`
533 """
534 parsed = parse_fullid(fullid)
535 return f"{parsed['var_name']}_{parsed['id']}"
538def var_name_from_fullid(fullid: str) -> str:
539 """
540 Return the variable name for variable with fullid `fullid`
541 """
542 return parse_fullid(fullid)["var_name"]
545def create_grfn_literal_node(metadata: typing.List):
546 """
547 Creates a GrFN `LambdaNode` with type `LITERAL` and metadata `metadata`.
548 The created node has an empty lambda expression (`func_str` attribute)
549 """
550 lambda_uuid = GenericNode.create_node_id()
551 # we fill out lambda expression in a later pass
552 lambda_str = ""
553 lambda_func = lambda: None
554 lambda_type = LambdaType.LITERAL
555 return LambdaNode(
556 lambda_uuid, lambda_type, lambda_str, lambda_func, metadata
557 )
560def create_grfn_assign_node(metadata: typing.List):
561 """
562 Creates a GrFN `LambdaNode` with type `ASSIGN` and metadata `metadata`.
563 The created node has an empty lambda expression (`func_str` attribute)
564 """
565 lambda_uuid = GenericNode.create_node_id()
566 # we fill out lambda expression in a later pass
567 lambda_str = ""
568 lambda_func = lambda: None
569 lambda_type = LambdaType.ASSIGN
570 return LambdaNode(
571 lambda_uuid, lambda_type, lambda_str, lambda_func, metadata
572 )
575def create_grfn_pack_node(metadata: typing.List):
576 """
577 Creates a GrFN `LambdaNode` with type `PACK` and metadata `metadata`.
578 """
579 lambda_uuid = GenericNode.create_node_id()
580 lambda_str = ""
581 lambda_func = lambda: None
582 lambda_type = LambdaType.PACK
583 return PackNode(
584 lambda_uuid, lambda_type, lambda_str, lambda_func, metadata, "", ""
585 )
588def create_grfn_unpack_node(metadata: typing.List):
589 """
590 Creates a GrFN `LambdaNode` with type `UNPACK` and metadata `metadata`.
591 """
592 lambda_uuid = GenericNode.create_node_id()
593 lambda_str = ""
594 lambda_func = lambda: None
595 lambda_type = LambdaType.UNPACK
596 return UnpackNode(
597 lambda_uuid, lambda_type, lambda_str, lambda_func, metadata, "", ""
598 )
601def create_grfn_var_from_name_node(node):
602 """
603 Creates a GrFN `VariableNode` for this `AnnCastName` node.
604 """
605 con_scopestr = con_scope_to_str(node.con_scope)
606 return create_grfn_var(node.name, node.id, node.version, con_scopestr)
609def create_grfn_var(var_name: str, id: int, version: int, con_scopestr: str):
610 """
611 Creates a GrFN `VariableNode` using the parameters
612 """
613 identifier = VariableIdentifier(
614 "default_ns", con_scopestr, var_name, version
615 )
617 uid = GenericNode.create_node_id()
619 # we initialize the GrFN VariableNode with an empty metadata list.
620 # we fill in the metadata later with a call to add_metadata_to_grfn_var()
621 metadata = []
622 return VariableNode(uid, identifier, metadata)