New Data Store server.

This commit is contained in:
Jorrit Herder 2005-10-20 20:29:52 +00:00
parent 288860f6e6
commit d1f2ba26b0
7 changed files with 407 additions and 0 deletions

43
servers/ds/Makefile Normal file
View file

@ -0,0 +1,43 @@
# Makefile for Data Store Server (DS)
SERVER = ds
# directories
u = /usr
i = $u/include
s = $i/sys
m = $i/minix
b = $i/ibm
k = $u/src/kernel
p = $u/src/servers/pm
f = $u/src/servers/fs
# programs, flags, etc.
CC = exec cc
CFLAGS = -I$i
LDFLAGS = -i
LIBS = -lsys -lsysutil
OBJ = main.o store.o
# build local binary
all build: $(SERVER)
$(SERVER): $(OBJ)
$(CC) -o $@ $(LDFLAGS) $(OBJ) $(LIBS)
# install -S 256w $@
# install with other servers
install: /sbin/$(SERVER)
/sbin/$(SERVER): $(SERVER)
install -o root -c $? $@
# install -o root -cs $? $@
# clean up local files
clean:
rm -f $(SERVER) *.o *.bak
depend:
/usr/bin/mkdep "$(CC) -E $(CPPFLAGS)" *.c > .depend
# Include generated dependencies.
include .depend

7
servers/ds/glo.h Normal file
View file

@ -0,0 +1,7 @@
/* Global variables. */
/* The parameters of the call are kept here. */
extern int who; /* caller's proc number */
extern int callnr; /* system call number */
extern int dont_reply; /* normally 0; set to 1 to inhibit reply */

30
servers/ds/inc.h Normal file
View file

@ -0,0 +1,30 @@
/* Header file including all needed system headers. */
#define _SYSTEM 1 /* get OK and negative error codes */
#define _MINIX 1 /* tell headers to include MINIX stuff */
#include <ansi.h>
#include <sys/types.h>
#include <limits.h>
#include <errno.h>
#include <minix/callnr.h>
#include <minix/config.h>
#include <minix/type.h>
#include <minix/const.h>
#include <minix/com.h>
#include <minix/syslib.h>
#include <minix/sysutil.h>
#include <minix/keymap.h>
#include <minix/bitmap.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include "proto.h"
#include "glo.h"
#include "store.h"

135
servers/ds/main.c Normal file
View file

@ -0,0 +1,135 @@
/* Data Store Server.
* This service implements a little publish/subscribe data store that is
* crucial for the system's fault tolerance. Components that require state
* can store it here, for later retrieval, e.g., after a crash and subsequent
* restart by the reincarnation server.
*
* Created:
* Oct 19, 2005 by Jorrit N. Herder
*/
#include "inc.h" /* include master header file */
/* Allocate space for the global variables. */
int who; /* caller's proc number */
int callnr; /* system call number */
int sys_panic; /* flag to indicate system-wide panic */
extern int errno; /* error number set by system library */
/* Declare some local functions. */
FORWARD _PROTOTYPE(void init_server, (int argc, char **argv) );
FORWARD _PROTOTYPE(void exit_server, (void) );
FORWARD _PROTOTYPE(void get_work, (message *m_ptr) );
FORWARD _PROTOTYPE(void reply, (int whom, message *m_ptr) );
/*===========================================================================*
* main *
*===========================================================================*/
PUBLIC int main(int argc, char **argv)
{
/* This is the main routine of this service. The main loop consists of
* three major activities: getting new work, processing the work, and
* sending the reply. The loop never terminates, unless a panic occurs.
*/
message m;
int result;
sigset_t sigset;
/* Initialize the server, then go to work. */
init_server(argc, argv);
/* Main loop - get work and do it, forever. */
while (TRUE) {
/* Wait for incoming message, sets 'callnr' and 'who'. */
get_work(&m);
switch (callnr) {
case SYS_SIG:
sigset = (sigset_t) m.NOTIFY_ARG;
if (sigismember(&sigset,SIGTERM) || sigismember(&sigset,SIGKSTOP)) {
exit_server();
}
continue;
case DS_PUBLISH:
result = do_publish(&m);
break;
case DS_RETRIEVE:
result = do_retrieve(&m);
break;
case DS_SUBSCRIBE:
result = do_subscribe(&m);
break;
case GETSYSINFO:
result = do_getsysinfo(&m);
break;
default:
report("DS","warning, got illegal request from:", m.m_source);
result = EINVAL;
}
/* Finally send reply message, unless disabled. */
if (result != EDONTREPLY) {
m.m_type = result; /* build reply message */
reply(who, &m); /* send it away */
}
}
return(OK); /* shouldn't come here */
}
/*===========================================================================*
* init_server *
*===========================================================================*/
PRIVATE void init_server(int argc, char **argv)
{
/* Initialize the data store server. */
int i, s;
struct sigaction sigact;
/* Install signal handler. Ask PM to transform signal into message. */
sigact.sa_handler = SIG_MESS;
sigact.sa_mask = ~0; /* block all other signals */
sigact.sa_flags = 0; /* default behaviour */
if (sigaction(SIGTERM, &sigact, NULL) < 0)
report("DS","warning, sigaction() failed", errno);
}
/*===========================================================================*
* exit_server *
*===========================================================================*/
PRIVATE void exit_server()
{
/* Shut down the information service. */
/* Done. Now exit. */
exit(0);
}
/*===========================================================================*
* get_work *
*===========================================================================*/
PRIVATE void get_work(m_ptr)
message *m_ptr; /* message buffer */
{
int status = 0;
status = receive(ANY, m_ptr); /* this blocks until message arrives */
if (OK != status)
panic("DS","failed to receive message!", status);
who = m_ptr->m_source; /* message arrived! set sender */
callnr = m_ptr->m_type; /* set function call number */
}
/*===========================================================================*
* reply *
*===========================================================================*/
PRIVATE void reply(who, m_ptr)
int who; /* destination */
message *m_ptr; /* message buffer */
{
int s;
s = send(who, m_ptr); /* send the message */
if (OK != s)
panic("DS", "unable to send reply!", s);
}

10
servers/ds/proto.h Normal file
View file

@ -0,0 +1,10 @@
/* Function prototypes. */
/* main.c */
_PROTOTYPE(int main, (int argc, char **argv));
/* store.c */
_PROTOTYPE(int do_publish, (message *m_ptr));
_PROTOTYPE(int do_retrieve, (message *m_ptr));
_PROTOTYPE(int do_subscribe, (message *m_ptr));
_PROTOTYPE(int do_getsysinfo, (message *m_ptr));

163
servers/ds/store.c Normal file
View file

@ -0,0 +1,163 @@
/* Implementation of the Data Store. */
#include "inc.h"
/* Allocate space for the data store. */
PRIVATE struct data_store ds_store[NR_DS_KEYS];
PRIVATE int nr_in_use;
PRIVATE _PROTOTYPE(int find_key, (int key, struct data_store **dsp));
PRIVATE _PROTOTYPE(int set_owner, (struct data_store *dsp, void *auth_ptr));
PRIVATE _PROTOTYPE(int is_authorized, (struct data_store *dsp, void *auth_ptr));
PRIVATE int set_owner(dsp, ap)
struct data_store *dsp; /* data store structure */
void *ap; /* authorization pointer */
{
/* Authorize the caller. */
return(TRUE);
}
PRIVATE int is_authorized(dsp, ap)
struct data_store *dsp; /* data store structure */
void *ap; /* authorization pointer */
{
/* Authorize the caller. */
return(TRUE);
}
PRIVATE int find_key(key, dsp)
int key; /* key to look up */
struct data_store **dsp; /* store pointer here */
{
register int i;
*dsp = NULL;
for (i=0; i<NR_DS_KEYS; i++) {
if ((ds_store[i].ds_flags & DS_IN_USE) && ds_store[i].ds_key == key) {
*dsp = &ds_store[i];
return(TRUE); /* report success */
}
}
return(FALSE); /* report not found */
}
PUBLIC int do_publish(m_ptr)
message *m_ptr; /* request message */
{
struct data_store *dsp;
/* Store (key,value)-pair. First see if key already exists. If so,
* check if the caller is allowed to overwrite the value. Otherwise
* find a new slot and store the new value.
*/
if (find_key(m_ptr->DS_KEY, &dsp)) { /* look up key */
if (! is_authorized(dsp,m_ptr->DS_AUTH)) { /* check if owner */
return(EPERM);
}
}
else { /* find a new slot */
if (nr_in_use >= NR_DS_KEYS) {
return(EAGAIN); /* store is full */
} else {
dsp = &ds_store[nr_in_use]; /* new slot found */
dsp->ds_key = m_ptr->DS_KEY;
if (! set_owner(dsp,m_ptr->DS_AUTH)) { /* associate owner */
return(EINVAL);
}
dsp->ds_nr_subs = 0; /* nr of subscribers */
dsp->ds_flags = DS_IN_USE; /* initialize slot */
nr_in_use ++;
}
}
/* At this point we have a data store pointer and know the caller is
* authorize to write to it. Set all fields as requested.
*/
dsp->ds_val_l1 = m_ptr->DS_VAL_L1; /* store all data */
dsp->ds_val_l2 = m_ptr->DS_VAL_L2;
/* If the data is public. Check if there are any subscribers to this key.
* If so, notify all subscribers so that they can retrieve the data, if
* they're still interested.
*/
if ((dsp->ds_flags & DS_PUBLIC) && dsp->ds_nr_subs > 0) {
/* Subscriptions are not yet implemented. */
}
return(OK);
}
PUBLIC int do_retrieve(m_ptr)
message *m_ptr; /* request message */
{
struct data_store *dsp;
/* Retrieve data. Look up the key in the data store. Return an error if it
* is not found. If this data is private, only the owner may retrieve it.
*/
if (find_key(m_ptr->DS_KEY, &dsp)) { /* look up key */
/* If the data is not public, the caller must be authorized. */
if (! dsp->ds_flags & DS_PUBLIC) { /* check if private */
if (! is_authorized(dsp,m_ptr->DS_AUTH)) { /* authorize call */
return(EPERM); /* not allowed */
}
}
/* Data is public or the caller is authorized to retrieve it. */
printf("DS retrieves data: key %d (found %d), l1 %u, l2 %u\n",
m_ptr->DS_KEY, dsp->ds_key, dsp->ds_val_l1, dsp->ds_val_l2);
m_ptr->DS_VAL_L1 = dsp->ds_val_l1; /* return value */
m_ptr->DS_VAL_L2 = dsp->ds_val_l2; /* return value */
return(OK); /* report success */
}
return(ESRCH); /* key not found */
}
PUBLIC int do_subscribe(m_ptr)
message *m_ptr; /* request message */
{
/* Subscribe to a key of interest. Only existing and public keys can be
* subscribed to. All updates to the key will cause a notification message
* to be sent to the subscribed. On success, directly return a copy of the
* data for the given key.
*/
return(ENOSYS);
}
/*===========================================================================*
* do_getsysinfo *
*===========================================================================*/
PUBLIC int do_getsysinfo(m_ptr)
message *m_ptr;
{
vir_bytes src_addr, dst_addr;
int dst_proc;
size_t len;
int s;
switch(m_ptr->m1_i1) {
case SI_DATA_STORE:
src_addr = (vir_bytes) ds_store;
len = sizeof(struct data_store) * NR_DS_KEYS;
break;
default:
return(EINVAL);
}
dst_proc = m_ptr->m_source;
dst_addr = (vir_bytes) m_ptr->m1_p1;
if (OK != (s=sys_datacopy(SELF, src_addr, dst_proc, dst_addr, len)))
return(s);
return(OK);
}

19
servers/ds/store.h Normal file
View file

@ -0,0 +1,19 @@
/* Type definitions for the Data Store Server. */
struct data_store {
int ds_flags; /* flags for this store */
int ds_key; /* key to lookup information */
long ds_val_l1; /* data associated with key */
long ds_val_l2; /* data associated with key */
long ds_auth; /* secret given by owner of data */
int ds_nr_subs; /* number of subscribers for key */
};
/* Flag values. */
#define DS_IN_USE 0x01
#define DS_PUBLIC 0x02
/* Constants for the Data Store Server. */
#define NR_DS_KEYS 64 /* reserve space for so many items */