Coverage for skema/program_analysis/CAST2FN/ann_cast/ann_cast_helpers.py: 86%

242 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 17:15 +0000

1import re 

2import sys 

3import typing 

4from dataclasses import dataclass, field 

5from datetime import datetime 

6 

7from skema.model_assembly.metadata import ( 

8 CodeSpanReference, 

9 Domain, 

10 LambdaType, 

11 MetadataMethod, 

12 ProvenanceData, 

13 VariableCreationReason, 

14 VariableFromSource, 

15) 

16from skema.model_assembly.networks import ( 

17 GenericNode, 

18 LambdaNode, 

19 VariableNode, 

20 PackNode, 

21 UnpackNode, 

22) 

23from skema.model_assembly.structures import VariableIdentifier 

24from skema.program_analysis.CAST2FN.ann_cast.annotated_cast import * 

25from skema.program_analysis.CAST2FN.model.cast import SourceRef 

26 

27# NOTE: the GrFN json loading seems to rely on "." as the separator for container scopes 

28# For the Annotated Cast pipeline, it is fine to change these separators as long as they 

29# don't collide 

30# used in ContainerScopePass functions `con_scope_to_str()` and `visit_name()` 

31CON_STR_SEP = "." 

32# delimiter for fullids 

33FULLID_SEP = ":" 

34 

35# NOTE: we use hyphens between names to create illegal identifiers to prevent name collisions 

36LOOPBODY = "loop-body" 

37ELSEBODY = "else-body" 

38IFBODY = "if-body" 

39LOOPPRE = "loop-pre" 

40LOOPEXPR = "loop-expr" 

41LOOPPOST = "loop-post" 

42IFEXPR = "if-expr" 

43 

44MODULE_SCOPE = "module" 

45 

46VAR_INIT_VERSION = 0 

47VAR_EXIT_VERSION = 1 

48 

49# the variable versions for loop interface are extended 

50# to include `LOOP_VAR_UPDATED_VERSION` 

51# because the top loop interface has special semantics 

52# it chooses between the initial version, or the version 

53# updated after loop body execution 

54# However, the top_interface_out produces `VAR_INIT_VERSION` 

55# and bot_interface_in accepts `VAR_EXIT_VERSION` which is consistent 

56# with other containers 

57LOOP_VAR_UPDATED_VERSION = 2 

58 

59 

60@dataclass 

61class GrfnContainerSrcRef: 

62 """ 

63 Used to track the line begin, line end, and source file for ModelIf and Loop 

64 Containers. This data will eventually be added to the containers metadata 

65 """ 

66 

67 line_begin: typing.Optional[int] 

68 line_end: typing.Optional[int] 

69 source_file_name: typing.Optional[str] 

70 

71 

72@dataclass 

73class GrfnAssignment: 

74 assignment_node: LambdaNode 

75 assignment_type: LambdaType 

76 # inputs and outputs map fullid to GrFN Var uid 

77 inputs: typing.Dict[str, str] = field(default_factory=dict) 

78 outputs: typing.Dict[str, str] = field(default_factory=dict) 

79 lambda_expr: str = "" 

80 

81 

82def cast_op_to_str(op): 

83 op_map = { 

84 "Pow": "^", 

85 "Mult": "*", 

86 "Add": "+", 

87 "Sub": "-", 

88 "Div": "/", 

89 "Gt": ">", 

90 "Gte": ">=", 

91 "Lt": "<", 

92 "Lte": "<=", 

93 "Eq": "==", 

94 "NotEq": "!=", 

95 "BitXor": "^", 

96 "BitAnd": "&", 

97 "BitOr": "|", 

98 "LShift": "<<", 

99 "RShift": ">>", 

100 "Not": "not ", 

101 "Invert": "~", 

102 "USub": "- ", 

103 "And": "&&", 

104 "Or": "||", 

105 "Mod": "%", 

106 } 

107 return op_map[op] if op in op_map else None 

108 

109 

110# Metadata functions 

111def source_ref_dict(source_ref: SourceRef): 

112 to_return = dict() 

113 to_return["line_begin"] = source_ref.row_start 

114 to_return["line_end"] = source_ref.row_end 

115 to_return["col_start"] = source_ref.col_start 

116 to_return["col_end"] = source_ref.col_end 

117 return to_return 

118 

119 

120def combine_source_refs(source_refs: typing.List[SourceRef]): 

121 """ 

122 From a list of SourceRefs return a single SourceRef with 

123 row and column range covering all row/column ranges from the list 

124 """ 

125 row_start = sys.maxsize 

126 row_end = -1 

127 col_start = sys.maxsize 

128 col_end = -1 

129 source_file_name = None 

130 

131 for src_ref in source_refs: 

132 if src_ref.row_start is not None and src_ref.row_start < row_start: 

133 row_start = src_ref.row_start 

134 if src_ref.row_end is not None and src_ref.row_end > row_end: 

135 row_end = src_ref.row_end 

136 if src_ref.col_start is not None and src_ref.col_start < col_start: 

137 col_start = src_ref.col_start 

138 if src_ref.col_end is not None and src_ref.col_end > col_end: 

139 col_end = src_ref.col_end 

140 if src_ref.source_file_name is not None: 

141 assert ( 

142 source_file_name is None 

143 or source_file_name == src_ref.source_file_name 

144 ) 

145 source_file_name = src_ref.source_file_name 

146 

147 # use None instead of providing incorrect data 

148 row_start = None if row_start in [-1, sys.maxsize] else row_start 

149 row_end = None if row_end in [-1, sys.maxsize] else row_end 

150 col_start = None if col_start in [-1, sys.maxsize] else col_start 

151 col_end = None if col_end in [-1, sys.maxsize] else col_end 

152 

153 # due to incomplete source ref data, it is possible 

154 # to combine source refs and end up in a situation where we no longer have a valid 

155 # range i.e. row_end < row_start. 

156 # if we run into this, we swap them 

157 if row_end is not None and row_start is not None and row_end < row_start: 

158 row_end, row_start = row_start, row_end 

159 if col_end is not None and col_start is not None and col_end < col_start: 

160 col_end, col_start = col_start, col_end 

161 

162 return SourceRef(source_file_name, col_start, col_end, row_start, row_end) 

163 

164 

165def generate_domain_metadata(): 

166 # FUTURE: this is metadata needs to be updated 

167 # This is just default data that is often incorrect. 

168 # We borrowed this default from the legacy AIR -> GrFN pipeline 

169 data = dict() 

170 data["type"] = "domain" 

171 data["provenance"] = ProvenanceData.from_data( 

172 { 

173 "method": "PROGRAM_ANALYSIS_PIPELINE", 

174 "timestamp": datetime.now(), 

175 } 

176 ) 

177 data["data_type"] = "integer" 

178 data["measurement_scale"] = "discrete" 

179 data["elements"] = [] 

180 

181 return Domain.from_data(data=data) 

182 

183 

184def generate_from_source_metadata( 

185 from_source: bool, reason: VariableCreationReason 

186): 

187 provenance = ProvenanceData( 

188 MetadataMethod.PROGRAM_ANALYSIS_PIPELINE, 

189 ProvenanceData.get_dt_timestamp(), 

190 ) 

191 data = { 

192 "type": "FROM_SOURCE", 

193 "provenance": provenance, 

194 "from_source": str(from_source), 

195 "creation_reason": reason, 

196 } 

197 return VariableFromSource.from_ann_cast_data(data=data) 

198 

199 

200def generate_variable_node_span_metadata(source_refs): 

201 src_ref_dict = {} 

202 file_ref = "" 

203 if source_refs: 

204 # FUTURE: the source_refs attribute of CAST nodes is a list. Because 

205 # of this, it may be good to combine/consolidate source_refs that have multiple 

206 # elements. For now, we just take the first source_ref 

207 src_ref = source_refs[0] 

208 src_ref_dict = source_ref_dict(src_ref) 

209 file_ref = src_ref.source_file_name 

210 

211 code_span_data = { 

212 "source_ref": src_ref_dict, 

213 "file_uid": file_ref, 

214 "code_type": "identifier", 

215 } 

216 return CodeSpanReference.from_air_data(code_span_data) 

217 

218 

219def add_metadata_from_name_node(grfn_var, name_node): 

220 """ 

221 Adds metadata to the GrFN VariableNode inferred from the (Ann)CAST Name node 

222 

223 Currently, all Name nodes are obtained from source, so we generate 

224 the from source metadata accordingly. 

225 """ 

226 from_source = True 

227 from_source_mdata = generate_from_source_metadata( 

228 from_source, VariableCreationReason.UNKNOWN 

229 ) 

230 span_mdata = generate_variable_node_span_metadata(name_node.source_refs) 

231 add_metadata_to_grfn_var(grfn_var, from_source_mdata, span_mdata) 

232 

233 

234def add_metadata_to_grfn_var( 

235 grfn_var, from_source_mdata=None, span_mdata=None, domain_mdata=None 

236): 

237 if from_source_mdata is None: 

238 from_source = True 

239 from_source_mdata = generate_from_source_metadata( 

240 from_source, VariableCreationReason.UNKNOWN 

241 ) 

242 

243 # if this GrFN variable is from source, and we don't have span metadata, create 

244 # an blank SourceRef for its span metadata 

245 if from_source_mdata.from_source and span_mdata is None: 

246 source_refs = [SourceRef(None, None, None, None, None)] 

247 span_mdata = generate_variable_node_span_metadata(source_refs) 

248 

249 if domain_mdata is None: 

250 domain_mdata = generate_domain_metadata() 

251 

252 new_metadata = [from_source_mdata, domain_mdata, span_mdata] 

253 grfn_var.metadata = new_metadata 

254 

255 

256def create_lambda_node_metadata(source_refs): 

257 """ 

258 source_refs is either None or a List of SourceRefs 

259 This is what the spec for CAST implements 

260 """ 

261 src_ref_dict = {} 

262 file_ref = "" 

263 if source_refs: 

264 # FUTURE: the source_refs attribute of CAST nodes is a list. Because 

265 # of this, it may be good to combine/consolidate source_refs that have multiple 

266 # elements. For now, we just take the first source_ref 

267 src_ref = source_refs[0] 

268 src_ref_dict = source_ref_dict(src_ref) 

269 file_ref = src_ref.source_file_name 

270 

271 code_span_data = { 

272 "source_ref": src_ref_dict, 

273 "file_uid": file_ref, 

274 "code_type": "block", 

275 } 

276 metadata = [CodeSpanReference.from_air_data(code_span_data)] 

277 

278 return metadata 

279 

280 

281def create_container_metadata(grfn_src_ref: GrfnContainerSrcRef): 

282 src_ref_dict = {} 

283 src_ref_dict["line_begin"] = grfn_src_ref.line_begin 

284 src_ref_dict["line_end"] = grfn_src_ref.line_end 

285 

286 code_span_data = { 

287 "source_ref": src_ref_dict, 

288 "file_uid": grfn_src_ref.source_file_name, 

289 "code_type": "block", 

290 } 

291 metadata = [CodeSpanReference.from_air_data(code_span_data)] 

292 

293 return metadata 

294 

295 

296def combine_grfn_con_src_refs(source_refs: typing.List[GrfnContainerSrcRef]): 

297 """ 

298 From a list of GrfnContainerSrcRef return a single GrfnContainerSrcRef with 

299 line range covering all line ranges from the list 

300 """ 

301 line_begin = sys.maxsize 

302 line_end = -1 

303 source_file_name = None 

304 

305 for src_ref in source_refs: 

306 if src_ref.line_begin is not None and src_ref.line_begin < line_begin: 

307 line_begin = src_ref.line_begin 

308 if src_ref.line_end is not None and src_ref.line_end > line_end: 

309 line_end = src_ref.line_end 

310 if src_ref.source_file_name is not None: 

311 source_file_name = src_ref.source_file_name 

312 

313 # use None instead of providing incorrect data 

314 line_begin = None if line_begin in [-1, sys.maxsize] else line_begin 

315 line_end = None if line_end in [-1, sys.maxsize] else line_end 

316 

317 # due to incomplete source ref data, it is possible 

318 # to combine source refs and end up in a situation where we no longer have a valid 

319 # range i.e. line_end < line_begin. 

320 # if we run into this, we swap them 

321 if ( 

322 line_end is not None 

323 and line_begin is not None 

324 and line_end < line_begin 

325 ): 

326 line_end, line_begin = line_begin, line_end 

327 

328 return GrfnContainerSrcRef(line_begin, line_end, source_file_name) 

329 

330 

331# End Metadata functions 

332 

333 

334def union_dicts(dict1, dict2): 

335 """ 

336 Combines the key value pairs of dict1 and dict2. 

337 For collisions, don't assume which key-value pair will be chosen. 

338 """ 

339 return {**dict1, **dict2} 

340 

341 

342def con_scope_to_str(scope: typing.List): 

343 return CON_STR_SEP.join(scope) 

344 

345 

346def var_dict_to_str(str_start, vars): 

347 vars_id_and_names = [f" {name}: {id}" for id, name in vars.items()] 

348 return str_start + ", ".join(vars_id_and_names) 

349 

350 

351def interface_to_str(str_start, interface): 

352 return str_start + ", ".join(interface.values()) 

353 

354 

355def decision_in_to_str(str_start, decision): 

356 if_else_fullids = [] 

357 for d in decision.values(): 

358 ifid = d[IFBODY] 

359 elseid = d[ELSEBODY] 

360 if_else_fullids.append(f" If: {ifid}; Else: {elseid}") 

361 

362 return str_start + ", ".join(if_else_fullids) 

363 

364 

365def make_cond_var_name(con_scopestr): 

366 """ 

367 Make a condition variable name from the scope string `con_scopestr` 

368 """ 

369 # FUTURE: potentially add scoping info 

370 return "COND" 

371 

372 

373def make_loop_exit_name(con_scopestr): 

374 """ 

375 Makes a Loop exit variable to be used for GrFN condition node 

376 """ 

377 # FUTURE: potentially add scoping info 

378 return "EXIT" 

379 

380 

381def is_literal_assignment(node): 

382 """ 

383 Check if the node is a Number, Boolean, or String 

384 This may need to updated later 

385 """ 

386 # FUTURE: may need to augment this list e.g. AnnCastList/AnnCastDict etc 

387 if isinstance( 

388 node, 

389 (AnnCastLiteralValue), 

390 # (AnnCastNumber, AnnCastBoolean, AnnCastString, AnnCastLiteralValue), 

391 ): 

392 return True 

393 

394 return False 

395 

396 

397def is_func_def_main(node) -> bool: 

398 """ 

399 Parameter: AnnCastFuncitonDef 

400 Checks if node is the FunctionDef for "main" 

401 """ 

402 # FUTURE: this may need to be extended or adjusted for Python 

403 MAIN_FUNC_DEF_NAME = "main" 

404 return node.name.name == MAIN_FUNC_DEF_NAME 

405 

406 

407def func_def_container_name(node) -> str: 

408 """ 

409 Parameter: AnnCastFunctionDef 

410 Returns function container name in the form "name_id" 

411 """ 

412 return func_container_name_from_name_node(node.name) 

413 

414 

415def func_container_name_from_name_node(node) -> str: 

416 """ 

417 Parameter: AnnCastNameNode 

418 Returns function container name in the form "name_id" 

419 """ 

420 if isinstance(node, AnnCastAttribute): 

421 return f"{node.attr.name}_id{node.attr.id}" 

422 else: 

423 return f"{node.name}_id{node.id}" 

424 

425 

426def func_def_argument_name(node, arg_index: int) -> str: 

427 """ 

428 Returns the FunctionDef argument name for argument with index `arg_index` 

429 Used for the AnnCastCall's top interface in 

430 """ 

431 return f"{func_def_container_name(node)}_arg{arg_index}" 

432 

433 

434def func_def_ret_val_name(node) -> str: 

435 """ 

436 Returns the FunctionDef return value name 

437 Used for the AnnCastCall's bot interface out 

438 """ 

439 return f"{func_def_container_name(node)}_ret_val" 

440 

441 

442def specialized_global_name(node, var_name) -> str: 

443 """ 

444 Parameters: 

445 - node: a AnnCastFunctionDef 

446 - var_name: the variable name for the global 

447 Returns the specialized global name for FunctionDef `func_def_node` 

448 """ 

449 return f"{func_def_container_name(node)}_{var_name}" 

450 

451 

452def call_argument_name(node, arg_index: int) -> str: 

453 """ 

454 Returns the call site argument name for argument with index `arg_index` 

455 Used for the AnnCastCall's top interface in 

456 """ 

457 func_con_name = func_container_name_from_name_node(node.func) 

458 return f"{func_con_name}_call{node.invocation_index}_arg{arg_index}" 

459 

460 

461def call_param_name(node, arg_index: int) -> str: 

462 """ 

463 Returns the call site parameter name for argument with index `arg_index` 

464 Used for the AnnCastCall's top interface in 

465 """ 

466 func_con_name = func_container_name_from_name_node(node.func) 

467 return f"{func_con_name}_call{node.invocation_index}_param{arg_index}" 

468 

469 

470def call_container_name(node) -> str: 

471 """ 

472 Returns the call site container name 

473 Used for the AnnCastCall's top interface out and bot interface in 

474 """ 

475 func_con_name = func_container_name_from_name_node(node.func) 

476 return f"{func_con_name}_call{node.invocation_index}" 

477 

478 

479def call_ret_val_name(node) -> str: 

480 """ 

481 Returns the call site return value name 

482 Used for the AnnCastCall's bot interface out 

483 """ 

484 func_con_name = func_container_name_from_name_node(node.func) 

485 return f"{func_con_name}_call{node.invocation_index}_ret_val" 

486 

487 

488def ann_cast_name_to_fullid(node): 

489 """ 

490 Returns a string representing the fullid of the name node. 

491 The fullid has format 

492 'name.id.version.con_scopestr' 

493 This should only be called after both VariableVersionPass and 

494 ContainerScopePass have completed 

495 """ 

496 return build_fullid( 

497 node.name, node.id, node.version, con_scope_to_str(node.con_scope) 

498 ) 

499 

500 

501def build_fullid(var_name: str, id: int, version: int, con_scopestr: str): 

502 """ 

503 Returns a string representing the fullid. 

504 The fullid has format 

505 'var_name.id.version.con_scopestr' 

506 """ 

507 pieces = [var_name, str(id), str(version), con_scopestr] 

508 if pieces[0] == None: 

509 pieces[0] = "" 

510 return FULLID_SEP.join(pieces) 

511 

512 

513def parse_fullid(fullid: str) -> typing.Dict: 

514 """ 

515 Parses the fullid, returning a dict with mapping the strings 

516 - "var_name" 

517 - "id" 

518 - "version" 

519 - "con_scopestr" 

520 to their respective values determined by the fullid 

521 """ 

522 keys = ["var_name", "id", "version", "con_scopestr"] 

523 values = fullid.split(FULLID_SEP) 

524 

525 assert len(keys) == len(values) 

526 

527 return dict(zip(keys, values)) 

528 

529 

530def lambda_var_from_fullid(fullid: str) -> str: 

531 """ 

532 Return a suitable lambda variable name for variable with fullid `fullid` 

533 """ 

534 parsed = parse_fullid(fullid) 

535 return f"{parsed['var_name']}_{parsed['id']}" 

536 

537 

538def var_name_from_fullid(fullid: str) -> str: 

539 """ 

540 Return the variable name for variable with fullid `fullid` 

541 """ 

542 return parse_fullid(fullid)["var_name"] 

543 

544 

545def create_grfn_literal_node(metadata: typing.List): 

546 """ 

547 Creates a GrFN `LambdaNode` with type `LITERAL` and metadata `metadata`. 

548 The created node has an empty lambda expression (`func_str` attribute) 

549 """ 

550 lambda_uuid = GenericNode.create_node_id() 

551 # we fill out lambda expression in a later pass 

552 lambda_str = "" 

553 lambda_func = lambda: None 

554 lambda_type = LambdaType.LITERAL 

555 return LambdaNode( 

556 lambda_uuid, lambda_type, lambda_str, lambda_func, metadata 

557 ) 

558 

559 

560def create_grfn_assign_node(metadata: typing.List): 

561 """ 

562 Creates a GrFN `LambdaNode` with type `ASSIGN` and metadata `metadata`. 

563 The created node has an empty lambda expression (`func_str` attribute) 

564 """ 

565 lambda_uuid = GenericNode.create_node_id() 

566 # we fill out lambda expression in a later pass 

567 lambda_str = "" 

568 lambda_func = lambda: None 

569 lambda_type = LambdaType.ASSIGN 

570 return LambdaNode( 

571 lambda_uuid, lambda_type, lambda_str, lambda_func, metadata 

572 ) 

573 

574 

575def create_grfn_pack_node(metadata: typing.List): 

576 """ 

577 Creates a GrFN `LambdaNode` with type `PACK` and metadata `metadata`. 

578 """ 

579 lambda_uuid = GenericNode.create_node_id() 

580 lambda_str = "" 

581 lambda_func = lambda: None 

582 lambda_type = LambdaType.PACK 

583 return PackNode( 

584 lambda_uuid, lambda_type, lambda_str, lambda_func, metadata, "", "" 

585 ) 

586 

587 

588def create_grfn_unpack_node(metadata: typing.List): 

589 """ 

590 Creates a GrFN `LambdaNode` with type `UNPACK` and metadata `metadata`. 

591 """ 

592 lambda_uuid = GenericNode.create_node_id() 

593 lambda_str = "" 

594 lambda_func = lambda: None 

595 lambda_type = LambdaType.UNPACK 

596 return UnpackNode( 

597 lambda_uuid, lambda_type, lambda_str, lambda_func, metadata, "", "" 

598 ) 

599 

600 

601def create_grfn_var_from_name_node(node): 

602 """ 

603 Creates a GrFN `VariableNode` for this `AnnCastName` node. 

604 """ 

605 con_scopestr = con_scope_to_str(node.con_scope) 

606 return create_grfn_var(node.name, node.id, node.version, con_scopestr) 

607 

608 

609def create_grfn_var(var_name: str, id: int, version: int, con_scopestr: str): 

610 """ 

611 Creates a GrFN `VariableNode` using the parameters 

612 """ 

613 identifier = VariableIdentifier( 

614 "default_ns", con_scopestr, var_name, version 

615 ) 

616 

617 uid = GenericNode.create_node_id() 

618 

619 # we initialize the GrFN VariableNode with an empty metadata list. 

620 # we fill in the metadata later with a call to add_metadata_to_grfn_var() 

621 metadata = [] 

622 return VariableNode(uid, identifier, metadata)