PIKApp/app/core/pika-parallel.cc

557 lines
15 KiB
C++
Raw Normal View History

2023-09-26 00:35:21 +02:00
/* PIKA - Photo and Image Kooker Application
* a rebranding of The GNU Image Manipulation Program (created with heckimp)
* A derived work which may be trivial. However, any changes may be (C)2023 by Aldercone Studio
*
* Original copyright, applying to most contents (license remains unchanged):
* Copyright (C) 1995 Spencer Kimball and Peter Mattis
*
* pika-parallel.c
* Copyright (C) 2018 Ell
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "config.h"
#include <gio/gio.h>
#include <gegl.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef G_OS_WIN32
#include <windows.h>
#endif
extern "C"
{
#include "core-types.h"
#include "config/pikageglconfig.h"
#include "pika.h"
#include "pika-parallel.h"
#include "pikaasync.h"
#include "pikacancelable.h"
#define PIKA_PARALLEL_MAX_THREADS 64
#define PIKA_PARALLEL_RUN_ASYNC_MAX_THREADS 1
typedef struct
{
PikaAsync *async;
gint priority;
PikaRunAsyncFunc func;
gpointer user_data;
GDestroyNotify user_data_destroy_func;
} PikaParallelRunAsyncTask;
typedef struct
{
GThread *thread;
gboolean quit;
PikaAsync *current_async;
} PikaParallelRunAsyncThread;
/* local function prototypes */
static void pika_parallel_notify_num_processors (PikaGeglConfig *config);
static void pika_parallel_set_n_threads (gint n_threads,
gboolean finish_tasks);
static void pika_parallel_run_async_set_n_threads (gint n_threads,
gboolean finish_tasks);
static gpointer pika_parallel_run_async_thread_func (PikaParallelRunAsyncThread *thread);
static void pika_parallel_run_async_enqueue_task (PikaParallelRunAsyncTask *task);
static PikaParallelRunAsyncTask * pika_parallel_run_async_dequeue_task (void);
static gboolean pika_parallel_run_async_execute_task (PikaParallelRunAsyncTask *task);
static void pika_parallel_run_async_abort_task (PikaParallelRunAsyncTask *task);
static void pika_parallel_run_async_cancel (PikaAsync *async);
static void pika_parallel_run_async_waiting (PikaAsync *async);
/* local variables */
static gint pika_parallel_run_async_n_threads = 0;
static PikaParallelRunAsyncThread pika_parallel_run_async_threads[PIKA_PARALLEL_RUN_ASYNC_MAX_THREADS];
static GMutex pika_parallel_run_async_mutex;
static GCond pika_parallel_run_async_cond;
static GQueue pika_parallel_run_async_queue = G_QUEUE_INIT;
/* public functions */
void
pika_parallel_init (Pika *pika)
{
PikaGeglConfig *config;
g_return_if_fail (PIKA_IS_PIKA (pika));
config = PIKA_GEGL_CONFIG (pika->config);
g_signal_connect (config, "notify::num-processors",
G_CALLBACK (pika_parallel_notify_num_processors),
NULL);
pika_parallel_notify_num_processors (config);
}
void
pika_parallel_exit (Pika *pika)
{
g_return_if_fail (PIKA_IS_PIKA (pika));
g_signal_handlers_disconnect_by_func (pika->config,
(gpointer) pika_parallel_notify_num_processors,
NULL);
/* stop all threads */
pika_parallel_set_n_threads (0, /* finish_tasks = */ FALSE);
}
PikaAsync *
pika_parallel_run_async (PikaRunAsyncFunc func,
gpointer user_data)
{
return pika_parallel_run_async_full (0, func, user_data, NULL);
}
PikaAsync *
pika_parallel_run_async_full (gint priority,
PikaRunAsyncFunc func,
gpointer user_data,
GDestroyNotify user_data_destroy_func)
{
PikaAsync *async;
PikaParallelRunAsyncTask *task;
g_return_val_if_fail (func != NULL, NULL);
async = pika_async_new ();
task = g_slice_new (PikaParallelRunAsyncTask);
task->async = PIKA_ASYNC (g_object_ref (async));
task->priority = priority;
task->func = func;
task->user_data = user_data;
task->user_data_destroy_func = user_data_destroy_func;
if (pika_parallel_run_async_n_threads > 0)
{
g_signal_connect_after (async, "cancel",
G_CALLBACK (pika_parallel_run_async_cancel),
NULL);
g_signal_connect_after (async, "waiting",
G_CALLBACK (pika_parallel_run_async_waiting),
NULL);
g_mutex_lock (&pika_parallel_run_async_mutex);
pika_parallel_run_async_enqueue_task (task);
g_cond_signal (&pika_parallel_run_async_cond);
g_mutex_unlock (&pika_parallel_run_async_mutex);
}
else
{
while (pika_parallel_run_async_execute_task (task));
}
return async;
}
PikaAsync *
pika_parallel_run_async_independent (PikaRunAsyncFunc func,
gpointer user_data)
{
return pika_parallel_run_async_independent_full (0, func, user_data);
}
PikaAsync *
pika_parallel_run_async_independent_full (gint priority,
PikaRunAsyncFunc func,
gpointer user_data)
{
PikaAsync *async;
PikaParallelRunAsyncTask *task;
GThread *thread;
g_return_val_if_fail (func != NULL, NULL);
async = pika_async_new ();
task = g_slice_new0 (PikaParallelRunAsyncTask);
task->async = PIKA_ASYNC (g_object_ref (async));
task->priority = priority;
task->func = func;
task->user_data = user_data;
thread = g_thread_new (
"async-ind",
[] (gpointer data) -> gpointer
{
PikaParallelRunAsyncTask *task = (PikaParallelRunAsyncTask *) data;
/* adjust the thread's priority */
#if defined (G_OS_WIN32)
if (task->priority < 0)
{
SetThreadPriority (GetCurrentThread (), THREAD_PRIORITY_ABOVE_NORMAL);
}
else if (task->priority > 0)
{
SetThreadPriority (GetCurrentThread (), THREAD_MODE_BACKGROUND_BEGIN);
}
#elif defined (HAVE_UNISTD_H) && defined (__gnu_linux__)
if (task->priority)
{
nice (task->priority);
}
#endif
while (pika_parallel_run_async_execute_task (task));
return NULL;
},
task);
pika_async_add_callback (async,
[] (PikaAsync *async,
gpointer thread)
{
g_thread_join ((GThread *) thread);
},
thread);
return async;
}
/* private functions */
static void
pika_parallel_notify_num_processors (PikaGeglConfig *config)
{
pika_parallel_set_n_threads (config->num_processors,
/* finish_tasks = */ TRUE);
}
static void
pika_parallel_set_n_threads (gint n_threads,
gboolean finish_tasks)
{
pika_parallel_run_async_set_n_threads (n_threads, finish_tasks);
}
static void
pika_parallel_run_async_set_n_threads (gint n_threads,
gboolean finish_tasks)
{
gint i;
n_threads = CLAMP (n_threads, 0, PIKA_PARALLEL_RUN_ASYNC_MAX_THREADS);
if (n_threads > pika_parallel_run_async_n_threads) /* need more threads */
{
for (i = pika_parallel_run_async_n_threads; i < n_threads; i++)
{
PikaParallelRunAsyncThread *thread =
&pika_parallel_run_async_threads[i];
thread->quit = FALSE;
thread->thread = g_thread_new (
"async",
(GThreadFunc) pika_parallel_run_async_thread_func,
thread);
}
}
else if (n_threads < pika_parallel_run_async_n_threads) /* need less threads */
{
g_mutex_lock (&pika_parallel_run_async_mutex);
for (i = n_threads; i < pika_parallel_run_async_n_threads; i++)
{
PikaParallelRunAsyncThread *thread =
&pika_parallel_run_async_threads[i];
thread->quit = TRUE;
if (thread->current_async && ! finish_tasks)
pika_cancelable_cancel (PIKA_CANCELABLE (thread->current_async));
}
g_cond_broadcast (&pika_parallel_run_async_cond);
g_mutex_unlock (&pika_parallel_run_async_mutex);
for (i = n_threads; i < pika_parallel_run_async_n_threads; i++)
{
PikaParallelRunAsyncThread *thread =
&pika_parallel_run_async_threads[i];
g_thread_join (thread->thread);
}
}
pika_parallel_run_async_n_threads = n_threads;
if (n_threads == 0)
{
PikaParallelRunAsyncTask *task;
/* finish remaining tasks */
while ((task = pika_parallel_run_async_dequeue_task ()))
{
if (finish_tasks)
while (pika_parallel_run_async_execute_task (task));
else
pika_parallel_run_async_abort_task (task);
}
}
}
static gpointer
pika_parallel_run_async_thread_func (PikaParallelRunAsyncThread *thread)
{
g_mutex_lock (&pika_parallel_run_async_mutex);
while (TRUE)
{
PikaParallelRunAsyncTask *task;
while (! thread->quit &&
(task = pika_parallel_run_async_dequeue_task ()))
{
gboolean resume;
thread->current_async = PIKA_ASYNC (g_object_ref (task->async));
do
{
g_mutex_unlock (&pika_parallel_run_async_mutex);
resume = pika_parallel_run_async_execute_task (task);
g_mutex_lock (&pika_parallel_run_async_mutex);
}
while (resume &&
(g_queue_is_empty (&pika_parallel_run_async_queue) ||
task->priority <
((PikaParallelRunAsyncTask *)
g_queue_peek_head (
&pika_parallel_run_async_queue))->priority));
g_clear_object (&thread->current_async);
if (resume)
pika_parallel_run_async_enqueue_task (task);
}
if (thread->quit)
break;
g_cond_wait (&pika_parallel_run_async_cond,
&pika_parallel_run_async_mutex);
}
g_mutex_unlock (&pika_parallel_run_async_mutex);
return NULL;
}
static void
pika_parallel_run_async_enqueue_task (PikaParallelRunAsyncTask *task)
{
GList *link;
GList *iter;
if (pika_async_is_canceled (task->async))
{
pika_parallel_run_async_abort_task (task);
return;
}
link = g_list_alloc ();
link->data = task;
g_object_set_data (G_OBJECT (task->async),
"pika-parallel-run-async-link", link);
for (iter = g_queue_peek_tail_link (&pika_parallel_run_async_queue);
iter;
iter = g_list_previous (iter))
{
PikaParallelRunAsyncTask *other_task =
(PikaParallelRunAsyncTask *) iter->data;
if (other_task->priority <= task->priority)
break;
}
if (iter)
{
link->prev = iter;
link->next = iter->next;
iter->next = link;
if (link->next)
link->next->prev = link;
else
pika_parallel_run_async_queue.tail = link;
pika_parallel_run_async_queue.length++;
}
else
{
g_queue_push_head_link (&pika_parallel_run_async_queue, link);
}
}
static PikaParallelRunAsyncTask *
pika_parallel_run_async_dequeue_task (void)
{
PikaParallelRunAsyncTask *task;
task = (PikaParallelRunAsyncTask *) g_queue_pop_head (
&pika_parallel_run_async_queue);
if (task)
{
g_object_set_data (G_OBJECT (task->async),
"pika-parallel-run-async-link", NULL);
}
return task;
}
static gboolean
pika_parallel_run_async_execute_task (PikaParallelRunAsyncTask *task)
{
if (pika_async_is_canceled (task->async))
{
pika_parallel_run_async_abort_task (task);
return FALSE;
}
task->func (task->async, task->user_data);
if (pika_async_is_stopped (task->async))
{
g_object_unref (task->async);
g_slice_free (PikaParallelRunAsyncTask, task);
return FALSE;
}
return TRUE;
}
static void
pika_parallel_run_async_abort_task (PikaParallelRunAsyncTask *task)
{
if (task->user_data && task->user_data_destroy_func)
task->user_data_destroy_func (task->user_data);
pika_async_abort (task->async);
g_object_unref (task->async);
g_slice_free (PikaParallelRunAsyncTask, task);
}
static void
pika_parallel_run_async_cancel (PikaAsync *async)
{
GList *link;
PikaParallelRunAsyncTask *task = NULL;
link = (GList *) g_object_get_data (G_OBJECT (async),
"pika-parallel-run-async-link");
if (! link)
return;
g_mutex_lock (&pika_parallel_run_async_mutex);
link = (GList *) g_object_get_data (G_OBJECT (async),
"pika-parallel-run-async-link");
if (link)
{
g_object_set_data (G_OBJECT (async),
"pika-parallel-run-async-link", NULL);
task = (PikaParallelRunAsyncTask *) link->data;
g_queue_delete_link (&pika_parallel_run_async_queue, link);
}
g_mutex_unlock (&pika_parallel_run_async_mutex);
if (task)
pika_parallel_run_async_abort_task (task);
}
static void
pika_parallel_run_async_waiting (PikaAsync *async)
{
GList *link;
link = (GList *) g_object_get_data (G_OBJECT (async),
"pika-parallel-run-async-link");
if (! link)
return;
g_mutex_lock (&pika_parallel_run_async_mutex);
link = (GList *) g_object_get_data (G_OBJECT (async),
"pika-parallel-run-async-link");
if (link)
{
PikaParallelRunAsyncTask *task = (PikaParallelRunAsyncTask *) link->data;
task->priority = G_MININT;
g_queue_unlink (&pika_parallel_run_async_queue, link);
g_queue_push_head_link (&pika_parallel_run_async_queue, link);
}
g_mutex_unlock (&pika_parallel_run_async_mutex);
}
} /* extern "C" */