corner case: when nth is 1, n_multiplier should be 1

This commit is contained in:
mqy 2023-06-28 17:00:34 +08:00
parent 4afb12fbb3
commit 76e5e2719d

View File

@ -1,12 +1,12 @@
// https://github.com/ggerganov/ggml/issues/291 // https://github.com/ggerganov/ggml/issues/291
// https://github.com/ggerganov/llama.cpp/pull/1507 // https://github.com/ggerganov/llama.cpp/pull/1507
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
#include <stdbool.h>
#include <stdint.h>
#if defined(_MSC_VER) || defined(__MINGW32__) #if defined(_MSC_VER) || defined(__MINGW32__)
#include <malloc.h> // using malloc.h with MSC/MINGW #include <malloc.h> // using malloc.h with MSC/MINGW
@ -132,30 +132,36 @@ static void task_allocator_reset(struct task_allocator *a) {
atomic_store(&a->global_counter, 0); atomic_store(&a->global_counter, 0);
} }
// NOTE: when nth == 1, n_multiplier is actually useless.
static void task_allocator_init(struct task_allocator *a, int nth, static void task_allocator_init(struct task_allocator *a, int nth,
int n_multiplier) { int n_multiplier) {
GGML_ASSERT(nth > 0);
GGML_ASSERT(nth <= MAX_THREADS); GGML_ASSERT(nth <= MAX_THREADS);
GGML_ASSERT(n_multiplier > 0);
a->nth = nth; a->nth = nth;
a->n_multiplier = n_multiplier; a->n_multiplier = nth == 1 ? 1 : n_multiplier;
task_allocator_reset(a); task_allocator_reset(a);
} }
// ith: worker id (start from 0).
// chunk_idx and n_chunks will be updated.
// chunk_idx is set as -1 when nothing to do.
static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx,
int *n_chunks) { int *n_chunks) {
GGML_ASSERT(a->nth > 0); GGML_ASSERT(ith >= 0 && ith < a->nth);
GGML_ASSERT(a->n_multiplier > 0);
*chunk_idx = -1;
*n_chunks = 0;
while (atomic_fetch_add(&a->lock, 1) != 0) { // lock
atomic_fetch_sub(&a->lock, 1);
}
int M = a->n_multiplier; int M = a->n_multiplier;
int nth = a->nth; int nth = a->nth;
int total_chunks = M * nth; int total_chunks = M * nth;
*chunk_idx = -1;
*n_chunks = total_chunks;
while (atomic_fetch_add(&a->lock, 1) != 0) { // lock
atomic_fetch_sub(&a->lock, 1);
}
// all assigned? // all assigned?
if (atomic_load(&a->global_counter) == total_chunks) { if (atomic_load(&a->global_counter) == total_chunks) {
GGML_PRINT_DEBUG("[#_%d] %s(): nothing to do.\n", ith, __func__); GGML_PRINT_DEBUG("[#_%d] %s(): nothing to do.\n", ith, __func__);
@ -177,7 +183,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx,
atomic_fetch_add(&a->thread_queue_heads[ith], 1); atomic_fetch_add(&a->thread_queue_heads[ith], 1);
atomic_fetch_add(&a->global_counter, 1); atomic_fetch_add(&a->global_counter, 1);
GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th trunk of its own.\n", GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th chunk of its own.\n",
ith, __func__, head + 1); ith, __func__, head + 1);
*chunk_idx = idx; *chunk_idx = idx;
@ -188,6 +194,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx,
} }
// steal from others. // steal from others.
// TODO: optimize: steal from the slowest one.
for (int i = 0; i < nth; ++i) { for (int i = 0; i < nth; ++i) {
if (i == ith) { if (i == ith) {
continue; continue;
@ -203,7 +210,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx,
atomic_fetch_sub(&a->thread_queue_tails[i], 1); atomic_fetch_sub(&a->thread_queue_tails[i], 1);
atomic_fetch_add(&a->global_counter, 1); atomic_fetch_add(&a->global_counter, 1);
GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th trunk from #_%d\n", ith, GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th chunk from #_%d\n", ith,
__func__, tail, i); __func__, tail, i);
*chunk_idx = idx; *chunk_idx = idx;
@ -261,7 +268,7 @@ void compute_tensor(struct params params, struct ggml_tensor *node) {
while (true) { while (true) {
allocate_chunk(params.task_allocator, ith, &chunk_idx, &n_chunks); allocate_chunk(params.task_allocator, ith, &chunk_idx, &n_chunks);
if (chunk_idx < 0 || n_chunks <= 0) { if (chunk_idx < 0) {
break; break;
} }
@ -339,6 +346,144 @@ static thread_ret_t demo_compute_thread(void *data) {
return 0; return 0;
} }
static void test_task_allocator_init(void) {
struct task_allocator a;
task_allocator_init(&a, 1, 2);
GGML_ASSERT(a.nth == 1);
GGML_ASSERT(a.n_multiplier == 1); // when nth == 1, force n_multiplier as 1
task_allocator_init(&a, 2, 2);
GGML_ASSERT(a.nth == 2);
GGML_ASSERT(a.n_multiplier == 2); // ok
}
static void task_allocator_unit_test_no_steal(void) {
int chunk_idx; // out
int n_chunks; // out
int n_threads = 2;
int n_multiplier = 2;
const int expected_n_slots = n_threads * n_multiplier;
struct task_allocator a;
task_allocator_init(&a, n_threads, n_multiplier);
struct test_data_t {
int ith; // call by
int chunk_idx; // expected
int n_chunks; // expected
};
struct test_data_t test_data[] = {
//////////////////// clang format /////////////////////////
{
.ith = 0,
.chunk_idx = 0,
},
{
.ith = 1,
.chunk_idx = 2,
},
{
.ith = 0,
.chunk_idx = 1,
},
{
.ith = 1,
.chunk_idx = 3,
},
{
.ith = 0,
.chunk_idx = -1,
},
{
.ith = 1,
.chunk_idx = -1,
}};
int t_len = sizeof(test_data) / sizeof(struct test_data_t);
for (int i = 0; i < t_len; i++) {
allocate_chunk(&a, test_data[i].ith, &chunk_idx, &n_chunks);
if (chunk_idx != test_data[i].chunk_idx) {
fprintf(stderr,
"%s(): chunk_idx mismatch. i: %d, actual: %d, expected: %d\n",
__func__, i, chunk_idx, test_data[i].chunk_idx);
abort();
}
if (n_chunks != expected_n_slots) {
fprintf(stderr,
"%s(): n_chunks mismatch. i: %d, actual: %d, expected: %d\n",
__func__, i, n_chunks, expected_n_slots);
abort();
}
}
}
static void task_allocator_unit_test_steal(void) {
int chunk_idx; // out
int n_chunks; // out
int n_threads = 2;
int n_multiplier = 2;
const int expected_n_slots = n_threads * n_multiplier;
struct task_allocator a;
task_allocator_init(&a, n_threads, n_multiplier);
struct test_data_t {
int ith; // call by
int chunk_idx; // expected
};
struct test_data_t test_data[] = {
//////////////////// clang format /////////////////////////
{
.ith = 0,
.chunk_idx = 0,
},
{
.ith = 0,
.chunk_idx = 1,
},
{
.ith = 1,
.chunk_idx = 2,
},
{
.ith = 0,
.chunk_idx = 4, // steal from tail
},
{
.ith = 0,
.chunk_idx = -1,
},
{
.ith = 1,
.chunk_idx = -1,
}};
int t_len = sizeof(test_data) / sizeof(struct test_data_t);
for (int i = 0; i < t_len; i++) {
allocate_chunk(&a, test_data[i].ith, &chunk_idx, &n_chunks);
if (chunk_idx != test_data[i].chunk_idx) {
fprintf(stderr,
"%s(): chunk_idx mismatch. i: %d, actual: %d, expected: %d\n",
__func__, i, chunk_idx, test_data[i].chunk_idx);
abort();
}
if (n_chunks != expected_n_slots) {
fprintf(stderr,
"%s(): n_chunks mismatch. i: %d, actual: %d, expected: %d\n",
__func__, i, n_chunks, expected_n_slots);
abort();
}
}
}
// Integration test.
static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units, static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units,
int n_multiplier) { int n_multiplier) {
fprintf(stderr, fprintf(stderr,
@ -386,36 +531,40 @@ static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units,
// B can steal a chunk from A only if T_a > T_b + T_b_per_chunk. // B can steal a chunk from A only if T_a > T_b + T_b_per_chunk.
// - Saw this situation: A steal B, B steal C. // - Saw this situation: A steal B, B steal C.
// - n_chunks plays a key role, similar to choosing the best n_threads, it's // - n_chunks plays a key role, similar to choosing the best n_threads, it's
// difficult choose the ideal n_chunks value. Performance drops with per-chunk // difficult to choose the ideal n_chunks value. Performance drops when
// compute time exceeds the scheduling overhead. // per-chunk compute time exceeds the scheduling overhead.
// - Work stealing chunked task allocator can save the response time // - Work stealing chunked task allocator can save the response time
// significantly when the majority threads runs fast but a few suddenly or // significantly when the majority threads runs fast but a few suddenly or
// constantly slow. // constantly slow.
// //
int main(void) { int main(void) {
if (false) { // the most simple one: only main thread, one node test_task_allocator_init();
int n_threads = 1; task_allocator_unit_test_no_steal();
int n_nodes = 1; task_allocator_unit_test_steal();
int n_multiplier = 1; // trunks per thread.
int n_compute_units = 1;
test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); // Integration tests
} const int n_compute_units = 64;
if (false) { if (false) {
int n_threads = 2; int n_threads = 1;
int n_nodes = 2; int n_nodes = 1;
int n_multiplier = 1; // trunks per thread. int n_multiplier = 2; // equivalent to 1
int n_compute_units = 2;
test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier);
} }
if (true) { if (true) {
int n_threads = 4; int n_threads = 2;
int n_nodes = 2; int n_nodes = 2;
int n_multiplier = 8; // trunks per thread. int n_multiplier = 1;
int n_compute_units = 32;
test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier);
}
if (true) {
int n_threads = 2;
int n_nodes = 2;
int n_multiplier = 8;
test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier);
} }