diff options
author | Ramsey Harris <ramsey@ti.com> | 2015-02-05 14:46:20 -0800 |
---|---|---|
committer | Robert Tivy <rtivy@ti.com> | 2015-02-09 10:43:08 -0800 |
commit | 56ff71a499992b756a12bd0d78dfd3030b612359 (patch) | |
tree | 96316bf6af06db07520e7eccfcfe0a5913d1384a | |
parent | 562f9627405be9fb0e8666df2e6df597341650ef (diff) | |
download | ipc-56ff71a499992b756a12bd0d78dfd3030b612359.tar.gz |
Rework MessageQ_put to prioritize transport if given
In MessageQ_put, use secondary transport (if given) regardless
of destination queue. For processor local deliver, verify that
destination queue is process local, otherwise give error. Changed
secondary transport array to base interface type. Added interface
casting as needed.
-rw-r--r-- | linux/src/api/MessageQ.c | 140 |
1 files changed, 92 insertions, 48 deletions
diff --git a/linux/src/api/MessageQ.c b/linux/src/api/MessageQ.c index b236277..1929e17 100644 --- a/linux/src/api/MessageQ.c +++ b/linux/src/api/MessageQ.c @@ -122,7 +122,7 @@ typedef struct MessageQ_ModuleObject { pthread_mutex_t gate; int seqNum; IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2]; - INetworkTransport_Handle transInst[MessageQ_MAXTRANSPORTS]; + ITransport_Handle transInst[MessageQ_MAXTRANSPORTS]; MessageQ_PutHookFxn putHookFxn; } MessageQ_ModuleObject; @@ -224,7 +224,7 @@ Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst) return MessageQ_E_ALREADYEXISTS; } - MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst; + MessageQ_module->transInst[tid] = inst; return MessageQ_S_SUCCESS; } @@ -462,7 +462,8 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp) Int status; MessageQ_Object *obj = NULL; IMessageQTransport_Handle transport; - INetworkTransport_Handle transInst; + INetworkTransport_Handle netTrans; + ITransport_Handle baseTrans; UInt16 queueIndex; UInt16 clusterId; Int tid; @@ -573,10 +574,21 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp) } for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) { - transInst = MessageQ_module->transInst[tid]; - if (transInst) { - /* need to check return and do something if error */ - INetworkTransport_bind((Void *)transInst, obj->queue); + baseTrans = MessageQ_module->transInst[tid]; + + if (baseTrans != NULL) { + switch (ITransport_itype(baseTrans)) { + case INetworkTransport_TypeId: + netTrans = INetworkTransport_downCast(baseTrans); + INetworkTransport_bind((void *)netTrans, obj->queue); + break; + + default: + /* error */ + printf("MessageQ_create: Error: transport id %d is an " + "unsupported transport type.\n", tid); + break; + } } } @@ -603,7 +615,8 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr) { MessageQ_Object *obj; IMessageQTransport_Handle transport; - INetworkTransport_Handle transInst; + INetworkTransport_Handle netTrans; + ITransport_Handle baseTrans; Int status = MessageQ_S_SUCCESS; UInt16 queueIndex; UInt16 clusterId; @@ -654,9 +667,21 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr) } for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) { - transInst = MessageQ_module->transInst[tid]; - if (transInst) { - INetworkTransport_unbind((Void *)transInst, obj->queue); + baseTrans = MessageQ_module->transInst[tid]; + + if (baseTrans != NULL) { + switch (ITransport_itype(baseTrans)) { + case INetworkTransport_TypeId: + netTrans = INetworkTransport_downCast(baseTrans); + INetworkTransport_unbind((void *)netTrans, obj->queue); + break; + + default: + /* error */ + printf("MessageQ_create: Error: transport id %d is an " + "unsupported transport type.\n", tid); + break; + } } } @@ -731,7 +756,7 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg) UInt16 dstProcId = (UInt16)(queueId >> 16); UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff); Int status = MessageQ_S_SUCCESS; - ITransport_Handle transport; + ITransport_Handle baseTrans; IMessageQTransport_Handle msgTrans; INetworkTransport_Handle netTrans; Int priority; @@ -746,9 +771,42 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg) MessageQ_module->putHookFxn(queueId, msg); } - if (dstProcId != MultiProc_self()) { - tid = MessageQ_getTransportId(msg); - if (tid == 0) { + /* extract the transport ID from the message header */ + tid = MessageQ_getTransportId(msg); + + if (tid >= MessageQ_MAXTRANSPORTS) { + printf("MessageQ_put: Error: transport id %d too big, must be < %d\n", + tid, MessageQ_MAXTRANSPORTS); + return (MessageQ_E_FAIL); + } + + /* if tid is set, use secondary transport regardless of destination */ + if (tid != 0) { + baseTrans = MessageQ_module->transInst[tid]; + + if (baseTrans == NULL) { + printf("MessageQ_put: Error: transport is null\n"); + return (MessageQ_E_FAIL); + } + + /* downcast instance pointer to transport interface */ + switch (ITransport_itype(baseTrans)) { + case INetworkTransport_TypeId: + netTrans = INetworkTransport_downCast(baseTrans); + INetworkTransport_put(netTrans, (Ptr)msg); + break; + + default: + /* error */ + printf("MessageQ_put: Error: transport id %d is an " + "unsupported transport type\n", tid); + status = MessageQ_E_FAIL; + break; + } + } + else { + /* if destination on another processor, use primary transport */ + if (dstProcId != MultiProc_self()) { priority = MessageQ_getMsgPri(msg); clusterId = dstProcId - MultiProc_getBaseIdOfCluster(); @@ -756,7 +814,7 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg) if (clusterId > MultiProc_getNumProcsInCluster()) { printf("MessageQ_put: Error: destination procId=%d is not " "in cluster. Must specify a transportId.\n", dstProcId); - return MessageQ_E_FAIL; + return (MessageQ_E_FAIL); } msgTrans = MessageQ_module->transports[clusterId][priority]; @@ -764,45 +822,31 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg) IMessageQTransport_put(msgTrans, (Ptr)msg); } else { - if (tid >= MessageQ_MAXTRANSPORTS) { - printf("MessageQ_put: transport id %d too big, must be < %d\n", - tid, MessageQ_MAXTRANSPORTS); - return MessageQ_E_FAIL; + /* check if destination queue is in this process */ + if (queueIndex >= MessageQ_module->numQueues) { + printf("MessageQ_put: Error: unable to deliver message, " + "queueIndex too large or transportId missing.\n"); + return (MessageQ_E_FAIL); } - /* use secondary transport */ - netTrans = MessageQ_module->transInst[tid]; - transport = INetworkTransport_upCast(netTrans); + obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex]; - /* downcast instance pointer to transport interface */ - switch (ITransport_itype(transport)) { - case INetworkTransport_TypeId: - INetworkTransport_put(netTrans, (Ptr)msg); - break; - - default: - /* error */ - printf("MessageQ_put: Error: transport id %d is an " - "unsupported transport type\n", tid); - status = MessageQ_E_FAIL; - break; + if (obj == NULL) { + printf("MessageQ_put: Error: unable to deliver message, " + "destination queue not in this process.\n"); + return (MessageQ_E_FAIL); } - } - } - else { - obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex]; - - pthread_mutex_lock(&MessageQ_module->gate); - /* It is a local MessageQ */ - CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem); - - pthread_mutex_unlock(&MessageQ_module->gate); - - sem_post(&obj->synchronizer); + /* deliver message to process local queue */ + pthread_mutex_lock(&MessageQ_module->gate); + CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, + elem); + pthread_mutex_unlock(&MessageQ_module->gate); + sem_post(&obj->synchronizer); + } } - return status; + return (status); } /* |