threadpool : skip polling for unused threads (#9461)

* threadpool: skip polling for unused threads

Currently all threads do N polling rounds even if only 1 thread is active (n_threads_cur == 1).
This commit adds a check to skip the polling for unused threads (ith >= n_threads_cur).

n_threads_cur is now an atomic_int to explicitly tell thread sanitizer that it is written
from one thread and read from other threads (not a race conditions).

* threadpool: further simplify and improve ggml_barrier

Avoid using strict memory order while polling, yet make sure that all threads go through
full memory barrier (memory fence) on ggml_barrier entrace and exit.

* threads: add simple barrier test

This test does lots of small, parallel matmul ops where the barriers in between dominate the overhead.

* threadpool: improve thread sync for new-graphs

Using the same tricks as ggml_barrier. All the polling is done with relaxed memory order
to keep it efficient, once the new graph is detected we do full fence using read-modify-write
with strict memory order.

* threadpool: improve abort handling

Do not use threadpool->ec (exit code) to decide whether to exit the compute loop.
threadpool->ec is not atomic which makes thread-sanitizer rightfully unhappy about it.

Instead introduce atomic threadpool->abort flag used for this. This is consistent with
how we handle threadpool->stop or pause.

While at it add an explicit atomic_load for n_threads_cur for consistency.

* test-barrier: release threadpool before releasing the context

fixes use-after-free detected by gcc thread-sanitizer on x86-64
for some reason llvm sanitizer is not detecting this issue.
This commit is contained in:
Max Krasnyansky 2024-09-17 01:19:46 -07:00 committed by GitHub
parent 503147a9f9
commit 0226613853
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 170 additions and 50 deletions

View File

@ -2013,10 +2013,11 @@ struct ggml_threadpool {
// these are atomic as an annotation for thread-sanitizer // these are atomic as an annotation for thread-sanitizer
atomic_bool stop; // Used for stopping the threadpool altogether atomic_bool stop; // Used for stopping the threadpool altogether
atomic_bool pause; // Used for pausing the threadpool or individual threads atomic_bool pause; // Used for pausing the threadpool or individual threads
atomic_bool abort; // Used for aborting processing of a graph
struct ggml_compute_state * workers; // per thread state struct ggml_compute_state * workers; // per thread state
int n_threads_max; // number of threads in the pool int n_threads_max; // number of threads in the pool
int n_threads_cur; // number of threads used in the current graph atomic_int n_threads_cur; // number of threads used in the current graph
int32_t prio; // Scheduling priority int32_t prio; // Scheduling priority
uint32_t poll; // Polling level (0 - no polling) uint32_t poll; // Polling level (0 - no polling)
@ -3178,41 +3179,36 @@ inline static void ggml_critical_section_start(void) {
} }
} }
static void ggml_barrier(struct ggml_threadpool * tp) {
int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
if (n_threads == 1) {
return;
}
#ifdef GGML_USE_OPENMP #ifdef GGML_USE_OPENMP
static void ggml_barrier(struct ggml_threadpool * threadpool) {
if (threadpool->n_threads_cur == 1) {
return;
}
#pragma omp barrier #pragma omp barrier
}
#else #else
static void ggml_barrier(struct ggml_threadpool * threadpool) { int n_passed = atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed);
if (threadpool->n_threads_cur == 1) {
return;
}
atomic_int * n_barrier = &threadpool->n_barrier; // enter barrier (full seq-cst fence)
atomic_int * n_barrier_passed = &threadpool->n_barrier_passed; int n_barrier = atomic_fetch_add_explicit(&tp->n_barrier, 1, memory_order_seq_cst);
int n_threads = threadpool->n_threads_cur; int last = 0;
int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed); if (n_barrier == (n_threads - 1)) {
if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
// last thread // last thread
atomic_store(n_barrier, 0); atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
atomic_fetch_add_explicit(n_barrier_passed, 1, memory_order_relaxed); last = 1;
} else { } else {
// wait for other threads // wait for other threads
while (true) { while (atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed) == n_passed) {
if (atomic_load_explicit(n_barrier_passed, memory_order_relaxed) != passed_old) {
return;
}
ggml_thread_cpu_relax(); ggml_thread_cpu_relax();
} }
} }
}
// exit barrier (full seq-cst fence)
atomic_fetch_add_explicit(&tp->n_barrier_passed, last, memory_order_seq_cst);
#endif #endif
}
// TODO: make this somehow automatically executed // TODO: make this somehow automatically executed
// some sort of "sentry" mechanism // some sort of "sentry" mechanism
@ -19933,34 +19929,33 @@ struct ggml_cplan ggml_graph_plan(
static thread_ret_t ggml_graph_compute_thread(void * data) { static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data; struct ggml_compute_state * state = (struct ggml_compute_state *) data;
struct ggml_threadpool * tp = state->threadpool;
const struct ggml_cgraph * cgraph = state->threadpool->cgraph; const struct ggml_cgraph * cgraph = tp->cgraph;
const struct ggml_cplan * cplan = state->threadpool->cplan; const struct ggml_cplan * cplan = tp->cplan;
set_numa_thread_affinity(state->ith); set_numa_thread_affinity(state->ith);
struct ggml_compute_params params = { struct ggml_compute_params params = {
/*.ith =*/ state->ith, /*.ith =*/ state->ith,
/*.nth =*/ state->threadpool->n_threads_cur, /*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
/*.wsize =*/ cplan->work_size, /*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data, /*.wdata =*/ cplan->work_data,
/*.threadpool=*/ state->threadpool, /*.threadpool=*/ tp,
}; };
for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) { for (int node_n = 0; node_n < cgraph->n_nodes && !tp->abort; node_n++) {
struct ggml_tensor * node = cgraph->nodes[node_n]; struct ggml_tensor * node = cgraph->nodes[node_n];
ggml_compute_forward(&params, node); ggml_compute_forward(&params, node);
if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { if (state->ith == 0 && cplan->abort_callback &&
state->threadpool->ec = GGML_STATUS_ABORTED; cplan->abort_callback(cplan->abort_callback_data)) {
tp->abort = true;
tp->ec = GGML_STATUS_ABORTED;
} }
ggml_barrier(state->threadpool); ggml_barrier(state->threadpool);
if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
break;
}
} }
return 0; return 0;
@ -19968,7 +19963,15 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
#ifndef GGML_USE_OPENMP #ifndef GGML_USE_OPENMP
static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) { // check if thread is active
static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;
int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
return (state->ith < n_threads);
}
// check if thread is ready to proceed (exit from polling or sleeping)
static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool; struct ggml_threadpool * threadpool = state->threadpool;
if (state->pending || threadpool->stop || threadpool->pause) { return true; } if (state->pending || threadpool->stop || threadpool->pause) { return true; }
@ -19976,21 +19979,34 @@ static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
// check for new graph/work // check for new graph/work
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed); int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
if (new_graph != state->last_graph) { if (new_graph != state->last_graph) {
state->pending = (state->ith < threadpool->n_threads_cur); state->pending = ggml_graph_compute_thread_active(state);
state->last_graph = new_graph; state->last_graph = new_graph;
} }
return state->pending; return state->pending;
} }
// sync thread state after polling
static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;
// this should just be atomic_thread_fence(seq_cst) but it confuses thread-sanitizer
// so instead we just use a dummy read-modify-write
atomic_fetch_add_explicit(&threadpool->n_graph, 0, memory_order_seq_cst);
}
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) { static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool; struct ggml_threadpool * threadpool = state->threadpool;
// Skip polling for unused threads
if (!ggml_graph_compute_thread_active(state)) {
return state->pending;
}
// This seems to make 0 ... 100 a decent range for polling level across modern processors. // This seems to make 0 ... 100 a decent range for polling level across modern processors.
// Perhaps, we can adjust it dynamically based on load and things. // Perhaps, we can adjust it dynamically based on load and things.
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll; const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
for (uint64_t i=0; !ggml_graph_compute_ready(state) && i<n_rounds; i++) { for (uint64_t i=0; !ggml_graph_compute_thread_ready(state) && i < n_rounds; i++) {
// No new work. Keep polling. // No new work. Keep polling.
ggml_thread_cpu_relax(); ggml_thread_cpu_relax();
} }
@ -20002,13 +20018,14 @@ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state *
struct ggml_threadpool * threadpool = state->threadpool; struct ggml_threadpool * threadpool = state->threadpool;
if (ggml_graph_compute_poll_for_work(state)) { if (ggml_graph_compute_poll_for_work(state)) {
ggml_graph_compute_thread_sync(state);
return state->pending; return state->pending;
} }
ggml_mutex_lock_shared(&threadpool->mutex); ggml_mutex_lock_shared(&threadpool->mutex);
while (!ggml_graph_compute_ready(state)) { while (!ggml_graph_compute_thread_ready(state)) {
// No new work. Wait for the signal. // No new work. Wait for the signal.
GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith); GGML_PRINT_DEBUG("thread #%d waiting for work (sleeping)\n", state->ith);
ggml_cond_wait(&threadpool->cond, &threadpool->mutex); ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
} }
ggml_mutex_unlock_shared(&threadpool->mutex); ggml_mutex_unlock_shared(&threadpool->mutex);
@ -20055,13 +20072,20 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
} }
// Start processing new graph // Start processing new graph
static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool) static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int n_threads)
{ {
// always take the mutex here because the worker threads are doing hybrid poll/wait // Always take the mutex here because the worker threads are doing hybrid poll/wait
ggml_mutex_lock(&threadpool->mutex); ggml_mutex_lock(&threadpool->mutex);
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed); GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
// Update the number of active threads
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
// Indicate the graph is ready to be processed
// We need the full seq-cst fence here because of the polling threads (used in thread_sync)
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
if (threadpool->pause) { if (threadpool->pause) {
// Update main thread prio and affinity to match the threadpool settings // Update main thread prio and affinity to match the threadpool settings
@ -20120,6 +20144,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
threadpool->current_chunk = 0; threadpool->current_chunk = 0;
threadpool->stop = false; threadpool->stop = false;
threadpool->pause = tpp->paused; threadpool->pause = tpp->paused;
threadpool->abort = false;
threadpool->workers = NULL; threadpool->workers = NULL;
threadpool->n_threads_max = tpp->n_threads; threadpool->n_threads_max = tpp->n_threads;
threadpool->n_threads_cur = tpp->n_threads; threadpool->n_threads_cur = tpp->n_threads;
@ -20195,15 +20220,11 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
// No worker threads should be accessing the parameters below at this stage // No worker threads should be accessing the parameters below at this stage
threadpool->cgraph = cgraph; threadpool->cgraph = cgraph;
threadpool->cplan = cplan; threadpool->cplan = cplan;
threadpool->n_threads_cur = n_threads;
threadpool->current_chunk = 0; threadpool->current_chunk = 0;
threadpool->abort = false;
threadpool->ec = GGML_STATUS_SUCCESS; threadpool->ec = GGML_STATUS_SUCCESS;
} }
if (n_threads > threadpool->n_threads_max) {
GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
}
#ifdef GGML_USE_OPENMP #ifdef GGML_USE_OPENMP
if (n_threads > 1) { if (n_threads > 1) {
#pragma omp parallel num_threads(n_threads) #pragma omp parallel num_threads(n_threads)
@ -20212,7 +20233,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
{ {
// update the number of threads from the actual number of threads that we got from OpenMP // update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads(); n_threads = omp_get_num_threads();
threadpool->n_threads_cur = n_threads; atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
} }
ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]); ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
@ -20221,8 +20242,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
ggml_graph_compute_thread(&threadpool->workers[0]); ggml_graph_compute_thread(&threadpool->workers[0]);
} }
#else #else
if (n_threads > threadpool->n_threads_max) {
GGML_PRINT("WARNING: cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
n_threads = threadpool->n_threads_max;
}
// Kick all threads to start the new graph // Kick all threads to start the new graph
ggml_graph_compute_kickoff(threadpool); ggml_graph_compute_kickoff(threadpool, n_threads);
// This is a work thread too // This is a work thread too
ggml_graph_compute_thread(&threadpool->workers[0]); ggml_graph_compute_thread(&threadpool->workers[0]);

View File

@ -119,6 +119,7 @@ llama_target_and_test(test-grammar-parser.cpp)
llama_target_and_test(test-llama-grammar.cpp) llama_target_and_test(test-llama-grammar.cpp)
llama_target_and_test(test-grammar-integration.cpp) llama_target_and_test(test-grammar-integration.cpp)
llama_target_and_test(test-grad0.cpp) llama_target_and_test(test-grad0.cpp)
llama_target_and_test(test-barrier.cpp)
# llama_target_and_test(test-opt.cpp) # SLOW # llama_target_and_test(test-opt.cpp) # SLOW
llama_target_and_test(test-backend-ops.cpp) llama_target_and_test(test-backend-ops.cpp)

93
tests/test-barrier.cpp Normal file
View File

@ -0,0 +1,93 @@
#include "ggml.h"
#include "ggml-backend.h"
#include <chrono>
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <vector>
#define MAX_NARGS 2
int main(int argc, char *argv[]) {
int n_threads = 4;
int n_rounds = 100;
if (argc > 1) {
n_threads = std::atoi(argv[1]);
}
if (argc > 2) {
n_rounds = std::atoi(argv[2]);
}
struct ggml_init_params params = {
/* .mem_size = */ 1024*1024*1024,
/* .mem_buffer = */ NULL,
/* .no_alloc = */ false,
};
struct ggml_context * ctx = ggml_init(params);
// Create graph
struct ggml_cgraph * gf = ggml_new_graph(ctx);
// Lots of small, parallel ops where barriers in between will dominate
struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64);
for (int i = 0; i < 1000; i++) {
struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
out = ggml_mul_mat(ctx, a, out);
struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
out = ggml_mul_mat(ctx, d, out);
}
ggml_build_forward_expand(gf, out);
int n_nodes = ggml_graph_n_nodes(gf);
// Create threadpool
struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads);
struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
if (!threadpool) {
fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
exit(1);
}
// Create compute plan
struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, threadpool);
std::vector<uint8_t> work_data(cplan.work_size);
cplan.work_data = work_data.data();
std::cerr << "graph-compute with"
<< "\n n_threads: " << n_threads
<< "\n n_nodes: " << n_nodes
<< "\n n_rounds: " << n_rounds
<< "\n";
// ggml_graph_print(gf);
// Warmup
ggml_graph_compute(gf, &cplan);
auto t0 = std::chrono::high_resolution_clock::now();
for (int i=0; i < n_rounds; i++) {
ggml_graph_compute(gf, &cplan);
}
auto t1 = std::chrono::high_resolution_clock::now();
auto usec = std::chrono::duration_cast<std::chrono::microseconds>(t1-t0).count();
auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(t1-t0).count();
std::cerr << "graph-compute took " << usec << " usec "
<< "\n " << (float) usec / n_rounds << " usec per-iter"
<< "\n " << (float) nsec / (n_rounds * n_nodes) << " nsec per-node"
<< "\n";
ggml_threadpool_free(threadpool);
ggml_free(ctx);
return 0;
}