/*
 * @File: mqueue.c
 *
 * @Contents: POSIX Message Passing related functions
 *
 * Copyright (C) 2002 Sergio Saez <ssaez@disca.upv.es>
 * Project OCERA (Open Components for Realtime Embedded Applications)
 *
 *
 * 'mqueue.c' has the functions that implement message passing facilities for RTLinux
 * kernel.
 *
 */

#define MQUEUE_C

#define MODULE

/*
 * Required include files
 */

#include <rtl.h>

#include <errno.h>
#include <stdarg.h>

#include <mqueue.h>
#include <mqh.h>

/*** Definitions ***/

/*** Local variable definitions ***/

static struct mq_description mq_desc[_RTL_MQ_OPEN_MAX];
static mqd_t mq_desc_available_vect[_RTL_MQ_OPEN_MAX];
static int mq_desc_available_count;

#define MQ_DESC_PTR(mqd) (&mq_desc[mqd])

#define MQ_SEM_WAIT(sem, abs_timeout)           \
(((abs_timeout) != NULL)                        \
 ? (sem_timedwait((sem), (abs_timeout)))        \
 : (sem_wait(sem))                              \
    )

/*** Exported functions ***/

/*
 * @Func: init_mqueue
 *
 * @Desc: initialize the message queue system
 */
int init_mqueue ()
{
    int i;

    /* Initialize available queue descriptions vector  */
    for (i=0; i< _RTL_MQ_OPEN_MAX; i++) mq_desc_available_vect[i]= _RTL_MQ_OPEN_MAX-i-1;
    mq_desc_available_count= _RTL_MQ_OPEN_MAX;

    /* Initialize name space */
    mq_ns_init();

    return 0;

} /* end init_mqueue */

/*
 * @Func: mq_open
 *
 * @#name: string naming a message queue.
 * @#oflags: opening message queue flags
 * @#mode: permission mode (creation only)
 * @#attr: message queue attributes (creation only)
 *
 * @Desc: open a message queue
 */
mqd_t mq_open (
    const char *        name, 
    int                 oflags, 
    ...
    ) 
{
    int error;
    struct mq_message_queue *   mq;
    mqd_t mqdes;

    va_list ap;
    mode_t mode;                /* Additional parameters for creation */
    struct mq_attr * attr;

    if (mq_desc_available_count <= 0) 
        __reterror(EMFILE, (mqd_t)-1); /* Too many message queue descriptors are
                                        * currently in use by this process. */

    if ((oflags & O_CREAT) != 0) {

        va_start(ap, oflags);   /* Extract the additional parameters */
        mode= va_arg(ap, int);  /* To avoid warning: "`mode_t' is promoted to `int' when
                                 * passed through `...' (so you should pass `int' not
                                 * `mode_t' to `va_arg') */
        attr= va_arg(ap, struct mq_attr *); 
        va_end(ap);

        if (attr && (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0))
            __reterror(EINVAL, (mqd_t)-1);
        
        error= mq_ns_bind(name, &mq, oflags);
        if (error)
            __reterror(-error, (mqd_t)-1);

        if (!IS_VALID_MESSAGE_QUEUE(mq)) {            
            /* A new message queue */
            error= mq_queue_init(mq, attr);
            if (error)
                __reterror(-error, (mqd_t)-1);
        } /* endif */

    } else {
        error= mq_ns_lookup(name, &mq);
        if (error)
            __reterror(-error, (mqd_t)-1);
    } /* endif */

    error= mq_queue_open(mq);
    if (error)
        __reterror(-error, (mqd_t)-1);

    /* If this part is reached, the message queue has been successfully open */

    mq_desc_available_count--;
    mqdes= mq_desc_available_vect[mq_desc_available_count];
    
    MQ_DESC_PTR(mqdes)->flags= oflags & ~(O_CREAT | O_EXCL); 
				/* WARNING: It is assumed this removes all unnecesary
                                 * flags from 'oflags' parameter */
    MQ_DESC_PTR(mqdes)->queue= mq;

    return mqdes;

} /* end mq_open */

/*
 * @Func: mq_close
 *
 * @#mqdes: message queue descriptor.
 *
 * @Desc: close a message queue
 */
int mq_close (
    mqd_t   mqdes
    ) 
{    
    int error;
    struct mq_message_queue *   mq;

    if (mqdes < 0 || mqdes >= _RTL_MQ_OPEN_MAX || !IS_VALID_DESCRIPTION(MQ_DESC_PTR(mqdes)))
        __reterror(EBADF, (int)-1);
    
    mq= MQ_DESC_PTR(mqdes)->queue;
    
    /*** ENTER ***/
    if (sem_wait(&mq->queue_sem)) 
        return (ssize_t)-1;     /* An error has occurred while locking the
                                 * semaphore. EINTR can be produced, but POSIX does not
                                 * specify this possibility */

    if (mq->notification_thread == pthread_self() &&
        mq->notification_mqdes == mqdes ) { 

        mq->notification_mqdes= (mqd_t) -1;        
        mq->notification_signo= 0;
        mq->notification_thread= (pthread_t) -1;
    } /* endif */

    sem_post(&mq->queue_sem);
    /*** EXIT ***/
    
    /* Free message queue descriptor */
    MQ_DESC_PTR(mqdes)->queue= NULL;

    mq_desc_available_vect[mq_desc_available_count]= mqdes;
    mq_desc_available_count++;
    
    /* Updates the open descriptors counter, destroying the message queue if unlink is
     * pending */
    if (mq_queue_close(mq)) {
	error= mq_ns_unbind(mq);
	if (error)
	    __reterror(-error, (int)-1);
    } /* endif */

    return 0;

} /* end mq_close */

/*
 * @Func: mq_unlink
 *
 * @#name: string naming a message queue.
 *
 * @Desc: removes a message queue
 */
int mq_unlink (
    const char *        name
    ) 
{
    int error;

    error= mq_ns_unlink(name);
    if (error)
	__reterror(-error, (int)-1);

    return 0;

} /* end mq_unlink */

/*
 * @Func: mq_getattr
 *
 * @#mqdes: message queue descriptor.
 * @#mqstat: message queue attributes. 
 *
 * @Desc: get message queue attributes
 */
int mq_getattr (
    mqd_t               mqdes, 
    struct mq_attr *    mqstat
    ) 
{
    struct mq_message_queue *   mq;

    if (mqdes < 0 || mqdes >= _RTL_MQ_OPEN_MAX || !IS_VALID_DESCRIPTION(MQ_DESC_PTR(mqdes)))
        __reterror(EBADF, (int)-1);

    mq= MQ_DESC_PTR(mqdes)->queue;

    /* Flags associated with the open message queue description */
    mqstat->mq_flags= MQ_DESC_PTR(mqdes)->flags; 

    /* Attributes of the message queue */
    mqstat->mq_maxmsg= mq->attr.mq_maxmsg; /* Maximum number of messages. */
    mqstat->mq_msgsize= mq->attr.mq_msgsize; /* Maximum message size. */

    /* Number of messages currently on the queue */
    mqstat->mq_curmsgs= mq->attr.mq_curmsgs;

    return 0;

} /* end mq_getattr */

/*
 * @Func: mq_setattr
 *
 * @#mqdes: message queue descriptor.
 * @#mqstat: new message queue attributes (only mq_flags member is used)
 * @#omqstat: old message queue attributes
 *
 * @Desc: set the description flags and get message queue attributes
 */
int mq_setattr (
    mqd_t                       mqdes, 
    const struct mq_attr *      mqstat,
    struct mq_attr *            omqstat
    ) 
{
    struct mq_message_queue *   mq;

    if (mqdes < 0 || mqdes >= _RTL_MQ_OPEN_MAX || !IS_VALID_DESCRIPTION(MQ_DESC_PTR(mqdes)))
        __reterror(EBADF, (int)-1);

    mq= MQ_DESC_PTR(mqdes)->queue;

    if (omqstat) {
        /* Flags associated with the open message queue description */
	omqstat->mq_flags= MQ_DESC_PTR(mqdes)->flags; 
                                /* Message queue flags. Depends on the message queue
                                   descriptor. */

        /* Attributes of the message queue */
	omqstat->mq_maxmsg= mq->attr.mq_maxmsg; /* Maximum number of messages. */
	omqstat->mq_msgsize= mq->attr.mq_msgsize; /* Maximum message size. */

        /* Number of messages currently on the queue */
	omqstat->mq_curmsgs= mq->attr.mq_curmsgs;
    } /* endif */

    /* New flags associated with the open message queue description */
    MQ_DESC_PTR(mqdes)->flags= mqstat->mq_flags; 

    return 0;

} /* end mq_setattr */

/*
 * @Func: __mq_timedsend
 *
 * @#mqdes: message queue descriptor.
 * @#msg_ptr: message
 * @#msg_len: length of the message
 * @#msg_prio: priority of the message
 * @#abs_timeout: timeout 
 *
 * @Desc: send a message to a message queue within a given timeout
 */
int __mq_timedsend (
    mqd_t                       mqdes, 
    const char *                msg_ptr, 
    size_t                      msg_len, 
    unsigned                    msg_prio,
    const struct timespec *     abs_timeout
    ) 
{
    struct mq_message_queue *   mq;
    struct mq_msg *             msg;

    if (mqdes < 0 || mqdes >= _RTL_MQ_OPEN_MAX || !IS_VALID_DESCRIPTION(MQ_DESC_PTR(mqdes)))
        __reterror(EBADF, (int)-1);

    mq= MQ_DESC_PTR(mqdes)->queue;

    if (msg_len > mq->attr.mq_msgsize)
        __reterror(EMSGSIZE, (int) -1);

    if (msg_prio >= _RTL_MQ_PRIO_MAX)
        __reterror(EINVAL, (int) -1);

    /*** ENTER ***/
    if (MQ_SEM_WAIT(&mq->queue_sem, abs_timeout)) 
        return (ssize_t)-1;     /* An error has occurred while locking the
                                 * semaphore. Only EINTR or ETIMEDOUT are expected */

    if ((MQ_DESC_PTR(mqdes)->flags & O_NONBLOCK) != 0) {
        if (sem_trywait(&mq->queue_send)) {
            sem_post(&mq->queue_sem);
            /*** EXIT ***/
	    return -1;
        } /* endif */
    } else {
        sem_post(&mq->queue_sem);
        /*** EXIT ***/

        if (MQ_SEM_WAIT(&mq->queue_send, abs_timeout)) /* Gets a token from sending
                                                     * bucket */
	    return (ssize_t)-1; /* An error has occurred while locking the
                                 * semaphore. Only EINTR or ETIMEDOUT are expected */
        
        /*** ENTER ***/
        if (MQ_SEM_WAIT(&mq->queue_sem, abs_timeout)) {
            sem_post(&mq->queue_send); /* Returns the token */
            return (ssize_t)-1; /* An error has occurred while locking the
                                 * semaphore. Only EINTR or ETIMEDOUT are expected */
        } /* endif */
    } /* endif */

    /* Insert the message in the queue */
    msg= mq_queue_get_slot(mq);
    
    if (!mq->first[msg_prio]) {  /* There are no pending messages of that priority */
        mq_pq_push(&mq->pq, msg_prio); /* Add a priority token to the awaiting
                                        * priorities */
        mq->first[msg_prio]= msg;
    } else {
        mq->last[msg_prio]->next= msg;
    } /* endif */

    mq->last[msg_prio]= msg;   
    msg->next= NULL;            /* It is the last message in the queue */

    /* Copy message data and information */
    memcpy(MQ_MSG_DATA_PTR(msg), msg_ptr, msg_len);
    msg->length= msg_len;

    if (mq->notification_thread != (pthread_t) -1) {
        int sem_val;

        sem_getvalue(&mq->queue_receive, &sem_val);
        if (sem_val >= 0) { /* Nobody is waiting for messages */
            pthread_kill(mq->notification_thread, mq->notification_signo);
            mq->notification_mqdes= (mqd_t) -1;        
            mq->notification_signo= 0;
            mq->notification_thread= (pthread_t) -1;
        } /* endif */
    } /* endif */

    mq->attr.mq_curmsgs++;
    sem_post(&mq->queue_receive); /* Pass the token to the receiving bucket */

    sem_post(&mq->queue_sem);
    /*** EXIT ***/

    return 0;

} /* end __mq_timedsend */

/*
 * @Func: __mq_timedreceive
 *
 * @#mqdes: message queue descriptor.
 * @#msg_ptr: message buffer
 * @#msg_len: size of the buffer
 * @#msg_prio: priority of the received message
 * @#abs_timeout: timeout 
 *
 * @Desc: receive a message from a message queue within a given timeout
 */
ssize_t __mq_timedreceive (
    mqd_t                       mqdes, 
    char *                      msg_ptr, 
    size_t                      msg_len, 
    unsigned *                  msg_prio,
    const struct timespec *     abs_timeout
    ) 
{
    struct mq_message_queue *   mq;
    unsigned                    prio;
    struct mq_msg *             msg;

    if (mqdes < 0 || mqdes >= _RTL_MQ_OPEN_MAX || !IS_VALID_DESCRIPTION(MQ_DESC_PTR(mqdes)))
        __reterror(EBADF, (ssize_t)-1);

    mq= MQ_DESC_PTR(mqdes)->queue;

    if (msg_len < mq->attr.mq_msgsize)
        __reterror(EMSGSIZE, (ssize_t)-1);
    
    /*** ENTER ***/
    if (MQ_SEM_WAIT(&mq->queue_sem, abs_timeout)) 
        return (ssize_t)-1;     /* An error has occurred while locking the
                                 * semaphore. Only EINTR or ETIMEDOUT are expected */

    if ((MQ_DESC_PTR(mqdes)->flags & O_NONBLOCK) != 0) {
        if (sem_trywait(&mq->queue_receive)) {
            sem_post(&mq->queue_sem);
            /*** EXIT ***/
	    return -1;
        } /* endif */
    } else {
        sem_post(&mq->queue_sem);
        /*** EXIT ***/

        if (MQ_SEM_WAIT(&mq->queue_receive, abs_timeout)) /* Gets a token from receiving
                                                           * bucket */
	    return (ssize_t)-1; /* An error has occurred while locking the
                                 * semaphore. Only EINTR or ETIMEDOUT are expected */
        
        /*** ENTER ***/
        if (MQ_SEM_WAIT(&mq->queue_sem, abs_timeout)) {
            sem_post(&mq->queue_receive); /* Returns the token */
            return (ssize_t)-1; /* An error has occurred while locking the
                                 * semaphore. Only EINTR or ETIMEDOUT are expected */
        } /* endif */
    } /* endif */

    /* Remove a message from corresponding queue */
    prio= mq_pq_top(&mq->pq);   /* Find out the highest priority */
    msg= mq->first[prio];

    mq->first[prio]= msg->next;
    if (mq->last[prio] == msg) { /* It was the last message on that priority */
        mq->last[prio]= NULL;
        mq_pq_pop(&mq->pq);     /* Remove the priority token from awaiting priorities */
    } /* endif */
        
    /* Copy message data and information */
    memcpy(msg_ptr, MQ_MSG_DATA_PTR(msg), msg->length);
    if (msg_prio)
        *msg_prio= prio;

    mq_queue_release_slot(mq, msg);    

    mq->attr.mq_curmsgs--;        
    sem_post(&mq->queue_send); /* Pass the token to the sending bucket */

    sem_post(&mq->queue_sem);
    /*** EXIT ***/
    
    return msg->length;

} /* end __mq_timedreceive */

        
/*
 * @Func: mq_notify
 *
 * @#mqdes: message queue descriptor.
 * @#notification: notification to be sent to the process 
 *
 * @Desc: register a process to be notified of a message arrival
 */
int mq_notify (
    mqd_t                       mqdes, 
    const struct sigevent *     notification
    ) 
{
    struct mq_message_queue *   mq;

    if (mqdes < 0 || mqdes >= _RTL_MQ_OPEN_MAX || !IS_VALID_DESCRIPTION(MQ_DESC_PTR(mqdes)))
        __reterror(EBADF, (int)-1);

    if (notification != NULL && notification->sigev_notify != SIGEV_SIGNAL)
        __reterror(EBADF, (int)-1); /* Not specified in POSIX message queues */

    mq= MQ_DESC_PTR(mqdes)->queue;

    if (sem_wait(&mq->queue_sem)) 
        return (ssize_t)-1;     /* An error has occurred while locking the
                                 * semaphore. Only EINTR is expected */

    if (notification != NULL) { 
        if (mq->notification_thread != (pthread_t) -1) {
            /* Someone is already registered for notification */
            sem_post(&mq->queue_sem);
            __reterror(EBUSY, (int)-1);
        } /* endif */
    
        /* Registers this thread for notification */
        mq->notification_thread= pthread_self();
        mq->notification_signo= notification->sigev_signo;
                                /* Only non-real-time signals */
        mq->notification_mqdes= mqdes;        
    } else {                    /* notification == NULL */
        if (mq->notification_thread == pthread_self()) {
            mq->notification_mqdes= (mqd_t) -1;
            mq->notification_signo= 0;
            mq->notification_thread= (pthread_t) -1;
        } else {
            sem_post(&mq->queue_sem);
            __reterror(EBUSY, (int)-1);
        } /* endif */
    } /* endif */
    
    sem_post(&mq->queue_sem);

    return 0;

} /* end mq_notify */

int init_module(void){
    init_mqueue();
    return 0;
} /* end init_module */

void cleanup_module(void) {

} /* end cleanup_module */
