diff --git a/ggml.c b/ggml.c index dc084e6b6..e72ac70fb 100644 --- a/ggml.c +++ b/ggml.c @@ -2739,9 +2739,10 @@ struct ggml_context_container { // enum ggml_task_type { - GGML_TASK_INIT = 0, - GGML_TASK_COMPUTE, - GGML_TASK_FINALIZE, + GGML_TASK_UNKNOWN = 0, + GGML_TASK_INIT = 1, + GGML_TASK_COMPUTE = 2, + GGML_TASK_FINALIZE = 4, }; struct ggml_compute_params { @@ -9291,14 +9292,26 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { break; } + int type = state->params.type; if (state->node) { if (state->params.ith < state->params.nth) { - ggml_compute_forward(&state->params, state->node); + + if (type & GGML_TASK_INIT) + { + state->params.type = GGML_TASK_INIT; + ggml_compute_forward(&state->params, state->node); + } + + if (type & GGML_TASK_COMPUTE) + { + state->params.type = GGML_TASK_COMPUTE; + ggml_compute_forward(&state->params, state->node); + } } state->node = NULL; } else { - break; + continue; } } @@ -9556,6 +9569,11 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) struct ggml_tensor * node = cgraph->nodes[i]; + // this node is caculated already + if (node->n_tasks == 0) { + continue; + } + // TODO: this could be used to avoid unnecessary computations, but it needs to be improved //if (node->grad == NULL && node->perf_runs > 0) { // continue; @@ -9575,6 +9593,8 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) ggml_compute_forward(¶ms, node); + int dispath_threads = 0; + // COMPUTE if (node->n_tasks > 1) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { @@ -9606,13 +9626,79 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) } atomic_store(&state_shared.has_work, true); + + } else { + int start = i; + int end = i + 1; + while (end < cgraph->n_nodes && dispath_threads < n_threads && (end - start) < 4) + { + struct ggml_tensor * next = cgraph->nodes[end]; + end++; + + if (next->n_tasks != 1) + continue; + + // check src depedency + bool is_dep = false; + for (int k = start; k < end; k++) + { + struct ggml_tensor * node = cgraph->nodes[k]; + if (next->src0 == node || next->src1 == node) + { + is_dep = true; + break; + } + } + + if (is_dep) + continue; + + workers[dispath_threads].params = (struct ggml_compute_params) { + .type = GGML_TASK_COMPUTE | GGML_TASK_INIT, + .ith = 0, + .nth = 1, + .wsize = 0, // has to be 0 for single thread op + .wdata = NULL, // has to be NULL for single thread op + }; + workers[dispath_threads].node = next; + + next->n_tasks = 0; // indicate this node is caculated + dispath_threads++; + } + + if (dispath_threads > 0) + { + if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { + atomic_store(&state_shared.has_work, false); + } + + while (atomic_load(&state_shared.has_work)) { + ggml_lock_lock (&state_shared.spin); + ggml_lock_unlock(&state_shared.spin); + } + + for (int j = dispath_threads; j < n_threads - 1; j++) + { + workers[j].node = NULL; + } + + atomic_fetch_sub(&state_shared.n_ready, 1); + + while (atomic_load(&state_shared.n_ready) > 0) + { + ggml_lock_lock(&state_shared.spin); + ggml_lock_unlock(&state_shared.spin); + } + + atomic_store(&state_shared.has_work, true); + } } params.type = GGML_TASK_COMPUTE; ggml_compute_forward(¶ms, node); // wait for thread pool - if (node->n_tasks > 1) { + if (node->n_tasks > 1 || dispath_threads > 0) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { atomic_store(&state_shared.has_work, false); } @@ -9630,6 +9716,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) } } +#if 0 // FINALIZE if (node->n_tasks > 1) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { @@ -9684,7 +9771,7 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) ggml_lock_unlock(&state_shared.spin); } } - +#endif // performance stats (node) { int64_t perf_cycles_cur = ggml_perf_cycles() - perf_node_start_cycles;