tool-call: adapt very simple agent + docker isolation from https://github.com/ggerganov/llama.cpp/pull/6389

This commit is contained in:
ochafik 2024-09-26 21:07:46 +01:00
parent 10f9fe8d49
commit 8299fac07c
4 changed files with 414 additions and 0 deletions

View File

@ -0,0 +1,33 @@
# Agents / Tool Calling w/ llama.cpp
- Install prerequisite: [uv](https://docs.astral.sh/uv/) (used to simplify python deps)
- Run `llama-server` w/ jinja templates:
```bash
# make -j LLAMA_CURL=1 llama-server
./llama-server \
-mu https://huggingface.co/lmstudio-community/Meta-Llama-3.1-70B-Instruct-GGUF/resolve/main/Meta-Llama-3.1-70B-Instruct-Q4_K_M.gguf \
--jinja \
-c 8192 -fa
```
- Run some tools inside a docker container
```bash
docker run --rm -it \
-p "8088:8088" \
-v $PWD/examples/tool-call:/src \
ghcr.io/astral-sh/uv:python3.12-alpine \
uv run /src/fastify.py --port 8088 /src/tools.py
```
- Verify which tools have been exposed: http://localhost:8088/docs
- Run the agent with a given goal:
```bash
uv run examples/tool-call/agent.py \
--tool-endpoint http://localhost:8088 \
--goal "What is the sum of 2535 squared and 32222000403 then multiplied by one and a half. What's a third of the result?"
```

189
examples/tool-call/agent.py Normal file
View File

@ -0,0 +1,189 @@
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "fastapi",
# "openai",
# "pydantic",
# "requests",
# "uvicorn",
# "typer",
# ]
# ///
import json
import openai
from pydantic import BaseModel
import requests
import sys
import typer
from typing import Annotated, List, Optional
import urllib
class OpenAPIMethod:
def __init__(self, url, name, descriptor, catalog):
self.url = url
self.__name__ = name
assert 'post' in descriptor, 'Only POST methods are supported'
post_descriptor = descriptor['post']
self.__doc__ = post_descriptor.get('description', '')
parameters = post_descriptor.get('parameters', [])
request_body = post_descriptor.get('requestBody')
self.parameters = {p['name']: p for p in parameters}
assert all(param['in'] == 'query' for param in self.parameters.values()), f'Only query path parameters are supported (path: {url}, descriptor: {json.dumps(descriptor)})'
self.body = None
if request_body:
assert 'application/json' in request_body['content'], f'Only application/json is supported for request body (path: {url}, descriptor: {json.dumps(descriptor)})'
body_name = 'body'
i = 2
while body_name in self.parameters:
body_name = f'body{i}'
i += 1
self.body = dict(
name=body_name,
required=request_body['required'],
schema=request_body['content']['application/json']['schema'],
)
self.parameters_schema = dict(
type='object',
properties={
**({
self.body['name']: self.body['schema']
} if self.body else {}),
**{
name: param['schema']
for name, param in self.parameters.items()
}
},
components=catalog.get('components'),
required=[name for name, param in self.parameters.items() if param['required']] + ([self.body['name']] if self.body and self.body['required'] else [])
)
def __call__(self, **kwargs):
if self.body:
body = kwargs.pop(self.body['name'], None)
if self.body['required']:
assert body is not None, f'Missing required body parameter: {self.body["name"]}'
else:
body = None
query_params = {}
for name, param in self.parameters.items():
value = kwargs.pop(name, None)
if param['required']:
assert value is not None, f'Missing required parameter: {name}'
assert param['in'] == 'query', 'Only query parameters are supported'
query_params[name] = value
params = "&".join(f"{name}={urllib.parse.quote(value)}" for name, value in query_params.items())
url = f'{self.url}?{params}'
response = requests.post(url, json=body)
response.raise_for_status()
response_json = response.json()
return response_json
def main(
goal: Annotated[str, typer.Option()],
api_key: Optional[str] = None,
tool_endpoint: Optional[List[str]] = None,
format: Annotated[Optional[str], typer.Option(help="The output format: either a Python type (e.g. 'float' or a Pydantic model defined in one of the tool files), or a JSON schema, e.g. '{\"format\": \"date\"}'")] = None,
max_iterations: Optional[int] = 10,
parallel_calls: Optional[bool] = False,
verbose: bool = False,
# endpoint: Optional[str] = None,
endpoint: str = "http://localhost:8080/v1/",
):
openai.api_key = api_key
openai.base_url = endpoint
tool_map = {}
tools = []
for url in (tool_endpoint or []):
assert url.startswith('http://') or url.startswith('https://'), f'Tools must be URLs, not local files: {url}'
catalog_url = f'{url}/openapi.json'
catalog_response = requests.get(catalog_url)
catalog_response.raise_for_status()
catalog = catalog_response.json()
for path, descriptor in catalog['paths'].items():
fn = OpenAPIMethod(url=f'{url}{path}', name=path.replace('/', ' ').strip().replace(' ', '_'), descriptor=descriptor, catalog=catalog)
tool_map[fn.__name__] = fn
if verbose:
sys.stderr.write(f'# PARAMS SCHEMA ({fn.__name__}): {json.dumps(fn.parameters_schema, indent=2)}\n')
tools.append(dict(
type="function",
function=dict(
name=fn.__name__,
description=fn.__doc__ or '',
parameters=fn.parameters_schema,
)
)
)
sys.stdout.write(f'🛠️ {", ".join(tool_map.keys())}\n')
messages = [
dict(
role="user",
content=goal,
)
]
i = 0
while (max_iterations is None or i < max_iterations):
response = openai.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
)
if verbose:
sys.stderr.write(f'# RESPONSE: {response}\n')
assert len(response.choices) == 1
choice = response.choices[0]
content = choice.message.content
if choice.finish_reason == "tool_calls":
messages.append(choice.message)
for tool_call in choice.message.tool_calls:
if content:
print(f'💭 {content}')
args = json.loads(tool_call.function.arguments)
pretty_call = f'{tool_call.function.name}({", ".join(f"{k}={v.model_dump_json() if isinstance(v, BaseModel) else json.dumps(v)}" for k, v in args.items())})'
sys.stdout.write(f'⚙️ {pretty_call}')
sys.stdout.flush()
tool_result = tool_map[tool_call.function.name](**args)
sys.stdout.write(f"{tool_result}\n")
messages.append(dict(
tool_call_id=tool_call.id,
role="tool",
name=tool_call.function.name,
content=f'{tool_result}',
# content=f'{pretty_call} = {tool_result}',
))
else:
assert content
print(content)
i += 1
if max_iterations is not None:
raise Exception(f"Failed to get a valid response after {max_iterations} tool calls")
if __name__ == '__main__':
typer.run(main)

View File

@ -0,0 +1,76 @@
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "fastapi",
# "uvicorn",
# "typer",
# ]
# ///
'''
Binds the functions of a python script as a FastAPI server.
'''
import os
import sys
import fastapi, uvicorn
from pathlib import Path
import typer
from typing import List
import importlib.util
def _load_source_as_module(source):
i = 0
while (module_name := f'mod_{i}') in sys.modules:
i += 1
spec = importlib.util.spec_from_file_location(module_name, source)
assert spec, f'Failed to load {source} as module'
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
assert spec.loader, f'{source} spec has no loader'
spec.loader.exec_module(module)
return module
def _load_module(f: str):
if f.endswith('.py'):
sys.path.insert(0, str(Path(f).parent))
return _load_source_as_module(f)
else:
return importlib.import_module(f)
def main(files: List[str], host: str = '0.0.0.0', port: int = 8000):
app = fastapi.FastAPI()
for f in files:
print(f'Binding functions from {f}')
module = _load_module(f)
for k in dir(module):
if k.startswith('_'):
continue
if k == k.capitalize():
continue
v = getattr(module, k)
if not callable(v) or isinstance(v, type):
continue
if not hasattr(v, '__annotations__'):
continue
vt = type(v)
if vt.__module__ == 'langchain_core.tools' and vt.__name__.endswith('Tool') and hasattr(v, 'func') and callable(v.func):
v = v.func
print(f'INFO: Binding /{k}')
try:
app.post('/' + k)(v)
except Exception as e:
print(f'WARNING: Failed to bind /{k}\n\t{e}')
print(f'INFO: CWD = {os.getcwd()}')
uvicorn.run(app, host=host, port=port)
if __name__ == '__main__':
typer.run(main)

116
examples/tool-call/tools.py Normal file
View File

@ -0,0 +1,116 @@
from datetime import date
import datetime
import json
from pydantic import BaseModel
import subprocess
import sys
import time
import typer
from typing import Union, Optional, Dict
import types
class Duration(BaseModel):
seconds: Optional[int] = None
minutes: Optional[int] = None
hours: Optional[int] = None
days: Optional[int] = None
months: Optional[int] = None
years: Optional[int] = None
def __str__(self) -> str:
return ', '.join([
x
for x in [
f"{self.years} years" if self.years else None,
f"{self.months} months" if self.months else None,
f"{self.days} days" if self.days else None,
f"{self.hours} hours" if self.hours else None,
f"{self.minutes} minutes" if self.minutes else None,
f"{self.seconds} seconds" if self.seconds else None,
]
if x is not None
])
@property
def get_total_seconds(self) -> int:
return sum([
self.seconds or 0,
(self.minutes or 0)*60,
(self.hours or 0)*3600,
(self.days or 0)*86400,
(self.months or 0)*2592000,
(self.years or 0)*31536000,
])
class WaitForDuration(BaseModel):
duration: Duration
def __call__(self):
sys.stderr.write(f"Waiting for {self.duration}...\n")
time.sleep(self.duration.get_total_seconds)
@staticmethod
def wait_for_duration(duration: Duration) -> None:
'Wait for a certain amount of time before continuing.'
# sys.stderr.write(f"Waiting for {duration}...\n")
time.sleep(duration.get_total_seconds)
@staticmethod
def wait_for_date(target_date: date) -> None:
f'''
Wait until a specific date is reached before continuing.
Today's date is {datetime.date.today()}
'''
# Get the current date
current_date = datetime.date.today()
if target_date < current_date:
raise ValueError("Target date cannot be in the past.")
time_diff = datetime.datetime.combine(target_date, datetime.time.min) - datetime.datetime.combine(current_date, datetime.time.min)
days, seconds = time_diff.days, time_diff.seconds
# sys.stderr.write(f"Waiting for {days} days and {seconds} seconds until {target_date}...\n")
time.sleep(days * 86400 + seconds)
# sys.stderr.write(f"Reached the target date: {target_date}\n")
def _is_serializable(obj) -> bool:
try:
json.dumps(obj)
return True
except Exception as e:
return False
def python(source: str) -> Union[Dict, str]:
"""
Evaluate a Python program and return the globals it declared.
Can be used to compute mathematical expressions (e.g. after importing math module).
Args:
source: contain valid, executable and pure Python code. Should also import any required Python packages.
For example: "import math\nresult = math.cos(2) * 10"
Returns:
dict | str: A dictionary containing variables declared, or an error message if an exception occurred.
"""
try:
namespace = {}
sys.stderr.write(f"Executing Python program:\n{source}\n")
exec(source, namespace)
results = {
k: v
for k, v in namespace.items()
if not k.startswith('_') \
and not isinstance(v, type) \
and not isinstance(v, types.ModuleType) \
and not callable(v) \
and _is_serializable(v)
}
sys.stderr.write(f"Results: {json.dumps(results, indent=2)}\n")
return results
except Exception as e:
msg = f"Error: {sys.exc_info()[1]}"
sys.stderr.write(f"{msg}\n")
return msg