Coverage for skema/model_assembly/metadata.py: 69%

371 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 17:15 +0000

1from __future__ import annotations 

2from abc import ABC, abstractclassmethod, abstractmethod 

3from copy import deepcopy 

4from enum import Enum, auto, unique 

5from dataclasses import dataclass 

6from datetime import datetime 

7from typing import List, Union, Type, Dict 

8from time import time 

9 

10from ..utils.misc import uuid 

11 

12CategoricalTypes = Union[bool, str, int] 

13NumericalTypes = Union[int, float] 

14 

15 

16class MissingEnumError(Exception): 

17 pass 

18 

19 

20class AutoMATESBaseEnum(Enum): 

21 def __str__(self): 

22 return str(self.name).lower() 

23 

24 @abstractclassmethod 

25 def from_str(cls, child_cls: Type, data: str): 

26 try: 

27 return getattr(child_cls, data.upper()) 

28 except AttributeError: 

29 raise MissingEnumError( 

30 f"No matching value found in {child_cls.__name__} for {data}" 

31 ) 

32 

33 

34@unique 

35class MetadataType(AutoMATESBaseEnum): 

36 NONE = auto() 

37 GRFN_CREATION = auto() 

38 EQUATION_EXTRACTION = auto() 

39 TEXT_EXTRACTION = auto() 

40 CODE_SPAN_REFERENCE = auto() 

41 CODE_COLLECTION_REFERENCE = auto() 

42 DOMAIN = auto() 

43 FROM_SOURCE = auto() 

44 

45 @classmethod 

46 def from_str(cls, data: str): 

47 return super().from_str(cls, data) 

48 

49 @classmethod 

50 def get_metadata_class(cls, mtype: MetadataType) -> TypedMetadata: 

51 if mtype == cls.GRFN_CREATION: 

52 return GrFNCreation 

53 elif mtype == cls.CODE_SPAN_REFERENCE: 

54 return CodeSpanReference 

55 elif mtype == cls.CODE_COLLECTION_REFERENCE: 

56 return CodeCollectionReference 

57 elif mtype == cls.DOMAIN: 

58 return Domain 

59 elif mtype == cls.FROM_SOURCE: 

60 return VariableFromSource 

61 else: 

62 raise MissingEnumError( 

63 "Unhandled MetadataType to TypedMetadata conversion " 

64 + f"for: {mtype}" 

65 ) 

66 

67 

68@unique 

69class MetadataMethod(AutoMATESBaseEnum): 

70 NONE = auto() 

71 TEXT_READING_PIPELINE = auto() 

72 EQUATION_READING_PIPELINE = auto() 

73 PROGRAM_ANALYSIS_PIPELINE = auto() 

74 MODEL_ASSEMBLY_PIPELINE = auto() 

75 CODE_ROLE_ASSIGNMENT = auto() 

76 

77 @classmethod 

78 def from_str(cls, data: str): 

79 return super().from_str(cls, data) 

80 

81 

82@unique 

83class MeasurementType(AutoMATESBaseEnum): 

84 # NOTE: Refer to this stats data type blog post: 

85 # https://towardsdatascience.com/data-types-in-statistics-347e152e8bee 

86 

87 # NOTE: the ordering of the values below is incredibly important!! 

88 UNKNOWN = 0 # used for instances where the measurement scale is unknown 

89 NONE = 1 # Used for undefined variable types 

90 CATEGORICAL = 2 # Labels used to represent a quality 

91 BINARY = 3 # Categorical measure with *only two* categories 

92 NOMINAL = 4 # Categorical measure with *many* categories 

93 ORDINAL = 5 # Categorical measure with many *ordered* categories 

94 NUMERICAL = 6 # Numbers used to express a quantity 

95 DISCRETE = 7 # Numerical measure with *countably infinite* options 

96 CONTINUOUS = 8 # Numerical measure w/ *uncountably infinite* options 

97 INTERVAL = 9 # Continuous measure *without* an absolute zero 

98 RATIO = 10 # Continuous measure *with* an absolute zero 

99 

100 @classmethod 

101 def from_name(cls, name: str): 

102 name = name.lower() 

103 if name == "float": 

104 return cls.CONTINUOUS 

105 elif name == "string": 

106 return cls.NOMINAL 

107 elif name == "boolean": 

108 return cls.BINARY 

109 elif name == "integer": 

110 return cls.DISCRETE 

111 # TODO remove array after updating for2py to use list type 

112 elif any(name == x for x in ["none", "list", "array", "object"]): 

113 return cls.NONE 

114 elif name == "unknown": 

115 return cls.UNKNOWN 

116 else: 

117 raise ValueError(f"MeasurementType unrecognized name: {name}") 

118 

119 @classmethod 

120 def from_str(cls, data: str): 

121 return super().from_str(cls, data) 

122 

123 @classmethod 

124 def isa_categorical(cls, item: MeasurementType) -> bool: 

125 return any( 

126 [ 

127 item == x 

128 for x in range(cls.CATEGORICAL.value, cls.NUMERICAL.value) 

129 ] 

130 ) 

131 

132 @classmethod 

133 def isa_numerical(cls, item: MeasurementType) -> bool: 

134 return any( 

135 [ 

136 item == x 

137 for x in range(cls.NUMERICAL.value, cls.RATIO.value + 1) 

138 ] 

139 ) 

140 

141 

142@unique 

143class LambdaType(AutoMATESBaseEnum): 

144 ASSIGN = auto() 

145 LITERAL = auto() 

146 CONDITION = auto() 

147 DECISION = auto() 

148 INTERFACE = auto() 

149 EXTRACT = auto() 

150 PACK = auto() 

151 OPERATOR = auto() 

152 LOOP_TOP_INTERFACE = auto() 

153 UNPACK = auto() 

154 

155 def __str__(self): 

156 return str(self.name) 

157 

158 def shortname(self): 

159 if self == LambdaType.LOOP_TOP_INTERFACE: 

160 return "LTI" 

161 return self.__str__()[0] 

162 

163 @classmethod 

164 def get_lambda_type(cls, type_str: str, num_inputs: int): 

165 if type_str == "assign": 

166 if num_inputs == 0: 

167 return cls.LITERAL 

168 return cls.ASSIGN 

169 elif type_str == "condition": 

170 return cls.CONDITION 

171 elif type_str == "decision": 

172 return cls.DECISION 

173 elif type_str == "interface": 

174 return cls.INTERFACE 

175 elif type_str == "pack": 

176 return cls.PACK 

177 elif type_str == "unpack": 

178 return cls.UNPACK 

179 elif type_str == "extract": 

180 return cls.EXTRACT 

181 elif type_str == "loop_top_interface": 

182 return cls.LOOP_TOP_INTERFACE 

183 else: 

184 raise ValueError(f"Unrecognized lambda type name: {type_str}") 

185 

186 @classmethod 

187 def from_str(cls, data: str): 

188 return super().from_str(cls, data) 

189 

190 

191@unique 

192class DataType(AutoMATESBaseEnum): 

193 BOOLEAN = auto() 

194 STRING = auto() 

195 INTEGER = auto() 

196 SHORT = auto() 

197 LONG = auto() 

198 FLOAT = auto() 

199 DOUBLE = auto() 

200 ARRAY = auto() 

201 LIST = auto() 

202 STRUCT = auto() 

203 UNION = auto() 

204 OBJECT = auto() 

205 NONE = auto() 

206 UNKNOWN = auto() 

207 

208 @classmethod 

209 def from_str(cls, data: str): 

210 return super().from_str(cls, data) 

211 

212 

213@unique 

214class CodeSpanType(AutoMATESBaseEnum): 

215 IDENTIFIER = auto() 

216 BLOCK = auto() 

217 

218 @classmethod 

219 def from_str(cls, data: str): 

220 return super().from_str(cls, data) 

221 

222 

223@unique 

224class SuperSet(Enum): 

225 ALL_BOOLS = auto() 

226 ALL_STRS = auto() 

227 

228 def __str__(self): 

229 return str(self.name) 

230 

231 @classmethod 

232 def from_data_type(cls, dtype: DataType): 

233 if dtype == DataType.BOOLEAN: 

234 return cls.ALL_BOOLS 

235 elif dtype == DataType.STRING: 

236 return cls.ALL_STRS 

237 else: 

238 raise ValueError(f"No implemented superset for type: {dtype}") 

239 

240 @classmethod 

241 def ismember(cls, item: DataType, sset: SuperSet) -> bool: 

242 if sset == cls.ALL_BOOLS: 

243 return isinstance(item, DataType.BOOLEAN) 

244 elif sset == cls.ALL_STRS: 

245 return isinstance(item, DataType.STRING) 

246 else: 

247 raise TypeError(f"Unrecognized SuperSet type: {sset}") 

248 

249 

250class BaseMetadata(ABC): 

251 @abstractmethod 

252 def to_dict(self) -> str: 

253 """Returns the contents of this metadata object as a JSON string.""" 

254 return NotImplemented 

255 

256 

257@dataclass 

258class ProvenanceData(BaseMetadata): 

259 method: MetadataMethod 

260 timestamp: datetime 

261 

262 @staticmethod 

263 def get_dt_timestamp() -> datetime: 

264 """Returns an datetime timestamp.""" 

265 return datetime.fromtimestamp(time()) 

266 

267 @classmethod 

268 def from_data(cls, data: dict) -> ProvenanceData: 

269 return cls(MetadataMethod.from_str(data["method"]), data["timestamp"]) 

270 

271 def to_dict(self): 

272 return {"method": str(self.method), "timestamp": str(self.timestamp)} 

273 

274 

275@dataclass 

276class TypedMetadata(BaseMetadata): 

277 type: MetadataType 

278 provenance: ProvenanceData 

279 

280 @abstractclassmethod 

281 def from_data(cls, data): 

282 data = deepcopy(data) 

283 mtype = MetadataType.from_str(data["type"]) 

284 provenance = ProvenanceData.from_data(data["provenance"]) 

285 ChildMetadataClass = MetadataType.get_metadata_class(mtype) 

286 data.update({"type": mtype, "provenance": provenance}) 

287 return ChildMetadataClass.from_data(data) 

288 

289 def to_dict(self): 

290 return { 

291 "type": str(self.type), 

292 "provenance": self.provenance.to_dict(), 

293 } 

294 

295 

296@dataclass 

297class CodeSpan(BaseMetadata): 

298 line_begin: int 

299 line_end: int 

300 col_begin: int 

301 col_end: int 

302 

303 @classmethod 

304 def from_source_ref(cls, source_ref: Dict[str, int]) -> CodeSpan: 

305 def get_ref_with_default(ref: str) -> Union[int, None]: 

306 return source_ref[ref] if ref in source_ref else None 

307 

308 return cls( 

309 get_ref_with_default("line_begin"), 

310 get_ref_with_default("line_end"), 

311 get_ref_with_default("col_start"), 

312 get_ref_with_default("col_end"), 

313 ) 

314 

315 @classmethod 

316 def from_data(cls, data: dict) -> CodeSpan: 

317 if data == {} or data is None: 

318 return None 

319 else: 

320 return cls(**data) 

321 

322 def to_dict(self): 

323 return { 

324 "line_begin": self.line_begin, 

325 "line_end": self.line_end, 

326 "col_begin": self.col_begin, 

327 "col_end": self.col_end, 

328 } 

329 

330 

331@dataclass 

332class CodeFileReference(BaseMetadata): 

333 uid: str 

334 name: str 

335 path: str 

336 

337 @classmethod 

338 def from_str(cls, filepath: str) -> CodeFileReference: 

339 split_point = filepath.rfind("/") 

340 dirpath = filepath[: split_point + 1] 

341 filename = filepath[split_point + 1 :] 

342 return cls(str(uuid.uuid4()), filename, dirpath) 

343 

344 @classmethod 

345 def from_data(cls, data: dict) -> CodeFileReference: 

346 return cls(**data) 

347 

348 def to_dict(self): 

349 return {"uid": self.uid, "name": self.name, "path": self.path} 

350 

351 

352@dataclass 

353class DomainInterval(BaseMetadata): 

354 l_bound: NumericalTypes 

355 u_bound: NumericalTypes 

356 l_inclusive: bool 

357 u_inclusive: bool 

358 

359 @classmethod 

360 def from_data(cls, data: dict) -> DomainInterval: 

361 return cls(**data) 

362 

363 def to_dict(self): 

364 return { 

365 "l_bound": str(self.l_bound), 

366 "u_bound": str(self.u_bound), 

367 "l_inclusive": self.l_inclusive, 

368 "u_inclusive": self.u_inclusive, 

369 } 

370 

371 

372@dataclass 

373class DomainSet(BaseMetadata): 

374 superset: SuperSet 

375 predicate: str 

376 

377 @classmethod 

378 def from_data(cls, data: dict) -> DomainSet: 

379 return cls(SuperSet.from_str(data["superset"]), data["predicate"]) 

380 

381 def to_dict(self): 

382 return {"superset": str(self.superset), "predicate": self.predicate} 

383 

384 

385DomainElement = Union[DomainInterval, DomainSet] 

386 

387 

388@dataclass 

389class CodeSpanReference(TypedMetadata): 

390 code_type: CodeSpanType 

391 code_file_reference_uid: str 

392 code_span: CodeSpan 

393 

394 @classmethod 

395 def from_air_data(cls, data: dict) -> CodeSpanReference: 

396 return cls( 

397 MetadataType.CODE_SPAN_REFERENCE, 

398 ProvenanceData( 

399 MetadataMethod.PROGRAM_ANALYSIS_PIPELINE, 

400 ProvenanceData.get_dt_timestamp(), 

401 ), 

402 CodeSpanType.from_str(data["code_type"]), 

403 data["file_uid"], 

404 CodeSpan.from_source_ref(data["source_ref"]), 

405 ) 

406 

407 @classmethod 

408 def from_data(cls, data: dict) -> CodeSpanReference: 

409 return cls( 

410 data["type"], 

411 data["provenance"], 

412 CodeSpanType.from_str(data["code_type"]), 

413 data["code_file_reference_uid"], 

414 CodeSpan.from_data(data["code_span"]), 

415 ) 

416 

417 def to_dict(self): 

418 data = super().to_dict() 

419 data.update( 

420 { 

421 "code_type": str(self.code_type), 

422 "code_file_reference_uid": self.code_file_reference_uid, 

423 "code_span": self.code_span.to_dict(), 

424 } 

425 ) 

426 return data 

427 

428 

429@unique 

430class VariableCreationReason(AutoMATESBaseEnum): 

431 UNKNOWN = auto() 

432 LOOP_ITERATION = auto() 

433 TUPLE_DECONSTRUCTION = auto() 

434 INLINE_EXPRESSION_EXPANSION = auto() 

435 INLINE_CALL_RESULT = auto() 

436 COMPLEX_RETURN_EXPR = auto() 

437 CONDITION_RESULT = auto() 

438 LOOP_EXIT_VAR = auto() 

439 LITERAL_FUNCTION_ARG = auto() 

440 TOP_IFACE_INTRO = auto() 

441 BOT_IFACE_INTRO = auto() 

442 FUNC_RET_VAL = auto() 

443 FUNC_ARG = auto() 

444 COND_VAR = auto() 

445 DUP_GLOBAL = auto() 

446 DUMMY_ASSIGN = auto() 

447 

448 def __str__(self): 

449 return str(self.name) 

450 

451 @classmethod 

452 def from_str(cls, data: str): 

453 return super().from_str(cls, data) 

454 

455 

456@dataclass 

457class VariableFromSource(TypedMetadata): 

458 from_source: bool 

459 creation_reason: VariableCreationReason 

460 

461 @classmethod 

462 def from_air_data(cls, data: dict) -> VariableFromSource: 

463 return cls( 

464 MetadataType.FROM_SOURCE, 

465 ProvenanceData( 

466 MetadataMethod.PROGRAM_ANALYSIS_PIPELINE, 

467 ProvenanceData.get_dt_timestamp(), 

468 ), 

469 CodeSpanType.from_str(data["code_type"]), 

470 data["file_uid"], 

471 CodeSpan.from_source_ref(data["source_ref"]), 

472 ) 

473 

474 @classmethod 

475 def from_data(cls, data: dict) -> VariableFromSource: 

476 return cls( 

477 data["type"], 

478 data["provenance"], 

479 data["from_source"] or data["from_source"] == "True", 

480 VariableCreationReason.from_str(data["creation_reason"]), 

481 ) 

482 

483 @classmethod 

484 def from_ann_cast_data(cls, data: dict) -> VariableFromSource: 

485 return cls( 

486 data["type"], 

487 data["provenance"], 

488 data["from_source"] or data["from_source"] == "True", 

489 data["creation_reason"], 

490 ) 

491 

492 def to_dict(self): 

493 data = super().to_dict() 

494 data.update( 

495 { 

496 "from_source": str(self.from_source), 

497 "creation_reason": str(self.creation_reason), 

498 } 

499 ) 

500 return data 

501 

502 

503@dataclass 

504class GrFNCreation(TypedMetadata): 

505 name: str 

506 

507 @classmethod 

508 def from_name(cls, filepath: str) -> GrFNCreation: 

509 filename = filepath[filepath.rfind("/") + 1 :] 

510 return cls( 

511 MetadataType.GRFN_CREATION, 

512 ProvenanceData( 

513 MetadataMethod.MODEL_ASSEMBLY_PIPELINE, 

514 ProvenanceData.get_dt_timestamp(), 

515 ), 

516 filename, 

517 ) 

518 

519 @classmethod 

520 def from_data(cls, data: dict) -> GrFNCreation: 

521 return cls(**data) 

522 

523 def to_dict(self): 

524 data = super().to_dict() 

525 data.update({"name": self.name}) 

526 return data 

527 

528 

529@dataclass 

530class CodeCollectionReference(TypedMetadata): 

531 global_reference_uid: str 

532 files: List[CodeFileReference] 

533 

534 @classmethod 

535 def from_sources(cls, sources: List[str]) -> CodeCollectionReference: 

536 return cls( 

537 MetadataType.CODE_COLLECTION_REFERENCE, 

538 ProvenanceData( 

539 MetadataMethod.PROGRAM_ANALYSIS_PIPELINE, 

540 ProvenanceData.get_dt_timestamp(), 

541 ), 

542 str(uuid.uuid4()), 

543 [CodeFileReference.from_str(fpath) for fpath in sources], 

544 ) 

545 

546 @classmethod 

547 def from_data(cls, data: dict) -> CodeCollectionReference: 

548 return cls( 

549 data["type"], 

550 data["provenance"], 

551 data["global_reference_uid"], 

552 [CodeFileReference.from_data(d) for d in data["files"]], 

553 ) 

554 

555 def to_dict(self): 

556 data = super().to_dict() 

557 data.update( 

558 { 

559 "global_reference_uid": self.global_reference_uid, 

560 "files": [code_file.to_dict() for code_file in self.files], 

561 } 

562 ) 

563 return data 

564 

565 

566# @dataclass 

567# class MeasurementType(TypedMetadata): 

568# data_type: MeasurementType 

569 

570 

571# @dataclass 

572# class MeasurementScale(TypedMetadata): 

573# measurement_scale: str 

574 

575 

576@dataclass 

577class Domain(TypedMetadata): 

578 data_type: DataType 

579 measurement_scale: MeasurementType 

580 elements: List[DomainElement] 

581 

582 @classmethod 

583 def from_data(cls, data: dict) -> Domain: 

584 mtype = MeasurementType.from_str(data["measurement_scale"]) 

585 if MeasurementType.isa_categorical(mtype): 

586 els = [DomainSet.from_data(dom_el) for dom_el in data["elements"]] 

587 elif MeasurementType.isa_numerical(mtype): 

588 els = [ 

589 DomainInterval.from_data(dom_el) for dom_el in data["elements"] 

590 ] 

591 else: 

592 els = [] 

593 return cls( 

594 data["type"], 

595 data["provenance"], 

596 DataType.from_str(data["data_type"]), 

597 mtype, 

598 els, 

599 ) 

600 

601 def to_dict(self): 

602 data = super().to_dict() 

603 data.update( 

604 { 

605 "data_type": str(self.data_type), 

606 "measurement_scale": str(self.measurement_scale), 

607 "elements": [dom_el.to_dict() for dom_el in self.elements], 

608 } 

609 ) 

610 return data