557 lines
15 KiB
C++
557 lines
15 KiB
C++
|
/* PIKA - Photo and Image Kooker Application
|
||
|
* a rebranding of The GNU Image Manipulation Program (created with heckimp)
|
||
|
* A derived work which may be trivial. However, any changes may be (C)2023 by Aldercone Studio
|
||
|
*
|
||
|
* Original copyright, applying to most contents (license remains unchanged):
|
||
|
* Copyright (C) 1995 Spencer Kimball and Peter Mattis
|
||
|
*
|
||
|
* pika-parallel.c
|
||
|
* Copyright (C) 2018 Ell
|
||
|
*
|
||
|
* This program is free software: you can redistribute it and/or modify
|
||
|
* it under the terms of the GNU General Public License as published by
|
||
|
* the Free Software Foundation; either version 3 of the License, or
|
||
|
* (at your option) any later version.
|
||
|
*
|
||
|
* This program is distributed in the hope that it will be useful,
|
||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
* GNU General Public License for more details.
|
||
|
*
|
||
|
* You should have received a copy of the GNU General Public License
|
||
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
*/
|
||
|
|
||
|
|
||
|
#include "config.h"
|
||
|
|
||
|
#include <gio/gio.h>
|
||
|
#include <gegl.h>
|
||
|
|
||
|
#ifdef HAVE_UNISTD_H
|
||
|
#include <unistd.h>
|
||
|
#endif
|
||
|
|
||
|
#ifdef G_OS_WIN32
|
||
|
#include <windows.h>
|
||
|
#endif
|
||
|
|
||
|
extern "C"
|
||
|
{
|
||
|
|
||
|
#include "core-types.h"
|
||
|
|
||
|
#include "config/pikageglconfig.h"
|
||
|
|
||
|
#include "pika.h"
|
||
|
#include "pika-parallel.h"
|
||
|
#include "pikaasync.h"
|
||
|
#include "pikacancelable.h"
|
||
|
|
||
|
|
||
|
#define PIKA_PARALLEL_MAX_THREADS 64
|
||
|
#define PIKA_PARALLEL_RUN_ASYNC_MAX_THREADS 1
|
||
|
|
||
|
|
||
|
typedef struct
|
||
|
{
|
||
|
PikaAsync *async;
|
||
|
gint priority;
|
||
|
PikaRunAsyncFunc func;
|
||
|
gpointer user_data;
|
||
|
GDestroyNotify user_data_destroy_func;
|
||
|
} PikaParallelRunAsyncTask;
|
||
|
|
||
|
typedef struct
|
||
|
{
|
||
|
GThread *thread;
|
||
|
|
||
|
gboolean quit;
|
||
|
|
||
|
PikaAsync *current_async;
|
||
|
} PikaParallelRunAsyncThread;
|
||
|
|
||
|
|
||
|
/* local function prototypes */
|
||
|
|
||
|
static void pika_parallel_notify_num_processors (PikaGeglConfig *config);
|
||
|
|
||
|
static void pika_parallel_set_n_threads (gint n_threads,
|
||
|
gboolean finish_tasks);
|
||
|
|
||
|
static void pika_parallel_run_async_set_n_threads (gint n_threads,
|
||
|
gboolean finish_tasks);
|
||
|
static gpointer pika_parallel_run_async_thread_func (PikaParallelRunAsyncThread *thread);
|
||
|
static void pika_parallel_run_async_enqueue_task (PikaParallelRunAsyncTask *task);
|
||
|
static PikaParallelRunAsyncTask * pika_parallel_run_async_dequeue_task (void);
|
||
|
static gboolean pika_parallel_run_async_execute_task (PikaParallelRunAsyncTask *task);
|
||
|
static void pika_parallel_run_async_abort_task (PikaParallelRunAsyncTask *task);
|
||
|
static void pika_parallel_run_async_cancel (PikaAsync *async);
|
||
|
static void pika_parallel_run_async_waiting (PikaAsync *async);
|
||
|
|
||
|
|
||
|
/* local variables */
|
||
|
|
||
|
static gint pika_parallel_run_async_n_threads = 0;
|
||
|
static PikaParallelRunAsyncThread pika_parallel_run_async_threads[PIKA_PARALLEL_RUN_ASYNC_MAX_THREADS];
|
||
|
|
||
|
static GMutex pika_parallel_run_async_mutex;
|
||
|
static GCond pika_parallel_run_async_cond;
|
||
|
static GQueue pika_parallel_run_async_queue = G_QUEUE_INIT;
|
||
|
|
||
|
|
||
|
/* public functions */
|
||
|
|
||
|
|
||
|
void
|
||
|
pika_parallel_init (Pika *pika)
|
||
|
{
|
||
|
PikaGeglConfig *config;
|
||
|
|
||
|
g_return_if_fail (PIKA_IS_PIKA (pika));
|
||
|
|
||
|
config = PIKA_GEGL_CONFIG (pika->config);
|
||
|
|
||
|
g_signal_connect (config, "notify::num-processors",
|
||
|
G_CALLBACK (pika_parallel_notify_num_processors),
|
||
|
NULL);
|
||
|
|
||
|
pika_parallel_notify_num_processors (config);
|
||
|
}
|
||
|
|
||
|
void
|
||
|
pika_parallel_exit (Pika *pika)
|
||
|
{
|
||
|
g_return_if_fail (PIKA_IS_PIKA (pika));
|
||
|
|
||
|
g_signal_handlers_disconnect_by_func (pika->config,
|
||
|
(gpointer) pika_parallel_notify_num_processors,
|
||
|
NULL);
|
||
|
|
||
|
/* stop all threads */
|
||
|
pika_parallel_set_n_threads (0, /* finish_tasks = */ FALSE);
|
||
|
}
|
||
|
|
||
|
PikaAsync *
|
||
|
pika_parallel_run_async (PikaRunAsyncFunc func,
|
||
|
gpointer user_data)
|
||
|
{
|
||
|
return pika_parallel_run_async_full (0, func, user_data, NULL);
|
||
|
}
|
||
|
|
||
|
PikaAsync *
|
||
|
pika_parallel_run_async_full (gint priority,
|
||
|
PikaRunAsyncFunc func,
|
||
|
gpointer user_data,
|
||
|
GDestroyNotify user_data_destroy_func)
|
||
|
{
|
||
|
PikaAsync *async;
|
||
|
PikaParallelRunAsyncTask *task;
|
||
|
|
||
|
g_return_val_if_fail (func != NULL, NULL);
|
||
|
|
||
|
async = pika_async_new ();
|
||
|
|
||
|
task = g_slice_new (PikaParallelRunAsyncTask);
|
||
|
|
||
|
task->async = PIKA_ASYNC (g_object_ref (async));
|
||
|
task->priority = priority;
|
||
|
task->func = func;
|
||
|
task->user_data = user_data;
|
||
|
task->user_data_destroy_func = user_data_destroy_func;
|
||
|
|
||
|
if (pika_parallel_run_async_n_threads > 0)
|
||
|
{
|
||
|
g_signal_connect_after (async, "cancel",
|
||
|
G_CALLBACK (pika_parallel_run_async_cancel),
|
||
|
NULL);
|
||
|
g_signal_connect_after (async, "waiting",
|
||
|
G_CALLBACK (pika_parallel_run_async_waiting),
|
||
|
NULL);
|
||
|
|
||
|
g_mutex_lock (&pika_parallel_run_async_mutex);
|
||
|
|
||
|
pika_parallel_run_async_enqueue_task (task);
|
||
|
|
||
|
g_cond_signal (&pika_parallel_run_async_cond);
|
||
|
|
||
|
g_mutex_unlock (&pika_parallel_run_async_mutex);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
while (pika_parallel_run_async_execute_task (task));
|
||
|
}
|
||
|
|
||
|
return async;
|
||
|
}
|
||
|
|
||
|
PikaAsync *
|
||
|
pika_parallel_run_async_independent (PikaRunAsyncFunc func,
|
||
|
gpointer user_data)
|
||
|
{
|
||
|
return pika_parallel_run_async_independent_full (0, func, user_data);
|
||
|
}
|
||
|
|
||
|
PikaAsync *
|
||
|
pika_parallel_run_async_independent_full (gint priority,
|
||
|
PikaRunAsyncFunc func,
|
||
|
gpointer user_data)
|
||
|
{
|
||
|
PikaAsync *async;
|
||
|
PikaParallelRunAsyncTask *task;
|
||
|
GThread *thread;
|
||
|
|
||
|
g_return_val_if_fail (func != NULL, NULL);
|
||
|
|
||
|
async = pika_async_new ();
|
||
|
|
||
|
task = g_slice_new0 (PikaParallelRunAsyncTask);
|
||
|
|
||
|
task->async = PIKA_ASYNC (g_object_ref (async));
|
||
|
task->priority = priority;
|
||
|
task->func = func;
|
||
|
task->user_data = user_data;
|
||
|
|
||
|
thread = g_thread_new (
|
||
|
"async-ind",
|
||
|
[] (gpointer data) -> gpointer
|
||
|
{
|
||
|
PikaParallelRunAsyncTask *task = (PikaParallelRunAsyncTask *) data;
|
||
|
|
||
|
/* adjust the thread's priority */
|
||
|
#if defined (G_OS_WIN32)
|
||
|
if (task->priority < 0)
|
||
|
{
|
||
|
SetThreadPriority (GetCurrentThread (), THREAD_PRIORITY_ABOVE_NORMAL);
|
||
|
}
|
||
|
else if (task->priority > 0)
|
||
|
{
|
||
|
SetThreadPriority (GetCurrentThread (), THREAD_MODE_BACKGROUND_BEGIN);
|
||
|
}
|
||
|
#elif defined (HAVE_UNISTD_H) && defined (__gnu_linux__)
|
||
|
if (task->priority)
|
||
|
{
|
||
|
nice (task->priority);
|
||
|
}
|
||
|
#endif
|
||
|
|
||
|
while (pika_parallel_run_async_execute_task (task));
|
||
|
|
||
|
return NULL;
|
||
|
},
|
||
|
task);
|
||
|
|
||
|
pika_async_add_callback (async,
|
||
|
[] (PikaAsync *async,
|
||
|
gpointer thread)
|
||
|
{
|
||
|
g_thread_join ((GThread *) thread);
|
||
|
},
|
||
|
thread);
|
||
|
|
||
|
return async;
|
||
|
}
|
||
|
|
||
|
|
||
|
/* private functions */
|
||
|
|
||
|
|
||
|
static void
|
||
|
pika_parallel_notify_num_processors (PikaGeglConfig *config)
|
||
|
{
|
||
|
pika_parallel_set_n_threads (config->num_processors,
|
||
|
/* finish_tasks = */ TRUE);
|
||
|
}
|
||
|
|
||
|
static void
|
||
|
pika_parallel_set_n_threads (gint n_threads,
|
||
|
gboolean finish_tasks)
|
||
|
{
|
||
|
pika_parallel_run_async_set_n_threads (n_threads, finish_tasks);
|
||
|
}
|
||
|
|
||
|
static void
|
||
|
pika_parallel_run_async_set_n_threads (gint n_threads,
|
||
|
gboolean finish_tasks)
|
||
|
{
|
||
|
gint i;
|
||
|
|
||
|
n_threads = CLAMP (n_threads, 0, PIKA_PARALLEL_RUN_ASYNC_MAX_THREADS);
|
||
|
|
||
|
if (n_threads > pika_parallel_run_async_n_threads) /* need more threads */
|
||
|
{
|
||
|
for (i = pika_parallel_run_async_n_threads; i < n_threads; i++)
|
||
|
{
|
||
|
PikaParallelRunAsyncThread *thread =
|
||
|
&pika_parallel_run_async_threads[i];
|
||
|
|
||
|
thread->quit = FALSE;
|
||
|
|
||
|
thread->thread = g_thread_new (
|
||
|
"async",
|
||
|
(GThreadFunc) pika_parallel_run_async_thread_func,
|
||
|
thread);
|
||
|
}
|
||
|
}
|
||
|
else if (n_threads < pika_parallel_run_async_n_threads) /* need less threads */
|
||
|
{
|
||
|
g_mutex_lock (&pika_parallel_run_async_mutex);
|
||
|
|
||
|
for (i = n_threads; i < pika_parallel_run_async_n_threads; i++)
|
||
|
{
|
||
|
PikaParallelRunAsyncThread *thread =
|
||
|
&pika_parallel_run_async_threads[i];
|
||
|
|
||
|
thread->quit = TRUE;
|
||
|
|
||
|
if (thread->current_async && ! finish_tasks)
|
||
|
pika_cancelable_cancel (PIKA_CANCELABLE (thread->current_async));
|
||
|
}
|
||
|
|
||
|
g_cond_broadcast (&pika_parallel_run_async_cond);
|
||
|
|
||
|
g_mutex_unlock (&pika_parallel_run_async_mutex);
|
||
|
|
||
|
for (i = n_threads; i < pika_parallel_run_async_n_threads; i++)
|
||
|
{
|
||
|
PikaParallelRunAsyncThread *thread =
|
||
|
&pika_parallel_run_async_threads[i];
|
||
|
|
||
|
g_thread_join (thread->thread);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pika_parallel_run_async_n_threads = n_threads;
|
||
|
|
||
|
if (n_threads == 0)
|
||
|
{
|
||
|
PikaParallelRunAsyncTask *task;
|
||
|
|
||
|
/* finish remaining tasks */
|
||
|
while ((task = pika_parallel_run_async_dequeue_task ()))
|
||
|
{
|
||
|
if (finish_tasks)
|
||
|
while (pika_parallel_run_async_execute_task (task));
|
||
|
else
|
||
|
pika_parallel_run_async_abort_task (task);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static gpointer
|
||
|
pika_parallel_run_async_thread_func (PikaParallelRunAsyncThread *thread)
|
||
|
{
|
||
|
g_mutex_lock (&pika_parallel_run_async_mutex);
|
||
|
|
||
|
while (TRUE)
|
||
|
{
|
||
|
PikaParallelRunAsyncTask *task;
|
||
|
|
||
|
while (! thread->quit &&
|
||
|
(task = pika_parallel_run_async_dequeue_task ()))
|
||
|
{
|
||
|
gboolean resume;
|
||
|
|
||
|
thread->current_async = PIKA_ASYNC (g_object_ref (task->async));
|
||
|
|
||
|
do
|
||
|
{
|
||
|
g_mutex_unlock (&pika_parallel_run_async_mutex);
|
||
|
|
||
|
resume = pika_parallel_run_async_execute_task (task);
|
||
|
|
||
|
g_mutex_lock (&pika_parallel_run_async_mutex);
|
||
|
}
|
||
|
while (resume &&
|
||
|
(g_queue_is_empty (&pika_parallel_run_async_queue) ||
|
||
|
task->priority <
|
||
|
((PikaParallelRunAsyncTask *)
|
||
|
g_queue_peek_head (
|
||
|
&pika_parallel_run_async_queue))->priority));
|
||
|
|
||
|
g_clear_object (&thread->current_async);
|
||
|
|
||
|
if (resume)
|
||
|
pika_parallel_run_async_enqueue_task (task);
|
||
|
}
|
||
|
|
||
|
if (thread->quit)
|
||
|
break;
|
||
|
|
||
|
g_cond_wait (&pika_parallel_run_async_cond,
|
||
|
&pika_parallel_run_async_mutex);
|
||
|
}
|
||
|
|
||
|
g_mutex_unlock (&pika_parallel_run_async_mutex);
|
||
|
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
static void
|
||
|
pika_parallel_run_async_enqueue_task (PikaParallelRunAsyncTask *task)
|
||
|
{
|
||
|
GList *link;
|
||
|
GList *iter;
|
||
|
|
||
|
if (pika_async_is_canceled (task->async))
|
||
|
{
|
||
|
pika_parallel_run_async_abort_task (task);
|
||
|
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
link = g_list_alloc ();
|
||
|
link->data = task;
|
||
|
|
||
|
g_object_set_data (G_OBJECT (task->async),
|
||
|
"pika-parallel-run-async-link", link);
|
||
|
|
||
|
for (iter = g_queue_peek_tail_link (&pika_parallel_run_async_queue);
|
||
|
iter;
|
||
|
iter = g_list_previous (iter))
|
||
|
{
|
||
|
PikaParallelRunAsyncTask *other_task =
|
||
|
(PikaParallelRunAsyncTask *) iter->data;
|
||
|
|
||
|
if (other_task->priority <= task->priority)
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
if (iter)
|
||
|
{
|
||
|
link->prev = iter;
|
||
|
link->next = iter->next;
|
||
|
|
||
|
iter->next = link;
|
||
|
|
||
|
if (link->next)
|
||
|
link->next->prev = link;
|
||
|
else
|
||
|
pika_parallel_run_async_queue.tail = link;
|
||
|
|
||
|
pika_parallel_run_async_queue.length++;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
g_queue_push_head_link (&pika_parallel_run_async_queue, link);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static PikaParallelRunAsyncTask *
|
||
|
pika_parallel_run_async_dequeue_task (void)
|
||
|
{
|
||
|
PikaParallelRunAsyncTask *task;
|
||
|
|
||
|
task = (PikaParallelRunAsyncTask *) g_queue_pop_head (
|
||
|
&pika_parallel_run_async_queue);
|
||
|
|
||
|
if (task)
|
||
|
{
|
||
|
g_object_set_data (G_OBJECT (task->async),
|
||
|
"pika-parallel-run-async-link", NULL);
|
||
|
}
|
||
|
|
||
|
return task;
|
||
|
}
|
||
|
|
||
|
static gboolean
|
||
|
pika_parallel_run_async_execute_task (PikaParallelRunAsyncTask *task)
|
||
|
{
|
||
|
if (pika_async_is_canceled (task->async))
|
||
|
{
|
||
|
pika_parallel_run_async_abort_task (task);
|
||
|
|
||
|
return FALSE;
|
||
|
}
|
||
|
|
||
|
task->func (task->async, task->user_data);
|
||
|
|
||
|
if (pika_async_is_stopped (task->async))
|
||
|
{
|
||
|
g_object_unref (task->async);
|
||
|
|
||
|
g_slice_free (PikaParallelRunAsyncTask, task);
|
||
|
|
||
|
return FALSE;
|
||
|
}
|
||
|
|
||
|
return TRUE;
|
||
|
}
|
||
|
|
||
|
static void
|
||
|
pika_parallel_run_async_abort_task (PikaParallelRunAsyncTask *task)
|
||
|
{
|
||
|
if (task->user_data && task->user_data_destroy_func)
|
||
|
task->user_data_destroy_func (task->user_data);
|
||
|
|
||
|
pika_async_abort (task->async);
|
||
|
|
||
|
g_object_unref (task->async);
|
||
|
|
||
|
g_slice_free (PikaParallelRunAsyncTask, task);
|
||
|
}
|
||
|
|
||
|
static void
|
||
|
pika_parallel_run_async_cancel (PikaAsync *async)
|
||
|
{
|
||
|
GList *link;
|
||
|
PikaParallelRunAsyncTask *task = NULL;
|
||
|
|
||
|
link = (GList *) g_object_get_data (G_OBJECT (async),
|
||
|
"pika-parallel-run-async-link");
|
||
|
|
||
|
if (! link)
|
||
|
return;
|
||
|
|
||
|
g_mutex_lock (&pika_parallel_run_async_mutex);
|
||
|
|
||
|
link = (GList *) g_object_get_data (G_OBJECT (async),
|
||
|
"pika-parallel-run-async-link");
|
||
|
|
||
|
if (link)
|
||
|
{
|
||
|
g_object_set_data (G_OBJECT (async),
|
||
|
"pika-parallel-run-async-link", NULL);
|
||
|
|
||
|
task = (PikaParallelRunAsyncTask *) link->data;
|
||
|
|
||
|
g_queue_delete_link (&pika_parallel_run_async_queue, link);
|
||
|
}
|
||
|
|
||
|
g_mutex_unlock (&pika_parallel_run_async_mutex);
|
||
|
|
||
|
if (task)
|
||
|
pika_parallel_run_async_abort_task (task);
|
||
|
}
|
||
|
|
||
|
static void
|
||
|
pika_parallel_run_async_waiting (PikaAsync *async)
|
||
|
{
|
||
|
GList *link;
|
||
|
|
||
|
link = (GList *) g_object_get_data (G_OBJECT (async),
|
||
|
"pika-parallel-run-async-link");
|
||
|
|
||
|
if (! link)
|
||
|
return;
|
||
|
|
||
|
g_mutex_lock (&pika_parallel_run_async_mutex);
|
||
|
|
||
|
link = (GList *) g_object_get_data (G_OBJECT (async),
|
||
|
"pika-parallel-run-async-link");
|
||
|
|
||
|
if (link)
|
||
|
{
|
||
|
PikaParallelRunAsyncTask *task = (PikaParallelRunAsyncTask *) link->data;
|
||
|
|
||
|
task->priority = G_MININT;
|
||
|
|
||
|
g_queue_unlink (&pika_parallel_run_async_queue, link);
|
||
|
g_queue_push_head_link (&pika_parallel_run_async_queue, link);
|
||
|
}
|
||
|
|
||
|
g_mutex_unlock (&pika_parallel_run_async_mutex);
|
||
|
}
|
||
|
|
||
|
} /* extern "C" */
|