From b1d402d5fbc2f53e29a65d2122c8d64e10e52662 Mon Sep 17 00:00:00 2001 From: mqy Date: Wed, 28 Jun 2023 03:26:39 +0800 Subject: [PATCH] work stealing chunked task allocator example for issue #291 --- examples/CMakeLists.txt | 1 + examples/task-allocator/CMakeLists.txt | 7 + examples/task-allocator/task-allocator.c | 433 +++++++++++++++++++++++ 3 files changed, 441 insertions(+) create mode 100644 examples/task-allocator/CMakeLists.txt create mode 100644 examples/task-allocator/task-allocator.c diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index cf9c4a223..2c4dd1a63 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -39,6 +39,7 @@ else() add_subdirectory(baby-llama) add_subdirectory(train-text-from-scratch) add_subdirectory(simple) + add_subdirectory(task-allocator) if (LLAMA_METAL) add_subdirectory(metal) endif() diff --git a/examples/task-allocator/CMakeLists.txt b/examples/task-allocator/CMakeLists.txt new file mode 100644 index 000000000..3d10456a1 --- /dev/null +++ b/examples/task-allocator/CMakeLists.txt @@ -0,0 +1,7 @@ +set(TARGET task-allocator) +add_executable(${TARGET} task-allocator.c) +target_link_libraries(${TARGET} PRIVATE) +target_compile_features(${TARGET} PRIVATE c_std_11) +if(TARGET BUILD_INFO) + add_dependencies(${TARGET} BUILD_INFO) +endif() diff --git a/examples/task-allocator/task-allocator.c b/examples/task-allocator/task-allocator.c new file mode 100644 index 000000000..4d6957f0d --- /dev/null +++ b/examples/task-allocator/task-allocator.c @@ -0,0 +1,433 @@ +// https://github.com/ggerganov/ggml/issues/291 +// https://github.com/ggerganov/llama.cpp/pull/1507 + +#include +#include +#include +#include + +#if defined(_MSC_VER) || defined(__MINGW32__) +#include // using malloc.h with MSC/MINGW +#elif !defined(__FreeBSD__) && !defined(__NetBSD__) && !defined(__OpenBSD__) +#include +#endif + +#define GGML_ASSERT(x) \ + do { \ + if (!(x)) { \ + fprintf(stderr, "GGML_ASSERT: %s:%d: %s\n", __FILE__, __LINE__, \ + #x); \ + abort(); \ + } \ + } while (0) + +#define GGML_DEBUG 1 +#if (GGML_DEBUG >= 1) +#define GGML_PRINT_DEBUG(...) printf(__VA_ARGS__) +#else +#define GGML_PRINT_DEBUG(...) +#endif + +#if (GGML_DEBUG >= 5) +#define GGML_PRINT_DEBUG_5(...) printf(__VA_ARGS__) +#else +#define GGML_PRINT_DEBUG_5(...) +#endif + +#define UNUSED(x) (void)(x) +#define MIN(a, b) ((a) < (b) ? (a) : (b)) + +#if defined(_WIN32) + +#include + +typedef volatile LONG atomic_int; +typedef atomic_int atomic_bool; + +typedef HANDLE pthread_t; +typedef int thread_ret_t; + +static void atomic_store(atomic_int *ptr, LONG val) { + Intechan_lockedExchange(ptr, val); +} + +static LONG atomic_load(atomic_int *ptr) { + return Intechan_lockedCompareExchange(ptr, 0, 0); +} + +static LONG atomic_fetch_add(atomic_int *ptr, LONG inc) { + return Intechan_lockedExchangeAdd(ptr, inc); +} + +static LONG atomic_fetch_sub(atomic_int *ptr, LONG dec) { + return atomic_fetch_add(ptr, -(dec)); +} + +static int pthread_create(pthread_t *out, void *unused, + thread_ret_t (*func)(void *), void *arg) { + (void)unused; + HANDLE handle = + CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)func, arg, 0, NULL); + if (handle == NULL) { + return EAGAIN; + } + + *out = handle; + return 0; +} + +static int pthread_join(pthread_t thread, void *unused) { + (void)unused; + return (int)WaitForSingleObject(thread, INFINITE); +} + +static int sched_yield(void) { + // https://learn.microsoft.com/en-us/windows/win32/api/winnt/nf-winnt-yieldprocessor + YieldProcessor(); + return 0; +} + +#else // ! _WIN32 + +typedef void *thread_ret_t; + +#include +#include +#include + +#endif + +typedef pthread_t ggml_thread_t; + +//----------------------------------------------------------------------------- +/// Most of the above codes are taken from +/// https://github.com/ggerganov/llama.cpp/tree/master/ggml.c +/// Copyright original authors. +//----------------------------------------------------------------------------- + +_Thread_local int32_t thread_local_id; + +#define MAX_THREADS 16 + +struct task_allocator { + int nth; + + int n_multiplier; // >= 1 + + atomic_int lock; // 0 unlocked, 1 locked + + // total assigned. + atomic_int global_counter; + + atomic_int thread_queue_heads[MAX_THREADS]; + atomic_int thread_queue_tails[MAX_THREADS]; +}; + +static void task_allocator_reset(struct task_allocator *a) { + for (int i = 0; i < a->nth; ++i) { + atomic_store(&a->thread_queue_heads[i], 0); + atomic_store(&a->thread_queue_tails[i], a->n_multiplier); + } + + atomic_store(&a->lock, 0); + atomic_store(&a->global_counter, 0); +} + +static void task_allocator_init(struct task_allocator *a, int nth, + int n_multiplier) { + GGML_ASSERT(nth <= MAX_THREADS); + a->nth = nth; + a->n_multiplier = n_multiplier; + task_allocator_reset(a); +} + +static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, + int *n_chunks) { + GGML_ASSERT(a->nth > 0); + GGML_ASSERT(a->n_multiplier > 0); + + *chunk_idx = -1; + *n_chunks = 0; + + while (atomic_fetch_add(&a->lock, 1) != 0) { // lock + atomic_fetch_sub(&a->lock, 1); + } + + int M = a->n_multiplier; + int nth = a->nth; + int total_chunks = M * nth; + + // all assigned? + if (atomic_load(&a->global_counter) == total_chunks) { + GGML_PRINT_DEBUG_5("[#_%d] %s(): nothing to do.\n", thread_local_id, + __func__); + atomic_fetch_sub(&a->lock, 1); // unlock + return; + } + + // try take its own, pop front. + { + int head = atomic_load(&a->thread_queue_heads[ith]); + int tail = atomic_load(&a->thread_queue_tails[ith]); + + GGML_PRINT_DEBUG_5("[#_%d] %s(): head: %d, tail: %d.\n", + thread_local_id, __func__, head, tail); + + if (head < tail) { + int idx = ith * M + head; + + atomic_fetch_add(&a->thread_queue_heads[ith], 1); + atomic_fetch_add(&a->global_counter, 1); + + GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th trunk of its own.\n", + thread_local_id, __func__, head + 1); + + *chunk_idx = idx; + *n_chunks = total_chunks; + atomic_fetch_sub(&a->lock, 1); // unlock + return; + } + } + + // steal from others. + for (int i = 0; i < nth; ++i) { + if (i == ith) { + continue; + } + + int tail = atomic_load(&a->thread_queue_tails[i]); + if (tail == atomic_load(&a->thread_queue_heads[i])) { + continue; + } + + // pop back + int idx = i * M + tail; + atomic_fetch_sub(&a->thread_queue_tails[i], 1); + atomic_fetch_add(&a->global_counter, 1); + + GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th trunk from #_%d\n", + thread_local_id, __func__, tail, i); + + *chunk_idx = idx; + *n_chunks = total_chunks; + atomic_fetch_sub(&a->lock, 1); // unlock + return; + } + + fprintf(stderr, "%s:%d should be unreachable!\n", __FILE__, __LINE__); + abort(); +} + +struct state_shared { + int n_threads; + int n_multiplier; + + int n_nodes; + struct ggml_tensor *nodes; + + // thread done counter for single node + atomic_int done_counter; + + struct task_allocator task_allocator; +}; + +struct state { + ggml_thread_t thrd; + int ith; + struct state_shared *shared; +}; + +// simulate tensor that can be compute in parallel +struct ggml_tensor { + // simulate actual compute workload, e.g. src0 rows + int n_compute_units; +}; + +struct params { + int ith; + int nth; + + // simulate performance jitters related to: OS workload, thread affinity, + // economic cores, ... + int jitter_percent; + + struct task_allocator *task_allocator; +}; + +void compute_tensor(struct params params, struct ggml_tensor *node) { + GGML_PRINT_DEBUG_5("[#_%d] %s(): enter.\n", thread_local_id, __func__); + + const int ith = params.ith; + int chunk_idx; + int n_chunks; + + while (true) { + allocate_chunk(params.task_allocator, ith, &chunk_idx, &n_chunks); + if (chunk_idx < 0 || n_chunks <= 0) { + break; + } + + const int nr = node->n_compute_units; + const int dr = (nr + n_chunks - 1) / n_chunks; + const int ir0 = dr * chunk_idx; + const int ir1 = MIN(ir0 + dr, nr); + const int n_loops = 10000 * (100 + params.jitter_percent); + + volatile int64_t x = 0; + + for (int i = ir0; i <= ir1; ++i) { + for (int j = 0; j < n_loops; ++j) { + ++x; + } + } + UNUSED(x); + } + + GGML_PRINT_DEBUG_5("[#_%d] %s(): exit.\n", thread_local_id, __func__); +} + +static thread_ret_t demo_compute_thread(void *data) { + struct state *state = (struct state *)data; + GGML_ASSERT(state); + + struct state_shared *shared = state->shared; + GGML_ASSERT(shared); + + struct task_allocator *allocator = &shared->task_allocator; + GGML_ASSERT(allocator); + + int ith = state->ith; + int n_threads = shared->n_threads; + + thread_local_id = ith; + + atomic_int *done_counter = &shared->done_counter; + + for (int i = 0; i < shared->n_nodes; ++i) { + // Just slow down the last thread. + struct params params = { + .ith = state->ith, + .nth = n_threads, // suppose parallel + .task_allocator = allocator, + .jitter_percent = ith + 1 < n_threads ? 0 : 50, + }; + + struct ggml_tensor *node = &shared->nodes[i]; + + compute_tensor(params, node); + atomic_fetch_add(done_counter, 1); + + while (atomic_load(done_counter) != n_threads) { + sched_yield(); + // main: go here --> later, main saw cond matched, break out loop + // --> reset counter + // current: go here --stall for a thousand years --> check condition + // OOPS! will never break out. + + // So we have to check if the counter has been reset. + if (atomic_load(done_counter) == 0) { + break; + } + } + + GGML_PRINT_DEBUG_5( + "[#_%d] %s(): saw all threads finished computing the node.\n", + thread_local_id, __func__); + + if (ith == 0) { + task_allocator_reset(allocator); + atomic_store(done_counter, 0); + } else { + while (atomic_load(done_counter) != 0) { + sched_yield(); + } + } + } + + GGML_PRINT_DEBUG_5("[#_%d] %s(): exited\n", thread_local_id, __func__); + + return 0; +} + +static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units, + int n_multiplier) { + fprintf(stderr, + "\n[#_%d] %s(): n_threads: %d, n_nodes: %d, n_compute_units: %d, " + "n_multiplier: %d ===>\n\n", + thread_local_id, __func__, n_threads, n_nodes, n_compute_units, + n_multiplier); + + struct ggml_tensor *nodes = alloca(n_nodes * sizeof(struct ggml_tensor)); + + for (int i = 0; i < n_nodes; ++i) { + nodes[i].n_compute_units = n_compute_units; + } + + struct state_shared shared = { + .n_threads = n_threads, + .n_nodes = n_nodes, + .nodes = nodes, + .done_counter = 0, + }; + + task_allocator_init(&shared.task_allocator, n_threads, n_multiplier); + + struct state *workers = alloca(n_threads * sizeof(struct state)); + + for (int i = 0; i < n_threads; ++i) { + workers[i].ith = i; + workers[i].shared = &shared; + if (i > 0) { + pthread_create(&workers[i].thrd, NULL, demo_compute_thread, + &workers[i]); + } + } + + demo_compute_thread(&workers[0]); + + for (int i = 1; i < n_threads; ++i) { + pthread_join(workers[i].thrd, NULL); + } +} + +// +// Conclusions: +// +// - Given workers A and B, and the accumulated time T_a and T_b: +// B can steal a chunk from A only if T_a > T_b + T_b_per_chunk. +// - Saw this situation: A steal B, B steal C. +// - n_chunks plays a key role, similar to choosing the best n_threads, it's +// difficult choose the ideal n_chunks value. Performance drops with per-chunk +// compute time exceeds the scheduling overhead. +// - Work stealing chunked task allocator can save the response time +// significantly when the majority threads runs fast but a few suddenly or +// constantly slow. +// +int main(void) { + if (false) { // the most simple one: only main thread, one node + int n_threads = 1; + int n_nodes = 1; + int n_multiplier = 1; // trunks per thread. + int n_compute_units = 1; + + test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); + } + + if (false) { + int n_threads = 2; + int n_nodes = 2; + int n_multiplier = 1; // trunks per thread. + int n_compute_units = 2; + + test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); + } + + if (true) { + int n_threads = 4; + int n_nodes = 2; + int n_multiplier = 8; // trunks per thread. + int n_compute_units = 32; + + test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); + } +}