/* PIKA - Photo and Image Kooker Application * a rebranding of The GNU Image Manipulation Program (created with heckimp) * A derived work which may be trivial. However, any changes may be (C)2023 by Aldercone Studio * * Original copyright, applying to most contents (license remains unchanged): * Copyright (C) 1995 Spencer Kimball and Peter Mattis * * pika-parallel.c * Copyright (C) 2018 Ell * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "config.h" #include #include #ifdef HAVE_UNISTD_H #include #endif #ifdef G_OS_WIN32 #include #endif extern "C" { #include "core-types.h" #include "config/pikageglconfig.h" #include "pika.h" #include "pika-parallel.h" #include "pikaasync.h" #include "pikacancelable.h" #define PIKA_PARALLEL_MAX_THREADS 64 #define PIKA_PARALLEL_RUN_ASYNC_MAX_THREADS 1 typedef struct { PikaAsync *async; gint priority; PikaRunAsyncFunc func; gpointer user_data; GDestroyNotify user_data_destroy_func; } PikaParallelRunAsyncTask; typedef struct { GThread *thread; gboolean quit; PikaAsync *current_async; } PikaParallelRunAsyncThread; /* local function prototypes */ static void pika_parallel_notify_num_processors (PikaGeglConfig *config); static void pika_parallel_set_n_threads (gint n_threads, gboolean finish_tasks); static void pika_parallel_run_async_set_n_threads (gint n_threads, gboolean finish_tasks); static gpointer pika_parallel_run_async_thread_func (PikaParallelRunAsyncThread *thread); static void pika_parallel_run_async_enqueue_task (PikaParallelRunAsyncTask *task); static PikaParallelRunAsyncTask * pika_parallel_run_async_dequeue_task (void); static gboolean pika_parallel_run_async_execute_task (PikaParallelRunAsyncTask *task); static void pika_parallel_run_async_abort_task (PikaParallelRunAsyncTask *task); static void pika_parallel_run_async_cancel (PikaAsync *async); static void pika_parallel_run_async_waiting (PikaAsync *async); /* local variables */ static gint pika_parallel_run_async_n_threads = 0; static PikaParallelRunAsyncThread pika_parallel_run_async_threads[PIKA_PARALLEL_RUN_ASYNC_MAX_THREADS]; static GMutex pika_parallel_run_async_mutex; static GCond pika_parallel_run_async_cond; static GQueue pika_parallel_run_async_queue = G_QUEUE_INIT; /* public functions */ void pika_parallel_init (Pika *pika) { PikaGeglConfig *config; g_return_if_fail (PIKA_IS_PIKA (pika)); config = PIKA_GEGL_CONFIG (pika->config); g_signal_connect (config, "notify::num-processors", G_CALLBACK (pika_parallel_notify_num_processors), NULL); pika_parallel_notify_num_processors (config); } void pika_parallel_exit (Pika *pika) { g_return_if_fail (PIKA_IS_PIKA (pika)); g_signal_handlers_disconnect_by_func (pika->config, (gpointer) pika_parallel_notify_num_processors, NULL); /* stop all threads */ pika_parallel_set_n_threads (0, /* finish_tasks = */ FALSE); } PikaAsync * pika_parallel_run_async (PikaRunAsyncFunc func, gpointer user_data) { return pika_parallel_run_async_full (0, func, user_data, NULL); } PikaAsync * pika_parallel_run_async_full (gint priority, PikaRunAsyncFunc func, gpointer user_data, GDestroyNotify user_data_destroy_func) { PikaAsync *async; PikaParallelRunAsyncTask *task; g_return_val_if_fail (func != NULL, NULL); async = pika_async_new (); task = g_slice_new (PikaParallelRunAsyncTask); task->async = PIKA_ASYNC (g_object_ref (async)); task->priority = priority; task->func = func; task->user_data = user_data; task->user_data_destroy_func = user_data_destroy_func; if (pika_parallel_run_async_n_threads > 0) { g_signal_connect_after (async, "cancel", G_CALLBACK (pika_parallel_run_async_cancel), NULL); g_signal_connect_after (async, "waiting", G_CALLBACK (pika_parallel_run_async_waiting), NULL); g_mutex_lock (&pika_parallel_run_async_mutex); pika_parallel_run_async_enqueue_task (task); g_cond_signal (&pika_parallel_run_async_cond); g_mutex_unlock (&pika_parallel_run_async_mutex); } else { while (pika_parallel_run_async_execute_task (task)); } return async; } PikaAsync * pika_parallel_run_async_independent (PikaRunAsyncFunc func, gpointer user_data) { return pika_parallel_run_async_independent_full (0, func, user_data); } PikaAsync * pika_parallel_run_async_independent_full (gint priority, PikaRunAsyncFunc func, gpointer user_data) { PikaAsync *async; PikaParallelRunAsyncTask *task; GThread *thread; g_return_val_if_fail (func != NULL, NULL); async = pika_async_new (); task = g_slice_new0 (PikaParallelRunAsyncTask); task->async = PIKA_ASYNC (g_object_ref (async)); task->priority = priority; task->func = func; task->user_data = user_data; thread = g_thread_new ( "async-ind", [] (gpointer data) -> gpointer { PikaParallelRunAsyncTask *task = (PikaParallelRunAsyncTask *) data; /* adjust the thread's priority */ #if defined (G_OS_WIN32) if (task->priority < 0) { SetThreadPriority (GetCurrentThread (), THREAD_PRIORITY_ABOVE_NORMAL); } else if (task->priority > 0) { SetThreadPriority (GetCurrentThread (), THREAD_MODE_BACKGROUND_BEGIN); } #elif defined (HAVE_UNISTD_H) && defined (__gnu_linux__) if (task->priority) { nice (task->priority); } #endif while (pika_parallel_run_async_execute_task (task)); return NULL; }, task); pika_async_add_callback (async, [] (PikaAsync *async, gpointer thread) { g_thread_join ((GThread *) thread); }, thread); return async; } /* private functions */ static void pika_parallel_notify_num_processors (PikaGeglConfig *config) { pika_parallel_set_n_threads (config->num_processors, /* finish_tasks = */ TRUE); } static void pika_parallel_set_n_threads (gint n_threads, gboolean finish_tasks) { pika_parallel_run_async_set_n_threads (n_threads, finish_tasks); } static void pika_parallel_run_async_set_n_threads (gint n_threads, gboolean finish_tasks) { gint i; n_threads = CLAMP (n_threads, 0, PIKA_PARALLEL_RUN_ASYNC_MAX_THREADS); if (n_threads > pika_parallel_run_async_n_threads) /* need more threads */ { for (i = pika_parallel_run_async_n_threads; i < n_threads; i++) { PikaParallelRunAsyncThread *thread = &pika_parallel_run_async_threads[i]; thread->quit = FALSE; thread->thread = g_thread_new ( "async", (GThreadFunc) pika_parallel_run_async_thread_func, thread); } } else if (n_threads < pika_parallel_run_async_n_threads) /* need less threads */ { g_mutex_lock (&pika_parallel_run_async_mutex); for (i = n_threads; i < pika_parallel_run_async_n_threads; i++) { PikaParallelRunAsyncThread *thread = &pika_parallel_run_async_threads[i]; thread->quit = TRUE; if (thread->current_async && ! finish_tasks) pika_cancelable_cancel (PIKA_CANCELABLE (thread->current_async)); } g_cond_broadcast (&pika_parallel_run_async_cond); g_mutex_unlock (&pika_parallel_run_async_mutex); for (i = n_threads; i < pika_parallel_run_async_n_threads; i++) { PikaParallelRunAsyncThread *thread = &pika_parallel_run_async_threads[i]; g_thread_join (thread->thread); } } pika_parallel_run_async_n_threads = n_threads; if (n_threads == 0) { PikaParallelRunAsyncTask *task; /* finish remaining tasks */ while ((task = pika_parallel_run_async_dequeue_task ())) { if (finish_tasks) while (pika_parallel_run_async_execute_task (task)); else pika_parallel_run_async_abort_task (task); } } } static gpointer pika_parallel_run_async_thread_func (PikaParallelRunAsyncThread *thread) { g_mutex_lock (&pika_parallel_run_async_mutex); while (TRUE) { PikaParallelRunAsyncTask *task; while (! thread->quit && (task = pika_parallel_run_async_dequeue_task ())) { gboolean resume; thread->current_async = PIKA_ASYNC (g_object_ref (task->async)); do { g_mutex_unlock (&pika_parallel_run_async_mutex); resume = pika_parallel_run_async_execute_task (task); g_mutex_lock (&pika_parallel_run_async_mutex); } while (resume && (g_queue_is_empty (&pika_parallel_run_async_queue) || task->priority < ((PikaParallelRunAsyncTask *) g_queue_peek_head ( &pika_parallel_run_async_queue))->priority)); g_clear_object (&thread->current_async); if (resume) pika_parallel_run_async_enqueue_task (task); } if (thread->quit) break; g_cond_wait (&pika_parallel_run_async_cond, &pika_parallel_run_async_mutex); } g_mutex_unlock (&pika_parallel_run_async_mutex); return NULL; } static void pika_parallel_run_async_enqueue_task (PikaParallelRunAsyncTask *task) { GList *link; GList *iter; if (pika_async_is_canceled (task->async)) { pika_parallel_run_async_abort_task (task); return; } link = g_list_alloc (); link->data = task; g_object_set_data (G_OBJECT (task->async), "pika-parallel-run-async-link", link); for (iter = g_queue_peek_tail_link (&pika_parallel_run_async_queue); iter; iter = g_list_previous (iter)) { PikaParallelRunAsyncTask *other_task = (PikaParallelRunAsyncTask *) iter->data; if (other_task->priority <= task->priority) break; } if (iter) { link->prev = iter; link->next = iter->next; iter->next = link; if (link->next) link->next->prev = link; else pika_parallel_run_async_queue.tail = link; pika_parallel_run_async_queue.length++; } else { g_queue_push_head_link (&pika_parallel_run_async_queue, link); } } static PikaParallelRunAsyncTask * pika_parallel_run_async_dequeue_task (void) { PikaParallelRunAsyncTask *task; task = (PikaParallelRunAsyncTask *) g_queue_pop_head ( &pika_parallel_run_async_queue); if (task) { g_object_set_data (G_OBJECT (task->async), "pika-parallel-run-async-link", NULL); } return task; } static gboolean pika_parallel_run_async_execute_task (PikaParallelRunAsyncTask *task) { if (pika_async_is_canceled (task->async)) { pika_parallel_run_async_abort_task (task); return FALSE; } task->func (task->async, task->user_data); if (pika_async_is_stopped (task->async)) { g_object_unref (task->async); g_slice_free (PikaParallelRunAsyncTask, task); return FALSE; } return TRUE; } static void pika_parallel_run_async_abort_task (PikaParallelRunAsyncTask *task) { if (task->user_data && task->user_data_destroy_func) task->user_data_destroy_func (task->user_data); pika_async_abort (task->async); g_object_unref (task->async); g_slice_free (PikaParallelRunAsyncTask, task); } static void pika_parallel_run_async_cancel (PikaAsync *async) { GList *link; PikaParallelRunAsyncTask *task = NULL; link = (GList *) g_object_get_data (G_OBJECT (async), "pika-parallel-run-async-link"); if (! link) return; g_mutex_lock (&pika_parallel_run_async_mutex); link = (GList *) g_object_get_data (G_OBJECT (async), "pika-parallel-run-async-link"); if (link) { g_object_set_data (G_OBJECT (async), "pika-parallel-run-async-link", NULL); task = (PikaParallelRunAsyncTask *) link->data; g_queue_delete_link (&pika_parallel_run_async_queue, link); } g_mutex_unlock (&pika_parallel_run_async_mutex); if (task) pika_parallel_run_async_abort_task (task); } static void pika_parallel_run_async_waiting (PikaAsync *async) { GList *link; link = (GList *) g_object_get_data (G_OBJECT (async), "pika-parallel-run-async-link"); if (! link) return; g_mutex_lock (&pika_parallel_run_async_mutex); link = (GList *) g_object_get_data (G_OBJECT (async), "pika-parallel-run-async-link"); if (link) { PikaParallelRunAsyncTask *task = (PikaParallelRunAsyncTask *) link->data; task->priority = G_MININT; g_queue_unlink (&pika_parallel_run_async_queue, link); g_queue_push_head_link (&pika_parallel_run_async_queue, link); } g_mutex_unlock (&pika_parallel_run_async_mutex); } } /* extern "C" */