Coverage for skema/model_assembly/metadata.py: 69%
371 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
1from __future__ import annotations
2from abc import ABC, abstractclassmethod, abstractmethod
3from copy import deepcopy
4from enum import Enum, auto, unique
5from dataclasses import dataclass
6from datetime import datetime
7from typing import List, Union, Type, Dict
8from time import time
10from ..utils.misc import uuid
12CategoricalTypes = Union[bool, str, int]
13NumericalTypes = Union[int, float]
16class MissingEnumError(Exception):
17 pass
20class AutoMATESBaseEnum(Enum):
21 def __str__(self):
22 return str(self.name).lower()
24 @abstractclassmethod
25 def from_str(cls, child_cls: Type, data: str):
26 try:
27 return getattr(child_cls, data.upper())
28 except AttributeError:
29 raise MissingEnumError(
30 f"No matching value found in {child_cls.__name__} for {data}"
31 )
34@unique
35class MetadataType(AutoMATESBaseEnum):
36 NONE = auto()
37 GRFN_CREATION = auto()
38 EQUATION_EXTRACTION = auto()
39 TEXT_EXTRACTION = auto()
40 CODE_SPAN_REFERENCE = auto()
41 CODE_COLLECTION_REFERENCE = auto()
42 DOMAIN = auto()
43 FROM_SOURCE = auto()
45 @classmethod
46 def from_str(cls, data: str):
47 return super().from_str(cls, data)
49 @classmethod
50 def get_metadata_class(cls, mtype: MetadataType) -> TypedMetadata:
51 if mtype == cls.GRFN_CREATION:
52 return GrFNCreation
53 elif mtype == cls.CODE_SPAN_REFERENCE:
54 return CodeSpanReference
55 elif mtype == cls.CODE_COLLECTION_REFERENCE:
56 return CodeCollectionReference
57 elif mtype == cls.DOMAIN:
58 return Domain
59 elif mtype == cls.FROM_SOURCE:
60 return VariableFromSource
61 else:
62 raise MissingEnumError(
63 "Unhandled MetadataType to TypedMetadata conversion "
64 + f"for: {mtype}"
65 )
68@unique
69class MetadataMethod(AutoMATESBaseEnum):
70 NONE = auto()
71 TEXT_READING_PIPELINE = auto()
72 EQUATION_READING_PIPELINE = auto()
73 PROGRAM_ANALYSIS_PIPELINE = auto()
74 MODEL_ASSEMBLY_PIPELINE = auto()
75 CODE_ROLE_ASSIGNMENT = auto()
77 @classmethod
78 def from_str(cls, data: str):
79 return super().from_str(cls, data)
82@unique
83class MeasurementType(AutoMATESBaseEnum):
84 # NOTE: Refer to this stats data type blog post:
85 # https://towardsdatascience.com/data-types-in-statistics-347e152e8bee
87 # NOTE: the ordering of the values below is incredibly important!!
88 UNKNOWN = 0 # used for instances where the measurement scale is unknown
89 NONE = 1 # Used for undefined variable types
90 CATEGORICAL = 2 # Labels used to represent a quality
91 BINARY = 3 # Categorical measure with *only two* categories
92 NOMINAL = 4 # Categorical measure with *many* categories
93 ORDINAL = 5 # Categorical measure with many *ordered* categories
94 NUMERICAL = 6 # Numbers used to express a quantity
95 DISCRETE = 7 # Numerical measure with *countably infinite* options
96 CONTINUOUS = 8 # Numerical measure w/ *uncountably infinite* options
97 INTERVAL = 9 # Continuous measure *without* an absolute zero
98 RATIO = 10 # Continuous measure *with* an absolute zero
100 @classmethod
101 def from_name(cls, name: str):
102 name = name.lower()
103 if name == "float":
104 return cls.CONTINUOUS
105 elif name == "string":
106 return cls.NOMINAL
107 elif name == "boolean":
108 return cls.BINARY
109 elif name == "integer":
110 return cls.DISCRETE
111 # TODO remove array after updating for2py to use list type
112 elif any(name == x for x in ["none", "list", "array", "object"]):
113 return cls.NONE
114 elif name == "unknown":
115 return cls.UNKNOWN
116 else:
117 raise ValueError(f"MeasurementType unrecognized name: {name}")
119 @classmethod
120 def from_str(cls, data: str):
121 return super().from_str(cls, data)
123 @classmethod
124 def isa_categorical(cls, item: MeasurementType) -> bool:
125 return any(
126 [
127 item == x
128 for x in range(cls.CATEGORICAL.value, cls.NUMERICAL.value)
129 ]
130 )
132 @classmethod
133 def isa_numerical(cls, item: MeasurementType) -> bool:
134 return any(
135 [
136 item == x
137 for x in range(cls.NUMERICAL.value, cls.RATIO.value + 1)
138 ]
139 )
142@unique
143class LambdaType(AutoMATESBaseEnum):
144 ASSIGN = auto()
145 LITERAL = auto()
146 CONDITION = auto()
147 DECISION = auto()
148 INTERFACE = auto()
149 EXTRACT = auto()
150 PACK = auto()
151 OPERATOR = auto()
152 LOOP_TOP_INTERFACE = auto()
153 UNPACK = auto()
155 def __str__(self):
156 return str(self.name)
158 def shortname(self):
159 if self == LambdaType.LOOP_TOP_INTERFACE:
160 return "LTI"
161 return self.__str__()[0]
163 @classmethod
164 def get_lambda_type(cls, type_str: str, num_inputs: int):
165 if type_str == "assign":
166 if num_inputs == 0:
167 return cls.LITERAL
168 return cls.ASSIGN
169 elif type_str == "condition":
170 return cls.CONDITION
171 elif type_str == "decision":
172 return cls.DECISION
173 elif type_str == "interface":
174 return cls.INTERFACE
175 elif type_str == "pack":
176 return cls.PACK
177 elif type_str == "unpack":
178 return cls.UNPACK
179 elif type_str == "extract":
180 return cls.EXTRACT
181 elif type_str == "loop_top_interface":
182 return cls.LOOP_TOP_INTERFACE
183 else:
184 raise ValueError(f"Unrecognized lambda type name: {type_str}")
186 @classmethod
187 def from_str(cls, data: str):
188 return super().from_str(cls, data)
191@unique
192class DataType(AutoMATESBaseEnum):
193 BOOLEAN = auto()
194 STRING = auto()
195 INTEGER = auto()
196 SHORT = auto()
197 LONG = auto()
198 FLOAT = auto()
199 DOUBLE = auto()
200 ARRAY = auto()
201 LIST = auto()
202 STRUCT = auto()
203 UNION = auto()
204 OBJECT = auto()
205 NONE = auto()
206 UNKNOWN = auto()
208 @classmethod
209 def from_str(cls, data: str):
210 return super().from_str(cls, data)
213@unique
214class CodeSpanType(AutoMATESBaseEnum):
215 IDENTIFIER = auto()
216 BLOCK = auto()
218 @classmethod
219 def from_str(cls, data: str):
220 return super().from_str(cls, data)
223@unique
224class SuperSet(Enum):
225 ALL_BOOLS = auto()
226 ALL_STRS = auto()
228 def __str__(self):
229 return str(self.name)
231 @classmethod
232 def from_data_type(cls, dtype: DataType):
233 if dtype == DataType.BOOLEAN:
234 return cls.ALL_BOOLS
235 elif dtype == DataType.STRING:
236 return cls.ALL_STRS
237 else:
238 raise ValueError(f"No implemented superset for type: {dtype}")
240 @classmethod
241 def ismember(cls, item: DataType, sset: SuperSet) -> bool:
242 if sset == cls.ALL_BOOLS:
243 return isinstance(item, DataType.BOOLEAN)
244 elif sset == cls.ALL_STRS:
245 return isinstance(item, DataType.STRING)
246 else:
247 raise TypeError(f"Unrecognized SuperSet type: {sset}")
250class BaseMetadata(ABC):
251 @abstractmethod
252 def to_dict(self) -> str:
253 """Returns the contents of this metadata object as a JSON string."""
254 return NotImplemented
257@dataclass
258class ProvenanceData(BaseMetadata):
259 method: MetadataMethod
260 timestamp: datetime
262 @staticmethod
263 def get_dt_timestamp() -> datetime:
264 """Returns an datetime timestamp."""
265 return datetime.fromtimestamp(time())
267 @classmethod
268 def from_data(cls, data: dict) -> ProvenanceData:
269 return cls(MetadataMethod.from_str(data["method"]), data["timestamp"])
271 def to_dict(self):
272 return {"method": str(self.method), "timestamp": str(self.timestamp)}
275@dataclass
276class TypedMetadata(BaseMetadata):
277 type: MetadataType
278 provenance: ProvenanceData
280 @abstractclassmethod
281 def from_data(cls, data):
282 data = deepcopy(data)
283 mtype = MetadataType.from_str(data["type"])
284 provenance = ProvenanceData.from_data(data["provenance"])
285 ChildMetadataClass = MetadataType.get_metadata_class(mtype)
286 data.update({"type": mtype, "provenance": provenance})
287 return ChildMetadataClass.from_data(data)
289 def to_dict(self):
290 return {
291 "type": str(self.type),
292 "provenance": self.provenance.to_dict(),
293 }
296@dataclass
297class CodeSpan(BaseMetadata):
298 line_begin: int
299 line_end: int
300 col_begin: int
301 col_end: int
303 @classmethod
304 def from_source_ref(cls, source_ref: Dict[str, int]) -> CodeSpan:
305 def get_ref_with_default(ref: str) -> Union[int, None]:
306 return source_ref[ref] if ref in source_ref else None
308 return cls(
309 get_ref_with_default("line_begin"),
310 get_ref_with_default("line_end"),
311 get_ref_with_default("col_start"),
312 get_ref_with_default("col_end"),
313 )
315 @classmethod
316 def from_data(cls, data: dict) -> CodeSpan:
317 if data == {} or data is None:
318 return None
319 else:
320 return cls(**data)
322 def to_dict(self):
323 return {
324 "line_begin": self.line_begin,
325 "line_end": self.line_end,
326 "col_begin": self.col_begin,
327 "col_end": self.col_end,
328 }
331@dataclass
332class CodeFileReference(BaseMetadata):
333 uid: str
334 name: str
335 path: str
337 @classmethod
338 def from_str(cls, filepath: str) -> CodeFileReference:
339 split_point = filepath.rfind("/")
340 dirpath = filepath[: split_point + 1]
341 filename = filepath[split_point + 1 :]
342 return cls(str(uuid.uuid4()), filename, dirpath)
344 @classmethod
345 def from_data(cls, data: dict) -> CodeFileReference:
346 return cls(**data)
348 def to_dict(self):
349 return {"uid": self.uid, "name": self.name, "path": self.path}
352@dataclass
353class DomainInterval(BaseMetadata):
354 l_bound: NumericalTypes
355 u_bound: NumericalTypes
356 l_inclusive: bool
357 u_inclusive: bool
359 @classmethod
360 def from_data(cls, data: dict) -> DomainInterval:
361 return cls(**data)
363 def to_dict(self):
364 return {
365 "l_bound": str(self.l_bound),
366 "u_bound": str(self.u_bound),
367 "l_inclusive": self.l_inclusive,
368 "u_inclusive": self.u_inclusive,
369 }
372@dataclass
373class DomainSet(BaseMetadata):
374 superset: SuperSet
375 predicate: str
377 @classmethod
378 def from_data(cls, data: dict) -> DomainSet:
379 return cls(SuperSet.from_str(data["superset"]), data["predicate"])
381 def to_dict(self):
382 return {"superset": str(self.superset), "predicate": self.predicate}
385DomainElement = Union[DomainInterval, DomainSet]
388@dataclass
389class CodeSpanReference(TypedMetadata):
390 code_type: CodeSpanType
391 code_file_reference_uid: str
392 code_span: CodeSpan
394 @classmethod
395 def from_air_data(cls, data: dict) -> CodeSpanReference:
396 return cls(
397 MetadataType.CODE_SPAN_REFERENCE,
398 ProvenanceData(
399 MetadataMethod.PROGRAM_ANALYSIS_PIPELINE,
400 ProvenanceData.get_dt_timestamp(),
401 ),
402 CodeSpanType.from_str(data["code_type"]),
403 data["file_uid"],
404 CodeSpan.from_source_ref(data["source_ref"]),
405 )
407 @classmethod
408 def from_data(cls, data: dict) -> CodeSpanReference:
409 return cls(
410 data["type"],
411 data["provenance"],
412 CodeSpanType.from_str(data["code_type"]),
413 data["code_file_reference_uid"],
414 CodeSpan.from_data(data["code_span"]),
415 )
417 def to_dict(self):
418 data = super().to_dict()
419 data.update(
420 {
421 "code_type": str(self.code_type),
422 "code_file_reference_uid": self.code_file_reference_uid,
423 "code_span": self.code_span.to_dict(),
424 }
425 )
426 return data
429@unique
430class VariableCreationReason(AutoMATESBaseEnum):
431 UNKNOWN = auto()
432 LOOP_ITERATION = auto()
433 TUPLE_DECONSTRUCTION = auto()
434 INLINE_EXPRESSION_EXPANSION = auto()
435 INLINE_CALL_RESULT = auto()
436 COMPLEX_RETURN_EXPR = auto()
437 CONDITION_RESULT = auto()
438 LOOP_EXIT_VAR = auto()
439 LITERAL_FUNCTION_ARG = auto()
440 TOP_IFACE_INTRO = auto()
441 BOT_IFACE_INTRO = auto()
442 FUNC_RET_VAL = auto()
443 FUNC_ARG = auto()
444 COND_VAR = auto()
445 DUP_GLOBAL = auto()
446 DUMMY_ASSIGN = auto()
448 def __str__(self):
449 return str(self.name)
451 @classmethod
452 def from_str(cls, data: str):
453 return super().from_str(cls, data)
456@dataclass
457class VariableFromSource(TypedMetadata):
458 from_source: bool
459 creation_reason: VariableCreationReason
461 @classmethod
462 def from_air_data(cls, data: dict) -> VariableFromSource:
463 return cls(
464 MetadataType.FROM_SOURCE,
465 ProvenanceData(
466 MetadataMethod.PROGRAM_ANALYSIS_PIPELINE,
467 ProvenanceData.get_dt_timestamp(),
468 ),
469 CodeSpanType.from_str(data["code_type"]),
470 data["file_uid"],
471 CodeSpan.from_source_ref(data["source_ref"]),
472 )
474 @classmethod
475 def from_data(cls, data: dict) -> VariableFromSource:
476 return cls(
477 data["type"],
478 data["provenance"],
479 data["from_source"] or data["from_source"] == "True",
480 VariableCreationReason.from_str(data["creation_reason"]),
481 )
483 @classmethod
484 def from_ann_cast_data(cls, data: dict) -> VariableFromSource:
485 return cls(
486 data["type"],
487 data["provenance"],
488 data["from_source"] or data["from_source"] == "True",
489 data["creation_reason"],
490 )
492 def to_dict(self):
493 data = super().to_dict()
494 data.update(
495 {
496 "from_source": str(self.from_source),
497 "creation_reason": str(self.creation_reason),
498 }
499 )
500 return data
503@dataclass
504class GrFNCreation(TypedMetadata):
505 name: str
507 @classmethod
508 def from_name(cls, filepath: str) -> GrFNCreation:
509 filename = filepath[filepath.rfind("/") + 1 :]
510 return cls(
511 MetadataType.GRFN_CREATION,
512 ProvenanceData(
513 MetadataMethod.MODEL_ASSEMBLY_PIPELINE,
514 ProvenanceData.get_dt_timestamp(),
515 ),
516 filename,
517 )
519 @classmethod
520 def from_data(cls, data: dict) -> GrFNCreation:
521 return cls(**data)
523 def to_dict(self):
524 data = super().to_dict()
525 data.update({"name": self.name})
526 return data
529@dataclass
530class CodeCollectionReference(TypedMetadata):
531 global_reference_uid: str
532 files: List[CodeFileReference]
534 @classmethod
535 def from_sources(cls, sources: List[str]) -> CodeCollectionReference:
536 return cls(
537 MetadataType.CODE_COLLECTION_REFERENCE,
538 ProvenanceData(
539 MetadataMethod.PROGRAM_ANALYSIS_PIPELINE,
540 ProvenanceData.get_dt_timestamp(),
541 ),
542 str(uuid.uuid4()),
543 [CodeFileReference.from_str(fpath) for fpath in sources],
544 )
546 @classmethod
547 def from_data(cls, data: dict) -> CodeCollectionReference:
548 return cls(
549 data["type"],
550 data["provenance"],
551 data["global_reference_uid"],
552 [CodeFileReference.from_data(d) for d in data["files"]],
553 )
555 def to_dict(self):
556 data = super().to_dict()
557 data.update(
558 {
559 "global_reference_uid": self.global_reference_uid,
560 "files": [code_file.to_dict() for code_file in self.files],
561 }
562 )
563 return data
566# @dataclass
567# class MeasurementType(TypedMetadata):
568# data_type: MeasurementType
571# @dataclass
572# class MeasurementScale(TypedMetadata):
573# measurement_scale: str
576@dataclass
577class Domain(TypedMetadata):
578 data_type: DataType
579 measurement_scale: MeasurementType
580 elements: List[DomainElement]
582 @classmethod
583 def from_data(cls, data: dict) -> Domain:
584 mtype = MeasurementType.from_str(data["measurement_scale"])
585 if MeasurementType.isa_categorical(mtype):
586 els = [DomainSet.from_data(dom_el) for dom_el in data["elements"]]
587 elif MeasurementType.isa_numerical(mtype):
588 els = [
589 DomainInterval.from_data(dom_el) for dom_el in data["elements"]
590 ]
591 else:
592 els = []
593 return cls(
594 data["type"],
595 data["provenance"],
596 DataType.from_str(data["data_type"]),
597 mtype,
598 els,
599 )
601 def to_dict(self):
602 data = super().to_dict()
603 data.update(
604 {
605 "data_type": str(self.data_type),
606 "measurement_scale": str(self.measurement_scale),
607 "elements": [dom_el.to_dict() for dom_el in self.elements],
608 }
609 )
610 return data