Coverage for skema/rest/llm_proxy.py: 23%

91 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 17:15 +0000

1from langchain.chat_models import ChatOpenAI 

2from langchain.prompts import ( 

3 ChatPromptTemplate, 

4 SystemMessagePromptTemplate, 

5 HumanMessagePromptTemplate, 

6) 

7from langchain.output_parsers import ( 

8 StructuredOutputParser, 

9 ResponseSchema 

10) 

11from fastapi import APIRouter, FastAPI, File, UploadFile 

12from io import BytesIO 

13from zipfile import ZipFile 

14from pathlib import Path 

15from pydantic import BaseModel, Field 

16from typing import List, Optional 

17from skema.skema_py import server as code2fn 

18from skema.rest.proxies import SKEMA_OPENAI_KEY 

19import time 

20 

21router = APIRouter() 

22 

23class Dynamics(BaseModel): 

24 """ 

25 Dynamics Data Model for capturing dynamics within a CodeFile. 

26 """ 

27 

28 name: Optional[str] = Field(None, description="Name of the dynamics section.") 

29 description: Optional[str] = Field(None, description="Description of the dynamics.") 

30 block: List[str] = Field( 

31 description="A list containing strings indicating the line numbers in the file that contain the dynamics, e.g., ['L205-L213', 'L225-L230']." 

32 ) 

33 

34@router.post( 

35 "/linespan-given-filepaths-zip", 

36 summary=( 

37 "Send a zip file containing a code file," 

38 " get a line span of the dynamics back. One for each code file." 

39 ), 

40) 

41async def get_lines_of_model(zip_file: UploadFile = File()) -> List[Dynamics]: 

42 """ 

43 Endpoint for generating a line span containing the dynamics from a zip archive. Currently 

44 it only expects there to be one python file in the zip. There can be other files, such as a 

45 README.md, but only one .py. Future versions will generalize support to arbritary zip contents.  

46 

47 ### Python example 

48 ``` 

49 import requests 

50 

51 files = { 

52 "zip_file": open(zip_path, "rb"), 

53 } 

54 

55 response = requests.post(f"{ENDPOINT}/morae/linespan-given-filepaths-zip", files=files) 

56 gromet_json = response.json() 

57 """ 

58 files=[] 

59 blobs=[] 

60 block=[] 

61 outputs=[] 

62 description=None 

63 with ZipFile(BytesIO(zip_file.file.read()), "r") as zip: 

64 for file in zip.namelist(): 

65 file_obj = Path(file) 

66 if file_obj.suffix in [".py"]: 

67 # Skip file if located in a hidden directory or MACOSX artifact 

68 valid = True 

69 for parent in file_obj.parents: 

70 if parent.name == "_MACOSX": 

71 valid = False 

72 break 

73 elif parent.name.startswith("."): 

74 valid = False 

75 break 

76 if valid: 

77 files.append(file) 

78 blobs.append(zip.open(file).read().decode("utf-8")) 

79 

80 # this is the formatting instructions 

81 response_schemas = [ 

82 ResponseSchema(name="model_function", description="The name of the function that contains the model dynamics") 

83 ] 

84 

85 # for structured output parsing, converts schema to langhchain object 

86 output_parser = StructuredOutputParser.from_response_schemas(response_schemas) 

87 

88 # for structured output parsing, makes the instructions to be passed as a variable to prompt template 

89 format_instructions = output_parser.get_format_instructions() 

90 

91 # low temp as is not generative 

92 temperature = 0.0 

93 

94 # initialize the models 

95 openai = ChatOpenAI( 

96 temperature=temperature, 

97 model_name='gpt-3.5-turbo-0613', 

98 openai_api_key=SKEMA_OPENAI_KEY 

99 ) 

100 

101 template="You are a assistant that answers questions about code." 

102 system_message_prompt = SystemMessagePromptTemplate.from_template(template) 

103 

104 # iterate through each file  

105 for f in range(len(files)): 

106 # read in the code, for the prompt 

107 code = blobs[f] 

108 file = files[f] 

109 # json for the fn construction 

110 single_snippet_payload = { 

111 "files": [file], 

112 "blobs": [code], 

113 } 

114 

115 # construct the prompts 

116 human_template="Find the function that contains the model dynamics in {code} \n{format_instructions}" 

117 human_message_prompt = HumanMessagePromptTemplate.from_template(human_template) 

118 

119 # combining the templates for a chat template 

120 chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt]) 

121 

122 # formatting the prompt with input variables 

123 formatted_prompt = chat_prompt.format_prompt(code=code, format_instructions = format_instructions).to_messages() 

124 

125 # running the model 

126 output = openai(formatted_prompt) 

127 

128 # parsing the output 

129 try: 

130 parsed_output = output_parser.parse(output.content) 

131 

132 function_name = parsed_output['model_function'] 

133 

134 # FIXME: we should rewrite things to avoid this need 

135 #time.sleep(0.5) 

136 system = code2fn.System(**single_snippet_payload) 

137 print(f"System:\t{system}") 

138 response_zip = await code2fn.fn_given_filepaths(system) 

139 #print(f"response_zip:\t{response_zip}") 

140 # get metadata entry for function 

141 for entry in response_zip['modules'][0]['fn_array']: 

142 try: 

143 print(f"Entry trying to get name and metadata idx from: {entry['b'][0]} looking for {function_name}") 

144 if entry['b'][0]['name'][0:len(function_name)] == function_name: 

145 metadata_idx = entry['b'][0]['metadata'] 

146 except: 

147 print(f"failed to find {function_name}") 

148 

149 # get line span using metadata 

150 for (i,metadata) in enumerate(response_zip['modules'][0]['metadata_collection']): 

151 if i == (metadata_idx - 1): 

152 line_begin = metadata[0]['line_begin'] 

153 line_end = metadata[0]['line_end'] 

154 # if the line_begin of meta entry 2 (base 0) and meta entry 3 (base 0) are we add a slice from [meta2.line_begin, meta3.line_begin) 

155 # to capture all the imports, return a Dynamics.block with 2 entries, both of which need to be concatenated to pass forward 

156 file_line_begin = response_zip['modules'][0]['metadata_collection'][2][0]['line_begin'] 

157 

158 code_line_begin = response_zip['modules'][0]['metadata_collection'][3][0]['line_begin'] - 1 

159 

160 if (file_line_begin != code_line_begin) and (code_line_begin > file_line_begin): 

161 block.append(f"L{file_line_begin}-L{code_line_begin}") 

162 

163 block.append(f"L{line_begin}-L{line_end}") 

164 except Exception as e: 

165 print("Failed to parse dynamics") 

166 print(f"e:\t{e}") 

167 description = "Failed to parse dynamics" 

168 line_begin = 0 

169 line_end = 0 

170 block.append(f"L{line_begin}-L{line_end}") 

171 

172 

173 

174 

175 output = Dynamics(name=file, description=description, block=block) 

176 outputs.append(output) 

177 block = [] 

178 description = None 

179 

180 return outputs 

181 

182 

183app = FastAPI() 

184app.include_router(router)