Coverage for skema/skema_py/client.py: 0%

31 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 17:15 +0000

1#!/usr/bin/env python 

2 

3"""Example Python client program to communicate with the skema-py service.""" 

4 

5import os 

6import json 

7import requests 

8import argparse 

9 

10 

11def system_to_json( 

12 root_path: str, system_filepaths: str, system_name: str 

13) -> str: 

14 files = [] 

15 blobs = [] 

16 

17 with open(system_filepaths, "r") as f: 

18 files = f.readlines() 

19 files = [file.strip() for file in files] 

20 

21 for file_path in files: 

22 full_path = os.path.join(root_path, file_path) 

23 with open(full_path, "r") as f: 

24 blobs.append(f.read()) 

25 

26 root_name = os.path.basename(os.path.normpath(root_path)) 

27 

28 return json.dumps( 

29 { 

30 "files": files, 

31 "blobs": blobs, 

32 "system_name": system_name, 

33 "root_name": root_name, 

34 } 

35 ) 

36 

37 

38if __name__ == "__main__": 

39 parser = argparse.ArgumentParser(description=__doc__) 

40 parser.add_argument( 

41 "--url", 

42 default="http://localhost:8000/fn-given-filepaths", 

43 help="URL for the fn-given-filepaths endpoint", 

44 ) 

45 

46 parser.add_argument( 

47 "--write", 

48 action="store_true", 

49 help=( 

50 "If this flag is provided, the program writes the response " 

51 "to a file. Otherwise it prints the response to standard output." 

52 ), 

53 ) 

54 

55 parser.add_argument("root_path", type=str) 

56 parser.add_argument("system_filepaths", type=str) 

57 parser.add_argument("system_name", type=str) 

58 

59 args = parser.parse_args() 

60 

61 data = system_to_json( 

62 args.root_path, args.system_filepaths, args.system_name 

63 ) 

64 response = requests.post(args.url, data=data) 

65 

66 if args.write: 

67 with open(f"{args.system_name}--Gromet-FN-auto.json", "w") as f: 

68 f.write(response.json()) 

69 else: 

70 print(response.json())