Source code for delphi.apps.rest_api.api

# -*- coding: utf-8 -*-
import os
import numpy as np
from math import sqrt
from delphi.cpp.DelphiPython import AnalysisGraph, InitialBeta, InitialDerivative
from flask import jsonify, request, Blueprint, current_app
from delphi.apps.rest_api.models import *
import multiprocessing
multiprocessing.set_start_method('fork')

# from time import perf_counter

bp = Blueprint("rest_api", __name__)

# ============
# CauseMos API
# ============


[docs]def train_model(G, modelID, sampling_resolution, burn, app): G.run_train_model(sampling_resolution, burn, InitialBeta.ZERO, InitialDerivative.DERI_ZERO) #G.write_model_to_db(modelID) with app.app_context(): model = DelphiModel( id=modelID, model=G.serialize_to_json_string(verbose=False) ) db.session.merge(model) db.session.commit()
[docs]@bp.route("/delphi/create-model", methods=["POST"]) def createNewModel(): """ Create a new Delphi model. """ if os.environ.get("CI") == "true": # When running in a continuous integration run, we set the sampling # resolution to be small to prevent timeouts. kde_kernels = 5 sampling_resolution = 5 burn = 5 elif os.environ.get("DELPHI_N_SAMPLES") is not None: # We also enable setting the sampling resolution through the # environment variable "DELPHI_N_SAMPLES", for development and testing # purposes. # NOTE: When creating, this environment variable is named incorrectly! kde_kernels = int(os.environ["DELPHI_N_SAMPLES"]) sampling_resolution = 100 burn = 100 else: # If neither "CI" or "DELPHI_N_SAMPLES" is set, we default to a # sampling resolution of 1000. # TODO - we might want to set the default sampling resolution with some # kind of heuristic, based on the number of nodes and edges. - Adarsh kde_kernels = 1000 sampling_resolution = 200 burn = 10000 data = json.loads(request.data) G = AnalysisGraph.from_causemos_json_string(request.data, belief_score_cutoff=0, grounding_score_cutoff=0, kde_kernels=kde_kernels) model = DelphiModel( id=data["id"], model=G.serialize_to_json_string(verbose=False) ) db.session.merge(model) db.session.commit() response = json.loads(G.generate_create_model_response()) # executor.submit_stored(data["id"], train_model, G, data["id"]) try: proc = multiprocessing.Process(target=train_model, args=(G, data["id"], sampling_resolution, burn, current_app), name='training') proc.start() except multiprocessing.ProcessError: print("Error: unable to start training process") response['status'] = 'server error: training' return jsonify(response)
[docs]@bp.route("/delphi/models/<string:modelID>", methods=["GET"]) def getModelStatus(modelID): query_result = DelphiModel.query.filter_by(id=modelID).first() if not query_result: return jsonify(json.loads('{"status": "invalid model id"}')) model = query_result.model # t1_start = perf_counter() G = AnalysisGraph.deserialize_from_json_string(model, verbose=False) # t1_stop = perf_counter() # print("\nElapsed time to deserialize (seconds):", t1_stop-t1_start) response = json.loads(G.generate_create_model_response()) return jsonify(response)
[docs]def runProjectionExperiment(request, experiment_id, G, app): request_body = request.get_json() startTime = request_body["experimentParam"]["startTime"] endTime = request_body["experimentParam"]["endTime"] numTimesteps = request_body["experimentParam"]["numTimesteps"] causemos_experiment_result = G.run_causemos_projection_experiment_from_json_string( request.data ) with app.app_context(): result = CauseMosAsyncExperimentResult.query.filter_by( id=experiment_id ).first() # A rudimentary test to see if the projection failed. We check whether # the number time steps is equal to the number of elements in the first # concept's time series. if len(list(causemos_experiment_result.values())[0]) < numTimesteps: result.status = "failed" else: result.status = "completed" timesteps_nparr = np.round( np.linspace(startTime, endTime, numTimesteps) ) # The calculation of the 95% confidence interval about the median is # taken from: # https://www.ucl.ac.uk/child-health/short-courses-events/ \ # about-statistical-courses/research-methods-and-statistics/chapter-8-content-8 n = G.get_res() lower_rank = int((n - 1.96 * sqrt(n)) / 2) upper_rank = int((2 + n + 1.96 * sqrt(n)) / 2) lower_rank = 0 if lower_rank < 0 else lower_rank upper_rank = n - 1 if upper_rank >= n else upper_rank result.results = {"data": []} for ( conceptname, timestamp_sample_matrix, ) in causemos_experiment_result.items(): data_dict = {} data_dict["concept"] = conceptname data_dict["values"] = [] data_dict["confidenceInterval"] = {"upper": [], "lower": []} for i, time_step in enumerate(timestamp_sample_matrix): time_step.sort() l = len(time_step) // 2 median_value = ( time_step[l] if len(time_step) % 2 else (time_step[l] + time_step[l - 1]) / 2 ) lower_limit = time_step[lower_rank] upper_limit = time_step[upper_rank] value_dict = { "timestamp": timesteps_nparr[i], "value": median_value, } data_dict["values"].append(value_dict.copy()) value_dict.update({"value": lower_limit}) data_dict["confidenceInterval"]["lower"].append( value_dict.copy() ) value_dict.update({"value": upper_limit}) data_dict["confidenceInterval"]["upper"].append( value_dict.copy() ) result.results["data"].append(data_dict) with app.app_context(): db.session.merge(result) db.session.commit()
[docs]def runExperiment(request, experiment_id, experiment_type, model, app): G = AnalysisGraph.deserialize_from_json_string(model, verbose=False) if experiment_type == "PROJECTION": runProjectionExperiment(request, experiment_id, G, app) elif experiment_type == "GOAL_OPTIMIZATION": # Not yet implemented pass elif experiment_type == "SENSITIVITY_ANALYSIS": # Not yet implemented pass elif experiment_type == "MODEL_VALIDATION": # Not yet implemented pass elif experiment_type == "BACKCASTING": # Not yet implemented pass else: # Unknown experiment type pass
[docs]@bp.route("/delphi/models/<string:modelID>/experiments", methods=["POST"]) def createCausemosExperiment(modelID): request_body = request.get_json() query_result = DelphiModel.query.filter_by(id=modelID).first() if not query_result: return jsonify({"experimentId": "invalid model id"}) model = query_result.model trained = json.loads(model)["trained"] if not trained: return jsonify({"experimentId": "model not trained"}) experiment_type = request_body["experimentType"] experiment_id = str(uuid4()) result = CauseMosAsyncExperimentResult( id=experiment_id, baseType="CauseMosAsyncExperimentResult", experimentType=experiment_type, status="in progress", results={} ) db.session.add(result) db.session.commit() # executor.submit_stored(experiment_id, runExperiment, request, # experiment_id, experiment_type, model) try: proc = multiprocessing.Process(target=runExperiment, args=(request, experiment_id, experiment_type, model, current_app), name='experiment') proc.start() except multiprocessing.ProcessError: print("Error: unable to start experiment process") return jsonify({"experimentId": 'server error: experiment'}) return jsonify({"experimentId": experiment_id})
[docs]@bp.route( "/delphi/models/<string:modelID>/experiments/<string:experimentID>", methods=["GET"], ) def getExperimentResults(modelID: str, experimentID: str): """ Fetch experiment results""" # NOTE: I saw some weird behavior when we request results for an invalid # experiment ID just after running an experiment. The trained model seemed # to be not saved to the database. The model got re-trained from scratch # on a subsequent experiment after the initial experiment and the invalid # experiment result request. When I added a sleep between the initial # create experiment and the invalid result request this re-training did not # occur. result = CauseMosAsyncExperimentResult.query.filter_by( id=experimentID ).first() if result: experimentType = result.experimentType status = result.status results = result.results else: # experimentID not in database. Should be an incorrect experimentID experimentType = "UNKNOWN" status = "invalid experiment id" results = {} response = { "modelId": modelID, "experimentId": experimentID, "experimentType": experimentType, "status": status, "results": results } return jsonify(response)