/* 
 * @File: mqh.c
 *
 * @Contents: POSIX Message Passing helper functions
 *
 * Copyright (C) 2002 Sergio Saez <ssaez@disca.upv.es>
 * Project OCERA (Open Components for Realtime Embedded Applications)
 *
 *
 * 'mqh.c' has the helper functions that implement namespace for message queues.
 *
 */

#define MQH_C

#define MODULE

/*
 * Required include files
 */

#include <rtl.h>

#include <string.h>
#include <stddef.h>

#include <rtl_stdlib.h>

#include <mqh.h>

/*** Local variable definitions ***/

static struct mq_message_queue_entry mq_queue_pool[_RTL_MQ_CREATE_MAX];
static struct mq_message_queue_entry * mq_queue_used_list;
static struct mq_message_queue_entry * mq_queue_available_list;

static struct mq_attr default_mq_attr= {
    0,                          /* mq_flags: No flags */
    _RTL_MQ_ATTR_DFT_MAXMSG,         /* mq_maxmsg */
    _RTL_MQ_ATTR_DFT_MSGSIZE,        /* mq_msgsize */
    0                           /* mq_curmsgs: No messages */
};                              /* WARNING: It have to match 'struct mq_attr' definition
                                 * in mqueue.h */

/*** Exported functions ***/

/*
 * @Func: mq_ns_init
 *
 * @Desc: initialize the message queue name space system
 */
void mq_ns_init () {
    int i;

    /* Initialize unused name space entries list */
    INIT_LIST(mq_queue_available_list, mq_queue_pool, _RTL_MQ_CREATE_MAX);

    for (i=0; i< _RTL_MQ_CREATE_MAX; i++) {
        mq_queue_pool[i].name[0]= '\0';
        mq_queue_pool[i].queue.data= NULL;
    } /* endfor */

    mq_queue_used_list= NULL;

} /* end mq_ns_init */

/*
 * @Func: mq_ns_lookup
 *
 * @#name: message queue name
 * @#mq: returned message queue state
 *
 * @Desc: looks for a message queue with the given name
 */
int mq_ns_lookup (
    const char *                name, 
    struct mq_message_queue **  mq
    ) 
{
    struct mq_message_queue_entry * mq_entry;

    if (strlen(name) > PATH_MAX) 
        return -ENAMETOOLONG;

    for (mq_entry= mq_queue_used_list; mq_entry; mq_entry= mq_entry->next) {
        if (!strncmp(name, mq_entry->name, PATH_MAX)) {
            *mq= &(mq_entry->queue);
            return 0;
        } /* endif */
    } /* endfor */

    *mq= NULL;
    
    return -ENOENT;

} /* end mq_ns_lookup */

/*
 * @Func: mq_ns_bind
 *
 * @#name: message queue name
 * @#mq: returned message queue state
 * @#mode: creation mode
 *
 * @Desc: binds a name to a message queue
 */
int mq_ns_bind (
    const char *                name, 
    struct mq_message_queue **  mq,
    long                        oflags
    ) 
{
    struct mq_message_queue_entry * mq_entry;

    if (strlen(name) > PATH_MAX) 
        return -ENAMETOOLONG;

    for (mq_entry= mq_queue_used_list; mq_entry; mq_entry= mq_entry->next) {
        if (!strncmp(name, mq_entry->name, PATH_MAX)) {
            if (oflags & O_EXCL)
                return -EEXIST;
            else {
	        *mq= &(mq_entry->queue);            
                return 0;
            } /* endif */
        } /* endif */
    } /* endfor */

    /* There are no message queue with the specified name. It have to be created. This
     * should be atomic, but it is not. Some lock-free algorithm could be used for names
     * list checking. */

    if (mq_queue_available_list == NULL) 
        return -ENFILE;         /* Too many message queues are currently open in the
                                 * system */

    mq_entry= mq_queue_available_list; /* New item */

    /* Remove it from mq available list */
    mq_queue_available_list= mq_queue_available_list->next;

    /* Insert it on mq used list */
    strncpy(mq_entry->name, name, PATH_MAX); 
    mq_entry->next= mq_queue_used_list; 
    mq_queue_used_list= mq_entry;

    *mq= &(mq_entry->queue);   /* Returns the message queue structure */

    return 0;

} /* end mq_ns_bind */

/*
 * @Func: mq_ns_remove
 *
 * @#name: message queue name
 *
 * @Desc: actually removes a message queue entry from used list
 */
static inline void mq_ns_remove (
 struct mq_message_queue_entry * mq_entry,
 struct mq_message_queue_entry * mq_prev
 )
{			  
    /* Remove item from mq used list */
    if (mq_prev == NULL)
	mq_queue_used_list= mq_entry->next;
    else
	mq_prev->next= mq_entry->next;
            
    mq_entry->name[0]= '\0';

    /* Insert it on mq available list */
    mq_entry->next= mq_queue_available_list;
    mq_queue_available_list= mq_entry;
    
} /* end mq_ns_remove */

/*
 * @Func: mq_ns_unlink
 *
 * @#name: message queue name
 *
 * @Desc: removes a message queue from used list
 */
int mq_ns_unlink (
    const char *                name 
    ) 
{
    struct mq_message_queue_entry * mq_entry;
    struct mq_message_queue_entry * mq_prev;

    if (strlen(name) > PATH_MAX) 
        return -ENAMETOOLONG;

    mq_prev= NULL;
    for (mq_entry= mq_queue_used_list; mq_entry; mq_prev= mq_entry, mq_entry= mq_entry->next) {
        if (!strncmp(name, mq_entry->name, PATH_MAX)) {
	    if (mq_queue_unlink(&(mq_entry->queue)))
		mq_ns_remove(mq_entry, mq_prev);
            return 0;
        } /* endif */
    } /* endfor */
    
    return -ENOENT;         /* The mq entry is not used or already freed */

} /* end mq_ns_unlink */

/*
 * @Func: mq_ns_unbind
 *
 * @#name: message queue name
 *
 * @Desc: removes a message queue from used list
 */
int mq_ns_unbind (
    const struct mq_message_queue * mq
    ) 
{
    struct mq_message_queue_entry * mq_entry;
    struct mq_message_queue_entry * mq_prev;

    mq_prev= NULL;
    for (mq_entry= mq_queue_used_list; mq_entry; mq_prev= mq_entry, mq_entry= mq_entry->next) {
        if (&(mq_entry->queue) == mq) {
	    mq_ns_remove(mq_entry, mq_prev);
            return 0;
        } /* endif */
    } /* endfor */

    return -ENOENT;         /* The mq entry is not used or already freed */

} /* end mq_ns_unbind */

/*
 * @Func: mq_queue_init
 *
 * @#mq: message queue state
 *
 * @Desc: Initialize a message queue
 */
int mq_queue_init (
    struct mq_message_queue *   mq,
    struct mq_attr *            attr
    ) 
{
    int i;
    int error;

    if (!attr)
        attr= &default_mq_attr;
    
    mq->attr.mq_flags= 0;       /* No flags */
    mq->attr.mq_maxmsg= attr->mq_maxmsg;
    mq->attr.mq_msgsize= attr->mq_msgsize;
    mq->attr.mq_curmsgs= 0;
    
    mq->count= 0;

    /* Allocate message queue memory */
    mq->data= malloc(attr->mq_maxmsg * attr->mq_msgsize);
    if (!mq->data)
        return -ENOSPC;

    mq->msgs= malloc(attr->mq_maxmsg * sizeof(struct mq_msg));
    if (!mq->msgs) {
	free(mq->data);
	mq->data= NULL;
        return -ENOSPC;
    } /* endif */

    /* Asynchronous notification */
    mq->notification_thread= (pthread_t) -1;
    mq->notification_signo= 0;
    mq->notification_mqdes= (mqd_t) -1;

    /* Initialize priority queues */
    error= mq_pq_init(&mq->pq, _RTL_MQ_PRIO_MAX);
    if (error) {
	free(mq->msgs);
	free(mq->data);
	mq->data= NULL;
        return -ENOSPC;
    } /* endif */
    
    for (i=0; i<_RTL_MQ_PRIO_MAX; i++) {
	mq->first[i]= NULL;
	mq->last[i]= NULL;
    } /* endfor */

    /* Initialize message queue slots */
    for (i=0; i<attr->mq_maxmsg; i++) {
	mq->msgs[i].next= &(mq->msgs[i+1]);
	mq->msgs[i].length= 0;
	mq->msgs[i].data= mq->data + i * attr->mq_msgsize;
    } /* endfor */
    
    mq->msgs[attr->mq_maxmsg-1].next= NULL;
    mq->slots= mq->msgs;	/* &(mq->msgs[0]) */

    /* Initialize synchronization variables */
    sem_init(&mq->queue_sem, 0, 1);
    sem_init(&mq->queue_receive, 0, 0); /* Number of message that can be received
                                         * without blocking the calling thread */
    sem_init(&mq->queue_send, 0, attr->mq_maxmsg);
                                /* Number of messages that can be send without blocking
                                 * the calling thread */
    return 0;

} /* end mq_queue_init */

/*
 * @Func: mq_queue_destroy
 *
 * @#mq: message queue state
 *
 * @Desc: Releases the message queue resources
 */
inline int mq_queue_destroy (
    struct mq_message_queue *   mq
    ) 
{
    if (mq->count <= 0 && (mq->attr.mq_flags & MQ_ATTR_F_UNLINKED) != 0) {
        /* Release resources */
	sem_destroy(&mq->queue_send);
	sem_destroy(&mq->queue_receive);
	sem_destroy(&mq->queue_sem);

	mq_pq_destroy(&mq->pq);
    
        free(mq->msgs);
        free(mq->data);
        mq->data= NULL;		/* This indicates that the message queue is not valid */
	
	return 1;
    } /* endif */

    return 0;

} /* end mq_queue_destroy */

/*
 * @Func: mq_queue_open
 *
 * @#mq: message queue state
 *
 * @Desc: Open a message queue
 */
int mq_queue_open (
    struct mq_message_queue *   mq
    ) 
{
    /* Check if it is an unlinked message queue */
    if ((mq->attr.mq_flags & MQ_ATTR_F_UNLINKED) != 0)
        return -EINVAL;

    mq->count++;                /* Another descriptor pointing to this queue */

    return 0;

} /* end mq_queue_open */

/*
 * @Func: mq_queue_close
 *
 * @#mq: message queue state
 *
 * @Desc: Close a message queue
 */
int mq_queue_close (
    struct mq_message_queue *   mq
    ) 
{
    mq->count--;
    
    return mq_queue_destroy(mq);

} /* end mq_queue_close */

/*
 * @Func: mq_queue_unlink
 *
 * @#mq: message queue state
 *
 * @Desc: Unlink a message queue
 */
int mq_queue_unlink (
    struct mq_message_queue *   mq
    ) 
{
    mq->attr.mq_flags &= MQ_ATTR_F_UNLINKED;

    return mq_queue_destroy(mq);

} /* end mq_queue_unlink */

/*
 * @Func: mq_queue_get_slot
 *
 * @#mq: message queue state
 *
 * @Desc: return a unused message slot
 */
struct mq_msg * mq_queue_get_slot (
    struct mq_message_queue *   mq
    ) 
{
    struct mq_msg *     msg;

    msg= mq->slots;
    mq->slots= msg->next;
    
    return msg;

} /* end mq_queue_get_slot */

/*
 * @Func: mq_queue_release_slot
 *
 * @#mq: message queue state
 * @#msg: message slot to be released
 *
 * @Desc: releases a message slot
 */
void mq_queue_release_slot (
    struct mq_message_queue *   mq,
    struct mq_msg *             msg
    ) 
{
    msg->next= mq->slots;
    mq->slots= msg;

} /* end mq_queue_release_slot */

