diff --git a/examples/task-allocator/task-allocator.c b/examples/task-allocator/task-allocator.c index 7a96774a9..f9b7e3feb 100644 --- a/examples/task-allocator/task-allocator.c +++ b/examples/task-allocator/task-allocator.c @@ -1,12 +1,12 @@ // https://github.com/ggerganov/ggml/issues/291 // https://github.com/ggerganov/llama.cpp/pull/1507 +#include +#include #include #include #include #include -#include -#include #if defined(_MSC_VER) || defined(__MINGW32__) #include // using malloc.h with MSC/MINGW @@ -132,30 +132,36 @@ static void task_allocator_reset(struct task_allocator *a) { atomic_store(&a->global_counter, 0); } +// NOTE: when nth == 1, n_multiplier is actually useless. static void task_allocator_init(struct task_allocator *a, int nth, int n_multiplier) { + GGML_ASSERT(nth > 0); GGML_ASSERT(nth <= MAX_THREADS); + GGML_ASSERT(n_multiplier > 0); + a->nth = nth; - a->n_multiplier = n_multiplier; + a->n_multiplier = nth == 1 ? 1 : n_multiplier; task_allocator_reset(a); } +// ith: worker id (start from 0). +// chunk_idx and n_chunks will be updated. +// chunk_idx is set as -1 when nothing to do. static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, int *n_chunks) { - GGML_ASSERT(a->nth > 0); - GGML_ASSERT(a->n_multiplier > 0); - - *chunk_idx = -1; - *n_chunks = 0; - - while (atomic_fetch_add(&a->lock, 1) != 0) { // lock - atomic_fetch_sub(&a->lock, 1); - } + GGML_ASSERT(ith >= 0 && ith < a->nth); int M = a->n_multiplier; int nth = a->nth; int total_chunks = M * nth; + *chunk_idx = -1; + *n_chunks = total_chunks; + + while (atomic_fetch_add(&a->lock, 1) != 0) { // lock + atomic_fetch_sub(&a->lock, 1); + } + // all assigned? if (atomic_load(&a->global_counter) == total_chunks) { GGML_PRINT_DEBUG("[#_%d] %s(): nothing to do.\n", ith, __func__); @@ -177,7 +183,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, atomic_fetch_add(&a->thread_queue_heads[ith], 1); atomic_fetch_add(&a->global_counter, 1); - GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th trunk of its own.\n", + GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th chunk of its own.\n", ith, __func__, head + 1); *chunk_idx = idx; @@ -188,6 +194,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, } // steal from others. + // TODO: optimize: steal from the slowest one. for (int i = 0; i < nth; ++i) { if (i == ith) { continue; @@ -203,7 +210,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, atomic_fetch_sub(&a->thread_queue_tails[i], 1); atomic_fetch_add(&a->global_counter, 1); - GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th trunk from #_%d\n", ith, + GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th chunk from #_%d\n", ith, __func__, tail, i); *chunk_idx = idx; @@ -261,7 +268,7 @@ void compute_tensor(struct params params, struct ggml_tensor *node) { while (true) { allocate_chunk(params.task_allocator, ith, &chunk_idx, &n_chunks); - if (chunk_idx < 0 || n_chunks <= 0) { + if (chunk_idx < 0) { break; } @@ -339,6 +346,144 @@ static thread_ret_t demo_compute_thread(void *data) { return 0; } +static void test_task_allocator_init(void) { + struct task_allocator a; + + task_allocator_init(&a, 1, 2); + GGML_ASSERT(a.nth == 1); + GGML_ASSERT(a.n_multiplier == 1); // when nth == 1, force n_multiplier as 1 + + task_allocator_init(&a, 2, 2); + GGML_ASSERT(a.nth == 2); + GGML_ASSERT(a.n_multiplier == 2); // ok +} + +static void task_allocator_unit_test_no_steal(void) { + int chunk_idx; // out + int n_chunks; // out + + int n_threads = 2; + int n_multiplier = 2; + const int expected_n_slots = n_threads * n_multiplier; + + struct task_allocator a; + task_allocator_init(&a, n_threads, n_multiplier); + + struct test_data_t { + int ith; // call by + int chunk_idx; // expected + int n_chunks; // expected + }; + + struct test_data_t test_data[] = { + //////////////////// clang format ///////////////////////// + { + .ith = 0, + .chunk_idx = 0, + }, + { + .ith = 1, + .chunk_idx = 2, + }, + { + .ith = 0, + .chunk_idx = 1, + }, + { + .ith = 1, + .chunk_idx = 3, + }, + { + .ith = 0, + .chunk_idx = -1, + }, + { + .ith = 1, + .chunk_idx = -1, + }}; + + int t_len = sizeof(test_data) / sizeof(struct test_data_t); + + for (int i = 0; i < t_len; i++) { + allocate_chunk(&a, test_data[i].ith, &chunk_idx, &n_chunks); + if (chunk_idx != test_data[i].chunk_idx) { + fprintf(stderr, + "%s(): chunk_idx mismatch. i: %d, actual: %d, expected: %d\n", + __func__, i, chunk_idx, test_data[i].chunk_idx); + abort(); + } + if (n_chunks != expected_n_slots) { + fprintf(stderr, + "%s(): n_chunks mismatch. i: %d, actual: %d, expected: %d\n", + __func__, i, n_chunks, expected_n_slots); + abort(); + } + } +} + +static void task_allocator_unit_test_steal(void) { + int chunk_idx; // out + int n_chunks; // out + + int n_threads = 2; + int n_multiplier = 2; + const int expected_n_slots = n_threads * n_multiplier; + + struct task_allocator a; + task_allocator_init(&a, n_threads, n_multiplier); + + struct test_data_t { + int ith; // call by + int chunk_idx; // expected + }; + + struct test_data_t test_data[] = { + //////////////////// clang format ///////////////////////// + { + .ith = 0, + .chunk_idx = 0, + }, + { + .ith = 0, + .chunk_idx = 1, + }, + { + .ith = 1, + .chunk_idx = 2, + }, + { + .ith = 0, + .chunk_idx = 4, // steal from tail + }, + { + .ith = 0, + .chunk_idx = -1, + }, + { + .ith = 1, + .chunk_idx = -1, + }}; + + int t_len = sizeof(test_data) / sizeof(struct test_data_t); + + for (int i = 0; i < t_len; i++) { + allocate_chunk(&a, test_data[i].ith, &chunk_idx, &n_chunks); + if (chunk_idx != test_data[i].chunk_idx) { + fprintf(stderr, + "%s(): chunk_idx mismatch. i: %d, actual: %d, expected: %d\n", + __func__, i, chunk_idx, test_data[i].chunk_idx); + abort(); + } + if (n_chunks != expected_n_slots) { + fprintf(stderr, + "%s(): n_chunks mismatch. i: %d, actual: %d, expected: %d\n", + __func__, i, n_chunks, expected_n_slots); + abort(); + } + } +} + +// Integration test. static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units, int n_multiplier) { fprintf(stderr, @@ -386,36 +531,40 @@ static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units, // B can steal a chunk from A only if T_a > T_b + T_b_per_chunk. // - Saw this situation: A steal B, B steal C. // - n_chunks plays a key role, similar to choosing the best n_threads, it's -// difficult choose the ideal n_chunks value. Performance drops with per-chunk -// compute time exceeds the scheduling overhead. +// difficult to choose the ideal n_chunks value. Performance drops when +// per-chunk compute time exceeds the scheduling overhead. // - Work stealing chunked task allocator can save the response time // significantly when the majority threads runs fast but a few suddenly or // constantly slow. // int main(void) { - if (false) { // the most simple one: only main thread, one node - int n_threads = 1; - int n_nodes = 1; - int n_multiplier = 1; // trunks per thread. - int n_compute_units = 1; + test_task_allocator_init(); + task_allocator_unit_test_no_steal(); + task_allocator_unit_test_steal(); - test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); - } + // Integration tests + const int n_compute_units = 64; if (false) { - int n_threads = 2; - int n_nodes = 2; - int n_multiplier = 1; // trunks per thread. - int n_compute_units = 2; + int n_threads = 1; + int n_nodes = 1; + int n_multiplier = 2; // equivalent to 1 test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); } if (true) { - int n_threads = 4; + int n_threads = 2; int n_nodes = 2; - int n_multiplier = 8; // trunks per thread. - int n_compute_units = 32; + int n_multiplier = 1; + + test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); + } + + if (true) { + int n_threads = 2; + int n_nodes = 2; + int n_multiplier = 8; test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); }