minix/minix/kernel/mqueue.c
Sanchayan Maity fe71ea4bc1 First round of assorted bug fixes
Fixed the following bugs:
1. Message priority was not set at all at two places
2. Limits check was not correct while returning error values
3. We are suppose to allow multiple users operate on the same queue
using a named descriptor. However any one user calling close before
the other would result in closing the queue even when other users
might be using it. Track number of users and close only if number of
users is zero. Yet to do functional tests for this, so this is untested,
but introduce it anyway at this juncture.
2016-03-19 11:49:48 +05:30

210 lines
4.2 KiB
C

#include <string.h>
#include <sys/errno.h>
#include <minix/sysutil.h>
#include "mqueue.h"
#include "kernel/kernel.h"
#include <minix/endpoint.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include "clock.h"
#include "proto.h"
message_queue mq;
int initialize_message_queues(void)
{
mq.num_queues = 0;
for (int i = 0; i < MAX_QUEUES; i++) {
mq.queue_slot_empty[i] = EMPTY;
mq.msg[i].num_users = 0;
mq.msg[i].num_msgs = 0;
}
return 0;
}
int deinitialize_message_queues(void)
{
mq.num_queues = -1;
for (int i = 0; i < MAX_QUEUES; i++) {
mq.msg[i].num_msgs = -1;
mq.msg[i].num_users = -1;
mq.queue_slot_empty[i] = EMPTY;
}
return 0;
}
int mq_open(const char *name, int oflag)
{
mqd_t mqd;
if (mq.num_queues >= MAX_QUEUES)
return EMQUEUEFULL;
if (strlen(name) > NAME_SIZE)
return EINVAL;
for (int i = 0; i < MAX_QUEUES; i++)
if (mq.queue_slot_empty[i] == NOT_EMPTY)
if (strcmp(mq.msg[i].name, name) == 0) {
mq.msg[i].num_users++;
return (mqd_t) i;
}
for (int i = 0; i < MAX_QUEUES; i++) {
if (mq.queue_slot_empty[i] == EMPTY) {
mqd = i;
mq.queue_slot_empty[i] = NOT_EMPTY;
mq.num_queues++;
mq.msg[i].num_users++;
strncpy(mq.msg[i].name, name, strlen(name));
for (int j = 0; j < MAX_MESSAGES; j++) {
mq.msg[i].msg_slot_empty[j] = EMPTY;
mq.msg[i].msge[j].priority = DEFAULT_PRIO;
for (int k = 0; k < MAX_RECEIVERS; k++)
mq.msg[i].msge[j].dst[k] = EMPTY;
}
break;
}
}
return mqd;
}
int mq_close(mqd_t mqdes)
{
if (mqdes < 0 || mqdes >= MAX_QUEUES)
return EINVAL;
if (mq.queue_slot_empty[mqdes] == EMPTY)
return EMSGNOTFOUND;
mq.msg[mqdes].num_users--;
if (mq.msg[mqdes].num_users == 0) {
mq.msg[mqdes].num_msgs = 0;
mq.queue_slot_empty[mqdes] = EMPTY;
mq.num_queues--;
}
return 0;
}
int mq_send(mqd_t mqdes, const char *msg_ptr, unsigned int msg_prio, endpoint_t src, endpoint_t dst[])
{
if (mqdes < 0 || mqdes >= MAX_QUEUES)
return EINVAL;
if (mq.queue_slot_empty[mqdes] == EMPTY)
return EMSGNOTFOUND;
if (mq.msg[mqdes].num_msgs > MAX_MESSAGES)
return EMSGFULL;
int empty_slot_pos;
for (int i = 0; i < MAX_MESSAGES; i++)
if (mq.msg[mqdes].msg_slot_empty[i] == EMPTY) {
empty_slot_pos = i;
break;
}
memset(mq.msg[mqdes].msge[empty_slot_pos].msg, 0, MAX_PAYLOAD);
memcpy(mq.msg[mqdes].msge[empty_slot_pos].msg,
msg_ptr, MAX_PAYLOAD);
mq.msg[mqdes].msge[empty_slot_pos].src = src;
mq.msg[mqdes].msge[empty_slot_pos].priority = msg_prio;
mq.msg[mqdes].msg_slot_empty[empty_slot_pos] = NOT_EMPTY;
for (int i = 0; i < MAX_RECEIVERS; i++)
mq.msg[mqdes].msge[empty_slot_pos].dst[i] = dst[i];
mq.msg[mqdes].msge[empty_slot_pos].timestamp = get_monotonic();
mq.msg[mqdes].num_msgs++;
return 0;
}
size_t mq_receive(mqd_t mqdes, char *msg_ptr, unsigned int msg_prio, endpoint_t dst)
{
if (mqdes < 0 || mqdes >= MAX_QUEUES)
return EINVAL;
if (mq.queue_slot_empty[mqdes] == EMPTY)
return EMSGNOTFOUND;
if (mq.msg[mqdes].num_msgs == 0)
return EMSGEMPTY;
int index = message_index_with_highprio(mqdes, dst);
if (index == -1)
return EMSGEMPTY;
for (int i = 0; i < MAX_RECEIVERS; i++) {
if (mq.msg[mqdes].msge[index].dst[i] == dst) {
memcpy(msg_ptr, mq.msg[mqdes].msge[index].msg, MAX_PAYLOAD);
mq.msg[mqdes].msge[index].dst[i] = EMPTY;
clean_message_queue(mqdes);
}
}
return 0;
}
int find_max(int n1, int n2)
{
if (n1 > n2)
return n1;
return n2;
}
int message_index_with_highprio(int mqdes, endpoint_t dst)
{
int prio;
int max_prio = -1;
int index = -1;
for (int i = 0; i < MAX_MESSAGES; i++) {
if (mq.msg[mqdes].msg_slot_empty[i] == NOT_EMPTY) {
prio = mq.msg[mqdes].msge[i].priority;
for (int j = 0; j < MAX_RECEIVERS; j++) {
if (mq.msg[mqdes].msge[i].dst[j] == dst) {
if (max_prio < find_max(max_prio, prio)) {
index = i;
max_prio = find_max(max_prio, prio);
}
}
}
}
}
return index;
}
int clean_message_queue(mqd_t mqdes)
{
int flag = 1;
for (int i = 0; i < MAX_MESSAGES; i++) {
if (mq.msg[mqdes].msg_slot_empty[i] == NOT_EMPTY) {
for (int j = 0; j < MAX_RECEIVERS; j++)
if (mq.msg[mqdes].msge[i].dst[j] != EMPTY)
flag = 0;
}
if (flag == 1) {
mq.msg[mqdes].msg_slot_empty[i] = EMPTY;
mq.msg[mqdes].num_msgs--;
}
}
return 0;
}