server: concurrency fix + monitoring - add /metrics prometheus compatible endpoint (#5708)

* server: monitoring - add /metrics prometheus compatible endpoint

* server: concurrency issue, when 2 task are waiting for results, only one call thread is notified

* server: metrics - move to a dedicated struct
This commit is contained in:
Pierrick Hymbert 2024-02-25 13:49:43 +01:00 committed by GitHub
parent 1289408817
commit d52d7819b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 191 additions and 8 deletions

View File

@ -41,6 +41,7 @@ see https://github.com/ggerganov/llama.cpp/issues/1437
- `--grp-attn-w`: Set the group attention width to extend context size through self-extend(default: 512), used together with group attention factor `--grp-attn-n` - `--grp-attn-w`: Set the group attention width to extend context size through self-extend(default: 512), used together with group attention factor `--grp-attn-n`
- `-n, --n-predict`: Set the maximum tokens to predict (default: -1) - `-n, --n-predict`: Set the maximum tokens to predict (default: -1)
- `--slots-endpoint-disable`: To disable slots state monitoring endpoint. Slots state may contain user data, prompts included. - `--slots-endpoint-disable`: To disable slots state monitoring endpoint. Slots state may contain user data, prompts included.
- `--metrics`: enable prometheus `/metrics` compatible endpoint (default: disabled)
- `--chat-template JINJA_TEMPLATE`: Set custom jinja chat template. This parameter accepts a string, not a file name (default: template taken from model's metadata). We only support [some pre-defined templates](https://github.com/ggerganov/llama.cpp/wiki/Templates-supported-by-llama_chat_apply_template) - `--chat-template JINJA_TEMPLATE`: Set custom jinja chat template. This parameter accepts a string, not a file name (default: template taken from model's metadata). We only support [some pre-defined templates](https://github.com/ggerganov/llama.cpp/wiki/Templates-supported-by-llama_chat_apply_template)
## Build ## Build
@ -457,6 +458,18 @@ Notice that each `probs` is an array of length `n_probs`.
] ]
``` ```
- **GET** `/metrics`: [Prometheus](https://prometheus.io/) compatible metrics exporter endpoint if `--metrics` is enabled:
Available metrics:
- `llamacpp:prompt_tokens_total`: Number of prompt tokens processed.
- `llamacpp:tokens_predicted_total`: Number of generation tokens processed.
- `llamacpp:prompt_tokens_seconds`: Average prompt throughput in tokens/s.
- `llamacpp:predicted_tokens_seconds`: Average generation throughput in tokens/s.
- `llamacpp:kv_cache_usage_ratio`: KV-cache usage. 1 means 100 percent usage.
- `llamacpp:kv_cache_tokens`: KV-cache tokens.
- `llamacpp:requests_processing`: Number of request processing.
- `llamacpp:requests_deferred`: Number of request deferred.
## More examples ## More examples
### Change system prompt on runtime ### Change system prompt on runtime

View File

@ -43,6 +43,7 @@ struct server_params
int32_t read_timeout = 600; int32_t read_timeout = 600;
int32_t write_timeout = 600; int32_t write_timeout = 600;
bool slots_endpoint = true; bool slots_endpoint = true;
bool metrics_endpoint = false;
}; };
bool server_verbose = false; bool server_verbose = false;
@ -310,6 +311,39 @@ struct llama_client_slot
} }
}; };
struct llama_metrics {
uint64_t n_prompt_tokens_processed_total = 0;
uint64_t n_tokens_predicted_total = 0;
uint64_t n_prompt_tokens_processed = 0;
uint64_t t_prompt_processing = 0;
uint64_t n_tokens_predicted = 0;
uint64_t t_tokens_generation = 0;
void on_prompt_eval(const llama_client_slot &slot) {
n_prompt_tokens_processed_total += slot.num_prompt_tokens_processed;
n_prompt_tokens_processed += slot.num_prompt_tokens_processed;
t_prompt_processing += slot.t_prompt_processing;
}
void on_prediction(const llama_client_slot &slot) {
n_tokens_predicted_total += slot.n_decoded;
n_tokens_predicted += slot.n_decoded;
t_tokens_generation += slot.t_token_generation;
}
void reset_bucket() {
n_prompt_tokens_processed = 0;
t_prompt_processing = 0;
n_tokens_predicted = 0;
t_tokens_generation = 0;
}
};
struct llama_server_context struct llama_server_context
{ {
llama_model *model = nullptr; llama_model *model = nullptr;
@ -344,6 +378,8 @@ struct llama_server_context
llama_server_queue queue_tasks; llama_server_queue queue_tasks;
llama_server_response queue_results; llama_server_response queue_results;
llama_metrics metrics;
~llama_server_context() ~llama_server_context()
{ {
if (ctx) if (ctx)
@ -1404,7 +1440,7 @@ struct llama_server_context
case TASK_TYPE_NEXT_RESPONSE: { case TASK_TYPE_NEXT_RESPONSE: {
// do nothing // do nothing
} break; } break;
case TASK_TYPE_SLOTS_DATA: { case TASK_TYPE_METRICS: {
json slots_data = json::array(); json slots_data = json::array();
int n_idle_slots = 0; int n_idle_slots = 0;
int n_processing_slots = 0; int n_processing_slots = 0;
@ -1438,10 +1474,24 @@ struct llama_server_context
res.stop = true; res.stop = true;
res.error = false; res.error = false;
res.result_json = { res.result_json = {
{ "idle", n_idle_slots }, { "idle", n_idle_slots },
{ "processing", n_processing_slots }, { "processing", n_processing_slots },
{ "slots", slots_data } { "deferred", queue_tasks.queue_tasks_deferred.size() },
{ "n_prompt_tokens_processed_total", metrics.n_prompt_tokens_processed_total},
{ "n_tokens_predicted_total", metrics.n_tokens_predicted_total},
{ "n_prompt_tokens_processed", metrics.n_prompt_tokens_processed},
{ "t_prompt_processing", metrics.t_prompt_processing},
{ "n_tokens_predicted", metrics.n_tokens_predicted},
{ "t_tokens_generation", metrics.t_tokens_generation},
{ "kv_cache_tokens_count", llama_get_kv_cache_token_count(ctx)},
{ "kv_cache_used_cells", llama_get_kv_cache_used_cells(ctx)},
{ "slots", slots_data },
}; };
metrics.reset_bucket();
queue_results.send(res); queue_results.send(res);
} break; } break;
} }
@ -1849,6 +1899,7 @@ struct llama_server_context
{ {
slot.t_start_genereration = ggml_time_us(); slot.t_start_genereration = ggml_time_us();
slot.t_prompt_processing = (slot.t_start_genereration - slot.t_start_process_prompt) / 1e3; slot.t_prompt_processing = (slot.t_start_genereration - slot.t_start_process_prompt) / 1e3;
metrics.on_prompt_eval(slot);
} }
llama_token_data_array cur_p = { slot.ctx_sampling->cur.data(), slot.ctx_sampling->cur.size(), false }; llama_token_data_array cur_p = { slot.ctx_sampling->cur.data(), slot.ctx_sampling->cur.size(), false };
@ -1871,6 +1922,7 @@ struct llama_server_context
slot.release(); slot.release();
slot.print_timings(); slot.print_timings();
send_final_response(slot); send_final_response(slot);
metrics.on_prediction(slot);
} }
slot.i_batch = -1; slot.i_batch = -1;
@ -1955,6 +2007,7 @@ static void server_print_usage(const char *argv0, const gpt_params &params,
printf(" --mmproj MMPROJ_FILE path to a multimodal projector file for LLaVA.\n"); printf(" --mmproj MMPROJ_FILE path to a multimodal projector file for LLaVA.\n");
printf(" --log-disable disables logging to a file.\n"); printf(" --log-disable disables logging to a file.\n");
printf(" --slots-endpoint-disable disables slots monitoring endpoint.\n"); printf(" --slots-endpoint-disable disables slots monitoring endpoint.\n");
printf(" --metrics enable prometheus compatible metrics endpoint (default: %s).\n", sparams.metrics_endpoint ? "enabled" : "disabled");
printf("\n"); printf("\n");
printf(" -n, --n-predict maximum tokens to predict (default: %d)\n", params.n_predict); printf(" -n, --n-predict maximum tokens to predict (default: %d)\n", params.n_predict);
printf(" --override-kv KEY=TYPE:VALUE\n"); printf(" --override-kv KEY=TYPE:VALUE\n");
@ -2414,6 +2467,10 @@ static void server_params_parse(int argc, char **argv, server_params &sparams,
{ {
sparams.slots_endpoint = false; sparams.slots_endpoint = false;
} }
else if (arg == "--metrics")
{
sparams.metrics_endpoint = true;
}
else if (arg == "--chat-template") else if (arg == "--chat-template")
{ {
if (++i >= argc) if (++i >= argc)
@ -2621,7 +2678,7 @@ int main(int argc, char **argv)
// request slots data using task queue // request slots data using task queue
task_server task; task_server task;
task.id = llama.queue_tasks.get_new_id(); task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_SLOTS_DATA; task.type = TASK_TYPE_METRICS;
task.target_id = -1; task.target_id = -1;
llama.queue_results.add_waiting_task_id(task.id); llama.queue_results.add_waiting_task_id(task.id);
@ -2668,7 +2725,7 @@ int main(int argc, char **argv)
// request slots data using task queue // request slots data using task queue
task_server task; task_server task;
task.id = llama.queue_tasks.get_new_id(); task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_SLOTS_DATA; task.type = TASK_TYPE_METRICS;
task.target_id = -1; task.target_id = -1;
llama.queue_results.add_waiting_task_id(task.id); llama.queue_results.add_waiting_task_id(task.id);
@ -2683,6 +2740,87 @@ int main(int argc, char **argv)
}); });
} }
if (sparams.metrics_endpoint) {
svr.Get("/metrics", [&](const httplib::Request&, httplib::Response& res) {
// request slots data using task queue
task_server task;
task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_METRICS;
task.target_id = -1;
llama.queue_results.add_waiting_task_id(task.id);
llama.queue_tasks.post(task);
// get the result
task_result result = llama.queue_results.recv(task.id);
llama.queue_results.remove_waiting_task_id(task.id);
json data = result.result_json;
uint64_t n_prompt_tokens_processed = data["n_prompt_tokens_processed"];
uint64_t t_prompt_processing = data["t_prompt_processing"];
uint64_t n_tokens_predicted = data["n_tokens_predicted"];
uint64_t t_tokens_generation = data["t_tokens_generation"];
int32_t kv_cache_used_cells = data["kv_cache_used_cells"];
// metrics definition: https://prometheus.io/docs/practices/naming/#metric-names
json all_metrics_def = json {
{"counter", {{
{"name", "prompt_tokens_total"},
{"help", "Number of prompt tokens processed."},
{"value", data["n_prompt_tokens_processed_total"]}
}, {
{"name", "tokens_predicted_total"},
{"help", "Number of generation tokens processed."},
{"value", data["n_tokens_predicted_total"]}
}}},
{"gauge", {{
{"name", "prompt_tokens_seconds"},
{"help", "Average prompt throughput in tokens/s."},
{"value", n_prompt_tokens_processed ? 1e3 / t_prompt_processing * n_prompt_tokens_processed : 0}
},{
{"name", "predicted_tokens_seconds"},
{"help", "Average generation throughput in tokens/s."},
{"value", n_tokens_predicted ? 1e3 / t_tokens_generation * n_tokens_predicted : 0}
},{
{"name", "kv_cache_usage_ratio"},
{"help", "KV-cache usage. 1 means 100 percent usage."},
{"value", 1. * kv_cache_used_cells / params.n_ctx}
},{
{"name", "kv_cache_tokens"},
{"help", "KV-cache tokens."},
{"value", data["kv_cache_tokens_count"]}
},{
{"name", "requests_processing"},
{"help", "Number of request processing."},
{"value", data["processing"]}
},{
{"name", "requests_deferred"},
{"help", "Number of request deferred."},
{"value", data["deferred"]}
}}}
};
std::stringstream prometheus;
for (const auto& el : all_metrics_def.items()) {
const auto& type = el.key();
const auto& metrics_def = el.value();
for (const auto& metric_def : metrics_def) {
std::string name = metric_def["name"];
std::string help = metric_def["help"];
prometheus << "# HELP llamacpp:" << name << " " << help << "\n"
<< "# TYPE llamacpp:" << name << " " << type << "\n"
<< "llamacpp:" << name << " " << metric_def["value"] << "\n";
}
}
res.set_content(prometheus.str(), "text/plain; version=0.0.4");
res.status = 200; // HTTP OK
});
}
svr.set_logger(log_server_request); svr.set_logger(log_server_request);
svr.set_exception_handler([](const httplib::Request &, httplib::Response &res, std::exception_ptr ep) svr.set_exception_handler([](const httplib::Request &, httplib::Response &res, std::exception_ptr ep)

View File

@ -16,6 +16,8 @@ def before_scenario(context, scenario):
def after_scenario(context, scenario): def after_scenario(context, scenario):
if context.server_process is None:
return
if scenario.status == "failed": if scenario.status == "failed":
if 'GITHUB_ACTIONS' in os.environ: if 'GITHUB_ACTIONS' in os.environ:
print(f"\x1b[33;101mSCENARIO FAILED: {scenario.name} server logs:\x1b[0m\n\n") print(f"\x1b[33;101mSCENARIO FAILED: {scenario.name} server logs:\x1b[0m\n\n")

View File

@ -13,6 +13,7 @@ Feature: llama.cpp server
And 1 slots And 1 slots
And embeddings extraction And embeddings extraction
And 32 server max tokens to predict And 32 server max tokens to predict
And prometheus compatible metrics exposed
Then the server is starting Then the server is starting
Then the server is healthy Then the server is healthy
@ -25,6 +26,7 @@ Feature: llama.cpp server
And <n_predict> max tokens to predict And <n_predict> max tokens to predict
And a completion request with no api error And a completion request with no api error
Then <n_predicted> tokens are predicted matching <re_content> Then <n_predicted> tokens are predicted matching <re_content>
And prometheus metrics are exposed
Examples: Prompts Examples: Prompts
| prompt | n_predict | re_content | n_predicted | | prompt | n_predict | re_content | n_predicted |

View File

@ -13,6 +13,7 @@ import aiohttp
import openai import openai
from behave import step from behave import step
from behave.api.async_step import async_run_until_complete from behave.api.async_step import async_run_until_complete
from prometheus_client import parser
@step(u"a server listening on {server_fqdn}:{server_port}") @step(u"a server listening on {server_fqdn}:{server_port}")
@ -34,6 +35,8 @@ def step_server_config(context, server_fqdn, server_port):
context.server_api_key = None context.server_api_key = None
context.server_continuous_batching = False context.server_continuous_batching = False
context.server_embeddings = False context.server_embeddings = False
context.server_metrics = False
context.server_process = None
context.server_seed = None context.server_seed = None
context.user_api_key = None context.user_api_key = None
@ -82,6 +85,11 @@ def step_server_embeddings(context):
context.server_embeddings = True context.server_embeddings = True
@step(u'prometheus compatible metrics exposed')
def step_server_metrics(context):
context.server_metrics = True
@step(u"the server is starting") @step(u"the server is starting")
def step_start_server(context): def step_start_server(context):
start_server_background(context) start_server_background(context)
@ -424,6 +432,23 @@ def step_check_options_header_value(context, cors_header, cors_header_value):
assert context.options_response.headers[cors_header] == cors_header_value assert context.options_response.headers[cors_header] == cors_header_value
@step(u'prometheus metrics are exposed')
@async_run_until_complete
async def step_prometheus_metrics_exported(context):
async with aiohttp.ClientSession() as session:
async with await session.get(f'{context.base_url}/metrics') as metrics_response:
assert metrics_response.status == 200
assert metrics_response.headers['Content-Type'] == "text/plain; version=0.0.4"
metrics_raw = await metrics_response.text()
metric_exported = False
for metric in parser.text_string_to_metric_families(metrics_raw):
match metric.name:
case "llamacpp:kv_cache_usage_ratio":
assert len(metric.samples) > 0
metric_exported = True
assert metric_exported, "No metrics exported"
async def concurrent_requests(context, f_completion, *args, **kwargs): async def concurrent_requests(context, f_completion, *args, **kwargs):
n_prompts = len(context.prompts) n_prompts = len(context.prompts)
if context.debug: if context.debug:
@ -753,6 +778,8 @@ def start_server_background(context):
server_args.append('--cont-batching') server_args.append('--cont-batching')
if context.server_embeddings: if context.server_embeddings:
server_args.append('--embedding') server_args.append('--embedding')
if context.server_metrics:
server_args.append('--metrics')
if context.model_alias is not None: if context.model_alias is not None:
server_args.extend(['--alias', context.model_alias]) server_args.extend(['--alias', context.model_alias])
if context.n_ctx is not None: if context.n_ctx is not None:

View File

@ -1,3 +1,4 @@
aiohttp~=3.9.3 aiohttp~=3.9.3
behave~=1.2.6 behave~=1.2.6
openai~=0.25.0 openai~=0.25.0
prometheus-client~=0.20.0

View File

@ -50,7 +50,7 @@ enum task_type {
TASK_TYPE_COMPLETION, TASK_TYPE_COMPLETION,
TASK_TYPE_CANCEL, TASK_TYPE_CANCEL,
TASK_TYPE_NEXT_RESPONSE, TASK_TYPE_NEXT_RESPONSE,
TASK_TYPE_SLOTS_DATA TASK_TYPE_METRICS
}; };
struct task_server { struct task_server {
@ -441,7 +441,7 @@ struct llama_server_response {
{ {
LOG_VERBOSE("queue_results.push_back", {}); LOG_VERBOSE("queue_results.push_back", {});
queue_results.push_back(result); queue_results.push_back(result);
condition_results.notify_one(); condition_results.notify_all();
return; return;
} }
} }