Vault
4.1
|
00001 /* 00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved. 00003 This file is part of the Code Vault version 4.1 00004 http://www.bombaydigital.com/ 00005 License: MIT. See LICENSE.md in the Vault top level directory. 00006 */ 00007 00010 #include "vlistenerthread.h" 00011 #include "vtypes_internal.h" 00012 00013 #include "vlistenersocket.h" 00014 #include "vsocketfactory.h" 00015 #include "vsocketthreadfactory.h" 00016 #include "vmanagementinterface.h" 00017 #include "vclientsession.h" 00018 #include "vexception.h" 00019 #include "vmutexlocker.h" 00020 #include "vlogger.h" 00021 #include "vmessageinputthread.h" 00022 #include "vmessageoutputthread.h" 00023 00024 VListenerThread::VListenerThread(const VString& threadBaseName, bool deleteSelfAtEnd, bool createDetached, VManagementInterface* manager, int portNumber, const VString& bindAddress, VSocketFactory* socketFactory, VSocketThreadFactory* threadFactory, VClientSessionFactory* sessionFactory, bool initiallyListening) 00025 : VThread(threadBaseName, VSTRING_FORMAT("vault.messages.VListenerThread.%s.%d", threadBaseName.chars(), portNumber), deleteSelfAtEnd, createDetached, manager) 00026 , mPortNumber(portNumber) 00027 , mBindAddress(bindAddress) 00028 , mShouldListen(initiallyListening) 00029 , mSocketFactory(socketFactory) 00030 , mThreadFactory(threadFactory) 00031 , mSessionFactory(sessionFactory) 00032 , mSocketThreads() 00033 , mSocketThreadsMutex(VSTRING_FORMAT("VListenerThread(%s)::mSocketThreadsMutex", threadBaseName.chars())) 00034 { 00035 } 00036 00037 VListenerThread::~VListenerThread() { 00038 VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("VListenerThread '%s' ended.", mName.chars())); 00039 00040 // Make sure any of socket threads still alive no longer reference us. 00041 VMutexLocker locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::socketThreadEnded()", this->getName().chars())); 00042 for (VSocketThreadPtrVector::const_iterator i = mSocketThreads.begin(); i != mSocketThreads.end(); ++i) { 00043 (*i)->mOwnerThread = NULL; 00044 } 00045 } 00046 00047 void VListenerThread::stop() { 00048 this->stopListening(); 00049 this->stopAllSocketThreads(); 00050 00051 VThread::stop(); 00052 } 00053 00054 void VListenerThread::run() { 00055 while (this->isRunning()) { 00056 if (mShouldListen) { 00057 this->_runListening(); 00058 } else { 00059 VThread::sleep(5 * VDuration::SECOND()); // this value limits how quickly we can be shut down 00060 } 00061 } 00062 } 00063 00064 void VListenerThread::socketThreadEnded(VSocketThread* socketThread) { 00065 VMutexLocker locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::socketThreadEnded()", this->getName().chars())); 00066 VSocketThreadPtrVector::iterator position; 00067 00068 position = std::find(mSocketThreads.begin(), mSocketThreads.end(), socketThread); 00069 00070 if (position != mSocketThreads.end()) { 00071 mSocketThreads.erase(position); 00072 } 00073 } 00074 00075 int VListenerThread::getPortNumber() const { 00076 return mPortNumber; 00077 } 00078 00079 VSocketInfoVector VListenerThread::enumerateActiveSockets() { 00080 VSocketInfoVector info; 00081 VMutexLocker locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::enumerateActiveSockets()", this->getName().chars())); 00082 00083 for (VSizeType i = 0; i < mSocketThreads.size(); ++i) { 00084 VSocketInfo oneSocketInfo(*(mSocketThreads[i]->getSocket())); 00085 00086 info.push_back(oneSocketInfo); 00087 } 00088 00089 return info; 00090 } 00091 00092 void VListenerThread::stopSocketThread(VSocketID socketID, int localPortNumber) { 00093 bool found = false; 00094 VMutexLocker locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::stopSocketThread()", this->getName().chars())); 00095 00096 for (VSizeType i = 0; i < mSocketThreads.size(); ++i) { 00097 VSocketThread* thread = mSocketThreads[i]; 00098 VSocket* socket = thread->getSocket(); 00099 00100 if ((socket->getSockID() == socketID) && 00101 (socket->getPortNumber() == localPortNumber)) { 00102 found = true; 00103 thread->closeAndStop(); 00104 } 00105 } 00106 00107 if (! found) { 00108 throw VStackTraceException(VSTRING_FORMAT("VListenerThread::stopSocketThread did not find a socket with id %d and port %d.", socketID, localPortNumber)); 00109 } 00110 } 00111 00112 void VListenerThread::stopAllSocketThreads() { 00113 VMutexLocker locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::stopAllSocketThreads()", this->getName().chars())); 00114 00115 for (VSizeType i = 0; i < mSocketThreads.size(); ++i) { 00116 VSocketThread* thread = mSocketThreads[i]; 00117 00118 thread->closeAndStop(); 00119 } 00120 } 00121 00122 void VListenerThread::_runListening() { 00123 VListenerSocket* listenerSocket = NULL; 00124 00125 if (mManager != NULL) { 00126 mManager->listenerStarting(this); 00127 } 00128 00129 VString exceptionMessage; // filled in if catch block entered 00130 try { 00131 listenerSocket = new VListenerSocket(mPortNumber, mBindAddress, mSocketFactory); 00132 listenerSocket->listen(); 00133 00134 if (mManager != NULL) { 00135 mManager->listenerListening(this); 00136 } 00137 00138 while (mShouldListen && this->isRunning()) { 00139 VSocket* theSocket = listenerSocket->accept(); 00140 00141 if (theSocket != NULL) { 00142 try { 00143 VMutexLocker locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::_runListening()", this->getName().chars())); 00144 00145 if (mSessionFactory == NULL) { 00146 VSocketThread* thread = mThreadFactory->createThread(theSocket, this); 00147 thread->start(); // throws if can't create OS thread 00148 mSocketThreads.push_back(thread); 00149 } else { 00150 VClientSessionPtr session = mSessionFactory->createSession(theSocket, this); // throws if can't create OS thread(s) 00151 VSocketThread* thread; 00152 thread = session->getInputThread(); 00153 if (thread != NULL) { 00154 mSocketThreads.push_back(thread); 00155 } 00156 00157 thread = session->getOutputThread(); 00158 if (thread != NULL) { 00159 mSocketThreads.push_back(thread); 00160 } 00161 00162 mSessionFactory->addSessionToServer(session); 00163 } 00164 } catch (const VException& ex) { 00165 // Likely cause: Failure in starting OS thread. Log, but keep listening. 00166 VLOGGER_ERROR(VSTRING_FORMAT("[%s]VListenerThread::_runListening: Unable to create new session: Error %d. %s", this->getName().chars(), ex.getError(), ex.what())); 00167 delete theSocket; 00168 } 00169 } else { 00170 /* 00171 We timed out, which is normal if we have a timeout value. 00172 As long as we haven't been stopped, we'll try again. 00173 */ 00174 } 00175 } 00176 } catch (const VException& ex) { 00177 exceptionMessage.format("[%s]VListenerThread::_runListening() caught exception #%d '%s'.", mName.chars(), ex.getError(), ex.what()); 00178 } catch (const std::exception& ex) { 00179 exceptionMessage.format("[%s]VListenerThread::_runListening() caught exception '%s'.", mName.chars(), ex.what()); 00180 } catch (...) { 00181 exceptionMessage.format("[%s]VListenerThread::_runListening() caught unknown exception.", mName.chars()); 00182 } 00183 00184 if (exceptionMessage.isNotEmpty()) { 00185 mShouldListen = false; 00186 VLOGGER_NAMED_ERROR(mLoggerName, exceptionMessage); 00187 if (mManager != NULL) { 00188 mManager->listenerFailed(this, exceptionMessage); 00189 } 00190 } 00191 00192 delete listenerSocket; 00193 00194 if (mManager != NULL) { 00195 mManager->listenerEnded(this); 00196 } 00197 } 00198