summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRamsey Harris <ramsey@ti.com>2015-02-19 14:56:44 -0800
committerRobert Tivy <rtivy@ti.com>2015-02-20 16:47:34 -0800
commitfbe2efb88b1822addaa99f9d3b54f3443a4356a8 (patch)
tree806903c3842ab1f6e131bc71a41448811b488442
parentf429ce401b70bb9708f77e05196899cd876f129f (diff)
downloadipc-fbe2efb88b1822addaa99f9d3b54f3443a4356a8.tar.gz
Re-work MessageQ_put to eliminate transport recursion
On Linux, when delivering an inbound message sent from another process on the same processor, the logic in MessageQ_put caused a transport infinite recursion. In other words, the message was given back to the transport instead of being delivered to the queue. The new logic is to attempt the local message deliver first; if it fails, then give it to the transport for delivery instead of failing back to the caller.
-rw-r--r--linux/src/api/MessageQ.c120
1 files changed, 66 insertions, 54 deletions
diff --git a/linux/src/api/MessageQ.c b/linux/src/api/MessageQ.c
index 9ebf006..53185fc 100644
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
@@ -772,58 +772,95 @@ Int MessageQ_close(MessageQ_QueueId *queueId)
/*
* ======== MessageQ_put ========
- * Place a message onto a message queue
+ * Deliver the given message, either locally or to the transport
*
- * Calls transport's put(), which handles the sending of the message
- * using the appropriate kernel interface (socket, device ioctl) call
- * for the remote procId encoded in the queueId argument.
+ * If the destination is a local queue, deliver the message. Otherwise,
+ * pass the message to a transport for delivery. The transport handles
+ * the sending of the message using the appropriate interface (socket,
+ * device ioctl, etc.).
*/
Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
{
+ Int status = MessageQ_S_SUCCESS;
MessageQ_Object *obj;
- UInt16 dstProcId = (UInt16)(queueId >> 16);
- UInt16 queueIndex;
- UInt16 queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
- Int status = MessageQ_S_SUCCESS;
+ UInt16 dstProcId;
+ UInt16 queueIndex;
+ UInt16 queuePort;
ITransport_Handle baseTrans;
IMessageQTransport_Handle msgTrans;
INetworkTransport_Handle netTrans;
Int priority;
UInt tid;
UInt16 clusterId;
+ Bool delivered;
+
+ /* extract destination address from the given queueId */
+ dstProcId = (UInt16)(queueId >> 16);
+ queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
- /* use the queue port # for destination address */
+ /* write the destination address into the message header */
msg->dstId = queuePort;
msg->dstProc= dstProcId;
- /* invoke put hook function after addressing the message */
+ /* invoke the hook function after addressing the message */
if (MessageQ_module->putHookFxn != NULL) {
MessageQ_module->putHookFxn(queueId, msg);
}
- /* extract the transport ID from the message header */
+ /* For an outbound message: If message destination is on this
+ * processor, then check if the destination queue is in this
+ * process (thread-to-thread messaging).
+ *
+ * For an inbound message: Check if destination queue is in this
+ * process (process-to-process messaging).
+ */
+ if (dstProcId == MultiProc_self()) {
+ queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+ if (queueIndex < MessageQ_module->numQueues) {
+ obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+ if (obj != NULL) {
+ /* deliver message to queue */
+ pthread_mutex_lock(&MessageQ_module->gate);
+ CIRCLEQ_INSERT_TAIL(&obj->msgList,
+ (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
+ pthread_mutex_unlock(&MessageQ_module->gate);
+ sem_post(&obj->synchronizer);
+ goto done;
+ }
+ }
+ }
+
+ /* Getting here implies the message is outbound. Must give it to
+ * either the primary or secondary transport for delivery. Start
+ * by extracting the transport ID from the message header.
+ */
tid = MessageQ_getTransportId(msg);
if (tid >= MessageQ_MAXTRANSPORTS) {
printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
tid, MessageQ_MAXTRANSPORTS);
- return (MessageQ_E_FAIL);
+ status = MessageQ_E_FAIL;
+ goto done;
}
- /* if tid is set, use secondary transport regardless of destination */
+ /* if transportId is set, use secondary transport for message delivery */
if (tid != 0) {
baseTrans = MessageQ_module->transInst[tid];
if (baseTrans == NULL) {
printf("MessageQ_put: Error: transport is null\n");
- return (MessageQ_E_FAIL);
+ status = MessageQ_E_FAIL;
+ goto done;
}
/* downcast instance pointer to transport interface */
switch (ITransport_itype(baseTrans)) {
case INetworkTransport_TypeId:
netTrans = INetworkTransport_downCast(baseTrans);
- INetworkTransport_put(netTrans, (Ptr)msg);
+ delivered = INetworkTransport_put(netTrans, (Ptr)msg);
+ status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
break;
default:
@@ -835,49 +872,24 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
}
}
else {
- /* if destination on another processor, use primary transport */
- if (dstProcId != MultiProc_self()) {
- priority = MessageQ_getMsgPri(msg);
- clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
-
- /* primary transport can only be used for intra-cluster delivery */
- if (clusterId > MultiProc_getNumProcsInCluster()) {
- printf("MessageQ_put: Error: destination procId=%d is not "
- "in cluster. Must specify a transportId.\n", dstProcId);
- return (MessageQ_E_FAIL);
- }
-
- msgTrans = MessageQ_module->transports[clusterId][priority];
-
- IMessageQTransport_put(msgTrans, (Ptr)msg);
+ /* use primary transport for delivery */
+ priority = MessageQ_getMsgPri(msg);
+ clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
+
+ /* primary transport can only be used for intra-cluster delivery */
+ if (clusterId > MultiProc_getNumProcsInCluster()) {
+ printf("MessageQ_put: Error: destination procId=%d is not "
+ "in cluster. Must specify a transportId.\n", dstProcId);
+ status = MessageQ_E_FAIL;
+ goto done;
}
- else {
- /* check if destination queue is in this process */
- queueIndex = queuePort - MessageQ_PORTOFFSET;
-
- if (queueIndex >= MessageQ_module->numQueues) {
- printf("MessageQ_put: Error: unable to deliver message, "
- "queueIndex too large or transportId missing.\n");
- return (MessageQ_E_FAIL);
- }
-
- obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
- if (obj == NULL) {
- printf("MessageQ_put: Error: unable to deliver message, "
- "destination queue not in this process.\n");
- return (MessageQ_E_FAIL);
- }
-
- /* deliver message to process local queue */
- pthread_mutex_lock(&MessageQ_module->gate);
- CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg,
- elem);
- pthread_mutex_unlock(&MessageQ_module->gate);
- sem_post(&obj->synchronizer);
- }
+ msgTrans = MessageQ_module->transports[clusterId][priority];
+ delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+ status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
}
+done:
return (status);
}