summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRamsey Harris <ramsey@ti.com>2015-02-05 14:46:20 -0800
committerRobert Tivy <rtivy@ti.com>2015-02-09 10:43:08 -0800
commit56ff71a499992b756a12bd0d78dfd3030b612359 (patch)
tree96316bf6af06db07520e7eccfcfe0a5913d1384a
parent562f9627405be9fb0e8666df2e6df597341650ef (diff)
downloadipc-56ff71a499992b756a12bd0d78dfd3030b612359.tar.gz
Rework MessageQ_put to prioritize transport if given
In MessageQ_put, use secondary transport (if given) regardless of destination queue. For processor local deliver, verify that destination queue is process local, otherwise give error. Changed secondary transport array to base interface type. Added interface casting as needed.
-rw-r--r--linux/src/api/MessageQ.c140
1 files changed, 92 insertions, 48 deletions
diff --git a/linux/src/api/MessageQ.c b/linux/src/api/MessageQ.c
index b236277..1929e17 100644
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
@@ -122,7 +122,7 @@ typedef struct MessageQ_ModuleObject {
pthread_mutex_t gate;
int seqNum;
IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
- INetworkTransport_Handle transInst[MessageQ_MAXTRANSPORTS];
+ ITransport_Handle transInst[MessageQ_MAXTRANSPORTS];
MessageQ_PutHookFxn putHookFxn;
} MessageQ_ModuleObject;
@@ -224,7 +224,7 @@ Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
return MessageQ_E_ALREADYEXISTS;
}
- MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
+ MessageQ_module->transInst[tid] = inst;
return MessageQ_S_SUCCESS;
}
@@ -462,7 +462,8 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
Int status;
MessageQ_Object *obj = NULL;
IMessageQTransport_Handle transport;
- INetworkTransport_Handle transInst;
+ INetworkTransport_Handle netTrans;
+ ITransport_Handle baseTrans;
UInt16 queueIndex;
UInt16 clusterId;
Int tid;
@@ -573,10 +574,21 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
}
for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
- transInst = MessageQ_module->transInst[tid];
- if (transInst) {
- /* need to check return and do something if error */
- INetworkTransport_bind((Void *)transInst, obj->queue);
+ baseTrans = MessageQ_module->transInst[tid];
+
+ if (baseTrans != NULL) {
+ switch (ITransport_itype(baseTrans)) {
+ case INetworkTransport_TypeId:
+ netTrans = INetworkTransport_downCast(baseTrans);
+ INetworkTransport_bind((void *)netTrans, obj->queue);
+ break;
+
+ default:
+ /* error */
+ printf("MessageQ_create: Error: transport id %d is an "
+ "unsupported transport type.\n", tid);
+ break;
+ }
}
}
@@ -603,7 +615,8 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
{
MessageQ_Object *obj;
IMessageQTransport_Handle transport;
- INetworkTransport_Handle transInst;
+ INetworkTransport_Handle netTrans;
+ ITransport_Handle baseTrans;
Int status = MessageQ_S_SUCCESS;
UInt16 queueIndex;
UInt16 clusterId;
@@ -654,9 +667,21 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
}
for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
- transInst = MessageQ_module->transInst[tid];
- if (transInst) {
- INetworkTransport_unbind((Void *)transInst, obj->queue);
+ baseTrans = MessageQ_module->transInst[tid];
+
+ if (baseTrans != NULL) {
+ switch (ITransport_itype(baseTrans)) {
+ case INetworkTransport_TypeId:
+ netTrans = INetworkTransport_downCast(baseTrans);
+ INetworkTransport_unbind((void *)netTrans, obj->queue);
+ break;
+
+ default:
+ /* error */
+ printf("MessageQ_create: Error: transport id %d is an "
+ "unsupported transport type.\n", tid);
+ break;
+ }
}
}
@@ -731,7 +756,7 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
UInt16 dstProcId = (UInt16)(queueId >> 16);
UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
Int status = MessageQ_S_SUCCESS;
- ITransport_Handle transport;
+ ITransport_Handle baseTrans;
IMessageQTransport_Handle msgTrans;
INetworkTransport_Handle netTrans;
Int priority;
@@ -746,9 +771,42 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
MessageQ_module->putHookFxn(queueId, msg);
}
- if (dstProcId != MultiProc_self()) {
- tid = MessageQ_getTransportId(msg);
- if (tid == 0) {
+ /* extract the transport ID from the message header */
+ tid = MessageQ_getTransportId(msg);
+
+ if (tid >= MessageQ_MAXTRANSPORTS) {
+ printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
+ tid, MessageQ_MAXTRANSPORTS);
+ return (MessageQ_E_FAIL);
+ }
+
+ /* if tid is set, use secondary transport regardless of destination */
+ if (tid != 0) {
+ baseTrans = MessageQ_module->transInst[tid];
+
+ if (baseTrans == NULL) {
+ printf("MessageQ_put: Error: transport is null\n");
+ return (MessageQ_E_FAIL);
+ }
+
+ /* downcast instance pointer to transport interface */
+ switch (ITransport_itype(baseTrans)) {
+ case INetworkTransport_TypeId:
+ netTrans = INetworkTransport_downCast(baseTrans);
+ INetworkTransport_put(netTrans, (Ptr)msg);
+ break;
+
+ default:
+ /* error */
+ printf("MessageQ_put: Error: transport id %d is an "
+ "unsupported transport type\n", tid);
+ status = MessageQ_E_FAIL;
+ break;
+ }
+ }
+ else {
+ /* if destination on another processor, use primary transport */
+ if (dstProcId != MultiProc_self()) {
priority = MessageQ_getMsgPri(msg);
clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
@@ -756,7 +814,7 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
if (clusterId > MultiProc_getNumProcsInCluster()) {
printf("MessageQ_put: Error: destination procId=%d is not "
"in cluster. Must specify a transportId.\n", dstProcId);
- return MessageQ_E_FAIL;
+ return (MessageQ_E_FAIL);
}
msgTrans = MessageQ_module->transports[clusterId][priority];
@@ -764,45 +822,31 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
IMessageQTransport_put(msgTrans, (Ptr)msg);
}
else {
- if (tid >= MessageQ_MAXTRANSPORTS) {
- printf("MessageQ_put: transport id %d too big, must be < %d\n",
- tid, MessageQ_MAXTRANSPORTS);
- return MessageQ_E_FAIL;
+ /* check if destination queue is in this process */
+ if (queueIndex >= MessageQ_module->numQueues) {
+ printf("MessageQ_put: Error: unable to deliver message, "
+ "queueIndex too large or transportId missing.\n");
+ return (MessageQ_E_FAIL);
}
- /* use secondary transport */
- netTrans = MessageQ_module->transInst[tid];
- transport = INetworkTransport_upCast(netTrans);
+ obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
- /* downcast instance pointer to transport interface */
- switch (ITransport_itype(transport)) {
- case INetworkTransport_TypeId:
- INetworkTransport_put(netTrans, (Ptr)msg);
- break;
-
- default:
- /* error */
- printf("MessageQ_put: Error: transport id %d is an "
- "unsupported transport type\n", tid);
- status = MessageQ_E_FAIL;
- break;
+ if (obj == NULL) {
+ printf("MessageQ_put: Error: unable to deliver message, "
+ "destination queue not in this process.\n");
+ return (MessageQ_E_FAIL);
}
- }
- }
- else {
- obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
-
- pthread_mutex_lock(&MessageQ_module->gate);
- /* It is a local MessageQ */
- CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
-
- pthread_mutex_unlock(&MessageQ_module->gate);
-
- sem_post(&obj->synchronizer);
+ /* deliver message to process local queue */
+ pthread_mutex_lock(&MessageQ_module->gate);
+ CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg,
+ elem);
+ pthread_mutex_unlock(&MessageQ_module->gate);
+ sem_post(&obj->synchronizer);
+ }
}
- return status;
+ return (status);
}
/*