Run several single thread operator in worker threads

This commit is contained in:
Howard Su 2023-04-08 20:46:11 +08:00
parent f2d1c47294
commit ac7a69fa33

99
ggml.c
View File

@ -2739,9 +2739,10 @@ struct ggml_context_container {
//
enum ggml_task_type {
GGML_TASK_INIT = 0,
GGML_TASK_COMPUTE,
GGML_TASK_FINALIZE,
GGML_TASK_UNKNOWN = 0,
GGML_TASK_INIT = 1,
GGML_TASK_COMPUTE = 2,
GGML_TASK_FINALIZE = 4,
};
struct ggml_compute_params {
@ -9291,14 +9292,26 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
break;
}
int type = state->params.type;
if (state->node) {
if (state->params.ith < state->params.nth) {
if (type & GGML_TASK_INIT)
{
state->params.type = GGML_TASK_INIT;
ggml_compute_forward(&state->params, state->node);
}
if (type & GGML_TASK_COMPUTE)
{
state->params.type = GGML_TASK_COMPUTE;
ggml_compute_forward(&state->params, state->node);
}
}
state->node = NULL;
} else {
break;
continue;
}
}
@ -9556,6 +9569,11 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
struct ggml_tensor * node = cgraph->nodes[i];
// this node is caculated already
if (node->n_tasks == 0) {
continue;
}
// TODO: this could be used to avoid unnecessary computations, but it needs to be improved
//if (node->grad == NULL && node->perf_runs > 0) {
// continue;
@ -9575,6 +9593,8 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
ggml_compute_forward(&params, node);
int dispath_threads = 0;
// COMPUTE
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
@ -9606,13 +9626,79 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
}
atomic_store(&state_shared.has_work, true);
} else {
int start = i;
int end = i + 1;
while (end < cgraph->n_nodes && dispath_threads < n_threads && (end - start) < 4)
{
struct ggml_tensor * next = cgraph->nodes[end];
end++;
if (next->n_tasks != 1)
continue;
// check src depedency
bool is_dep = false;
for (int k = start; k < end; k++)
{
struct ggml_tensor * node = cgraph->nodes[k];
if (next->src0 == node || next->src1 == node)
{
is_dep = true;
break;
}
}
if (is_dep)
continue;
workers[dispath_threads].params = (struct ggml_compute_params) {
.type = GGML_TASK_COMPUTE | GGML_TASK_INIT,
.ith = 0,
.nth = 1,
.wsize = 0, // has to be 0 for single thread op
.wdata = NULL, // has to be NULL for single thread op
};
workers[dispath_threads].node = next;
next->n_tasks = 0; // indicate this node is caculated
dispath_threads++;
}
if (dispath_threads > 0)
{
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
while (atomic_load(&state_shared.has_work)) {
ggml_lock_lock (&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
for (int j = dispath_threads; j < n_threads - 1; j++)
{
workers[j].node = NULL;
}
atomic_fetch_sub(&state_shared.n_ready, 1);
while (atomic_load(&state_shared.n_ready) > 0)
{
ggml_lock_lock(&state_shared.spin);
ggml_lock_unlock(&state_shared.spin);
}
atomic_store(&state_shared.has_work, true);
}
}
params.type = GGML_TASK_COMPUTE;
ggml_compute_forward(&params, node);
// wait for thread pool
if (node->n_tasks > 1) {
if (node->n_tasks > 1 || dispath_threads > 0) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}
@ -9630,6 +9716,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
}
}
#if 0
// FINALIZE
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
@ -9684,7 +9771,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph)
ggml_lock_unlock(&state_shared.spin);
}
}
#endif
// performance stats (node)
{
int64_t perf_cycles_cur = ggml_perf_cycles() - perf_node_start_cycles;