Coverage for skema/skema_py/server.py: 99%
177 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
1import json
2import yaml
3import os
4import tempfile
5import asyncio
6from pathlib import Path
7from typing import List, Dict, Optional
8from io import BytesIO
9from zipfile import ZipFile
10from fastapi import APIRouter, FastAPI, status, Body, File, UploadFile
11from fastapi.responses import JSONResponse
12from pydantic import BaseModel, Field
14import skema.skema_py.acsets
15import skema.skema_py.petris
17import skema.program_analysis.comment_extractor.server as comment_service
18from skema.utils.fold import del_nulls
19from skema.gromet.fn.gromet_fn_module_collection import GrometFNModuleCollection
20from skema.gromet.metadata.debug import Debug
21from skema.program_analysis.multi_file_ingester import process_file_system
22from skema.program_analysis.snippet_ingester import process_snippet
23from skema.program_analysis.fn_unifier import align_full_system
24from skema.program_analysis.JSON2GroMEt.json2gromet import json_to_gromet
25from skema.program_analysis.comment_extractor.model import (
26 SingleFileCommentRequest,
27 SingleFileCommentResponse,
28 MultiFileCommentRequest,
29 MultiFileCommentResponse,
30 CodeComments,
31)
32from skema.program_analysis.tree_sitter_parsers.build_parsers import (
33 LANGUAGES_YAML_FILEPATH,
34)
37def get_supported_languages() -> (List, Dict):
38 """"""
39 # We calculate the supported file extensions and mapping between extension and language by reading the languages.yaml file from tree_sitter_parsers
40 languages_obj = yaml.safe_load(LANGUAGES_YAML_FILEPATH.read_text())
42 supported_file_extensions = []
43 extension_to_language = {}
44 for language, language_dict in languages_obj.items():
45 if language_dict["supports_fn_extraction"]:
46 supported_file_extensions.extend(language_dict["extensions"])
47 extension_to_language.update(
48 {extension: language for extension in language_dict["extensions"]}
49 )
51 return supported_file_extensions, extension_to_language
54SUPPORTED_FILE_EXTENSIONS, EXTENSION_TO_LANGUAGE = get_supported_languages()
57class Ports(BaseModel):
58 opis: List[str]
59 opos: List[str]
62class System(BaseModel):
63 files: List[str] = Field(
64 description="The relative file path from the directory specified by `root_name`, corresponding to each entry in `blobs`",
65 examples=[["example1.py", "dir/example2.py"]],
66 )
67 blobs: List[str] = Field(
68 description="Contents of each file to be analyzed",
69 examples=[[
70 "greet = lambda: print('howdy!')\ngreet()",
71 "#Variable declaration\nx=2\n#Function definition\ndef foo(x):\n '''Increment the input variable'''\n return x+1",
72 ]],
73 )
74 model: Optional[str] = Field(
75 default="petrinet",
76 description="A model name for the type of amr to get out",
77 examples=["regnet"],
78 )
79 system_name: Optional[str] = Field(
80 default=None,
81 description="A model name to associate with the provided code",
82 examples=["example-system"],
83 )
84 root_name: Optional[str] = Field(
85 default=None,
86 description="The name of the code system's root directory.",
87 examples=["example-system"],
88 )
89 comments: Optional[CodeComments] = Field(
90 default=None,
91 description="A CodeComments object representing the comments extracted from the source code in 'blobs'. Can provide comments for a single file (SingleFileCodeComments) or multiple files (MultiFileCodeComments)",
92 examples=[{
93 "files": {
94 "example-system/dir/example2.py": {
95 "single": [
96 {"content": "Variable declaration", "line_number": 0},
97 {"content": "Function definition", "line_number": 2},
98 ],
99 "multi": [],
100 "docstring": [
101 {
102 "content": ["Increment the input variable"],
103 "function_name": "foo",
104 "start_line_number": 5,
105 "end_line_number": 6,
106 }
107 ],
108 }
109 }
110 }],
111 )
112 dependency_depth: Optional[int] = Field(
113 default=0,
114 ge=0,
115 le=1,
116 description="The depth at which to ingest dependencies into Gromet. i.e. 0=Ingest no dependencies, 1=Ingest system dependencies. Accepted values: [0,1]",
117 examples = [
118 1
119 ]
120 )
123class MML_System(BaseModel):
124 """
125 Pydantic BaseModel representing a system with MML (MathML).
127 Attributes:
128 mml (str): The MML equation.
129 system (System): An instance of the System class representing system details.
130 """
131 mml: str
132 system: System
135class Equation_Extraction_System(BaseModel):
136 """
137 Pydantic BaseModel representing a system with equation extraction.
139 Attributes:
140 pdf_local_path (str): the local path to the PDF file
141 save_folder (str): the folder path where images will be saved
142 gpt_key (str): your OpenAI API key
143 """
145 pdf_local_path: str
146 save_folder: str
147 gpt_key: str
149async def system_to_enriched_system(system: System) -> System:
150 """Takes a System as input and enriches it with comments by running the tree-sitter comment extractor."""
152 # Instead of making each proxy call seperatly, we will gather them
153 coroutines = []
154 file_paths = []
155 for file, blob in zip(system.files, system.blobs):
156 file_path = Path(system.root_name or "") / file
157 if file_path.suffix not in SUPPORTED_FILE_EXTENSIONS:
158 # Since we are enriching a system for unification, we only want to extract comments from source files we can also extract Gromet FN from.
159 continue
161 request = SingleFileCommentRequest(
162 source=blob, language=EXTENSION_TO_LANGUAGE[file_path.suffix]
163 )
164 coroutines.append(comment_service.comments_extract(request))
165 file_paths.append(file_path)
166 results = await asyncio.gather(*coroutines)
168 # Due to the nested structure of MultiFileCodeComments, it easier to work with a Dict.
169 # Then, we can convert it using MutliFileCodeComments.model_validate()
170 comments = {"files": {}}
171 for file_path, result in zip(file_paths, results):
172 comments["files"][str(file_path)] = result
173 system.comments = MultiFileCommentResponse(**comments)
175 return system
177# returns an abbreviated Dict representing a GrometFNModuleCollection
178async def system_to_gromet(system: System):
179 """Convert a System to Gromet JSON"""
181 # We maintain a log of warnings and error to pass back to the user.
182 # This allows us to warn the user about unsupported file extensions.
183 server_log = []
184 to_remove = []
185 for index, (file, blob) in enumerate(zip(system.files, system.blobs)):
186 valid = True
188 # Itterate over parent directories to check for unsupported file paths
189 path_obj = Path(file)
190 for parent in path_obj.parents:
191 if parent.name == "_MACOSX":
192 unsupported_file_str = f"WARNING: Ingestion of files in _MACOSX directory not supported. File {file} will be skipped. "
193 valid = False
194 elif parent.name.startswith("."):
195 unsupported_file_str = f"WARNING: File {file} is in a hidden directory and will be skipped."
196 valid = False
198 # Check file extension is in the list of supported file extensions
199 if Path(file).suffix not in SUPPORTED_FILE_EXTENSIONS:
200 unsupported_file_str = f"WARNING: Ingestion of file extension {Path(file).suffix} for file {file} is not supported and will be skipped."
201 valid = False
203 if not valid:
204 to_remove.append(index)
205 print(unsupported_file_str)
206 server_log.append(unsupported_file_str)
208 # Remove files to prevent ingestion. Removed in reverse sorted order since each removal shifts the indecies.
209 for index in sorted(to_remove, reverse=True):
210 system.files.pop(index)
211 system.blobs.pop(index)
214 # If there are no supported files, then we will return an empty GrometFNModuleCollection with a top-level Debug metadata
215 if len(system.files) == 0:
216 no_supported_file_str = "ERROR: The system does not contain any files with supported file extensions. All files will be skipped."
217 print(no_supported_file_str)
218 gromet_collection = GrometFNModuleCollection(
219 metadata_collection=[[]], metadata=0
220 )
221 gromet_collection.metadata_collection[0].append(
222 Debug(
223 debug_type="code2fn", severity="ERROR", message=no_supported_file_str
224 ).to_dict() # There is a bug in Swagger that requires us to manually to_dict() this
225 )
226 return gromet_collection.to_dict()
228 # The CODE2FN Pipeline requires a file path as input.
229 # We are receiving a serialized version of the code system as input, so we must store the file in a temporary directory.
230 # This temp directory only persists during execution of the CODE2FN pipeline.
231 with tempfile.TemporaryDirectory() as tmp:
232 tmp_path = Path(tmp)
234 # Create files and intermediate directories
235 for index, file in enumerate(system.files):
236 file_path = Path(tmp_path, system.root_name or "", file)
237 file_path.parent.mkdir(parents=True, exist_ok=True)
238 file_path.write_text(system.blobs[index])
240 # Create system_filepaths.txt
241 system_filepaths = Path(tmp_path, "system_filepaths.txt")
242 system_filepaths.write_text("\n".join(system.files))
244 ## Run pipeline
245 gromet_collection = process_file_system(
246 system.system_name or "",
247 str(Path(tmp_path, system.root_name or "")),
248 str(system_filepaths),
249 dependency_depth=system.dependency_depth
250 )
252 # Attempt to enrich the system with comments. May return the same system if Rust isn't insalled.
253 if not system.comments:
254 system = await system_to_enriched_system(system)
256 # If comments are included in request or added in the enriching process, run the unifier to add them to the Gromet
257 if system.comments:
258 align_full_system(gromet_collection, system.comments)
260 # Explicitly call to_dict on any metadata object
261 # NOTE: Only required because of fault in swagger-codegen
262 for i, module in enumerate(gromet_collection.modules):
263 for j, metadata_list in enumerate(module.metadata_collection):
264 for k, metadata in enumerate(metadata_list):
265 gromet_collection.modules[i].metadata_collection[j][
266 k
267 ] = metadata.to_dict()
269 # Add debug Metadata to Gromet objects
270 if not gromet_collection.metadata_collection:
271 gromet_collection.metadata_collection = [[]]
272 for log in server_log:
273 gromet_collection.metadata_collection[0].append(
274 Debug(
275 debug_type="code2fn", severity="WARNING", message=log
276 ).to_dict() # There is a bug in Swagger that requires us to manually to_dict() this
277 )
279 # Convert Gromet data-model to dict for return
280 return gromet_collection.to_dict()
283router = APIRouter()
286@router.get(
287 "/healthcheck",
288 summary="Ping endpoint to test health of service",
289 status_code=status.HTTP_200_OK,
290 response_model=int
291)
292def healthcheck() -> int:
293 return status.HTTP_200_OK
296@router.get(
297 "/fn-supported-file-extensions",
298 summary="Endpoint for checking which files extensions are currently supported by code2fn pipeline.",
299 response_model=List[str],
300 responses={
301 200: {
302 "content": {
303 "application/json":{
304 "example": [
305 ".py",
306 ".f",
307 ".f90"
308 ]
309 }
310 }
311 }
312 }
313)
314def fn_supported_file_extensions():
315 """
316 Returns a List[str] where each entry in the list represents a file extension.
318 ### Python example
319 ```
320 import requests
322 response = requests.get("http://0.0.0.0:8000/fn-supported-file-extensions")
323 supported_extensions = response.json()
325 """
326 return SUPPORTED_FILE_EXTENSIONS
329@router.post(
330 "/fn-given-filepaths",
331 summary=(
332 "Send a system of code and filepaths of interest,"
333 " get a GroMEt FN Module collection back."
334 ),
335 responses={
336 200: {
337 "content": {
338 "application/json":{
339 "example": {
340 "schema": "FN",
341 "schema_version": "0.1.10",
342 "name": "gromet_name",
343 "modules": [],
344 "module_index": [],
345 "module_dependencies": [],
346 "executables": [],
347 "metadata_collection": [],
348 "gromet_type": "GrometFNModuleCollection"
350 }
351 }
352 }
353 }
354 }
355)
356async def fn_given_filepaths(system: System):
357 """
358 Endpoint for generating Gromet JSON from a serialized code system.
359 ### Python example
361 ```
362 import requests
364 # Single file
365 system = {
366 "files": ["exp1.py"],
367 "blobs": ["x=2"]
368 }
369 response = requests.post("http://0.0.0.0:8000/fn-given-filepaths", json=system)
370 gromet_json = response.json()
372 # Multi file
373 system = {
374 "files": ["exp1.py", "exp1.f"],
375 "blobs": ["x=2", "program exp1\\ninteger::x=2\\nend program exp1"],
376 "system_name": "exp1",
377 "root_name": "exp1"
378 }
379 response = requests.post("http://0.0.0.0:8000/fn-given-filepaths", json=system)
380 gromet_json = response.json()
381 """
382 return await system_to_gromet(system)
385@router.post(
386 "/fn-given-filepaths-zip",
387 summary=(
388 "Send a zip file containing a code system,"
389 " get a GroMEt FN Module collection back."
390 ),
391 responses={
392 200: {
393 "content": {
394 "application/json":{
395 "example": {
396 "schema": "FN",
397 "schema_version": "0.1.10",
398 "name": "gromet_name",
399 "modules": [],
400 "module_index": [],
401 "module_dependencies": [],
402 "executables": [],
403 "metadata_collection": [],
404 "gromet_type": "GrometFNModuleCollection"
405 }
406 }
407 }
408 }
409 }
410)
411async def fn_given_filepaths_zip(zip_file: UploadFile = File()):
412 """
413 Endpoint for generating Gromet JSON from a zip archive of arbitrary depth and structure.
414 All source files with a supported file extension (/fn-supported-file-extensions) will be processed as a single GrometFNModuleCollection.
416 ### Python example
417 ```
418 import requests
419 import shutil
420 from pathlib import Path
422 # Format input/output paths
423 input_name = "system_test"
424 output_name = "system_test.zip"
425 input_path = Path("/data") / "skema" / "code" / input_name
426 output_path = Path("/data") / "skema" / "code" / output_name
428 # Convert source directory to zip archive
429 shutil.make_archive(input_path, "zip", input_path)
431 files = {
432 "zip_file": open(output_path, "rb"),
433 }
434 response = requests.post("http://0.0.0.0:8000/fn-given-filepaths-zip", files=files)
435 gromet_json = response.json()
436 """
438 # To process a zip file, we first convert it to a System object, and then pass it to system_to_gromet.
439 files = []
440 blobs = []
441 with ZipFile(BytesIO(zip_file.file.read()), "r") as zip:
442 for file in zip.namelist():
443 file_obj = Path(file)
444 if file_obj.suffix in SUPPORTED_FILE_EXTENSIONS:
445 files.append(file)
446 blobs.append(zip.open(file).read())
448 zip_obj = Path(zip_file.filename)
449 system_name = zip_obj.stem
450 root_name = zip_obj.stem
452 system = System(
453 files=files, blobs=blobs, system_name=system_name, root_name=root_name
454 )
456 return await system_to_gromet(system)
459@router.post(
460 "/gromet-object-count",
461 summary=("Endpoint for counting the number of boxes, wires, and ports in a Gromet object."),
462 responses={
463 200: {
464 "content": {
465 "application/json":{
466 "example": {
467 "b": 0,
468 "bf": 0,
469 "opi": 0,
470 "opo": 0,
471 "pil": 0,
472 "pol": 0,
473 "wlopi": 0,
474 "wll": 0,
475 "wlf": 0,
476 "wlc": 0,
477 "wlopo": 0,
478 "pof": 0,
479 "pif": 0,
480 "wfopi": 0,
481 "wfl": 0,
482 "wff": 0,
483 "wfc": 0,
484 "wfopo": 0,
485 "pic": 0,
486 "poc": 0,
487 "wcopi": 0,
488 "wcl": 0,
489 "wcf": 0,
490 "wcc": 0,
491 "wcopo": 0,
492 }
493 }
494 }
495 }
496 }
497)
498async def gromet_object_count(gromet_object: Dict):
499 """
500 Endpoint for counting the number of boxes, wires, and ports in a Gromet object.
501 ### Python example
502 ```
503 import requests
505 system = {
506 "files": ["example1.py"],
507 "blobs": [
508 "greet = lambda: print('howdy!')"
509 ],
510 }
511 response = client.post("/code2fn/fn-given-filepaths", json=system)
512 gromet_collection = response.json()
513 response = client.post("/code2fn/gromet-object-count", json=gromet_collection)
514 gromet_object_count = response.json()
515 """
517 gromet_keys = {
518 "b": 0,
519 "bf": 0,
520 "opi": 0,
521 "opo": 0,
522 "pil": 0,
523 "pol": 0,
524 "wlopi": 0,
525 "wll": 0,
526 "wlf": 0,
527 "wlc": 0,
528 "wlopo": 0,
529 "pof": 0,
530 "pif": 0,
531 "wfopi": 0,
532 "wfl": 0,
533 "wff": 0,
534 "wfc": 0,
535 "wfopo": 0,
536 "pic": 0,
537 "poc": 0,
538 "wcopi": 0,
539 "wcl": 0,
540 "wcf": 0,
541 "wcc": 0,
542 "wcopo": 0,
543 }
545 def recurse(gromet_object: Dict):
546 """Recursive walking function for Gromet"""
547 for key, value in gromet_object.items():
548 if key in gromet_keys:
549 gromet_keys[key] += len(value)
550 elif isinstance(value, List):
551 for element in value:
552 if isinstance(element, Dict):
553 recurse(element)
554 elif isinstance(value, Dict):
555 recurse(value)
557 # Its likely that the Gromet passed to this endpoint will not have None values removed.
558 # So, we need to remove None values ahead of time.
559 del_nulls(gromet_object)
560 recurse(gromet_object)
562 # We also will aggregate the boxes, wires, and ports to better support MORAE usecases
563 gromet_keys["boxes"] = sum(
564 [val for key, val in gromet_keys.items() if key.startswith("b")]
565 )
566 gromet_keys["wires"] = sum(
567 [val for key, val in gromet_keys.items() if key.startswith("w")]
568 )
569 gromet_keys["ports"] = sum(
570 [val for key, val in gromet_keys.items() if key.startswith(("p", "o"))]
571 )
573 return gromet_keys
576@router.post(
577 "/get-pyacset",
578 summary=("Get PyACSet for a given model"),
579)
580async def get_pyacset(ports: Ports):
581 opis, opos = ports.opis, ports.opos
582 petri = skema.skema_py.petris.Petri()
583 petri.add_species(len(opos))
584 trans = skema.skema_py.petris.Transition
585 petri.add_parts(trans, len(opis))
587 for i, tran in enumerate(opis):
588 petri.set_subpart(i, skema.skema_py.petris.attr_tname, opis[i])
590 for j, spec in enumerate(opos):
591 petri.set_subpart(j, skema.skema_py.petris.attr_sname, opos[j])
593 return petri.write_json()
596app = FastAPI()
597app.include_router(
598 router,
599 prefix="/code2fn",
600 tags=["code2fn"],
601)