diff options
-rw-r--r-- | linux/src/api/Ipc.c | 3 | ||||
-rw-r--r-- | linux/src/daemon/NameServer_daemon.c | 121 |
2 files changed, 122 insertions, 2 deletions
diff --git a/linux/src/api/Ipc.c b/linux/src/api/Ipc.c index 1b9d25f..90725d3 100644 --- a/linux/src/api/Ipc.c +++ b/linux/src/api/Ipc.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com + * Copyright (c) 2012-2016 Texas Instruments Incorporated - http://www.ti.com * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -459,6 +459,7 @@ Int Ipc_attach(UInt16 procId) status = Ipc_module.transportFactory->attachFxn(procId); if (status < 0) { + NameServer_detach(procId); status = Ipc_E_FAIL; goto done; } diff --git a/linux/src/daemon/NameServer_daemon.c b/linux/src/daemon/NameServer_daemon.c index e296176..288b39e 100644 --- a/linux/src/daemon/NameServer_daemon.c +++ b/linux/src/daemon/NameServer_daemon.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com + * Copyright (c) 2012-2016 Texas Instruments Incorporated - http://www.ti.com * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -145,6 +145,7 @@ typedef struct NameServer_ModuleObject { NameServer_Params defInstParams; /* Default instance paramters */ pthread_mutex_t modGate; + pthread_mutex_t attachGate; } NameServer_ModuleObject; #define CIRCLEQ_elemClear(elem) { \ @@ -178,6 +179,7 @@ static NameServer_ModuleObject NameServer_state = { // only _NP (non-portable) type available in CG tools which we're using .modGate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP, #endif + .attachGate = PTHREAD_MUTEX_INITIALIZER, .refCount = 0 }; @@ -241,6 +243,97 @@ static UInt32 stringHash(String s) return (hash); } +static Int NameServer_reattach(UInt16 procId) +{ + Int status = NameServer_S_SUCCESS; + UInt16 clId; + int sendSock = INVALIDSOCKET; + int recvSock = INVALIDSOCKET; + int err; + + /* procId already validated in API layer */ + clId = procId - MultiProc_getBaseIdOfCluster(); + + if (NameServer_module->comm[clId].refCount == 0) { + goto done; + } + + LOG2("NameServer_reattach: --> procId=%d, refCount=%d\n", + procId, NameServer_module->comm[clId].refCount) + + /* first create new sockets */ + sendSock = socket(AF_RPMSG, SOCK_SEQPACKET, 0); + if (sendSock < 0) { + status = NameServer_E_FAIL; + LOG2("NameServer_reattach: socket failed: %d, %s\n", errno, + strerror(errno)); + goto done; + } + LOG2("NameServer_reattach: created send socket: %d, procId %d\n", sendSock, + procId); + + err = ConnectSocket(sendSock, procId, MESSAGEQ_RPMSG_PORT); + if (err < 0) { + status = NameServer_E_FAIL; + LOG3("NameServer_reattach: connect failed: procId=%d, errno=%d (%s)\n", + procId, errno, strerror(errno)); + goto done; + } + + /* create socket for receiving messages from remote processor */ + recvSock = socket(AF_RPMSG, SOCK_SEQPACKET, 0); + if (recvSock < 0) { + status = NameServer_E_FAIL; + LOG2("NameServer_reattach: socket failed: %d, %s\n", errno, + strerror(errno)); + goto done; + } + + LOG2("NameServer_attach: created receive socket: %d, procId %d\n", recvSock, + procId); + + err = SocketBindAddr(recvSock, procId, NAME_SERVER_RPMSG_ADDR); + if (err < 0) { + status = NameServer_E_FAIL; + LOG2("NameServer_attach: bind failed: %d, %s\n", errno, + strerror(errno)); + goto done; + } + + /* then close old sockets */ + /* close the sending socket */ + LOG1("NameServer_reattach: closing socket: %d\n", + NameServer_module->comm[clId].sendSock) + close(NameServer_module->comm[clId].sendSock); + + /* close the receiving socket */ + LOG1("NameServer_reattach: closing socket: %d\n", + NameServer_module->comm[clId].recvSock) + close(NameServer_module->comm[clId].recvSock); + + /* assign new sockets */ + NameServer_module->comm[clId].sendSock = sendSock; + NameServer_module->comm[clId].recvSock = recvSock; + +done: + if (status < 0) { + if (recvSock >= 0) { + LOG1(" closing receive socket: %d\n", recvSock) + close(recvSock); + } + + if (sendSock >= 0) { + LOG1(" closing send socket: %d\n", sendSock) + close(sendSock); + } + } + + LOG2("NameServer_reattach: <-- refCount=%d, status=%d\n", + NameServer_module->comm[clId].refCount, status) + + return (status); +} + static void NameServerRemote_processMessage(NameServerRemote_Msg *msg, UInt16 procId) { @@ -330,6 +423,7 @@ static void *listener_cb(void *arg) int sock; uint64_t event; Bool run = TRUE; + Bool reconnect = FALSE; LOG0("listener_cb: Entered Listener thread.\n") @@ -384,6 +478,10 @@ static void *listener_cb(void *arg) } if (nbytes < 0) { LOG2("recvfrom failed: %s (%d)\n", strerror(errno), errno) + if (errno == ENOLINK) { + LOG0("Socket is no longer valid, MUST re-attach!\n"); + reconnect = TRUE; + } break; } else { @@ -414,6 +512,14 @@ static void *listener_cb(void *arg) } } + if (reconnect) { + reconnect = FALSE; + /* grab lock to prevent users from attach/deattach while recovering */ + pthread_mutex_lock(&NameServer_module->attachGate); + NameServer_reattach(procId); + pthread_mutex_unlock(&NameServer_module->attachGate); + } + } while (run); return ((void *)ret); @@ -1281,6 +1387,8 @@ Int NameServer_attach(UInt16 procId) /* procId already validated in API layer */ clId = procId - MultiProc_getBaseIdOfCluster(); + pthread_mutex_lock(&NameServer_module->attachGate); + LOG2("NameServer_attach: --> procId=%d, refCount=%d\n", procId, NameServer_module->comm[clId].refCount) @@ -1333,6 +1441,8 @@ Int NameServer_attach(UInt16 procId) /* getting here means we have successfully attached */ NameServer_module->comm[clId].refCount++; + pthread_mutex_unlock(&NameServer_module->attachGate); + /* tell the listener thread to add new receive sockets */ event = NameServer_Event_REFRESH; write(NameServer_module->unblockFd, &event, sizeof(event)); @@ -1340,6 +1450,8 @@ Int NameServer_attach(UInt16 procId) /* wait for ACK event */ read(NameServer_module->waitFd, &event, sizeof(event)); + pthread_mutex_lock(&NameServer_module->attachGate); + done: if (status < 0) { sock = NameServer_module->comm[clId].recvSock; @@ -1360,6 +1472,8 @@ done: LOG2("NameServer_attach: <-- refCount=%d, status=%d\n", NameServer_module->comm[clId].refCount, status) + pthread_mutex_unlock(&NameServer_module->attachGate); + return (status); } @@ -1377,11 +1491,14 @@ Int NameServer_detach(UInt16 procId) /* procId already validated in API layer */ clId = procId - MultiProc_getBaseIdOfCluster(); + pthread_mutex_lock(&NameServer_module->attachGate); + LOG2("NameServer_detach: --> procId=%d, refCount=%d\n", procId, NameServer_module->comm[clId].refCount) /* decrement reference count regardless of outcome below */ if (--NameServer_module->comm[clId].refCount > 0) { + pthread_mutex_unlock(&NameServer_module->attachGate); goto done; } @@ -1392,6 +1509,8 @@ Int NameServer_detach(UInt16 procId) recvSock = NameServer_module->comm[clId].recvSock; NameServer_module->comm[clId].recvSock = INVALIDSOCKET; + pthread_mutex_unlock(&NameServer_module->attachGate); + /* tell the listener thread to remove old sockets */ event = NameServer_Event_REFRESH; write(NameServer_module->unblockFd, &event, sizeof(event)); |