Coverage for skema/skema_py/client.py: 0%
31 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-04-30 17:15 +0000
1#!/usr/bin/env python
3"""Example Python client program to communicate with the skema-py service."""
5import os
6import json
7import requests
8import argparse
11def system_to_json(
12 root_path: str, system_filepaths: str, system_name: str
13) -> str:
14 files = []
15 blobs = []
17 with open(system_filepaths, "r") as f:
18 files = f.readlines()
19 files = [file.strip() for file in files]
21 for file_path in files:
22 full_path = os.path.join(root_path, file_path)
23 with open(full_path, "r") as f:
24 blobs.append(f.read())
26 root_name = os.path.basename(os.path.normpath(root_path))
28 return json.dumps(
29 {
30 "files": files,
31 "blobs": blobs,
32 "system_name": system_name,
33 "root_name": root_name,
34 }
35 )
38if __name__ == "__main__":
39 parser = argparse.ArgumentParser(description=__doc__)
40 parser.add_argument(
41 "--url",
42 default="http://localhost:8000/fn-given-filepaths",
43 help="URL for the fn-given-filepaths endpoint",
44 )
46 parser.add_argument(
47 "--write",
48 action="store_true",
49 help=(
50 "If this flag is provided, the program writes the response "
51 "to a file. Otherwise it prints the response to standard output."
52 ),
53 )
55 parser.add_argument("root_path", type=str)
56 parser.add_argument("system_filepaths", type=str)
57 parser.add_argument("system_name", type=str)
59 args = parser.parse_args()
61 data = system_to_json(
62 args.root_path, args.system_filepaths, args.system_name
63 )
64 response = requests.post(args.url, data=data)
66 if args.write:
67 with open(f"{args.system_name}--Gromet-FN-auto.json", "w") as f:
68 f.write(response.json())
69 else:
70 print(response.json())