minix/servers/vfs/worker.c
David van Moolenbroek 723e51327f VFS: worker thread model overhaul
The main purpose of this patch is to fix handling of unpause calls
from PM while another call is ongoing. The solution to this problem
sparked a full revision of the threading model, consisting of a large
number of related changes:

- all active worker threads are now always associated with a process,
  and every process has at most one active thread working for it;
- the process lock is always held by a process's worker thread;
- a process can now have both normal work and postponed PM work
  associated to it;
- timer expiry and non-postponed PM work is done from the main thread;
- filp garbage collection is done from a thread associated with VFS;
- reboot calls from PM are now done from a thread associated with PM;
- the DS events handler is protected from starting multiple threads;
- support for a system worker thread has been removed;
- the deadlock recovery thread has been replaced by a parameter to the
  worker_start() function; the number of worker threads has
  consequently been increased by one;
- saving and restoring of global but per-thread variables is now
  centralized in worker_suspend() and worker_resume(); err_code is now
  saved and restored in all cases;
- the concept of jobs has been removed, and job_m_in now points to a
  message stored in the worker thread structure instead;
- the PM lock has been removed;
- the separate exec lock has been replaced by a lock on the VM
  process, which was already being locked for exec calls anyway;
- PM_UNPAUSE is now processed as a postponed PM request, from a thread
  associated with the target process;
- the FP_DROP_WORK flag has been removed, since it is no longer more
  than just an optimization and only applied to processes operating on
  a pipe when getting killed;
- assignment to "fp" now takes place only when obtaining new work in
  the main thread or a worker thread, when resuming execution of a
  thread, and in the special case of exiting processes during reboot;
- there are no longer special cases where the yield() call is used to
  force a thread to run.

Change-Id: I7a97b9b95c2450454a9b5318dfa0e6150d4e6858
2014-02-18 11:25:03 +01:00

469 lines
14 KiB
C

#include "fs.h"
#include <assert.h>
static void worker_get_work(void);
static void *worker_main(void *arg);
static void worker_sleep(void);
static void worker_wake(struct worker_thread *worker);
static mthread_attr_t tattr;
#ifdef MKCOVERAGE
# define TH_STACKSIZE (40 * 1024)
#else
# define TH_STACKSIZE (28 * 1024)
#endif
#define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS])
/*===========================================================================*
* worker_init *
*===========================================================================*/
void worker_init(void)
{
/* Initialize worker thread */
struct worker_thread *wp;
int i;
threads_init();
if (mthread_attr_init(&tattr) != 0)
panic("failed to initialize attribute");
if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
panic("couldn't set default thread stack size");
if (mthread_attr_setdetachstate(&tattr, MTHREAD_CREATE_DETACHED) != 0)
panic("couldn't set default thread detach state");
pending = 0;
for (i = 0; i < NR_WTHREADS; i++) {
wp = &workers[i];
wp->w_fp = NULL; /* Mark not in use */
wp->w_next = NULL;
if (mutex_init(&wp->w_event_mutex, NULL) != 0)
panic("failed to initialize mutex");
if (cond_init(&wp->w_event, NULL) != 0)
panic("failed to initialize conditional variable");
if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
panic("unable to start thread");
}
/* Let all threads get ready to accept work. */
yield_all();
}
/*===========================================================================*
* worker_get_work *
*===========================================================================*/
static void worker_get_work(void)
{
/* Find new work to do. Work can be 'queued', 'pending', or absent. In the
* latter case wait for new work to come in.
*/
struct fproc *rfp;
/* Do we have queued work to do? */
if (pending > 0) {
/* Find pending work */
for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
if (rfp->fp_flags & FP_PENDING) {
self->w_fp = rfp;
rfp->fp_worker = self;
rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
pending--;
assert(pending >= 0);
return;
}
}
panic("Pending work inconsistency");
}
/* Wait for work to come to us */
worker_sleep();
}
/*===========================================================================*
* worker_available *
*===========================================================================*/
int worker_available(void)
{
int busy, i;
busy = 0;
for (i = 0; i < NR_WTHREADS; i++) {
if (workers[i].w_fp != NULL)
busy++;
}
return(NR_WTHREADS - busy);
}
/*===========================================================================*
* worker_main *
*===========================================================================*/
static void *worker_main(void *arg)
{
/* Worker thread main loop */
self = (struct worker_thread *) arg;
ASSERTW(self);
while(TRUE) {
worker_get_work();
fp = self->w_fp;
assert(fp->fp_worker == self);
/* Lock the process. */
lock_proc(fp);
/* The following two blocks could be run in a loop until both the
* conditions are no longer met, but it is currently impossible that
* more normal work is present after postponed PM work has been done.
*/
/* Perform normal work, if any. */
if (fp->fp_func != NULL) {
self->w_msg = fp->fp_msg;
err_code = OK;
fp->fp_func();
fp->fp_func = NULL; /* deliberately unset AFTER the call */
}
/* Perform postponed PM work, if any. */
if (fp->fp_flags & FP_PM_WORK) {
self->w_msg = fp->fp_pm_msg;
service_pm_postponed();
fp->fp_flags &= ~FP_PM_WORK;
}
/* Perform cleanup actions. */
thread_cleanup();
unlock_proc(fp);
fp->fp_worker = NULL;
self->w_fp = NULL;
}
return(NULL); /* Unreachable */
}
/*===========================================================================*
* worker_can_start *
*===========================================================================*/
int worker_can_start(struct fproc *rfp)
{
/* Return whether normal (non-PM) work can be started for the given process.
* This function is used to serialize invocation of "special" procedures, and
* not entirely safe for other cases, as explained in the comments below.
*/
int is_pending, is_active, has_normal_work, has_pm_work;
is_pending = (rfp->fp_flags & FP_PENDING);
is_active = (rfp->fp_worker != NULL);
has_normal_work = (rfp->fp_func != NULL);
has_pm_work = (rfp->fp_flags & FP_PM_WORK);
/* If there is no work scheduled for the process, we can start work. */
if (!is_pending && !is_active) return TRUE;
/* If there is already normal work scheduled for the process, we cannot add
* more, since we support only one normal job per process.
*/
if (has_normal_work) return FALSE;
/* If this process has pending PM work but no normal work, we can add the
* normal work for execution before the worker will start.
*/
if (is_pending) return TRUE;
/* However, if a worker is active for PM work, we cannot add normal work
* either, because the work will not be considered. For this reason, we can
* not use this function for processes that can possibly get postponed PM
* work. It is still safe for core system processes, though.
*/
return FALSE;
}
/*===========================================================================*
* worker_try_activate *
*===========================================================================*/
static void worker_try_activate(struct fproc *rfp, int use_spare)
{
/* See if we can wake up a thread to do the work scheduled for the given
* process. If not, mark the process as having pending work for later.
*/
int i, available, needed;
struct worker_thread *worker;
/* Use the last available thread only if requested. Otherwise, leave at least
* one spare thread for deadlock resolution.
*/
needed = use_spare ? 1 : 2;
worker = NULL;
for (i = available = 0; i < NR_WTHREADS; i++) {
if (workers[i].w_fp == NULL) {
if (worker == NULL)
worker = &workers[i];
if (++available >= needed)
break;
}
}
if (available >= needed) {
assert(worker != NULL);
rfp->fp_worker = worker;
worker->w_fp = rfp;
worker_wake(worker);
} else {
rfp->fp_flags |= FP_PENDING;
pending++;
}
}
/*===========================================================================*
* worker_start *
*===========================================================================*/
void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr,
int use_spare)
{
/* Schedule work to be done by a worker thread. The work is bound to the given
* process. If a function pointer is given, the work is considered normal work,
* and the function will be called to handle it. If the function pointer is
* NULL, the work is considered postponed PM work, and service_pm_postponed
* will be called to handle it. The input message will be a copy of the given
* message. Optionally, the last spare (deadlock-resolving) thread may be used
* to execute the work immediately.
*/
int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work;
assert(rfp != NULL);
is_pm_work = (func == NULL);
is_pending = (rfp->fp_flags & FP_PENDING);
is_active = (rfp->fp_worker != NULL);
has_normal_work = (rfp->fp_func != NULL);
has_pm_work = (rfp->fp_flags & FP_PM_WORK);
/* Sanity checks. If any of these trigger, someone messed up badly! */
if (is_pending || is_active) {
if (is_pending && is_active)
panic("work cannot be both pending and active");
/* The process cannot make more than one call at once. */
if (!is_pm_work && has_normal_work)
panic("process has two calls (%x, %x)",
rfp->fp_msg.m_type, m_ptr->m_type);
/* PM will not send more than one job per process to us at once. */
if (is_pm_work && has_pm_work)
panic("got two calls from PM (%x, %x)",
rfp->fp_pm_msg.m_type, m_ptr->m_type);
/* Despite PM's sys_delay_stop() system, it is possible that normal
* work (in particular, do_pending_pipe) arrives after postponed PM
* work has been scheduled for execution, so we don't check for that.
*/
#if 0
printf("VFS: adding %s work to %s thread\n",
is_pm_work ? "PM" : "normal",
is_pending ? "pending" : "active");
#endif
} else {
/* Some cleanup step forgotten somewhere? */
if (has_normal_work || has_pm_work)
panic("worker administration error");
}
/* Save the work to be performed. */
if (!is_pm_work) {
rfp->fp_msg = *m_ptr;
rfp->fp_func = func;
} else {
rfp->fp_pm_msg = *m_ptr;
rfp->fp_flags |= FP_PM_WORK;
}
/* If we have not only added to existing work, go look for a free thread.
* Note that we won't be using the spare thread for normal work if there is
* already PM work pending, but that situation will never occur in practice.
*/
if (!is_pending && !is_active)
worker_try_activate(rfp, use_spare);
}
/*===========================================================================*
* worker_sleep *
*===========================================================================*/
static void worker_sleep(void)
{
struct worker_thread *worker = self;
ASSERTW(worker);
if (mutex_lock(&worker->w_event_mutex) != 0)
panic("unable to lock event mutex");
if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0)
panic("could not wait on conditional variable");
if (mutex_unlock(&worker->w_event_mutex) != 0)
panic("unable to unlock event mutex");
self = worker;
}
/*===========================================================================*
* worker_wake *
*===========================================================================*/
static void worker_wake(struct worker_thread *worker)
{
/* Signal a worker to wake up */
ASSERTW(worker);
if (mutex_lock(&worker->w_event_mutex) != 0)
panic("unable to lock event mutex");
if (cond_signal(&worker->w_event) != 0)
panic("unable to signal conditional variable");
if (mutex_unlock(&worker->w_event_mutex) != 0)
panic("unable to unlock event mutex");
}
/*===========================================================================*
* worker_suspend *
*===========================================================================*/
struct worker_thread *worker_suspend(void)
{
/* Suspend the current thread, saving certain thread variables. Return a
* pointer to the thread's worker structure for later resumption.
*/
ASSERTW(self);
assert(fp != NULL);
assert(self->w_fp == fp);
assert(fp->fp_worker == self);
self->w_err_code = err_code;
return self;
}
/*===========================================================================*
* worker_resume *
*===========================================================================*/
void worker_resume(struct worker_thread *org_self)
{
/* Resume the current thread after suspension, restoring thread variables. */
ASSERTW(org_self);
self = org_self;
fp = self->w_fp;
assert(fp != NULL);
err_code = self->w_err_code;
}
/*===========================================================================*
* worker_wait *
*===========================================================================*/
void worker_wait(void)
{
/* Put the current thread to sleep until woken up by the main thread. */
(void) worker_suspend(); /* worker_sleep already saves and restores 'self' */
worker_sleep();
/* We continue here after waking up */
worker_resume(self);
assert(self->w_next == NULL);
}
/*===========================================================================*
* worker_signal *
*===========================================================================*/
void worker_signal(struct worker_thread *worker)
{
ASSERTW(worker); /* Make sure we have a valid thread */
worker_wake(worker);
}
/*===========================================================================*
* worker_stop *
*===========================================================================*/
void worker_stop(struct worker_thread *worker)
{
ASSERTW(worker); /* Make sure we have a valid thread */
if (worker->w_task != NONE) {
/* This thread is communicating with a driver or file server */
if (worker->w_drv_sendrec != NULL) { /* Driver */
worker->w_drv_sendrec->m_type = EIO;
} else if (worker->w_fs_sendrec != NULL) { /* FS */
worker->w_fs_sendrec->m_type = EIO;
} else {
panic("reply storage consistency error"); /* Oh dear */
}
} else {
/* This shouldn't happen at all... */
printf("VFS: stopping worker not blocked on any task?\n");
util_stacktrace();
}
worker_wake(worker);
}
/*===========================================================================*
* worker_stop_by_endpt *
*===========================================================================*/
void worker_stop_by_endpt(endpoint_t proc_e)
{
struct worker_thread *worker;
int i;
if (proc_e == NONE) return;
for (i = 0; i < NR_WTHREADS; i++) {
worker = &workers[i];
if (worker->w_fp != NULL && worker->w_task == proc_e)
worker_stop(worker);
}
}
/*===========================================================================*
* worker_get *
*===========================================================================*/
struct worker_thread *worker_get(thread_t worker_tid)
{
int i;
for (i = 0; i < NR_WTHREADS; i++)
if (workers[i].w_tid == worker_tid)
return(&workers[i]);
return(NULL);
}
/*===========================================================================*
* worker_set_proc *
*===========================================================================*/
void worker_set_proc(struct fproc *rfp)
{
/* Perform an incredibly ugly action that completely violates the threading
* model: change the current working thread's process context to another
* process. The caller is expected to hold the lock to both the calling and the
* target process, and neither process is expected to continue regular
* operation when done. This code is here *only* and *strictly* for the reboot
* code, and *must not* be used for anything else.
*/
if (fp == rfp) return;
if (rfp->fp_worker != NULL)
panic("worker_set_proc: target process not idle");
fp->fp_worker = NULL;
fp = rfp;
self->w_fp = rfp;
fp->fp_worker = self;
}