#include #include #include #include "mqueue.h" #include "kernel/kernel.h" #include #include #include #include #include "clock.h" #include "proto.h" message_queue mq; int initialize_message_queues(void) { mq.num_queues = 0; number_of_queues = MAX_QUEUES; number_of_messages = MAX_MESSAGES; mq_blocking = MQ_NON_BLOCKING; notify = NOTIFY_OFF; for (int i = 0; i < number_of_queues; i++) { mq.queue_slot_empty[i] = EMPTY; mq.msg[i].num_users = 0; mq.msg[i].num_msgs = 0; } return 0; } int deinitialize_message_queues(void) { mq.num_queues = -1; for (int i = 0; i < number_of_queues; i++) { mq.msg[i].num_msgs = -1; mq.msg[i].num_users = -1; mq.queue_slot_empty[i] = EMPTY; } return 0; } int mq_get_attributes(int *no_of_msgs, int *no_of_queues, int *blocking) { *no_of_msgs = number_of_messages; *no_of_queues = number_of_queues; *blocking = MQ_NON_BLOCKING; return 0; } int mq_set_attributes(int no_of_msgs, int no_of_queues, int blocking) { if (no_of_msgs <= MIN_LIMIT || no_of_msgs > MAX_LIMIT) return EINVAL; if (no_of_queues <= MIN_LIMIT || no_of_queues > MAX_LIMIT) return EINVAL; number_of_messages = no_of_msgs; number_of_queues = no_of_queues; mq_blocking = NON_BLOCKING; return 0; } int mq_open(const char *name, int oflag) { mqd_t mqd; if (mq.num_queues >= number_of_queues) return EMQUEUEFULL; if (strlen(name) > NAME_SIZE) return EINVAL; for (int i = 0; i < number_of_queues; i++) if (mq.queue_slot_empty[i] == NOT_EMPTY) if (strncmp(mq.msg[i].name, name, NAME_SIZE) == 0) { mq.msg[i].num_users++; return (mqd_t) i; } for (int i = 0; i < number_of_queues; i++) { if (mq.queue_slot_empty[i] == EMPTY) { mqd = i; mq.queue_slot_empty[i] = NOT_EMPTY; mq.num_queues++; mq.msg[i].num_users++; memset(mq.msg[i].name, '\0', NAME_SIZE); strncpy(mq.msg[i].name, name, strlen(name)); for (int j = 0; j < number_of_messages; j++) { mq.msg[i].msg_slot_empty[j] = EMPTY; mq.msg[i].msge[j].priority = DEFAULT_PRIO; for (int k = 0; k < MAX_RECEIVERS; k++) mq.msg[i].msge[j].dst[k] = EMPTY; } break; } } return mqd; } int mq_close(mqd_t mqdes) { if (mqdes < 0 || mqdes >= number_of_queues) return EINVAL; if (mq.queue_slot_empty[mqdes] == EMPTY) return EMSGNOTFOUND; mq.msg[mqdes].num_users--; if (mq.msg[mqdes].num_users == 0) { mq.msg[mqdes].num_msgs = 0; mq.queue_slot_empty[mqdes] = EMPTY; mq.num_queues--; } return 0; } int mq_send(mqd_t mqdes, const char *msg_ptr, unsigned int msg_prio, endpoint_t src, endpoint_t dst[]) { if (mqdes < 0 || mqdes >= number_of_queues) return EINVAL; if (mq.queue_slot_empty[mqdes] == EMPTY) return EMSGNOTFOUND; if (mq.msg[mqdes].num_msgs > number_of_messages) return EMSGFULL; int empty_slot_pos; for (int i = 0; i < number_of_messages; i++) if (mq.msg[mqdes].msg_slot_empty[i] == EMPTY) { empty_slot_pos = i; break; } memset(mq.msg[mqdes].msge[empty_slot_pos].msg, 0, MAX_PAYLOAD); memcpy(mq.msg[mqdes].msge[empty_slot_pos].msg, msg_ptr, MAX_PAYLOAD); mq.msg[mqdes].msge[empty_slot_pos].src = src; mq.msg[mqdes].msge[empty_slot_pos].priority = msg_prio; mq.msg[mqdes].msg_slot_empty[empty_slot_pos] = NOT_EMPTY; for (int i = 0; i < MAX_RECEIVERS; i++) mq.msg[mqdes].msge[empty_slot_pos].dst[i] = dst[i]; mq.msg[mqdes].msge[empty_slot_pos].timestamp = get_monotonic(); mq.msg[mqdes].num_msgs++; if (notify) { proc_nr_t proc_nr; for (int i = 0; i < MAX_RECEIVERS; i++) { if (dst[i] != -1) { /* Translate endpoint to process number */ if (!isokendpt(dst[i], &proc_nr)) goto exit; else cause_sig(proc_nr, SIGALRM); } } } exit: return 0; } size_t mq_receive(mqd_t mqdes, char *msg_ptr, unsigned int msg_prio, endpoint_t dst) { if (mqdes < 0 || mqdes >= number_of_queues) return EINVAL; if (mq.queue_slot_empty[mqdes] == EMPTY) return EMSGNOTFOUND; if (mq.msg[mqdes].num_msgs == 0) return EMSGEMPTY; int index = message_index_with_highprio(mqdes, dst); if (index == -1) return EMSGEMPTY; for (int i = 0; i < MAX_RECEIVERS; i++) { if (mq.msg[mqdes].msge[index].dst[i] == dst) { memcpy(msg_ptr, mq.msg[mqdes].msge[index].msg, MAX_PAYLOAD); mq.msg[mqdes].msge[index].dst[i] = EMPTY; clean_message_queue(mqdes); } } return 0; } int message_index_with_highprio(int mqdes, endpoint_t dst) { int prio; int max_prio = -1; int index = -1; for (int i = 0; i < number_of_messages; i++) { if (mq.msg[mqdes].msg_slot_empty[i] == NOT_EMPTY) { prio = mq.msg[mqdes].msge[i].priority; for (int j = 0; j < MAX_RECEIVERS; j++) { if (mq.msg[mqdes].msge[i].dst[j] == dst) { if (max_prio == prio) { if (mq.msg[mqdes].msge[i].timestamp > mq.msg[mqdes].msge[index].timestamp) index = i; } else if (max_prio < MAX(max_prio, prio)) { index = i; max_prio = MAX(max_prio, prio); } } } } } return index; } int clean_message_queue(mqd_t mqdes) { int flag = 1; for (int i = 0; i < number_of_messages; i++) { if (mq.msg[mqdes].msg_slot_empty[i] == NOT_EMPTY) { for (int j = 0; j < MAX_RECEIVERS; j++) if (mq.msg[mqdes].msge[i].dst[j] != EMPTY) flag = 0; } if (flag) { mq.msg[mqdes].msg_slot_empty[i] = EMPTY; mq.msg[mqdes].num_msgs--; } } return 0; } int mq_notify(int notify_on_off) { if (notify_on_off) notify = NOTIFY_ON; else notify = NOTIFY_OFF; return 0; }