minix/servers/avfs/worker.c
2011-12-09 14:46:10 +00:00

402 lines
12 KiB
C

#include "fs.h"
#include "glo.h"
#include "fproc.h"
#include "threads.h"
#include "job.h"
#include <assert.h>
FORWARD _PROTOTYPE( void append_job, (struct job *job,
void *(*func)(void *arg)) );
FORWARD _PROTOTYPE( void get_work, (struct worker_thread *worker) );
FORWARD _PROTOTYPE( void *worker_main, (void *arg) );
FORWARD _PROTOTYPE( void worker_sleep, (struct worker_thread *worker) );
FORWARD _PROTOTYPE( void worker_wake, (struct worker_thread *worker) );
FORWARD _PROTOTYPE( int worker_waiting_for, (struct worker_thread *worker,
endpoint_t proc_e) );
PRIVATE int init = 0;
PRIVATE mthread_attr_t tattr;
#ifdef MKCOVERAGE
# define TH_STACKSIZE (10 * 1024)
#else
# define TH_STACKSIZE (6 * 1024)
#endif
#define ASSERTW(w) assert((w) == &sys_worker || (w) == &dl_worker || \
((w) >= &workers[0] && (w) < &workers[NR_WTHREADS]));
/*===========================================================================*
* worker_init *
*===========================================================================*/
PUBLIC void worker_init(struct worker_thread *wp)
{
/* Initialize worker thread */
if (!init) {
threads_init();
if (mthread_attr_init(&tattr) != 0)
panic("failed to initialize attribute");
if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
panic("couldn't set default thread stack size");
if (mthread_attr_setdetachstate(&tattr, MTHREAD_CREATE_DETACHED) != 0)
panic("couldn't set default thread detach state");
pending = 0;
init = 1;
}
ASSERTW(wp);
wp->w_job.j_func = NULL; /* Mark not in use */
wp->w_next = NULL;
if (mutex_init(&wp->w_event_mutex, NULL) != 0)
panic("failed to initialize mutex");
if (cond_init(&wp->w_event, NULL) != 0)
panic("failed to initialize conditional variable");
if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
panic("unable to start thread");
yield();
}
/*===========================================================================*
* get_work *
*===========================================================================*/
PRIVATE void get_work(struct worker_thread *worker)
{
/* Find new work to do. Work can be 'queued', 'pending', or absent. In the
* latter case wait for new work to come in. */
struct job *new_job;
struct fproc *rfp;
ASSERTW(worker);
self = worker;
/* Do we have queued work to do? */
if ((new_job = worker->w_job.j_next) != NULL) {
worker->w_job = *new_job;
free(new_job);
return;
} else if (worker != &sys_worker && worker != &dl_worker && pending > 0) {
/* Find pending work */
for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
if (rfp->fp_flags & FP_PENDING) {
worker->w_job = rfp->fp_job;
rfp->fp_job.j_func = NULL;
rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
pending--;
assert(pending >= 0);
return;
}
}
panic("Pending work inconsistency");
}
/* Wait for work to come to us */
worker_sleep(worker);
}
/*===========================================================================*
* worker_available *
*===========================================================================*/
PUBLIC int worker_available(void)
{
int busy, i;
busy = 0;
for (i = 0; i < NR_WTHREADS; i++) {
if (workers[i].w_job.j_func != NULL)
busy++;
}
return(NR_WTHREADS - busy);
}
/*===========================================================================*
* worker_main *
*===========================================================================*/
PRIVATE void *worker_main(void *arg)
{
/* Worker thread main loop */
struct worker_thread *me;
me = (struct worker_thread *) arg;
ASSERTW(me);
while(TRUE) {
get_work(me);
/* Register ourselves in fproc table if possible */
if (me->w_job.j_fp != NULL) {
me->w_job.j_fp->fp_wtid = me->w_tid;
}
/* Carry out work */
me->w_job.j_func(&me->w_job);
/* Mark ourselves as done */
me->w_job.j_func = NULL;
}
return(NULL); /* Unreachable */
}
/*===========================================================================*
* dl_worker_start *
*===========================================================================*/
PUBLIC void dl_worker_start(void *(*func)(void *arg))
{
/* Start the deadlock resolving worker. This worker is reserved to run in case
* all other workers are busy and we have to have an additional worker to come
* to the rescue. */
assert(dl_worker.w_job.j_func == NULL);
if (dl_worker.w_job.j_func == NULL) {
dl_worker.w_job.j_fp = fp;
dl_worker.w_job.j_m_in = m_in;
dl_worker.w_job.j_func = func;
worker_wake(&dl_worker);
}
}
/*===========================================================================*
* sys_worker_start *
*===========================================================================*/
PUBLIC void sys_worker_start(void *(*func)(void *arg))
{
/* Carry out work for the system (i.e., kernel or PM). If this thread is idle
* do it right away, else create new job and append it to the queue. */
if (sys_worker.w_job.j_func == NULL) {
sys_worker.w_job.j_fp = fp;
sys_worker.w_job.j_m_in = m_in;
sys_worker.w_job.j_func = func;
worker_wake(&sys_worker);
} else {
append_job(&sys_worker.w_job, func);
}
}
/*===========================================================================*
* append_job *
*===========================================================================*/
PRIVATE void append_job(struct job *job, void *(*func)(void *arg))
{
/* Append a job */
struct job *new_job, *tail;
/* Create new job */
new_job = calloc(1, sizeof(struct job));
assert(new_job != NULL);
new_job->j_fp = fp;
new_job->j_m_in = m_in;
new_job->j_func = func;
new_job->j_next = NULL;
new_job->j_err_code = OK;
/* Append to queue */
tail = job;
while (tail->j_next != NULL) tail = tail->j_next;
tail->j_next = new_job;
}
/*===========================================================================*
* worker_start *
*===========================================================================*/
PUBLIC void worker_start(void *(*func)(void *arg))
{
/* Find an available worker or wait for one */
int i;
struct worker_thread *worker;
worker = NULL;
for (i = 0; i < NR_WTHREADS; i++) {
if (workers[i].w_job.j_func == NULL) {
worker = &workers[i];
break;
}
}
if (worker != NULL) {
worker->w_job.j_fp = fp;
worker->w_job.j_m_in = m_in;
worker->w_job.j_func = func;
worker->w_job.j_next = NULL;
worker->w_job.j_err_code = OK;
worker_wake(worker);
return;
}
/* No worker threads available, let's wait for one to finish. */
/* If this process already has a job scheduled, forget about this new
* job;
* - the new job is do_dummy and we have already scheduled an actual job
* - the new job is an actual job and we have already scheduled do_dummy in
* order to exit this proc, so doing the new job is pointless. */
if (fp->fp_job.j_func == NULL) {
assert(!(fp->fp_flags & FP_PENDING));
fp->fp_job.j_fp = fp;
fp->fp_job.j_m_in = m_in;
fp->fp_job.j_func = func;
fp->fp_job.j_next = NULL;
fp->fp_job.j_err_code = OK;
fp->fp_flags |= FP_PENDING;
pending++;
}
}
/*===========================================================================*
* worker_sleep *
*===========================================================================*/
PRIVATE void worker_sleep(struct worker_thread *worker)
{
ASSERTW(worker);
assert(self == worker);
if (mutex_lock(&worker->w_event_mutex) != 0)
panic("unable to lock event mutex");
if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0)
panic("could not wait on conditional variable");
if (mutex_unlock(&worker->w_event_mutex) != 0)
panic("unable to unlock event mutex");
self = worker;
}
/*===========================================================================*
* worker_wake *
*===========================================================================*/
PRIVATE void worker_wake(struct worker_thread *worker)
{
/* Signal a worker to wake up */
ASSERTW(worker);
if (mutex_lock(&worker->w_event_mutex) != 0)
panic("unable to lock event mutex");
if (cond_signal(&worker->w_event) != 0)
panic("unable to signal conditional variable");
if (mutex_unlock(&worker->w_event_mutex) != 0)
panic("unable to unlock event mutex");
}
/*===========================================================================*
* worker_wait *
*===========================================================================*/
PUBLIC void worker_wait(void)
{
struct worker_thread *worker;
worker = worker_self();
worker->w_job.j_m_in = m_in; /* Store important global data */
worker->w_job.j_err_code = err_code;
assert(fp == worker->w_job.j_fp);
worker_sleep(worker);
/* We continue here after waking up */
fp = worker->w_job.j_fp; /* Restore global data */
m_in = worker->w_job.j_m_in;
err_code = worker->w_job.j_err_code;
assert(worker->w_next == NULL);
}
/*===========================================================================*
* worker_signal *
*===========================================================================*/
PUBLIC void worker_signal(struct worker_thread *worker)
{
ASSERTW(worker); /* Make sure we have a valid thread */
worker_wake(worker);
}
/*===========================================================================*
* worker_stop *
*===========================================================================*/
PUBLIC void worker_stop(struct worker_thread *worker)
{
ASSERTW(worker); /* Make sure we have a valid thread */
if (worker->w_job.j_fp)
worker->w_job.j_fp->fp_sendrec->m_type = EIO;
else
worker->w_job.j_m_in.m_type = EIO;
worker_wake(worker);
}
/*===========================================================================*
* worker_stop_by_endpt *
*===========================================================================*/
PUBLIC void worker_stop_by_endpt(endpoint_t proc_e)
{
struct worker_thread *worker;
int i;
if (proc_e == NONE) return;
if (worker_waiting_for(&sys_worker, proc_e)) worker_stop(&sys_worker);
if (worker_waiting_for(&dl_worker, proc_e)) worker_stop(&dl_worker);
for (i = 0; i < NR_WTHREADS; i++) {
worker = &workers[i];
if (worker_waiting_for(worker, proc_e))
worker_stop(worker);
}
}
/*===========================================================================*
* worker_self *
*===========================================================================*/
PUBLIC struct worker_thread *worker_self(void)
{
struct worker_thread *worker;
worker = worker_get(mthread_self());
assert(worker != NULL);
return(worker);
}
/*===========================================================================*
* worker_get *
*===========================================================================*/
PUBLIC struct worker_thread *worker_get(thread_t worker_tid)
{
int i;
struct worker_thread *worker;
worker = NULL;
if (worker_tid == sys_worker.w_tid)
worker = &sys_worker;
else if (worker_tid == dl_worker.w_tid)
worker = &dl_worker;
else {
for (i = 0; i < NR_WTHREADS; i++) {
if (workers[i].w_tid == worker_tid) {
worker = &workers[i];
break;
}
}
}
return(worker);
}
/*===========================================================================*
* worker_getjob *
*===========================================================================*/
PUBLIC struct job *worker_getjob(thread_t worker_tid)
{
struct worker_thread *worker;
if ((worker = worker_get(worker_tid)) != NULL)
return(&worker->w_job);
return(NULL);
}
/*===========================================================================*
* worker_waiting_for *
*===========================================================================*/
PRIVATE int worker_waiting_for(struct worker_thread *worker, endpoint_t proc_e)
{
ASSERTW(worker); /* Make sure we have a valid thread */
if (worker->w_job.j_func != NULL) {
if (worker->w_job.j_fp != NULL) {
return(worker->w_job.j_fp->fp_task == proc_e);
}
}
return(0);
}