Add message queue support for Minix

Implement message queues support for Minix.

Pending tasks:
Blocking or non blocking for send receive?
Sort by priority and time?
Test codes?
Bug fixes?
This commit is contained in:
Sanchayan Maity 2016-03-07 17:41:07 +05:30
parent 6f36b466f3
commit 63062e7e15
17 changed files with 619 additions and 2 deletions

View file

@ -1231,6 +1231,39 @@ typedef struct {
} mess_lsys_krn_sys_vumap;
_ASSERT_MSG_SIZE(mess_lsys_krn_sys_vumap);
typedef struct {
char name[32];
int oflag;
uint8_t padding[20];
} mess_lsys_krn_sys_mqueue_open;
_ASSERT_MSG_SIZE(mess_lsys_krn_sys_mqueue_open);
typedef struct {
int mqdes;
uint8_t padding[52];
} mess_lsys_krn_sys_mqueue_close;
_ASSERT_MSG_SIZE(mess_lsys_krn_sys_mqueue_close);
typedef struct {
int mqdes;
char msg[32];
unsigned int msg_prio;
endpoint_t dst[4];
} mess_lsys_krn_sys_mqueue_send;
_ASSERT_MSG_SIZE(mess_lsys_krn_sys_mqueue_send);
typedef struct {
int mqdes;
char msg[32];
unsigned int msg_prio;
endpoint_t dst;
uint8_t padding[12];
} mess_lsys_krn_sys_mqueue_receive;
_ASSERT_MSG_SIZE(mess_lsys_krn_sys_mqueue_receive);
typedef struct {
void *vec_addr;
int vec_size;
@ -2248,6 +2281,10 @@ typedef struct noxfer_message {
mess_vm_vfs_mmap m_vm_vfs_mmap;
mess_vmmcp m_vmmcp;
mess_vmmcp_reply m_vmmcp_reply;
mess_lsys_krn_sys_mqueue_open m_lsys_krn_sys_mqueue_open;
mess_lsys_krn_sys_mqueue_close m_lsys_krn_sys_mqueue_close;
mess_lsys_krn_sys_mqueue_send m_lsys_krn_sys_mqueue_send;
mess_lsys_krn_sys_mqueue_receive m_lsys_krn_sys_mqueue_receive;
u8_t size[56]; /* message payload may have 56 bytes at most */
};

View file

@ -275,5 +275,11 @@ int copyfd(endpoint_t endpt, int fd, int what);
#define COPYFD_TO 1 /* copy file descriptor to remote process */
#define COPYFD_CLOSE 2 /* close file descriptor in remote process */
int sys_mq_open(const char *name, int oflag);
int sys_mq_send(int mqdes, const char *msg_ptr, pid_t dst[], unsigned int msg_prio);
int sys_mq_receive(int mqdes, char *msg_ptr, unsigned int msg_prio);
int sys_mq_close(int mqdes);
int sys_endpoint_from_pid(pid_t pid, endpoint_t *endpoint);
#endif /* _SYSLIB_H */

View file

@ -13,7 +13,7 @@ DBG=-O0
.include "arch/${MACHINE_ARCH}/Makefile.inc"
SRCS+= clock.c cpulocals.c interrupt.c main.c proc.c system.c \
table.c utility.c usermapped_data.c
table.c utility.c usermapped_data.c mqueue.c
LDADD+= -ltimers -lsys -lexec

View file

@ -23,6 +23,7 @@
#include "direct_utils.h"
#include "hw_intr.h"
#include "arch_proto.h"
#include "mqueue.h"
#ifdef CONFIG_SMP
#include "smp.h"
@ -304,6 +305,8 @@ void kmain(kinfo_t *local_cbi)
*/
add_memmap(&kinfo, kinfo.bootstrap_start, kinfo.bootstrap_len);
initialize_message_queues();
#ifdef CONFIG_SMP
if (config_no_apic) {
DEBUGBASIC(("APIC disabled, disables SMP, using legacy PIC\n"));
@ -376,6 +379,8 @@ void minix_shutdown(minix_timer_t *tp)
*/
int how;
deinitialize_message_queues();
#ifdef CONFIG_SMP
/*
* FIXME

202
minix/kernel/mqueue.c Normal file
View file

@ -0,0 +1,202 @@
#include <string.h>
#include <sys/errno.h>
#include <minix/sysutil.h>
#include "mqueue.h"
#include "kernel/kernel.h"
#include <minix/endpoint.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include "clock.h"
#include "proto.h"
message_queue mq;
int initialize_message_queues(void)
{
mq.num_queues = 0;
for (int i = 0; i < MAX_QUEUES; i++) {
mq.queue_slot_empty[i] = EMPTY;
mq.msg[i].num_msgs = 0;
}
return 0;
}
int deinitialize_message_queues(void)
{
mq.num_queues = -1;
for (int i = 0; i < MAX_QUEUES; i++) {
mq.msg[i].num_msgs = -1;
mq.queue_slot_empty[i] = -1;
}
return 0;
}
int mq_open(const char *name, int oflag)
{
mqd_t mqd;
if (mq.num_queues == MAX_QUEUES)
return (mqd_t) -1;
if (strlen(name) > NAME_SIZE)
return (mqd_t) -1;
for (int i = 0; i < MAX_QUEUES; i++)
if (mq.queue_slot_empty[i] == NOT_EMPTY)
if (strcmp(mq.msg[i].name, name) == 0)
return (mqd_t) i;
for (int i = 0; i < MAX_QUEUES; i++) {
if (mq.queue_slot_empty[i] == EMPTY) {
mqd = i;
mq.queue_slot_empty[i] = NOT_EMPTY;
mq.num_queues++;
strncpy(mq.msg[i].name, name, strlen(name));
for (int j = 0; j < MAX_MESSAGES; j++) {
mq.msg[i].msg_slot_empty[j] = NOT_EMPTY;
mq.msg[i].msge[j].priority = DEFAULT_PRIO;
for (int k = 0; k < MAX_RECEIVERS; k++)
mq.msg[i].msge[j].dst[k] = EMPTY;
}
break;
}
}
return mqd;
}
int mq_close(mqd_t mqdes)
{
if (mqdes < 0 || mqdes >= MAX_QUEUES)
return EINVAL;
if (mq.queue_slot_empty[mqdes] == EMPTY)
return EMSGNOTFOUND;
mq.msg[mqdes].num_msgs = 0;
mq.queue_slot_empty[mqdes] = EMPTY;
mq.num_queues--;
return 0;
}
int mq_send(mqd_t mqdes, const char *msg_ptr, unsigned int msg_prio, endpoint_t src, endpoint_t dst[])
{
if (mqdes < 0 || mqdes >= MAX_QUEUES)
return EINVAL;
if (mq.queue_slot_empty[mqdes] == EMPTY)
return EMSGNOTFOUND;
if (mq.msg[mqdes].num_msgs >= MAX_MESSAGES)
return EMSGFULL;
int empty_slot_pos;
for (int i = 0; i < MAX_MESSAGES; i++)
if (mq.msg[mqdes].msg_slot_empty[i] == EMPTY) {
empty_slot_pos = i;
break;
}
memcpy(mq.msg[mqdes].msge[empty_slot_pos].msg,
msg_ptr, MAX_PAYLOAD);
mq.msg[mqdes].msge[empty_slot_pos].src = src;
mq.msg[mqdes].msg_slot_empty[empty_slot_pos] = NOT_EMPTY;
for (int i = 0; i < MAX_RECEIVERS; i++)
mq.msg[mqdes].msge[empty_slot_pos].dst[i] = dst[i];
mq.msg[mqdes].msge[empty_slot_pos].timestamp = get_monotonic();
mq.msg[mqdes].num_msgs++;
return 0;
}
size_t mq_receive(mqd_t mqdes, char *msg_ptr, unsigned int msg_prio, endpoint_t dst)
{
if (mqdes < 0 || mqdes >= MAX_QUEUES)
return EINVAL;
if (mq.queue_slot_empty[mqdes] == EMPTY)
return EMSGNOTFOUND;
if (mq.msg[mqdes].num_msgs >= MAX_MESSAGES)
return EMSGFULL;
if (mq.msg[mqdes].num_msgs == 0)
return EMSGEMPTY;
int index = message_index_with_highprio(mqdes, dst);
if (index == -1)
return EMSGEMPTY;
for (int i = 0; i < MAX_RECEIVERS; i++) {
if (mq.msg[mqdes].msge[index].dst[i] == dst) {
memcpy(msg_ptr, mq.msg[mqdes].msge[index].msg, MAX_PAYLOAD);
mq.msg[mqdes].msge[index].dst[i] = EMPTY;
clean_message_queue(mqdes);
}
}
return 0;
}
int find_max(int n1, int n2)
{
if (n1 > n2)
return n1;
return n2;
}
int message_index_with_highprio(mqdes, dst)
{
int prio;
int max_prio = -1;
int index = -1;
for (int i = 0; i < MAX_MESSAGES; i++) {
prio = mq.msg[mqdes].msge[i].priority;
if (mq.msg[mqdes].msg_slot_empty[i] == NOT_EMPTY) {
for (int j = 0; j < MAX_RECEIVERS; j++) {
if (mq.msg[mqdes].msge[i].dst[j] == dst) {
if (max_prio < find_max(max_prio, prio)) {
index = i;
max_prio = find_max(max_prio, prio);
}
}
}
}
}
return index;
}
int clean_message_queue(mqd_t mqdes)
{
int flag = 1;
for (int i = 0; i < MAX_MESSAGES; i++) {
if (mq.msg[mqdes].msg_slot_empty[i] == NOT_EMPTY) {
for (int j = 0; j < MAX_RECEIVERS; j++)
if (mq.msg[mqdes].msge[i].dst[j] != EMPTY)
flag = 0;
}
if (flag == 1) {
mq.msg[mqdes].msg_slot_empty[i] = EMPTY;
mq.msg[mqdes].num_msgs--;
}
}
return 0;
}

56
minix/kernel/mqueue.h Normal file
View file

@ -0,0 +1,56 @@
#ifndef MQUEUE_H
#define MQUEUE_H
/*
* This header file defines constants and function declarations used for
* MINIX interprocess message queues. These are used primarily in file
* mqueue.c
*/
#include <minix/type.h>
#define MAX_RECEIVERS 4
#define MAX_MESSAGES 64
#define MAX_QUEUES 64
#define MAX_PAYLOAD 32
#define NAME_SIZE 32
#define DEFAULT_PRIO 0
#define EMPTY -1
#define NOT_EMPTY 1
typedef int mqd_t;
typedef struct message_entity {
u64_t timestamp;
char msg[MAX_PAYLOAD];
endpoint_t src;
endpoint_t dst[MAX_RECEIVERS];
unsigned int priority;
} message_entity;
typedef struct messageq {
char name[NAME_SIZE];
int num_msgs;
message_entity msge[MAX_MESSAGES];
int msg_slot_empty[MAX_MESSAGES];
} messageq;
typedef struct message_queue {
int num_queues;
messageq msg[MAX_QUEUES];
int queue_slot_empty[MAX_QUEUES];
} message_queue;
int initialize_message_queues(void);
int deinitialize_message_queues(void);
mqd_t mq_open(const char *name, int oflag);
int mq_send(mqd_t mqdes, const char *msg_ptr, unsigned int msg_prio, endpoint_t src, endpoint_t dst[]);
size_t mq_receive(mqd_t mqdes, char *msg_ptr, unsigned int msg_prio, endpoint_t dst);
int mq_close(mqd_t mqdes);
int clean_message_queue(mqd_t mqdes);
int message_index_with_highprio(mqd_t mqdes, endpoint_t dst);
#endif /* MQUEUE_H */

View file

@ -0,0 +1,26 @@
/* The kernel call implemented in this file:
* m_type: SYS_MQ_CLOSE
*
*/
#include "kernel/mqueue.h"
#include "kernel/system.h"
#include "kernel/vm.h"
#include <signal.h>
#include <string.h>
#include <assert.h>
#include <minix/endpoint.h>
#include <minix/u64.h>
#if USE_MQ_IPC
/*===========================================================================*
* do_mq_close *
*===========================================================================*/
int do_mq_close(struct proc * caller, message * m_ptr)
{
return mq_close(m_ptr->m_lsys_krn_sys_mqueue_close.mqdes);
}
#endif /* USE_MQ_IPC */

View file

@ -0,0 +1,28 @@
/* The kernel call implemented in this file:
* m_type: SYS_MQ_OPEN
*
*/
#include "kernel/mqueue.h"
#include "kernel/system.h"
#include "kernel/vm.h"
#include <signal.h>
#include <string.h>
#include <assert.h>
#include <minix/endpoint.h>
#include <minix/u64.h>
#if USE_MQ_IPC
/*===========================================================================*
* do_mq_open *
*===========================================================================*/
int do_mq_open(struct proc * caller, message * m_ptr)
{
return mq_open(m_ptr->m_lsys_krn_sys_mqueue_open.name,
m_ptr->m_lsys_krn_sys_mqueue_open.oflag);
}
#endif /* USE_MQ_IPC */

View file

@ -0,0 +1,29 @@
/* The kernel call implemented in this file:
* m_type: SYS_MQ_REC
*
*/
#include "kernel/mqueue.h"
#include "kernel/system.h"
#include "kernel/vm.h"
#include <signal.h>
#include <string.h>
#include <assert.h>
#include <minix/endpoint.h>
#include <minix/u64.h>
#if USE_MQ_IPC
/*===========================================================================*
* do_mq_rec *
*===========================================================================*/
int do_mq_rec(struct proc * caller, message * m_ptr)
{
return mq_receive(m_ptr->m_lsys_krn_sys_mqueue_receive.mqdes,
m_ptr->m_lsys_krn_sys_mqueue_receive.msg,
m_ptr->m_lsys_krn_sys_mqueue_receive.msg_prio,
caller->p_endpoint);
}
#endif /* USE_MQ_IPC */

View file

@ -0,0 +1,30 @@
/* The kernel call implemented in this file:
* m_type: SYS_MQ_SEND
*
*/
#include "kernel/mqueue.h"
#include "kernel/system.h"
#include "kernel/vm.h"
#include <signal.h>
#include <string.h>
#include <assert.h>
#include <minix/endpoint.h>
#include <minix/u64.h>
#if USE_MQ_IPC
/*===========================================================================*
* do_mq_send *
*===========================================================================*/
int do_mq_send(struct proc * caller, message * m_ptr)
{
return mq_send(m_ptr->m_lsys_krn_sys_mqueue_send.mqdes,
m_ptr->m_lsys_krn_sys_mqueue_send.msg,
m_ptr->m_lsys_krn_sys_mqueue_send.msg_prio,
caller->p_endpoint,
m_ptr->m_lsys_krn_sys_mqueue_send.dst);
}
#endif /* USE_MQ_IPC */

View file

@ -88,6 +88,11 @@ SRCS+= \
sys_vsafecopy.c \
sys_vtimer.c \
sys_vumap.c \
sys_mq_open.c \
sys_mq_close.c \
sys_mq_send.c \
sys_mq_receive.c \
sys_endpoint_from_pid.c \
taskcall.c \
tickdelay.c \
timers.c \

View file

@ -0,0 +1,108 @@
#include <minix/procfs.h>
#include <string.h>
#include "syslib.h"
struct pstat { /* structure filled by pstat() */
struct pstat *ps_next; /* next in process list */
int ps_task; /* is this process a task or not? */
int ps_endpt; /* process endpoint (NONE means unused slot) */
dev_t ps_dev; /* major/minor of controlling tty */
uid_t ps_ruid; /* real uid */
uid_t ps_euid; /* effective uid */
pid_t ps_pid; /* process id */
pid_t ps_ppid; /* parent process id */
int ps_pgrp; /* process group id */
char ps_state; /* process state */
char ps_pstate; /* sleep state */
char ps_fstate; /* VFS block state */
int ps_ftask; /* VFS suspend task (endpoint) */
vir_bytes ps_memory; /* memory usage */
int ps_recv; /* process number to receive from (endpoint) */
unsigned int ps_utime; /* accumulated user time */
unsigned int ps_stime; /* accumulated system time */
char ps_name[PROC_NAME_LEN + 1];/* process name */
char *ps_args; /* concatenated argument string */
};
int pstat(struct pstat *ps, pid_t pid);
int sys_endpoint_from_pid(pid_t pid, endpoint_t *endpoint) {
struct pstat ps;
int ret = pstat(&ps, pid);
if(!ret)
*endpoint = ps.ps_endpt;
return ret;
}
/* Taken from minix/commands/ps/ps.c */
int pstat(struct pstat *ps, pid_t pid) {
FILE *fp;
int version, ruid, euid, dev;
char type, path[PATH_MAX], name[256];
ps->ps_pid = pid;
ps->ps_next = NULL;
sprintf(path, "/proc/%d/psinfo", pid);
if ((fp = fopen(path, "r")) == NULL)
return -1;
if (fscanf(fp, "%d", &version) != 1) {
fclose(fp);
return -1;
}
/* The psinfo file's version must match what we expect. */
if (version != PSINFO_VERSION) {
fputs("procfs version mismatch!\n", stderr);
return -1;
}
if (fscanf(fp, " %c %d %255s %c %d %*d %u %u %*u %*u", &type, &ps->ps_endpt,
name, &ps->ps_state, &ps->ps_recv, &ps->ps_utime, &ps->ps_stime)
!= 7) {
fclose(fp);
return -1;
}
strncpy(ps->ps_name, name, sizeof(ps->ps_name) - 1);
ps->ps_name[sizeof(ps->ps_name) - 1] = 0;
ps->ps_task = type == TYPE_TASK;
if (!ps->ps_task) {
if (fscanf(fp, " %lu %*u %*u %c %d %u %u %u %*d %c %d %u",
&ps->ps_memory, &ps->ps_pstate, &ps->ps_ppid, &ruid, &euid,
&ps->ps_pgrp, &ps->ps_fstate, &ps->ps_ftask, &dev) != 9) {
fclose(fp);
return -1;
}
ps->ps_ruid = ruid;
ps->ps_euid = euid;
ps->ps_dev = dev;
} else {
ps->ps_memory = 0L;
ps->ps_pstate = PSTATE_NONE;
ps->ps_ppid = 0;
ps->ps_ruid = 0;
ps->ps_euid = 0;
ps->ps_pgrp = 0;
ps->ps_fstate = FSTATE_NONE;
ps->ps_ftask = NONE;
ps->ps_dev = NO_DEV;
}
fclose(fp);
if (ps->ps_state == STATE_ZOMBIE)
strncpy(ps->ps_args, "<defunct>", 9);
else
ps->ps_args = NULL;
return OK;
}

View file

@ -0,0 +1,10 @@
#include "syslib.h"
int sys_mq_close(int mqdes)
{
message m;
m.m_lsys_krn_sys_mqueue_close.mqdes = mqdes;
return _kernel_call(SYS_MQ_CLOSE, &m);
}

View file

@ -0,0 +1,14 @@
# include "syslib.h"
# include <string.h>
#define MAX_PAYLOAD 32
int sys_mq_open(const char *name, int oflag)
{
message m;
m.m_lsys_krn_sys_mqueue_open.oflag = oflag;
strncpy(m.m_lsys_krn_sys_mqueue_open.name, name, MAX_PAYLOAD);
return _kernel_call(SYS_MQ_OPEN, &m);
}

View file

@ -0,0 +1,24 @@
#include "syslib.h"
#include <string.h>
#include <machine/archtypes.h>
#include <minix/timers.h>
#include <minix/sysutil.h>
#include <minix/vm.h>
#define MAX_PAYLOAD 32
int sys_mq_receive(int mqdes, char *msg_ptr, unsigned int msg_prio)
{
int ret;
message m;
m.m_lsys_krn_sys_mqueue_receive.mqdes = mqdes;
m.m_lsys_krn_sys_mqueue_receive.msg_prio = msg_prio;
ret = _kernel_call(SYS_MQ_REC, &m);
if (ret == 0)
memcpy(msg_ptr, m.m_lsys_krn_sys_mqueue_receive.msg, MAX_PAYLOAD);
return ret;
}

View file

@ -0,0 +1,31 @@
#include "syslib.h"
#include <string.h>
#include <machine/archtypes.h>
#include <minix/timers.h>
#include <minix/sysutil.h>
#include <minix/vm.h>
#define MAX_RECEIVERS 4
#define MAX_PAYLOAD 32
int sys_mq_send(int mqdes, const char *msg_ptr, pid_t dst[], unsigned int msg_prio)
{
message m;
m.m_lsys_krn_sys_mqueue_send.mqdes = mqdes;
for (int i = 0; i < MAX_RECEIVERS; i++) {
if (dst[i] == -1) {
m.m_lsys_krn_sys_mqueue_send.dst[i] = dst[i];
break;
}
endpoint_t endpoint;
int ret = sys_endpoint_from_pid(dst[i], &endpoint);
if (ret != 0)
endpoint = -1;
m.m_lsys_krn_sys_mqueue_send.dst[i] = endpoint;
}
memcpy(m.m_lsys_krn_sys_mqueue_send.msg, msg_ptr, MAX_PAYLOAD);
return (_kernel_call(SYS_MQ_SEND, &m));
}

View file

@ -172,7 +172,13 @@
#define ENOLINK (_SIGN 95 ) /* Link has been severed */
#define EPROTO (_SIGN 96 ) /* Protocol error */
#define ELAST (_SIGN 96 ) /* Must equal largest errno */
#define EMQUEUEFULL (_SIGN 97 ) /* Message queue is full */
#define EMSGNOTFOUND (_SIGN 98 ) /* Message queue descriptor does not exist */
#define EMSGFULL (_SIGN 99 ) /* Message queue with given descriptor is full */
#define EMSGNONE (_SIGN 100 ) /* No message for process to retrieve */
#define EMSGEMPTY (_SIGN 101 ) /* Message queue empty */
#define ELAST (_SIGN 101 ) /* Must equal largest errno */
#if defined(_KERNEL) || defined(_KMEMUSER)
/* pseudo-errors returned inside kernel to modify return to process */