diff --git a/ggml.c b/ggml.c index 8a60bc383..43039c098 100644 --- a/ggml.c +++ b/ggml.c @@ -35,12 +35,21 @@ #include #endif +// if C11 or above use stdatomic.h +#if __STDC_VERSION__ >= 201112L +#include +#else typedef volatile LONG atomic_int; +typedef volatile char* atomic_uintptr_t; typedef atomic_int atomic_bool; static void atomic_store(atomic_int* ptr, LONG val) { InterlockedExchange(ptr, val); } +static int atomic_compare_exchange_strong(atomic_int* ptr, int* expected, int desired) { + int old_val = InterlockedCompareExchange(ptr, desired, *expected); + return old_val == *expected; +} static LONG atomic_load(atomic_int* ptr) { return InterlockedCompareExchange(ptr, 0, 0); } @@ -50,6 +59,7 @@ static LONG atomic_fetch_add(atomic_int* ptr, LONG inc) { static LONG atomic_fetch_sub(atomic_int* ptr, LONG dec) { return atomic_fetch_add(ptr, -(dec)); } +#endif typedef HANDLE pthread_t; @@ -2929,6 +2939,116 @@ static inline int ggml_up(int n, int m) { //////////////////////////////////////////////////////////////////////////////// +// +// thread data +// + +#if defined _MSC_VER || defined(__MINGW32__) +// (Windows headers included above) + +struct ggml_lock_t { + CRITICAL_SECTION CritSection; + CONDITION_VARIABLE ConditionVar; +}; +void ggml_lock_init(struct ggml_lock_t* lock) { + memset(lock, 0, sizeof(lock)); + InitializeCriticalSection(&lock->CritSection); + InitializeConditionVariable(&lock->ConditionVar); +} +void ggml_lock_destroy(struct ggml_lock_t* lock) { + DeleteCriticalSection(&lock->CritSection); + // condition variable has no deleter +} +void ggml_lock_notify_all(struct ggml_lock_t* lock) { + EnterCriticalSection(&lock->CritSection); + WakeAllConditionVariable(&lock->ConditionVar); + LeaveCriticalSection(&lock->CritSection); +} +void ggml_lock_wait_while(struct ggml_lock_t* lock, bool(*while_what_predicate)(void*), void* params) { + EnterCriticalSection(&lock->CritSection); + while (while_what_predicate(params)) { + SleepConditionVariableCS(&lock->ConditionVar, &lock->CritSection, INFINITE); + } + LeaveCriticalSection(&lock->CritSection); +} + +#else +// (pthread.h included above) + +struct ggml_lock_t { + pthread_mutex_t mut; + pthread_cond_t cv; +}; +void ggml_lock_init(struct ggml_lock_t* lock) { + memset(lock, 0, sizeof(lock)); + pthread_mutex_init(&lock->mut, NULL); + pthread_cond_init(&lock->cv, NULL); +} +void ggml_lock_destroy(struct ggml_lock_t* lock) { + pthread_mutex_destroy(&lock->mut); + pthread_cond_destroy(&lock->cv); +} +void ggml_lock_notify_all(struct ggml_lock_t* lock) { + pthread_mutex_lock(&lock->mut); + pthread_cond_broadcast(&lock->cv); + pthread_mutex_unlock(&lock->mut); +} +void ggml_lock_wait_while(struct ggml_lock_t* lock, bool(*while_what_predicate)(void*), void* params) { + pthread_mutex_lock(&lock->mut); + while (while_what_predicate(params)) { + pthread_cond_wait(&lock->cv, &lock->mut); + } + pthread_mutex_unlock(&lock->mut); +} + +#endif + +typedef pthread_t ggml_thread_t; + +//static_assert(sizeof(ggml_thread_t) <= sizeof(atomic_uintptr_t)); +//struct ggml_thread_pool { +// atomic_uintptr_t threads[GGML_MAX_THREADS]; +// bool used_flags[GGML_MAX_THREADS]; +// atomic_int free_threads; +// atomic_int free_slot_magic_val; +//}; +// +//static struct ggml_thread_pool g_thread_pool; + +static void ggml_thread_pool_init() { + //atomic_store(&g_thread_pool.free_slot_magic_val, 1); +} +static void ggml_thread_pool_cleanup() { + +} +static int ggml_thread_create(pthread_t* out, void* unused, thread_ret_t(*func)(void*), void* arg) { + //if (!atomic_fetch_sub(&g_thread_pool.free_threads, 1) <= 0) { + // uintptr_t thread_slot_magic = atomic_fetch_add(&g_thread_pool.free_slot_magic_val, 1); + // for (int i = 0; i < GGML_MAX_THREADS; ++i) { + // int expected = 0; + // if (atomic_compare_exchange_strong(&g_thread_pool.threads[i], &expected, thread_slot_magic)) { + // atomic_uintptr_t new_thread = pthread_create(out, unused, func, arg); + // atomic_store(&g_thread_pool.threads[i], new_thread); + // atomic_fetch_add(&g_thread_pool.free_threads, 1); + // g_thread_pool.used_flags[i] = true; + // return new_thread; + // } + // } + // GGML_PRINT_DEBUG("%s: no unused thread slot found\n", __func__); + // return NULL; + //} + + //for (int i = 0; i < GGML_MAX_THREADS; ++i) { + // + //} + return pthread_create(out, unused, func, arg); +}; +static int ggml_thread_join(pthread_t thread, void* unused) { + return pthread_join(thread, unused); +}; + +//////////////////////////////////////////////////////////////////////////////// + struct ggml_context * ggml_init(struct ggml_init_params params) { // make this function thread safe ggml_critical_section_start(); @@ -2975,6 +3095,11 @@ struct ggml_context * ggml_init(struct ggml_init_params params) { GGML_PRINT_DEBUG("%s: g_state initialized in %f ms\n", __func__, (t_end - t_start)/1000.0f); } + // initialize thread pool + { + ggml_thread_pool_init(); + } + is_first_call = false; } @@ -3023,11 +3148,16 @@ struct ggml_context * ggml_init(struct ggml_init_params params) { return ctx; } +static void ggml_at_exit() { + ggml_thread_pool_cleanup(); +} + void ggml_free(struct ggml_context * ctx) { // make this function thread safe ggml_critical_section_start(); bool found = false; + bool is_last_context = true; for (int i = 0; i < GGML_MAX_CONTEXTS; i++) { if (&g_state.contexts[i].context == ctx) { @@ -3049,7 +3179,9 @@ void ggml_free(struct ggml_context * ctx) { } found = true; - break; + } + else if (&g_state.contexts[i].context) { + is_last_context = false; } } @@ -3057,6 +3189,10 @@ void ggml_free(struct ggml_context * ctx) { GGML_PRINT_DEBUG("%s: context not found\n", __func__); } + if (is_last_context) { + ggml_at_exit(); + } + ggml_critical_section_end(); } @@ -9180,67 +9316,8 @@ struct ggml_cgraph ggml_build_backward(struct ggml_context * ctx, struct ggml_cg return result; } -// -// thread data -// -// synchronization is done via busy loops -// I tried using spin locks, but not sure how to use them correctly - the things I tried were slower than busy loops -// - -#ifdef __APPLE__ - -//#include -// -//typedef os_unfair_lock ggml_lock_t; -// -//#define ggml_lock_init(x) UNUSED(x) -//#define ggml_lock_destroy(x) UNUSED(x) -//#define ggml_lock_lock os_unfair_lock_lock -//#define ggml_lock_unlock os_unfair_lock_unlock -// -//#define GGML_LOCK_INITIALIZER OS_UNFAIR_LOCK_INIT - -typedef int ggml_lock_t; - -#define ggml_lock_init(x) UNUSED(x) -#define ggml_lock_destroy(x) UNUSED(x) -#define ggml_lock_lock(x) UNUSED(x) -#define ggml_lock_unlock(x) UNUSED(x) - -#define GGML_LOCK_INITIALIZER 0 - -typedef pthread_t ggml_thread_t; - -#define ggml_thread_create pthread_create -#define ggml_thread_join pthread_join - -#else - -//typedef pthread_spinlock_t ggml_lock_t; - -//#define ggml_lock_init(x) pthread_spin_init(x, PTHREAD_PROCESS_PRIVATE) -//#define ggml_lock_destroy pthread_spin_destroy -//#define ggml_lock_lock pthread_spin_lock -//#define ggml_lock_unlock pthread_spin_unlock - -typedef int ggml_lock_t; - -#define ggml_lock_init(x) UNUSED(x) -#define ggml_lock_destroy(x) UNUSED(x) -#define ggml_lock_lock(x) UNUSED(x) -#define ggml_lock_unlock(x) UNUSED(x) - -#define GGML_LOCK_INITIALIZER 0 - -typedef pthread_t ggml_thread_t; - -#define ggml_thread_create pthread_create -#define ggml_thread_join pthread_join - -#endif - struct ggml_compute_state_shared { - ggml_lock_t spin; + struct ggml_lock_t lock; int n_threads; @@ -9259,6 +9336,66 @@ struct ggml_compute_state { struct ggml_compute_state_shared * shared; }; +struct ggml_wait_while_predicate_params_t { + atomic_bool* condition; + bool invert; +}; + +static bool ggml_wait_while_predicate(void* cond_ptr) { + struct ggml_wait_while_predicate_params_t* pred = cond_ptr; + + if (pred->invert) { + return !atomic_load(pred->condition); + } + else { + return atomic_load(pred->condition); + } +} + +static bool ggml_wait_while(struct ggml_lock_t* lock, atomic_bool* condition, bool invert) { + struct ggml_wait_while_predicate_params_t params = { + .condition = condition, + .invert = invert + }; + ggml_lock_wait_while(lock, ggml_wait_while_predicate, ¶ms); +} + +struct ggml_wait_while_not_equal_params_t { + atomic_int* value; + int not_equal_to_what; +}; + +static bool ggml_wait_while_not_equal_predicate(void* cond_ptr) { + struct ggml_wait_while_not_equal_params_t* pred = cond_ptr; + return atomic_load(pred->value) != pred->not_equal_to_what; +} + +static bool ggml_wait_while_not_equal(struct ggml_lock_t* lock, atomic_int* value, int not_equal_to_what) { + struct ggml_wait_while_not_equal_params_t params = { + .value = value, + .not_equal_to_what = not_equal_to_what + }; + ggml_lock_wait_while(lock, ggml_wait_while_not_equal_predicate, ¶ms); +} + +struct ggml_wait_while_greater_than_params_t { + atomic_int* value; + int greater_than_what; +}; + +static bool ggml_wait_while_greater_than_predicate(void* cond_ptr) { + struct ggml_wait_while_greater_than_params_t* pred = cond_ptr; + return atomic_load(pred->value) > pred->greater_than_what; +} + +static bool ggml_wait_while_greater_than(struct ggml_lock_t* lock, atomic_int* value, int greater_than_what) { + struct ggml_wait_while_greater_than_params_t params = { + .value = value, + .greater_than_what = greater_than_what + }; + ggml_lock_wait_while(lock, ggml_wait_while_greater_than_predicate, ¶ms); +} + static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; @@ -9268,29 +9405,20 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { if (atomic_fetch_add(&state->shared->n_ready, 1) == n_threads - 1) { atomic_store(&state->shared->has_work, false); } else { - while (atomic_load(&state->shared->has_work)) { - if (atomic_load(&state->shared->stop)) { - return 0; - } - ggml_lock_lock (&state->shared->spin); - ggml_lock_unlock(&state->shared->spin); + ggml_wait_while(&state->shared->lock, &state->shared->has_work, false); + if (atomic_load(&state->shared->stop)) { + return 0; } } atomic_fetch_sub(&state->shared->n_ready, 1); + ggml_lock_notify_all(&state->shared->lock); // wait for work - while (!atomic_load(&state->shared->has_work)) { - if (atomic_load(&state->shared->stop)) { - return 0; - } - ggml_lock_lock (&state->shared->spin); - ggml_lock_unlock(&state->shared->spin); - } + ggml_wait_while(&state->shared->lock, &state->shared->has_work, true); - // check if we should stop if (atomic_load(&state->shared->stop)) { - break; + return 0; } if (state->node) { @@ -9311,7 +9439,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) const int n_threads = cgraph->n_threads; struct ggml_compute_state_shared state_shared = { - /*.spin =*/ GGML_LOCK_INITIALIZER, + /*.lock =*/ {}, /*.n_threads =*/ n_threads, /*.n_ready =*/ 0, /*.has_work =*/ false, @@ -9321,7 +9449,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) // create thread pool if (n_threads > 1) { - ggml_lock_init(&state_shared.spin); + ggml_lock_init(&state_shared.lock); atomic_store(&state_shared.has_work, true); @@ -9581,12 +9709,10 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) if (node->n_tasks > 1) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { atomic_store(&state_shared.has_work, false); + ggml_lock_notify_all(&state_shared.lock); } - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + ggml_wait_while(&state_shared.lock, &state_shared.has_work, false); // launch thread pool for (int j = 0; j < n_threads - 1; j++) { @@ -9602,12 +9728,10 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) atomic_fetch_sub(&state_shared.n_ready, 1); - while (atomic_load(&state_shared.n_ready) > 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + ggml_wait_while_greater_than(&state_shared.lock, &state_shared.n_ready, 0); atomic_store(&state_shared.has_work, true); + ggml_lock_notify_all(&state_shared.lock); } params.type = GGML_TASK_COMPUTE; @@ -9617,31 +9741,24 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) if (node->n_tasks > 1) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { atomic_store(&state_shared.has_work, false); + ggml_lock_notify_all(&state_shared.lock); } - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + ggml_wait_while(&state_shared.lock, &state_shared.has_work, false); atomic_fetch_sub(&state_shared.n_ready, 1); - while (atomic_load(&state_shared.n_ready) != 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + ggml_wait_while_not_equal(&state_shared.lock, &state_shared.n_ready, 0); } // FINALIZE if (node->n_tasks > 1) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { atomic_store(&state_shared.has_work, false); + ggml_lock_notify_all(&state_shared.lock); } - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + ggml_wait_while(&state_shared.lock, &state_shared.has_work, false); // launch thread pool for (int j = 0; j < n_threads - 1; j++) { @@ -9657,12 +9774,10 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) atomic_fetch_sub(&state_shared.n_ready, 1); - while (atomic_load(&state_shared.n_ready) > 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + ggml_wait_while_greater_than(&state_shared.lock, &state_shared.n_ready, 0); atomic_store(&state_shared.has_work, true); + ggml_lock_notify_all(&state_shared.lock); } params.type = GGML_TASK_FINALIZE; @@ -9671,20 +9786,15 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) // wait for thread pool if (node->n_tasks > 1) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); + atomic_store(&state_shared.has_work, false); + ggml_lock_notify_all(&state_shared.lock); } - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + ggml_wait_while(&state_shared.lock, &state_shared.has_work, false); atomic_fetch_sub(&state_shared.n_ready, 1); - while (atomic_load(&state_shared.n_ready) != 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); - } + ggml_wait_while_not_equal(&state_shared.lock, &state_shared.n_ready, 0); } // performance stats (node) @@ -9702,6 +9812,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) if (n_threads > 1) { atomic_store(&state_shared.stop, true); atomic_store(&state_shared.has_work, true); + ggml_lock_notify_all(&state_shared.lock); for (int j = 0; j < n_threads - 1; j++) { int rc = ggml_thread_join(workers[j].thrd, NULL); @@ -9709,7 +9820,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) UNUSED(rc); } - ggml_lock_destroy(&state_shared.spin); + ggml_lock_destroy(&state_shared.lock); } // performance stats (graph)