mirror of
https://github.com/ggerganov/llama.cpp.git
synced 2024-11-11 21:39:52 +00:00
22f281aa16
Changes: - Move each example into its own function. This makes the code much easier to read and understand. - Make the program easy to only run one test by commenting out function calls in main(). - Make the output easy to parse by indenting the output for each example. - Add shebang and +x bit to make it clear it's an executable. - Make the host configurable via --host with a default 127.0.0.1:8080. - Make the code look in the tools list to call the registered tool, instead of hardcoding the returned values. This makes the code more copy-pastable. - Add error checking, so that the program exits 1 if the LLM didn't returned expected values. It's super useful to check for correctness. Testing: - Tested with Mistral-7B-Instruct-v0.3 in F16 and Q5_K_M and Meta-Llama-3-8B-Instruct in F16 and Q5_K_M. - I did not observe a failure even once in Mistral-7B-Instruct-v0.3. - Llama-3 failed about a third of the time in example_concurrent: it only returned one call instead of 3. Even for F16. Potential follow ups: - Do not fix the prompt encoding yet. Surprisingly it mostly works even if the prompt encoding is not model optimized. - Add chained answer and response. Test only change.
313 lines
13 KiB
Python
Executable File
313 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
|
||
"""Function calling example using pydantic models."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import datetime
|
||
import json
|
||
import logging
|
||
import textwrap
|
||
import sys
|
||
from enum import Enum
|
||
from typing import Optional, Union
|
||
|
||
import requests
|
||
from pydantic import BaseModel, Field
|
||
from pydantic_models_to_grammar import (add_run_method_to_dynamic_model, convert_dictionary_to_pydantic_model,
|
||
create_dynamic_model_from_function, generate_gbnf_grammar_and_documentation)
|
||
|
||
|
||
def create_completion(host, prompt, gbnf_grammar):
|
||
"""Calls the /completion API on llama-server.
|
||
|
||
See
|
||
https://github.com/ggerganov/llama.cpp/tree/HEAD/examples/server#api-endpoints
|
||
"""
|
||
print(f" Request:\n Grammar:\n{textwrap.indent(gbnf_grammar, ' ')}\n Prompt:\n{textwrap.indent(prompt.rstrip(), ' ')}")
|
||
headers = {"Content-Type": "application/json"}
|
||
data = {"prompt": prompt, "grammar": gbnf_grammar}
|
||
result = requests.post(f"http://{host}/completion", headers=headers, json=data).json()
|
||
assert data.get("error") is None, data
|
||
logging.info("Result: %s", result)
|
||
content = result["content"]
|
||
print(f" Model: {result['model']}")
|
||
print(f" Result:\n{textwrap.indent(json.dumps(json.loads(content), indent=2), ' ')}")
|
||
return content
|
||
|
||
|
||
# A function for the agent to send a message to the user.
|
||
class SendMessageToUser(BaseModel):
|
||
"""Send a message to the User."""
|
||
chain_of_thought: str = Field(..., description="Your chain of thought while sending the message.")
|
||
message: str = Field(..., description="Message you want to send to the user.")
|
||
|
||
def run(self):
|
||
print(f"SendMessageToUser: {self.message}")
|
||
|
||
|
||
def example_rce(host):
|
||
"""Minimal test case where the LLM call an arbitrary python function."""
|
||
print("- example_rce")
|
||
tools = [SendMessageToUser]
|
||
gbnf_grammar, documentation = generate_gbnf_grammar_and_documentation(
|
||
pydantic_model_list=tools, outer_object_name="function",
|
||
outer_object_content="function_parameters", model_prefix="Function", fields_prefix="Parameters")
|
||
system_message = "You are an advanced AI, tasked to assist the user by calling functions in JSON format. The following are the available functions and their parameters and types:\n\n" + documentation
|
||
user_message = "What is 42 * 42?"
|
||
prompt = f"<|im_start|>system\n{system_message}<|im_end|>\n<|im_start|>user\n{user_message}<|im_end|>\n<|im_start|>assistant"
|
||
text = create_completion(host, prompt, gbnf_grammar)
|
||
json_data = json.loads(text)
|
||
tools_map = {tool.__name__:tool for tool in tools}
|
||
# This finds "SendMessageToUser":
|
||
tool = tools_map.get(json_data["function"])
|
||
if not tool:
|
||
print(f"Error: unknown tool {json_data['function']}")
|
||
return 1
|
||
tool(**json_data["function_parameters"]).run()
|
||
return 0
|
||
|
||
|
||
# Enum for the calculator tool.
|
||
class MathOperation(Enum):
|
||
ADD = "add"
|
||
SUBTRACT = "subtract"
|
||
MULTIPLY = "multiply"
|
||
DIVIDE = "divide"
|
||
|
||
|
||
# Simple pydantic calculator tool for the agent that can add, subtract,
|
||
# multiply, and divide. Docstring and description of fields will be used in
|
||
# system prompt.
|
||
class Calculator(BaseModel):
|
||
"""Perform a math operation on two numbers."""
|
||
number_one: Union[int, float] = Field(..., description="First number.")
|
||
operation: MathOperation = Field(..., description="Math operation to perform.")
|
||
number_two: Union[int, float] = Field(..., description="Second number.")
|
||
|
||
def run(self):
|
||
if self.operation == MathOperation.ADD:
|
||
return self.number_one + self.number_two
|
||
elif self.operation == MathOperation.SUBTRACT:
|
||
return self.number_one - self.number_two
|
||
elif self.operation == MathOperation.MULTIPLY:
|
||
return self.number_one * self.number_two
|
||
elif self.operation == MathOperation.DIVIDE:
|
||
return self.number_one / self.number_two
|
||
else:
|
||
raise ValueError("Unknown operation.")
|
||
|
||
|
||
def example_calculator(host):
|
||
"""Have the LLM ask to get a calculation done.
|
||
|
||
Here the grammar gets generated by passing the available function models to
|
||
generate_gbnf_grammar_and_documentation function. This also generates a
|
||
documentation usable by the LLM.
|
||
|
||
pydantic_model_list is the list of pydantic models outer_object_name is an
|
||
optional name for an outer object around the actual model object. Like a
|
||
"function" object with "function_parameters" which contains the actual model
|
||
object. If None, no outer object will be generated outer_object_content is
|
||
the name of outer object content.
|
||
|
||
model_prefix is the optional prefix for models in the documentation. (Default="Output Model")
|
||
fields_prefix is the prefix for the model fields in the documentation. (Default="Output Fields")
|
||
"""
|
||
print("- example_calculator")
|
||
tools = [SendMessageToUser, Calculator]
|
||
gbnf_grammar, documentation = generate_gbnf_grammar_and_documentation(
|
||
pydantic_model_list=tools, outer_object_name="function",
|
||
outer_object_content="function_parameters", model_prefix="Function", fields_prefix="Parameters")
|
||
system_message = "You are an advanced AI, tasked to assist the user by calling functions in JSON format. The following are the available functions and their parameters and types:\n\n" + documentation
|
||
user_message1 = "What is 42 * 42?"
|
||
prompt = f"<|im_start|>system\n{system_message}<|im_end|>\n<|im_start|>user\n{user_message1}<|im_end|>\n<|im_start|>assistant"
|
||
text = create_completion(host, prompt, gbnf_grammar)
|
||
json_data = json.loads(text)
|
||
expected = {
|
||
"function": "Calculator",
|
||
"function_parameters": {
|
||
"number_one": 42,
|
||
"operation": "multiply",
|
||
"number_two": 42
|
||
}
|
||
}
|
||
if json_data != expected:
|
||
print(" Result is not as expected!")
|
||
tools_map = {tool.__name__:tool for tool in tools}
|
||
# This finds "Calculator":
|
||
tool = tools_map.get(json_data["function"])
|
||
if not tool:
|
||
print(f"Error: unknown tool {json_data['function']}")
|
||
return 1
|
||
result = tool(**json_data["function_parameters"]).run()
|
||
print(f" Call {json_data['function']} gave result {result}")
|
||
return 0
|
||
|
||
|
||
class Category(Enum):
|
||
"""The category of the book."""
|
||
Fiction = "Fiction"
|
||
NonFiction = "Non-Fiction"
|
||
|
||
|
||
class Book(BaseModel):
|
||
"""Represents an entry about a book."""
|
||
title: str = Field(..., description="Title of the book.")
|
||
author: str = Field(..., description="Author of the book.")
|
||
published_year: Optional[int] = Field(..., description="Publishing year of the book.")
|
||
keywords: list[str] = Field(..., description="A list of keywords.")
|
||
category: Category = Field(..., description="Category of the book.")
|
||
summary: str = Field(..., description="Summary of the book.")
|
||
|
||
|
||
def example_struct(host):
|
||
"""A example structured output based on pydantic models.
|
||
|
||
The LLM will create an entry for a Book database out of an unstructured
|
||
text. We need no additional parameters other than our list of pydantic
|
||
models.
|
||
"""
|
||
print("- example_struct")
|
||
tools = [Book]
|
||
gbnf_grammar, documentation = generate_gbnf_grammar_and_documentation(pydantic_model_list=tools)
|
||
system_message = "You are an advanced AI, tasked to create a dataset entry in JSON for a Book. The following is the expected output model:\n\n" + documentation
|
||
text = """The Feynman Lectures on Physics is a physics textbook based on some lectures by Richard Feynman, a Nobel laureate who has sometimes been called "The Great Explainer". The lectures were presented before undergraduate students at the California Institute of Technology (Caltech), during 1961–1963. The book's co-authors are Feynman, Robert B. Leighton, and Matthew Sands."""
|
||
prompt = f"<|im_start|>system\n{system_message}<|im_end|>\n<|im_start|>user\n{text}<|im_end|>\n<|im_start|>assistant"
|
||
text = create_completion(host, prompt, gbnf_grammar)
|
||
json_data = json.loads(text)
|
||
# In this case, there's no function nor function_parameters.
|
||
# Here the result will vary based on the LLM used.
|
||
keys = sorted(["title", "author", "published_year", "keywords", "category", "summary"])
|
||
if keys != sorted(json_data.keys()):
|
||
print(f"Unexpected result: {sorted(json_data.keys())}")
|
||
return 1
|
||
book = Book(**json_data)
|
||
print(f" As a Book object: %s" % book)
|
||
return 0
|
||
|
||
|
||
def get_current_datetime(output_format: Optional[str] = None):
|
||
"""Get the current date and time in the given format.
|
||
|
||
Args:
|
||
output_format: formatting string for the date and time, defaults to '%Y-%m-%d %H:%M:%S'
|
||
"""
|
||
return datetime.datetime.now().strftime(output_format or "%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
# Example function to get the weather.
|
||
def get_current_weather(location, unit):
|
||
"""Get the current weather in a given location"""
|
||
if "London" in location:
|
||
return json.dumps({"location": "London", "temperature": "42", "unit": unit.value})
|
||
elif "New York" in location:
|
||
return json.dumps({"location": "New York", "temperature": "24", "unit": unit.value})
|
||
elif "North Pole" in location:
|
||
return json.dumps({"location": "North Pole", "temperature": "-42", "unit": unit.value})
|
||
return json.dumps({"location": location, "temperature": "unknown"})
|
||
|
||
|
||
def example_concurrent(host):
|
||
"""An example for parallel function calling with a Python function, a pydantic
|
||
function model and an OpenAI like function definition.
|
||
"""
|
||
print("- example_concurrent")
|
||
# Function definition in OpenAI style.
|
||
current_weather_tool = {
|
||
"type": "function",
|
||
"function": {
|
||
"name": "get_current_weather",
|
||
"description": "Get the current weather in a given location",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"location": {
|
||
"type": "string",
|
||
"description": "The city and state, e.g. San Francisco, CA",
|
||
},
|
||
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
|
||
},
|
||
"required": ["location"],
|
||
},
|
||
},
|
||
}
|
||
# Convert OpenAI function definition into pydantic model.
|
||
current_weather_tool_model = convert_dictionary_to_pydantic_model(current_weather_tool)
|
||
# Add the actual function to a pydantic model.
|
||
current_weather_tool_model = add_run_method_to_dynamic_model(current_weather_tool_model, get_current_weather)
|
||
|
||
# Convert normal Python function to a pydantic model.
|
||
current_datetime_model = create_dynamic_model_from_function(get_current_datetime)
|
||
|
||
tools = [SendMessageToUser, Calculator, current_datetime_model, current_weather_tool_model]
|
||
gbnf_grammar, documentation = generate_gbnf_grammar_and_documentation(
|
||
pydantic_model_list=tools, outer_object_name="function",
|
||
outer_object_content="params", model_prefix="Function", fields_prefix="Parameters", list_of_outputs=True)
|
||
system_message = "You are an advanced AI assistant. You are interacting with the user and with your environment by calling functions. You call functions by writing JSON objects, which represent specific function calls.\nBelow is a list of your available function calls:\n\n" + documentation
|
||
text = """Get the date and time, get the current weather in celsius in London and solve the following calculation: 42 * 42"""
|
||
prompt = f"<|im_start|>system\n{system_message}<|im_end|>\n<|im_start|>user\n{text}<|im_end|>\n<|im_start|>assistant"
|
||
text = create_completion(host, prompt, gbnf_grammar)
|
||
json_data = json.loads(text)
|
||
expected = [
|
||
{
|
||
"function": "get_current_datetime",
|
||
"params": {
|
||
"output_format": "%Y-%m-%d %H:%M:%S"
|
||
}
|
||
},
|
||
{
|
||
"function": "get_current_weather",
|
||
"params": {
|
||
"location": "London",
|
||
"unit": "celsius"
|
||
}
|
||
},
|
||
{
|
||
"function": "Calculator",
|
||
"params": {
|
||
"number_one": 42,
|
||
"operation": "multiply",
|
||
"number_two": 42
|
||
}
|
||
}
|
||
]
|
||
res = 0
|
||
if json_data != expected:
|
||
print(" Result is not as expected!")
|
||
print(" This can happen on highly quantized models")
|
||
res = 1
|
||
tools_map = {tool.__name__:tool for tool in tools}
|
||
for call in json_data:
|
||
tool = tools_map.get(call["function"])
|
||
if not tool:
|
||
print(f"Error: unknown tool {call['function']}")
|
||
return 1
|
||
result = tool(**call["params"]).run()
|
||
print(f" Call {call['function']} returned {result}")
|
||
# Should output something like this:
|
||
# Call get_current_datetime returned 2024-07-15 09:50:38
|
||
# Call get_current_weather returned {"location": "London", "temperature": "42", "unit": "celsius"}
|
||
# Call Calculator returned 1764
|
||
return res
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__)
|
||
parser.add_argument("--host", default="localhost:8080", help="llama.cpp server")
|
||
parser.add_argument("-v", "--verbose", action="store_true", help="enables logging")
|
||
args = parser.parse_args()
|
||
logging.basicConfig(level=logging.INFO if args.verbose else logging.ERROR)
|
||
ret = 0
|
||
# Comment out below to only run the example you want.
|
||
ret = ret or example_rce(args.host)
|
||
ret = ret or example_calculator(args.host)
|
||
ret = ret or example_struct(args.host)
|
||
ret = ret or example_concurrent(args.host)
|
||
return ret
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|