Source code for delphi.GrFN.networks

import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, Iterable, Set, Union
from typing import List
import importlib
import inspect
import json
import os

import networkx as nx
from networkx.algorithms.simple_paths import all_simple_paths

# from delphi.GrFN.analysis import get_max_s2_sensitivity
from delphi.GrFN.utils import ScopeNode
from delphi.utils.misc import choose_font
from delphi.translators.for2py import (
    genPGM,
    f2grfn,
)
import numpy as np

FONT = choose_font()

dodgerblue3 = "#1874CD"
forestgreen = "#228b22"


[docs]class ComputationalGraph(nx.DiGraph): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.FCG = self.to_FCG() self.function_sets = self.build_function_sets()
[docs] @staticmethod def var_shortname(long_var_name): ( module, var_scope, container_name, container_index, var_name, var_index, ) = long_var_name.split("::") return var_name
[docs] def get_input_nodes(self) -> List[str]: """ Get all input nodes from a network. """ return [n for n, d in self.in_degree() if d == 0]
[docs] def get_output_nodes(self) -> List[str]: """ Get all output nodes from a network. """ return [n for n, d in self.out_degree() if d == 0]
[docs] def to_FCG(self): G = nx.DiGraph() for (name, attrs) in self.nodes(data=True): if attrs["type"] == "function": for predecessor_variable in self.predecessors(name): for predecessor_function in self.predecessors( predecessor_variable ): G.add_edge(predecessor_function, name) return G
[docs] def build_function_sets(self): initial_funcs = [n for n, d in self.FCG.in_degree() if d == 0] distances = dict() def find_distances(funcs, dist): all_successors = list() for func in funcs: distances[func] = dist all_successors.extend(self.FCG.successors(func)) if len(all_successors) > 0: find_distances(list(set(all_successors)), dist + 1) find_distances(initial_funcs, 0) call_sets = dict() for func_name, call_dist in distances.items(): if call_dist in call_sets: call_sets[call_dist].add(func_name) else: call_sets[call_dist] = {func_name} function_set_dists = sorted( call_sets.items(), key=lambda t: (t[0], len(t[1])) ) function_sets = [func_set for _, func_set in function_set_dists] return function_sets
[docs] def run( self, inputs: Dict[str, Union[float, Iterable]], ) -> Union[float, Iterable]: """Executes the GrFN over a particular set of inputs and returns the result. Args: inputs: Input set where keys are the names of input nodes in the GrFN and each key points to a set of input values (or just one). Returns: A set of outputs from executing the GrFN, one for every set of inputs. """ full_inputs = {self.input_name_map[n]: v for n, v in inputs.items()} # Set input values for i in self.inputs: value = full_inputs[i] if isinstance(value, float): value = np.array([value], dtype=np.float32) if isinstance(value, int): value = np.array([value], dtype=np.int32) elif isinstance(value, list): value = np.array(value, dtype=np.float32) self.nodes[i]["value"] = value for func_set in self.function_sets: for func_name in func_set: lambda_fn = self.nodes[func_name]["lambda_fn"] output_node = list(self.successors(func_name))[0] signature = self.nodes[func_name]["func_inputs"] input_values = [self.nodes[n]["value"] for n in signature] res = lambda_fn(*input_values) # Convert output to a NumPy matrix if a constant was returned if len(input_values) == 0: res = np.array(res, dtype=np.float32) self.nodes[output_node]["value"] = res # Return the output return [self.nodes[o]["value"] for o in self.outputs]
[docs] def to_CAG(self): """ Export to a Causal Analysis Graph (CAG) PyGraphviz AGraph object. The CAG shows the influence relationships between the variables and elides the function nodes.""" G = nx.DiGraph() for (name, attrs) in self.nodes(data=True): if attrs["type"] == "variable": cag_name = attrs["cag_label"] G.add_node(cag_name, **attrs) for pred_fn in self.predecessors(name): for pred_var in self.predecessors(pred_fn): v_attrs = self.nodes[pred_var] v_name = v_attrs["cag_label"] G.add_node(v_name, **self.nodes[pred_var]) G.add_edge(v_name, cag_name) return G
[docs]class GroundedFunctionNetwork(ComputationalGraph): """ Representation of a GrFN model as a DiGraph with a set of input nodes and currently a single output. The DiGraph is composed of variable nodes and function nodes. Function nodes store an actual Python function with the expected set of ordered input arguments that correspond to the variable inputs of that node. Variable nodes store a value. This value can be any data type found in Python. When no value exists for a variable the value key will be set to None. Importantly only function nodes can be children or parents of variable nodes, and the reverse is also true. Both variable and function nodes can be inputs, but the output will always be a variable node. """ def __init__(self, G, scope_tree, outputs): super().__init__(G) self.outputs = outputs self.inputs = [ n for n, d in self.in_degree() if d == 0 and self.nodes[n]["type"] == "variable" ] self.input_name_map = { self.var_shortname(name): name for name in self.inputs } # self.outputs = outputs self.scope_tree = scope_tree def __repr__(self): return self.__str__() def __str__(self): return "\n".join(self.traverse_nodes(self.inputs))
[docs] def traverse_nodes(self, node_set, depth=0): """BFS traversal of nodes that returns name traversal as large string. Args: node_set: Set of input nodes to begin traversal. depth: Current traversal depth for child node viewing. Returns: type: String containing tabbed traversal view. """ tab = " " result = list() for n in node_set: repr = ( n if self.nodes[n]["type"] == "variable" else f"{n}{inspect.signature(self.nodes[n]['lambda_fn'])}" ) result.append(f"{tab * depth}{repr}") result.extend( self.traverse_nodes(self.successors(n), depth=depth + 1) ) return result
[docs] @classmethod def from_json_and_lambdas(cls, file: str, lambdas): """Builds a GrFN from a JSON object. Args: cls: The class variable for object creation. file: Filename of a GrFN JSON file. lambdas: A lambdas module Returns: type: A GroundedFunctionNetwork object. """ with open(file, "r") as f: data = json.load(f) return cls.from_dict(data, lambdas)
[docs] @classmethod def from_dict(cls, data: Dict, lambdas_path): """Builds a GrFN object from a set of extracted function data objects and an associated file of lambda functions. Args: cls: The class variable for object creation. data: A set of function data object that specify the wiring of a GrFN object. lambdas_path: Path to a lambdas file containing functions to be computed during GrFN execution. Returns: A GroundedFunctionNetwork object. """ lambdas = importlib.__import__(str(Path(lambdas_path).stem)) functions = {d["name"]: d for d in data["containers"]} occurrences = {} G = nx.DiGraph() scope_tree = nx.DiGraph() def identity(x): return x def make_identifier(scope: str, var: str): (_, name, idx) = var.split("::") return make_variable_name(scope, name, idx) def make_variable_name(parent: str, basename: str, index: str): return f"{parent}::{basename}::{index}" def add_variable_node( parent: str, basename: str, index: str, is_exit: bool = False ): full_var_name = make_variable_name(parent, basename, index) G.add_node( full_var_name, type="variable", color="crimson", fontcolor="white" if is_exit else "black", fillcolor="crimson" if is_exit else "white", style="filled" if is_exit else "", parent=parent, label=f"{basename}::{index}", cag_label=f"{basename}", basename=basename, padding=15, value=None, ) return full_var_name def process_wiring_statement(stmt, scope, inputs, cname): lambda_name = stmt["function"]["name"] lambda_node_name = f"{scope.name}::" + lambda_name stmt_type = lambda_name.split("__")[-3] if stmt_type == "assign" and len(stmt["input"]) == 0: stmt_type = "literal" for output in stmt["output"]: (_, var_name, idx) = output.split("::") node_name = add_variable_node( scope.name, var_name, idx, is_exit=var_name == "EXIT" ) G.add_edge(lambda_node_name, node_name) ordered_inputs = list() for inp in stmt["input"]: if inp.endswith("-1"): (parent, var_name, idx) = inputs[inp] else: parent = scope.name (_, var_name, idx) = inp.split("::") node_name = add_variable_node(parent, var_name, idx) ordered_inputs.append(node_name) G.add_edge(node_name, lambda_node_name) G.add_node( lambda_node_name, type="function", lambda_fn=getattr(lambdas, lambda_name), func_inputs=ordered_inputs, visited=False, shape="rectangle", parent=scope.name, label=stmt_type[0].upper(), padding=10, ) def process_call_statement(stmt, scope, inputs, cname): container_name = stmt["function"]["name"] if container_name not in occurrences: occurrences[container_name] = 0 new_container = functions[container_name] container_color = ( "navyblue" if new_container["repeat"] else "forestgreen" ) new_scope = ScopeNode( new_container, occurrences[container_name], parent=scope ) scope_tree.add_node(new_scope.name, color=container_color) scope_tree.add_edge(scope.name, new_scope.name) input_values = list() for inp in stmt["input"]: if inp.endswith("-1"): (parent, var_name, idx) = inputs[inp] else: parent = scope.name (_, var_name, idx) = inp.split("::") input_values.append((parent, var_name, idx)) callee_ret, callee_up = process_container( new_scope, input_values, container_name ) caller_ret, caller_up = list(), list() for var in stmt["output"]: parent = scope.name (_, var_name, idx) = var.split("::") node_name = add_variable_node(parent, var_name, idx) caller_ret.append(node_name) for var in stmt["updated"]: parent = scope.name (_, var_name, idx) = var.split("::") node_name = add_variable_node(parent, var_name, idx) caller_up.append(node_name) for callee_var, caller_var in zip(callee_ret, caller_ret): lambda_node_name = f"{callee_var}-->{caller_var}" G.add_node( lambda_node_name, type="function", lambda_fn=identity, func_inputs=[callee_var], shape="rectangle", parent=scope.name, label="A", padding=10, ) G.add_edge(callee_var, lambda_node_name) G.add_edge(lambda_node_name, caller_var) for callee_var, caller_var in zip(callee_up, caller_up): lambda_node_name = f"{callee_var}-->{caller_var}" G.add_node( lambda_node_name, type="function", lambda_fn=identity, func_inputs=[callee_var], shape="rectangle", parent=scope.name, label="A", padding=10, ) G.add_edge(callee_var, lambda_node_name) G.add_edge(lambda_node_name, caller_var) occurrences[container_name] += 1 def process_container(scope, input_vals, cname): if len(scope.arguments) == len(input_vals): input_vars = { a: v for a, v in zip(scope.arguments, input_vals) } elif len(scope.arguments) > 0: input_vars = { a: (scope.name,) + tuple(a.split("::")[1:]) for a in scope.arguments } for stmt in scope.body: func_def = stmt["function"] func_type = func_def["type"] if func_type == "lambda": process_wiring_statement(stmt, scope, input_vars, cname) elif func_type == "container": process_call_statement(stmt, scope, input_vars, cname) else: raise ValueError(f"Undefined function type: {func_type}") return_list, updated_list = list(), list() for var_name in scope.returns: (_, basename, idx) = var_name.split("::") return_list.append( make_variable_name(scope.name, basename, idx) ) for var_name in scope.updated: (_, basename, idx) = var_name.split("::") updated_list.append( make_variable_name(scope.name, basename, idx) ) return return_list, updated_list root = data["start"][0] occurrences[root] = 0 cur_scope = ScopeNode(functions[root], occurrences[root]) scope_tree.add_node(cur_scope.name, color="forestgreen") returns, updates = process_container(cur_scope, [], root) return cls(G, scope_tree, returns + updates)
[docs] @staticmethod def create_container_dict(G: nx.DiGraph): containers = {node_name: dict() for node_name in scope_tree.nodes}
[docs] @classmethod def from_python_file( cls, python_file, lambdas_path, json_filename: str, stem: str ): """Builds GrFN object from Python file.""" with open(python_file, "r") as f: pySrc = f.read() return cls.from_python_src(pySrc, lambdas_path, json_filename, stem)
[docs] @classmethod def from_python_src( cls, pySrc, python_file: str, fortran_file: str, module_log_file_path: str, mod_mapper_dict: list, processing_modules: bool, ): lambdas_path = python_file.replace(".py", "_lambdas.py") # Builds GrFN object from Python source code. pgm_dict = f2grfn.generate_grfn( pySrc, python_file, lambdas_path, mod_mapper_dict, fortran_file, module_log_file_path, processing_modules, ) G = cls.from_dict(pgm_dict, lambdas_path) # Cleanup intermediate files. variable_map_filename = python_file.replace(".py", "_variable_map.pkl") os.remove(variable_map_filename) rectified_xml_filename = "rectified_" + str(Path(python_file)).replace( ".py", ".xml" ) os.remove(rectified_xml_filename) return G
[docs] @classmethod def from_fortran_file(cls, fortran_file: str, tmpdir: str = "."): """Builds GrFN object from a Fortran program.""" root_dir = os.path.abspath(tmpdir) ( python_sources, translated_python_files, mod_mapper_dict, fortran_filename, module_log_file_path, processing_modules, ) = f2grfn.fortran_to_grfn( fortran_file, temp_dir=str(tmpdir), root_dir_path=root_dir, processing_modules=False, ) # For now, just taking the first translated file. # TODO - generalize this. python_file = translated_python_files[0] G = cls.from_python_src( python_sources[0][0], python_file, fortran_file, module_log_file_path, mod_mapper_dict, processing_modules, ) return G
[docs] @classmethod def from_fortran_src(cls, fortran_src: str, dir: str = "."): """ Create a GroundedFunctionNetwork instance from a string with raw Fortran code. Args: fortran_src: A string with Fortran source code. dir: (Optional) - the directory in which the temporary Fortran file will be created (make sure you have write permission!) Defaults to the current directory. Returns: A GroundedFunctionNetwork instance """ import tempfile fp = tempfile.NamedTemporaryFile("w+t", delete=False, dir=dir) fp.writelines(fortran_src) fp.close() G = cls.from_fortran_file(fp.name, dir) os.remove(fp.name) return G
[docs] def to_json(self): """Experimental outputting a GrFN to a JSON file.""" containers = { name: {"name": name, "parent": None, "exit": True, "nodes": list()} for name in self.scope_tree.nodes } nodes_json = list() for name, data in self.nodes(data=True): containers[data["parent"]]["nodes"].append(name) if data["type"] == "variable": nodes_json.append( { "name": name, "type": "variable", "reference": None, "data-type": { "name": "float32", "domain": [("-inf", "inf")], }, } ) elif data["type"] == "function": (source_list, _) = inspect.getsourcelines(data["lambda_fn"]) source_code = "".join(source_list) nodes_json.append( { "name": name, "type": "function", "reference": None, "inputs": data["func_inputs"], "lambda": source_code, } ) else: raise ValueError(f"Unrecognized node type: {data['type']}") return json.dumps( { "nodes": nodes_json, "edges": list(self.edges), "containers": list(containers.values()), } )
[docs] def to_json_file(self, filename): GrFN_json = self.to_json() json.dump(GrFN_json, open(filename, "w"))
[docs] def to_AGraph(self): """ Export to a PyGraphviz AGraph object. """ A = nx.nx_agraph.to_agraph(self) A.graph_attr.update( {"dpi": 227, "fontsize": 20, "fontname": "Menlo", "rankdir": "LR"} ) A.node_attr.update({"fontname": "Menlo"}) def build_tree(cluster_name, node_attrs, root_graph): subgraph_nodes = [ node_name for node_name, node_data in self.nodes(data=True) if node_data["parent"] == cluster_name ] root_graph.add_nodes_from(subgraph_nodes) subgraph = root_graph.add_subgraph( subgraph_nodes, name=f"cluster_{cluster_name}", label=cluster_name, style="bold, rounded", rankdir="LR", color=node_attrs[cluster_name]["color"], ) for n in self.scope_tree.successors(cluster_name): build_tree(n, node_attrs, subgraph) root = [n for n, d in self.scope_tree.in_degree() if d == 0][0] node_data = {n: d for n, d in self.scope_tree.nodes(data=True)} build_tree(root, node_data, A) return A
[docs] def CAG_to_AGraph(self): """Returns a variable-only view of the GrFN in the form of an AGraph. Returns: type: A CAG constructed via variable influence in the GrFN object. """ CAG = self.to_CAG() for name, data in CAG.nodes(data=True): CAG.nodes[name]["label"] = data["cag_label"] A = nx.nx_agraph.to_agraph(CAG) A.graph_attr.update( {"dpi": 227, "fontsize": 20, "fontname": "Menlo", "rankdir": "LR"} ) A.node_attr.update( { "shape": "rectangle", "color": "#650021", "style": "rounded", "fontname": "Menlo", } ) A.edge_attr.update({"color": "#650021", "arrowsize": 0.5}) return A
[docs] def FCG_to_AGraph(self): """ Build a PyGraphviz AGraph object corresponding to a call graph of functions. """ A = nx.nx_agraph.to_agraph(self.FCG) A.graph_attr.update( {"dpi": 227, "fontsize": 20, "fontname": "Menlo", "rankdir": "TB"} ) A.node_attr.update( {"shape": "rectangle", "color": "#650021", "style": "rounded"} ) A.edge_attr.update({"color": "#650021", "arrowsize": 0.5}) return A
[docs]class ForwardInfluenceBlanket(ComputationalGraph): """ This class takes a network and a list of a shared nodes between the input network and a secondary network. From this list a shared nodes and blanket network is created including all of the nodes between any input/output pair in the shared nodes, as well as all nodes required to blanket the network for forward influence. This class itself becomes the blanket and inherits from the ComputationalGraph class. """ def __init__(self, G: GroundedFunctionNetwork, shared_nodes: Set[str]): # super().__init__() outputs = G.outputs inputs = set(G.inputs).intersection(shared_nodes) # Get all paths from shared inputs to shared outputs path_inputs = shared_nodes - set(outputs) io_pairs = [(inp, G.output_node) for inp in path_inputs] paths = [p for (i, o) in io_pairs for p in all_simple_paths(G, i, o)] # Get all edges needed to blanket the included nodes main_nodes = {node for path in paths for node in path} main_edges = { (n1, n2) for path in paths for n1, n2 in zip(path, path[1:]) } blanket_nodes = set() add_nodes, add_edges = list(), list() def place_var_node(var_node): prev_funcs = list(G.predecessors(var_node)) if len(prev_funcs) > 0 and G.nodes[prev_funcs[0]]["label"] == "L": prev_func = prev_funcs[0] add_nodes.extend([var_node, prev_func]) add_edges.append((prev_func, var_node)) else: blanket_nodes.add(var_node) for node in main_nodes: if G.nodes[node]["type"] == "function": for var_node in G.predecessors(node): if var_node not in main_nodes: add_edges.append((var_node, node)) if "::IF_" in var_node: if_func = list(G.predecessors(var_node))[0] add_nodes.extend([if_func, var_node]) add_edges.append((if_func, var_node)) for new_var_node in G.predecessors(if_func): add_edges.append((new_var_node, if_func)) place_var_node(new_var_node) else: place_var_node(var_node) main_nodes |= set(add_nodes) main_edges |= set(add_edges) main_nodes = main_nodes - inputs - set(outputs) orig_nodes = G.nodes(data=True) F = nx.DiGraph() F.add_nodes_from([(n, d) for n, d in orig_nodes if n in inputs]) for node in inputs: F.nodes[node]["color"] = dodgerblue3 F.nodes[node]["fontcolor"] = dodgerblue3 F.nodes[node]["penwidth"] = 3.0 F.nodes[node]["fontname"] = FONT F.inputs = list(F.inputs) F.add_nodes_from([(n, d) for n, d in orig_nodes if n in blanket_nodes]) for node in blanket_nodes: F.nodes[node]["fontname"] = FONT F.nodes[node]["color"] = forestgreen F.nodes[node]["fontcolor"] = forestgreen F.add_nodes_from([(n, d) for n, d in orig_nodes if n in main_nodes]) for node in main_nodes: F.nodes[node]["fontname"] = FONT for out_var_node in outputs: F.add_node(out_var_node, **G.nodes[out_var_node]) F.nodes[out_var_node]["color"] = dodgerblue3 F.nodes[out_var_node]["fontcolor"] = dodgerblue3 F.add_edges_from(main_edges) super().__init__(F, outputs) # self.FCG = self.to_FCG() # self.function_sets = self.build_function_sets()
[docs] @classmethod def from_GrFN(cls, G1, G2): """ Creates a ForwardInfluenceBlanket object representing the intersection of this model with the other input model. Args: G1: The GrFN model to use as the basis for this FIB G2: The GroundedFunctionNetwork object to compare this model to. Returns: A ForwardInfluenceBlanket object to use for model comparison. """ if not ( isinstance(G1, GroundedFunctionNetwork) and isinstance(G2, GroundedFunctionNetwork) ): raise TypeError( f"Expected two GrFNs, but got ({type(G1)}, {type(G2)})" ) def shortname(var): return var[var.find("::") + 2 : var.rfind("_")] def shortname_vars(graph, shortname): return [v for v in graph.nodes() if shortname in v] g1_var_nodes = { shortname(n) for (n, d) in G1.nodes(data=True) if d["type"] == "variable" } g2_var_nodes = { shortname(n) for (n, d) in G2.nodes(data=True) if d["type"] == "variable" } shared_vars = { full_var for shared_var in g1_var_nodes.intersection(g2_var_nodes) for full_var in shortname_vars(G1, shared_var) } return cls(G1, shared_vars)
[docs] def run( self, inputs: Dict[str, Union[float, Iterable]], covers: Dict[str, Union[float, Iterable]], ) -> Union[float, Iterable]: """Executes the FIB over a particular set of inputs and returns the result. Args: inputs: Input set where keys are the names of input nodes in the GrFN and each key points to a set of input values (or just one). Returns: A set of outputs from executing the GrFN, one for every set of inputs. """ # Abort run if covers does not match our expected cover set if len(covers) != len(blanket_nodes): raise ValueError("Incorrect number of cover values.") # Set the cover node values for node_name, val in covers.items(): self.nodes[node_name]["value"] = val return super().run(inputs)
[docs] def to_AGraph(self): A = nx.nx_agraph.to_AGraph(self) A.graph_attr.update({"dpi": 227, "fontsize": 20}) A.node_attr.update({"shape": "rectangle", "style": "rounded"}) A.edge_attr.update({"arrowsize": 0.5}) return A