minix/servers/avfs/worker.c
Thomas Veerman abd6043a2f AVFS: fix various system call interruption issues
- When cancelling ioctls, VFS did not remember which file descriptor
   to cancel and sent bogus to the driver.
 - Select state was not cleaned up when select()ing process was
   interrupted.
 - Process trying to do a system call at the exact same time as a user
   trying to interrupt the process, could cause the system call worker
   thread to overwrite state belonging to the worker thread trying to
   exit the process. This led to hanging threads and eventual system hang
   when this happens often enough.
2012-02-09 14:24:28 +00:00

407 lines
12 KiB
C

#include "fs.h"
#include "glo.h"
#include "fproc.h"
#include "threads.h"
#include "job.h"
#include <assert.h>
FORWARD _PROTOTYPE( void append_job, (struct job *job,
void *(*func)(void *arg)) );
FORWARD _PROTOTYPE( void get_work, (struct worker_thread *worker) );
FORWARD _PROTOTYPE( void *worker_main, (void *arg) );
FORWARD _PROTOTYPE( void worker_sleep, (struct worker_thread *worker) );
FORWARD _PROTOTYPE( void worker_wake, (struct worker_thread *worker) );
FORWARD _PROTOTYPE( int worker_waiting_for, (struct worker_thread *worker,
endpoint_t proc_e) );
PRIVATE int init = 0;
PRIVATE mthread_attr_t tattr;
#ifdef MKCOVERAGE
# define TH_STACKSIZE (10 * 1024)
#else
# define TH_STACKSIZE (7 * 1024)
#endif
#define ASSERTW(w) assert((w) == &sys_worker || (w) == &dl_worker || \
((w) >= &workers[0] && (w) < &workers[NR_WTHREADS]));
/*===========================================================================*
* worker_init *
*===========================================================================*/
PUBLIC void worker_init(struct worker_thread *wp)
{
/* Initialize worker thread */
if (!init) {
threads_init();
if (mthread_attr_init(&tattr) != 0)
panic("failed to initialize attribute");
if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
panic("couldn't set default thread stack size");
if (mthread_attr_setdetachstate(&tattr, MTHREAD_CREATE_DETACHED) != 0)
panic("couldn't set default thread detach state");
pending = 0;
init = 1;
}
ASSERTW(wp);
wp->w_job.j_func = NULL; /* Mark not in use */
wp->w_next = NULL;
if (mutex_init(&wp->w_event_mutex, NULL) != 0)
panic("failed to initialize mutex");
if (cond_init(&wp->w_event, NULL) != 0)
panic("failed to initialize conditional variable");
if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
panic("unable to start thread");
yield();
}
/*===========================================================================*
* get_work *
*===========================================================================*/
PRIVATE void get_work(struct worker_thread *worker)
{
/* Find new work to do. Work can be 'queued', 'pending', or absent. In the
* latter case wait for new work to come in. */
struct job *new_job;
struct fproc *rfp;
ASSERTW(worker);
self = worker;
/* Do we have queued work to do? */
if ((new_job = worker->w_job.j_next) != NULL) {
worker->w_job = *new_job;
free(new_job);
return;
} else if (worker != &sys_worker && worker != &dl_worker && pending > 0) {
/* Find pending work */
for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
if (rfp->fp_flags & FP_PENDING) {
worker->w_job = rfp->fp_job;
rfp->fp_job.j_func = NULL;
rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
pending--;
assert(pending >= 0);
return;
}
}
panic("Pending work inconsistency");
}
/* Wait for work to come to us */
worker_sleep(worker);
}
/*===========================================================================*
* worker_available *
*===========================================================================*/
PUBLIC int worker_available(void)
{
int busy, i;
busy = 0;
for (i = 0; i < NR_WTHREADS; i++) {
if (workers[i].w_job.j_func != NULL)
busy++;
}
return(NR_WTHREADS - busy);
}
/*===========================================================================*
* worker_main *
*===========================================================================*/
PRIVATE void *worker_main(void *arg)
{
/* Worker thread main loop */
struct worker_thread *me;
me = (struct worker_thread *) arg;
ASSERTW(me);
while(TRUE) {
get_work(me);
/* Register ourselves in fproc table if possible */
if (me->w_job.j_fp != NULL) {
me->w_job.j_fp->fp_wtid = me->w_tid;
}
/* Carry out work */
me->w_job.j_func(&me->w_job);
/* Mark ourselves as done */
me->w_job.j_func = NULL;
}
return(NULL); /* Unreachable */
}
/*===========================================================================*
* dl_worker_start *
*===========================================================================*/
PUBLIC void dl_worker_start(void *(*func)(void *arg))
{
/* Start the deadlock resolving worker. This worker is reserved to run in case
* all other workers are busy and we have to have an additional worker to come
* to the rescue. */
assert(dl_worker.w_job.j_func == NULL);
if (dl_worker.w_job.j_func == NULL) {
dl_worker.w_job.j_fp = fp;
dl_worker.w_job.j_m_in = m_in;
dl_worker.w_job.j_func = func;
worker_wake(&dl_worker);
}
}
/*===========================================================================*
* sys_worker_start *
*===========================================================================*/
PUBLIC void sys_worker_start(void *(*func)(void *arg))
{
/* Carry out work for the system (i.e., kernel or PM). If this thread is idle
* do it right away, else create new job and append it to the queue. */
if (sys_worker.w_job.j_func == NULL) {
sys_worker.w_job.j_fp = fp;
sys_worker.w_job.j_m_in = m_in;
sys_worker.w_job.j_func = func;
worker_wake(&sys_worker);
} else {
append_job(&sys_worker.w_job, func);
}
}
/*===========================================================================*
* append_job *
*===========================================================================*/
PRIVATE void append_job(struct job *job, void *(*func)(void *arg))
{
/* Append a job */
struct job *new_job, *tail;
/* Create new job */
new_job = calloc(1, sizeof(struct job));
assert(new_job != NULL);
new_job->j_fp = fp;
new_job->j_m_in = m_in;
new_job->j_func = func;
new_job->j_next = NULL;
new_job->j_err_code = OK;
/* Append to queue */
tail = job;
while (tail->j_next != NULL) tail = tail->j_next;
tail->j_next = new_job;
}
/*===========================================================================*
* worker_start *
*===========================================================================*/
PUBLIC void worker_start(void *(*func)(void *arg))
{
/* Find an available worker or wait for one */
int i;
struct worker_thread *worker;
if (fp->fp_flags & FP_DROP_WORK) {
return; /* This process is not allowed to accept new work */
}
worker = NULL;
for (i = 0; i < NR_WTHREADS; i++) {
if (workers[i].w_job.j_func == NULL) {
worker = &workers[i];
break;
}
}
if (worker != NULL) {
worker->w_job.j_fp = fp;
worker->w_job.j_m_in = m_in;
worker->w_job.j_func = func;
worker->w_job.j_next = NULL;
worker->w_job.j_err_code = OK;
worker_wake(worker);
return;
}
/* No worker threads available, let's wait for one to finish. */
/* If this process already has a job scheduled, forget about this new
* job;
* - the new job is do_dummy and we have already scheduled an actual job
* - the new job is an actual job and we have already scheduled do_dummy in
* order to exit this proc, so doing the new job is pointless. */
if (fp->fp_job.j_func == NULL) {
assert(!(fp->fp_flags & FP_PENDING));
fp->fp_job.j_fp = fp;
fp->fp_job.j_m_in = m_in;
fp->fp_job.j_func = func;
fp->fp_job.j_next = NULL;
fp->fp_job.j_err_code = OK;
fp->fp_flags |= FP_PENDING;
pending++;
}
}
/*===========================================================================*
* worker_sleep *
*===========================================================================*/
PRIVATE void worker_sleep(struct worker_thread *worker)
{
ASSERTW(worker);
assert(self == worker);
if (mutex_lock(&worker->w_event_mutex) != 0)
panic("unable to lock event mutex");
if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0)
panic("could not wait on conditional variable");
if (mutex_unlock(&worker->w_event_mutex) != 0)
panic("unable to unlock event mutex");
self = worker;
}
/*===========================================================================*
* worker_wake *
*===========================================================================*/
PRIVATE void worker_wake(struct worker_thread *worker)
{
/* Signal a worker to wake up */
ASSERTW(worker);
if (mutex_lock(&worker->w_event_mutex) != 0)
panic("unable to lock event mutex");
if (cond_signal(&worker->w_event) != 0)
panic("unable to signal conditional variable");
if (mutex_unlock(&worker->w_event_mutex) != 0)
panic("unable to unlock event mutex");
}
/*===========================================================================*
* worker_wait *
*===========================================================================*/
PUBLIC void worker_wait(void)
{
struct worker_thread *worker;
worker = worker_self();
worker->w_job.j_m_in = m_in; /* Store important global data */
worker->w_job.j_err_code = err_code;
assert(fp == worker->w_job.j_fp);
worker_sleep(worker);
/* We continue here after waking up */
fp = worker->w_job.j_fp; /* Restore global data */
m_in = worker->w_job.j_m_in;
err_code = worker->w_job.j_err_code;
assert(worker->w_next == NULL);
}
/*===========================================================================*
* worker_signal *
*===========================================================================*/
PUBLIC void worker_signal(struct worker_thread *worker)
{
ASSERTW(worker); /* Make sure we have a valid thread */
worker_wake(worker);
}
/*===========================================================================*
* worker_stop *
*===========================================================================*/
PUBLIC void worker_stop(struct worker_thread *worker)
{
ASSERTW(worker); /* Make sure we have a valid thread */
if (worker->w_job.j_fp)
worker->w_job.j_fp->fp_sendrec->m_type = EIO;
else
worker->w_job.j_m_in.m_type = EIO;
worker_wake(worker);
}
/*===========================================================================*
* worker_stop_by_endpt *
*===========================================================================*/
PUBLIC void worker_stop_by_endpt(endpoint_t proc_e)
{
struct worker_thread *worker;
int i;
if (proc_e == NONE) return;
if (worker_waiting_for(&sys_worker, proc_e)) worker_stop(&sys_worker);
if (worker_waiting_for(&dl_worker, proc_e)) worker_stop(&dl_worker);
for (i = 0; i < NR_WTHREADS; i++) {
worker = &workers[i];
if (worker_waiting_for(worker, proc_e))
worker_stop(worker);
}
}
/*===========================================================================*
* worker_self *
*===========================================================================*/
PUBLIC struct worker_thread *worker_self(void)
{
struct worker_thread *worker;
worker = worker_get(mthread_self());
assert(worker != NULL);
return(worker);
}
/*===========================================================================*
* worker_get *
*===========================================================================*/
PUBLIC struct worker_thread *worker_get(thread_t worker_tid)
{
int i;
struct worker_thread *worker;
worker = NULL;
if (worker_tid == sys_worker.w_tid)
worker = &sys_worker;
else if (worker_tid == dl_worker.w_tid)
worker = &dl_worker;
else {
for (i = 0; i < NR_WTHREADS; i++) {
if (workers[i].w_tid == worker_tid) {
worker = &workers[i];
break;
}
}
}
return(worker);
}
/*===========================================================================*
* worker_getjob *
*===========================================================================*/
PUBLIC struct job *worker_getjob(thread_t worker_tid)
{
struct worker_thread *worker;
if ((worker = worker_get(worker_tid)) != NULL)
return(&worker->w_job);
return(NULL);
}
/*===========================================================================*
* worker_waiting_for *
*===========================================================================*/
PRIVATE int worker_waiting_for(struct worker_thread *worker, endpoint_t proc_e)
{
ASSERTW(worker); /* Make sure we have a valid thread */
if (worker->w_job.j_func != NULL) {
if (worker->w_job.j_fp != NULL) {
return(worker->w_job.j_fp->fp_task == proc_e);
}
}
return(0);
}