Coverage for skema/skema_py/server.py: 99%

177 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 17:15 +0000

1import json 

2import yaml 

3import os 

4import tempfile 

5import asyncio 

6from pathlib import Path 

7from typing import List, Dict, Optional 

8from io import BytesIO 

9from zipfile import ZipFile 

10from fastapi import APIRouter, FastAPI, status, Body, File, UploadFile 

11from fastapi.responses import JSONResponse 

12from pydantic import BaseModel, Field 

13 

14import skema.skema_py.acsets 

15import skema.skema_py.petris 

16 

17import skema.program_analysis.comment_extractor.server as comment_service 

18from skema.utils.fold import del_nulls 

19from skema.gromet.fn.gromet_fn_module_collection import GrometFNModuleCollection 

20from skema.gromet.metadata.debug import Debug 

21from skema.program_analysis.multi_file_ingester import process_file_system 

22from skema.program_analysis.snippet_ingester import process_snippet 

23from skema.program_analysis.fn_unifier import align_full_system 

24from skema.program_analysis.JSON2GroMEt.json2gromet import json_to_gromet 

25from skema.program_analysis.comment_extractor.model import ( 

26 SingleFileCommentRequest, 

27 SingleFileCommentResponse, 

28 MultiFileCommentRequest, 

29 MultiFileCommentResponse, 

30 CodeComments, 

31) 

32from skema.program_analysis.tree_sitter_parsers.build_parsers import ( 

33 LANGUAGES_YAML_FILEPATH, 

34) 

35 

36 

37def get_supported_languages() -> (List, Dict): 

38 """""" 

39 # We calculate the supported file extensions and mapping between extension and language by reading the languages.yaml file from tree_sitter_parsers 

40 languages_obj = yaml.safe_load(LANGUAGES_YAML_FILEPATH.read_text()) 

41 

42 supported_file_extensions = [] 

43 extension_to_language = {} 

44 for language, language_dict in languages_obj.items(): 

45 if language_dict["supports_fn_extraction"]: 

46 supported_file_extensions.extend(language_dict["extensions"]) 

47 extension_to_language.update( 

48 {extension: language for extension in language_dict["extensions"]} 

49 ) 

50 

51 return supported_file_extensions, extension_to_language 

52 

53 

54SUPPORTED_FILE_EXTENSIONS, EXTENSION_TO_LANGUAGE = get_supported_languages() 

55 

56 

57class Ports(BaseModel): 

58 opis: List[str] 

59 opos: List[str] 

60 

61 

62class System(BaseModel): 

63 files: List[str] = Field( 

64 description="The relative file path from the directory specified by `root_name`, corresponding to each entry in `blobs`", 

65 examples=[["example1.py", "dir/example2.py"]], 

66 ) 

67 blobs: List[str] = Field( 

68 description="Contents of each file to be analyzed", 

69 examples=[[ 

70 "greet = lambda: print('howdy!')\ngreet()", 

71 "#Variable declaration\nx=2\n#Function definition\ndef foo(x):\n '''Increment the input variable'''\n return x+1", 

72 ]], 

73 ) 

74 model: Optional[str] = Field( 

75 default="petrinet", 

76 description="A model name for the type of amr to get out", 

77 examples=["regnet"], 

78 ) 

79 system_name: Optional[str] = Field( 

80 default=None, 

81 description="A model name to associate with the provided code", 

82 examples=["example-system"], 

83 ) 

84 root_name: Optional[str] = Field( 

85 default=None, 

86 description="The name of the code system's root directory.", 

87 examples=["example-system"], 

88 ) 

89 comments: Optional[CodeComments] = Field( 

90 default=None, 

91 description="A CodeComments object representing the comments extracted from the source code in 'blobs'. Can provide comments for a single file (SingleFileCodeComments) or multiple files (MultiFileCodeComments)", 

92 examples=[{ 

93 "files": { 

94 "example-system/dir/example2.py": { 

95 "single": [ 

96 {"content": "Variable declaration", "line_number": 0}, 

97 {"content": "Function definition", "line_number": 2}, 

98 ], 

99 "multi": [], 

100 "docstring": [ 

101 { 

102 "content": ["Increment the input variable"], 

103 "function_name": "foo", 

104 "start_line_number": 5, 

105 "end_line_number": 6, 

106 } 

107 ], 

108 } 

109 } 

110 }], 

111 ) 

112 dependency_depth: Optional[int] = Field( 

113 default=0, 

114 ge=0, 

115 le=1, 

116 description="The depth at which to ingest dependencies into Gromet. i.e. 0=Ingest no dependencies, 1=Ingest system dependencies. Accepted values: [0,1]", 

117 examples = [ 

118 1 

119 ] 

120 ) 

121 

122 

123class MML_System(BaseModel): 

124 """ 

125 Pydantic BaseModel representing a system with MML (MathML). 

126 

127 Attributes: 

128 mml (str): The MML equation. 

129 system (System): An instance of the System class representing system details. 

130 """ 

131 mml: str 

132 system: System 

133 

134 

135class Equation_Extraction_System(BaseModel): 

136 """ 

137 Pydantic BaseModel representing a system with equation extraction. 

138 

139 Attributes: 

140 pdf_local_path (str): the local path to the PDF file 

141 save_folder (str): the folder path where images will be saved 

142 gpt_key (str): your OpenAI API key 

143 """ 

144 

145 pdf_local_path: str 

146 save_folder: str 

147 gpt_key: str 

148 

149async def system_to_enriched_system(system: System) -> System: 

150 """Takes a System as input and enriches it with comments by running the tree-sitter comment extractor.""" 

151 

152 # Instead of making each proxy call seperatly, we will gather them 

153 coroutines = [] 

154 file_paths = [] 

155 for file, blob in zip(system.files, system.blobs): 

156 file_path = Path(system.root_name or "") / file 

157 if file_path.suffix not in SUPPORTED_FILE_EXTENSIONS: 

158 # Since we are enriching a system for unification, we only want to extract comments from source files we can also extract Gromet FN from. 

159 continue 

160 

161 request = SingleFileCommentRequest( 

162 source=blob, language=EXTENSION_TO_LANGUAGE[file_path.suffix] 

163 ) 

164 coroutines.append(comment_service.comments_extract(request)) 

165 file_paths.append(file_path) 

166 results = await asyncio.gather(*coroutines) 

167 

168 # Due to the nested structure of MultiFileCodeComments, it easier to work with a Dict. 

169 # Then, we can convert it using MutliFileCodeComments.model_validate() 

170 comments = {"files": {}} 

171 for file_path, result in zip(file_paths, results): 

172 comments["files"][str(file_path)] = result 

173 system.comments = MultiFileCommentResponse(**comments) 

174 

175 return system 

176 

177# returns an abbreviated Dict representing a GrometFNModuleCollection 

178async def system_to_gromet(system: System): 

179 """Convert a System to Gromet JSON""" 

180 

181 # We maintain a log of warnings and error to pass back to the user. 

182 # This allows us to warn the user about unsupported file extensions. 

183 server_log = [] 

184 to_remove = [] 

185 for index, (file, blob) in enumerate(zip(system.files, system.blobs)): 

186 valid = True 

187 

188 # Itterate over parent directories to check for unsupported file paths 

189 path_obj = Path(file) 

190 for parent in path_obj.parents: 

191 if parent.name == "_MACOSX": 

192 unsupported_file_str = f"WARNING: Ingestion of files in _MACOSX directory not supported. File {file} will be skipped. " 

193 valid = False 

194 elif parent.name.startswith("."): 

195 unsupported_file_str = f"WARNING: File {file} is in a hidden directory and will be skipped." 

196 valid = False 

197 

198 # Check file extension is in the list of supported file extensions 

199 if Path(file).suffix not in SUPPORTED_FILE_EXTENSIONS: 

200 unsupported_file_str = f"WARNING: Ingestion of file extension {Path(file).suffix} for file {file} is not supported and will be skipped." 

201 valid = False 

202 

203 if not valid: 

204 to_remove.append(index) 

205 print(unsupported_file_str) 

206 server_log.append(unsupported_file_str) 

207 

208 # Remove files to prevent ingestion. Removed in reverse sorted order since each removal shifts the indecies. 

209 for index in sorted(to_remove, reverse=True): 

210 system.files.pop(index) 

211 system.blobs.pop(index) 

212 

213 

214 # If there are no supported files, then we will return an empty GrometFNModuleCollection with a top-level Debug metadata 

215 if len(system.files) == 0: 

216 no_supported_file_str = "ERROR: The system does not contain any files with supported file extensions. All files will be skipped." 

217 print(no_supported_file_str) 

218 gromet_collection = GrometFNModuleCollection( 

219 metadata_collection=[[]], metadata=0 

220 ) 

221 gromet_collection.metadata_collection[0].append( 

222 Debug( 

223 debug_type="code2fn", severity="ERROR", message=no_supported_file_str 

224 ).to_dict() # There is a bug in Swagger that requires us to manually to_dict() this 

225 ) 

226 return gromet_collection.to_dict() 

227 

228 # The CODE2FN Pipeline requires a file path as input. 

229 # We are receiving a serialized version of the code system as input, so we must store the file in a temporary directory. 

230 # This temp directory only persists during execution of the CODE2FN pipeline. 

231 with tempfile.TemporaryDirectory() as tmp: 

232 tmp_path = Path(tmp) 

233 

234 # Create files and intermediate directories 

235 for index, file in enumerate(system.files): 

236 file_path = Path(tmp_path, system.root_name or "", file) 

237 file_path.parent.mkdir(parents=True, exist_ok=True) 

238 file_path.write_text(system.blobs[index]) 

239 

240 # Create system_filepaths.txt 

241 system_filepaths = Path(tmp_path, "system_filepaths.txt") 

242 system_filepaths.write_text("\n".join(system.files)) 

243 

244 ## Run pipeline 

245 gromet_collection = process_file_system( 

246 system.system_name or "", 

247 str(Path(tmp_path, system.root_name or "")), 

248 str(system_filepaths), 

249 dependency_depth=system.dependency_depth 

250 ) 

251 

252 # Attempt to enrich the system with comments. May return the same system if Rust isn't insalled. 

253 if not system.comments: 

254 system = await system_to_enriched_system(system) 

255 

256 # If comments are included in request or added in the enriching process, run the unifier to add them to the Gromet 

257 if system.comments: 

258 align_full_system(gromet_collection, system.comments) 

259 

260 # Explicitly call to_dict on any metadata object 

261 # NOTE: Only required because of fault in swagger-codegen 

262 for i, module in enumerate(gromet_collection.modules): 

263 for j, metadata_list in enumerate(module.metadata_collection): 

264 for k, metadata in enumerate(metadata_list): 

265 gromet_collection.modules[i].metadata_collection[j][ 

266 k 

267 ] = metadata.to_dict() 

268 

269 # Add debug Metadata to Gromet objects 

270 if not gromet_collection.metadata_collection: 

271 gromet_collection.metadata_collection = [[]] 

272 for log in server_log: 

273 gromet_collection.metadata_collection[0].append( 

274 Debug( 

275 debug_type="code2fn", severity="WARNING", message=log 

276 ).to_dict() # There is a bug in Swagger that requires us to manually to_dict() this 

277 ) 

278 

279 # Convert Gromet data-model to dict for return 

280 return gromet_collection.to_dict() 

281 

282 

283router = APIRouter() 

284 

285 

286@router.get( 

287 "/healthcheck", 

288 summary="Ping endpoint to test health of service", 

289 status_code=status.HTTP_200_OK, 

290 response_model=int 

291) 

292def healthcheck() -> int: 

293 return status.HTTP_200_OK 

294 

295 

296@router.get( 

297 "/fn-supported-file-extensions", 

298 summary="Endpoint for checking which files extensions are currently supported by code2fn pipeline.", 

299 response_model=List[str], 

300 responses={ 

301 200: { 

302 "content": { 

303 "application/json":{ 

304 "example": [ 

305 ".py", 

306 ".f", 

307 ".f90" 

308 ] 

309 } 

310 } 

311 } 

312 } 

313) 

314def fn_supported_file_extensions(): 

315 """ 

316 Returns a List[str] where each entry in the list represents a file extension. 

317 

318 ### Python example 

319 ``` 

320 import requests 

321 

322 response = requests.get("http://0.0.0.0:8000/fn-supported-file-extensions") 

323 supported_extensions = response.json() 

324 

325 """ 

326 return SUPPORTED_FILE_EXTENSIONS 

327 

328 

329@router.post( 

330 "/fn-given-filepaths", 

331 summary=( 

332 "Send a system of code and filepaths of interest," 

333 " get a GroMEt FN Module collection back." 

334 ), 

335 responses={ 

336 200: { 

337 "content": { 

338 "application/json":{ 

339 "example": { 

340 "schema": "FN", 

341 "schema_version": "0.1.10", 

342 "name": "gromet_name", 

343 "modules": [], 

344 "module_index": [], 

345 "module_dependencies": [], 

346 "executables": [], 

347 "metadata_collection": [], 

348 "gromet_type": "GrometFNModuleCollection" 

349 

350 } 

351 } 

352 } 

353 } 

354 } 

355) 

356async def fn_given_filepaths(system: System): 

357 """ 

358 Endpoint for generating Gromet JSON from a serialized code system. 

359 ### Python example 

360 

361 ``` 

362 import requests 

363 

364 # Single file 

365 system = { 

366 "files": ["exp1.py"], 

367 "blobs": ["x=2"] 

368 } 

369 response = requests.post("http://0.0.0.0:8000/fn-given-filepaths", json=system) 

370 gromet_json = response.json() 

371 

372 # Multi file 

373 system = { 

374 "files": ["exp1.py", "exp1.f"], 

375 "blobs": ["x=2", "program exp1\\ninteger::x=2\\nend program exp1"], 

376 "system_name": "exp1", 

377 "root_name": "exp1" 

378 } 

379 response = requests.post("http://0.0.0.0:8000/fn-given-filepaths", json=system) 

380 gromet_json = response.json() 

381 """ 

382 return await system_to_gromet(system) 

383 

384 

385@router.post( 

386 "/fn-given-filepaths-zip", 

387 summary=( 

388 "Send a zip file containing a code system," 

389 " get a GroMEt FN Module collection back." 

390 ), 

391 responses={ 

392 200: { 

393 "content": { 

394 "application/json":{ 

395 "example": { 

396 "schema": "FN", 

397 "schema_version": "0.1.10", 

398 "name": "gromet_name", 

399 "modules": [], 

400 "module_index": [], 

401 "module_dependencies": [], 

402 "executables": [], 

403 "metadata_collection": [], 

404 "gromet_type": "GrometFNModuleCollection" 

405 } 

406 } 

407 } 

408 } 

409 } 

410) 

411async def fn_given_filepaths_zip(zip_file: UploadFile = File()): 

412 """ 

413 Endpoint for generating Gromet JSON from a zip archive of arbitrary depth and structure. 

414 All source files with a supported file extension (/fn-supported-file-extensions) will be processed as a single GrometFNModuleCollection. 

415 

416 ### Python example 

417 ``` 

418 import requests 

419 import shutil 

420 from pathlib import Path 

421 

422 # Format input/output paths 

423 input_name = "system_test" 

424 output_name = "system_test.zip" 

425 input_path = Path("/data") / "skema" / "code" / input_name 

426 output_path = Path("/data") / "skema" / "code" / output_name 

427 

428 # Convert source directory to zip archive 

429 shutil.make_archive(input_path, "zip", input_path) 

430 

431 files = { 

432 "zip_file": open(output_path, "rb"), 

433 } 

434 response = requests.post("http://0.0.0.0:8000/fn-given-filepaths-zip", files=files) 

435 gromet_json = response.json() 

436 """ 

437 

438 # To process a zip file, we first convert it to a System object, and then pass it to system_to_gromet. 

439 files = [] 

440 blobs = [] 

441 with ZipFile(BytesIO(zip_file.file.read()), "r") as zip: 

442 for file in zip.namelist(): 

443 file_obj = Path(file) 

444 if file_obj.suffix in SUPPORTED_FILE_EXTENSIONS: 

445 files.append(file) 

446 blobs.append(zip.open(file).read()) 

447 

448 zip_obj = Path(zip_file.filename) 

449 system_name = zip_obj.stem 

450 root_name = zip_obj.stem 

451 

452 system = System( 

453 files=files, blobs=blobs, system_name=system_name, root_name=root_name 

454 ) 

455 

456 return await system_to_gromet(system) 

457 

458 

459@router.post( 

460 "/gromet-object-count", 

461 summary=("Endpoint for counting the number of boxes, wires, and ports in a Gromet object."), 

462 responses={ 

463 200: { 

464 "content": { 

465 "application/json":{ 

466 "example": { 

467 "b": 0, 

468 "bf": 0, 

469 "opi": 0, 

470 "opo": 0, 

471 "pil": 0, 

472 "pol": 0, 

473 "wlopi": 0, 

474 "wll": 0, 

475 "wlf": 0, 

476 "wlc": 0, 

477 "wlopo": 0, 

478 "pof": 0, 

479 "pif": 0, 

480 "wfopi": 0, 

481 "wfl": 0, 

482 "wff": 0, 

483 "wfc": 0, 

484 "wfopo": 0, 

485 "pic": 0, 

486 "poc": 0, 

487 "wcopi": 0, 

488 "wcl": 0, 

489 "wcf": 0, 

490 "wcc": 0, 

491 "wcopo": 0, 

492 } 

493 } 

494 } 

495 } 

496 } 

497) 

498async def gromet_object_count(gromet_object: Dict): 

499 """ 

500 Endpoint for counting the number of boxes, wires, and ports in a Gromet object. 

501 ### Python example 

502 ``` 

503 import requests 

504 

505 system = { 

506 "files": ["example1.py"], 

507 "blobs": [ 

508 "greet = lambda: print('howdy!')" 

509 ], 

510 } 

511 response = client.post("/code2fn/fn-given-filepaths", json=system) 

512 gromet_collection = response.json() 

513 response = client.post("/code2fn/gromet-object-count", json=gromet_collection) 

514 gromet_object_count = response.json() 

515 """ 

516 

517 gromet_keys = { 

518 "b": 0, 

519 "bf": 0, 

520 "opi": 0, 

521 "opo": 0, 

522 "pil": 0, 

523 "pol": 0, 

524 "wlopi": 0, 

525 "wll": 0, 

526 "wlf": 0, 

527 "wlc": 0, 

528 "wlopo": 0, 

529 "pof": 0, 

530 "pif": 0, 

531 "wfopi": 0, 

532 "wfl": 0, 

533 "wff": 0, 

534 "wfc": 0, 

535 "wfopo": 0, 

536 "pic": 0, 

537 "poc": 0, 

538 "wcopi": 0, 

539 "wcl": 0, 

540 "wcf": 0, 

541 "wcc": 0, 

542 "wcopo": 0, 

543 } 

544 

545 def recurse(gromet_object: Dict): 

546 """Recursive walking function for Gromet""" 

547 for key, value in gromet_object.items(): 

548 if key in gromet_keys: 

549 gromet_keys[key] += len(value) 

550 elif isinstance(value, List): 

551 for element in value: 

552 if isinstance(element, Dict): 

553 recurse(element) 

554 elif isinstance(value, Dict): 

555 recurse(value) 

556 

557 # Its likely that the Gromet passed to this endpoint will not have None values removed. 

558 # So, we need to remove None values ahead of time. 

559 del_nulls(gromet_object) 

560 recurse(gromet_object) 

561 

562 # We also will aggregate the boxes, wires, and ports to better support MORAE usecases 

563 gromet_keys["boxes"] = sum( 

564 [val for key, val in gromet_keys.items() if key.startswith("b")] 

565 ) 

566 gromet_keys["wires"] = sum( 

567 [val for key, val in gromet_keys.items() if key.startswith("w")] 

568 ) 

569 gromet_keys["ports"] = sum( 

570 [val for key, val in gromet_keys.items() if key.startswith(("p", "o"))] 

571 ) 

572 

573 return gromet_keys 

574 

575 

576@router.post( 

577 "/get-pyacset", 

578 summary=("Get PyACSet for a given model"), 

579) 

580async def get_pyacset(ports: Ports): 

581 opis, opos = ports.opis, ports.opos 

582 petri = skema.skema_py.petris.Petri() 

583 petri.add_species(len(opos)) 

584 trans = skema.skema_py.petris.Transition 

585 petri.add_parts(trans, len(opis)) 

586 

587 for i, tran in enumerate(opis): 

588 petri.set_subpart(i, skema.skema_py.petris.attr_tname, opis[i]) 

589 

590 for j, spec in enumerate(opos): 

591 petri.set_subpart(j, skema.skema_py.petris.attr_sname, opos[j]) 

592 

593 return petri.write_json() 

594 

595 

596app = FastAPI() 

597app.include_router( 

598 router, 

599 prefix="/code2fn", 

600 tags=["code2fn"], 

601)